seaweedfs/weed/mq/logstore/log_to_parquet.go
Chris Lu dc784bf217
merge current message queue code changes (#6201)
* listing files to convert to parquet

* write parquet files

* save logs into parquet files

* pass by value

* compact logs into parquet format

* can skip existing files

* refactor

* refactor

* fix compilation

* when no partition found

* refactor

* add untested parquet file read

* rename package

* refactor

* rename files

* remove unused

* add merged log read func

* parquet wants to know the file size

* rewind by time

* pass in stop ts

* add stop ts

* adjust log

* minor

* adjust log

* skip .parquet files when reading message logs

* skip non message files

* Update subscriber_record.go

* send messages

* skip message data with only ts

* skip non log files

* update parquet-go package

* ensure a valid record type

* add new field to a record type

* Update read_parquet_to_log.go

* fix parquet file name generation

* separating reading parquet and logs

* add key field

* add skipped logs

* use in memory cache

* refactor

* refactor

* refactor

* refactor, and change compact log

* refactor

* rename

* refactor

* fix format

* prefix v to version directory
2024-11-04 12:08:25 -08:00

455 lines
14 KiB
Go

package logstore
import (
"encoding/binary"
"fmt"
"github.com/parquet-go/parquet-go"
"github.com/parquet-go/parquet-go/compress/zstd"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/mq/schema"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
"google.golang.org/protobuf/proto"
"io"
"os"
"strings"
"time"
)
const (
SW_COLUMN_NAME_TS = "_ts_ns"
SW_COLUMN_NAME_KEY = "_key"
)
func CompactTopicPartitions(filerClient filer_pb.FilerClient, t topic.Topic, timeAgo time.Duration, recordType *schema_pb.RecordType, preference *operation.StoragePreference) error {
// list the topic partition versions
topicVersions, err := collectTopicVersions(filerClient, t, timeAgo)
if err != nil {
return fmt.Errorf("list topic files: %v", err)
}
// compact the partitions
for _, topicVersion := range topicVersions {
partitions, err := collectTopicVersionsPartitions(filerClient, t, topicVersion)
if err != nil {
return fmt.Errorf("list partitions %s/%s/%s: %v", t.Namespace, t.Name, topicVersion, err)
}
for _, partition := range partitions {
err := compactTopicPartition(filerClient, t, timeAgo, recordType, partition, preference)
if err != nil {
return fmt.Errorf("compact partition %s/%s/%s/%s: %v", t.Namespace, t.Name, topicVersion, partition, err)
}
}
}
return nil
}
func collectTopicVersions(filerClient filer_pb.FilerClient, t topic.Topic, timeAgo time.Duration) (partitionVersions []time.Time, err error) {
err = filer_pb.ReadDirAllEntries(filerClient, util.FullPath(t.Dir()), "", func(entry *filer_pb.Entry, isLast bool) error {
t, err := topic.ParseTopicVersion(entry.Name)
if err != nil {
// skip non-partition directories
return nil
}
if t.Unix() < time.Now().Unix()-int64(timeAgo/time.Second) {
partitionVersions = append(partitionVersions, t)
}
return nil
})
return
}
func collectTopicVersionsPartitions(filerClient filer_pb.FilerClient, t topic.Topic, topicVersion time.Time) (partitions []topic.Partition, err error) {
version := topicVersion.Format(topic.PartitionGenerationFormat)
err = filer_pb.ReadDirAllEntries(filerClient, util.FullPath(t.Dir()).Child(version), "", func(entry *filer_pb.Entry, isLast bool) error {
if !entry.IsDirectory {
return nil
}
start, stop := topic.ParsePartitionBoundary(entry.Name)
if start != stop {
partitions = append(partitions, topic.Partition{
RangeStart: start,
RangeStop: stop,
RingSize: topic.PartitionCount,
UnixTimeNs: topicVersion.UnixNano(),
})
}
return nil
})
return
}
func compactTopicPartition(filerClient filer_pb.FilerClient, t topic.Topic, timeAgo time.Duration, recordType *schema_pb.RecordType, partition topic.Partition, preference *operation.StoragePreference) error {
partitionDir := topic.PartitionDir(t, partition)
// compact the partition directory
return compactTopicPartitionDir(filerClient, t.Name, partitionDir, timeAgo, recordType, preference)
}
func compactTopicPartitionDir(filerClient filer_pb.FilerClient, topicName, partitionDir string, timeAgo time.Duration, recordType *schema_pb.RecordType, preference *operation.StoragePreference) error {
// read all existing parquet files
minTsNs, maxTsNs, err := readAllParquetFiles(filerClient, partitionDir)
if err != nil {
return err
}
// read all log files
logFiles, err := readAllLogFiles(filerClient, partitionDir, timeAgo, minTsNs, maxTsNs)
if err != nil {
return err
}
if len(logFiles) == 0 {
return nil
}
// divide log files into groups of 128MB
logFileGroups := groupFilesBySize(logFiles, 128*1024*1024)
// write to parquet file
parquetLevels, err := schema.ToParquetLevels(recordType)
if err != nil {
return fmt.Errorf("ToParquetLevels failed %+v: %v", recordType, err)
}
// create a parquet schema
parquetSchema, err := schema.ToParquetSchema(topicName, recordType)
if err != nil {
return fmt.Errorf("ToParquetSchema failed: %v", err)
}
// TODO parallelize the writing
for _, logFileGroup := range logFileGroups {
if err = writeLogFilesToParquet(filerClient, partitionDir, recordType, logFileGroup, parquetSchema, parquetLevels, preference); err != nil {
return err
}
}
return nil
}
func groupFilesBySize(logFiles []*filer_pb.Entry, maxGroupSize int64) (logFileGroups [][]*filer_pb.Entry) {
var logFileGroup []*filer_pb.Entry
var groupSize int64
for _, logFile := range logFiles {
if groupSize+int64(logFile.Attributes.FileSize) > maxGroupSize {
logFileGroups = append(logFileGroups, logFileGroup)
logFileGroup = nil
groupSize = 0
}
logFileGroup = append(logFileGroup, logFile)
groupSize += int64(logFile.Attributes.FileSize)
}
if len(logFileGroup) > 0 {
logFileGroups = append(logFileGroups, logFileGroup)
}
return
}
func readAllLogFiles(filerClient filer_pb.FilerClient, partitionDir string, timeAgo time.Duration, minTsNs, maxTsNs int64) (logFiles []*filer_pb.Entry, err error) {
err = filer_pb.ReadDirAllEntries(filerClient, util.FullPath(partitionDir), "", func(entry *filer_pb.Entry, isLast bool) error {
if strings.HasSuffix(entry.Name, ".parquet") {
return nil
}
if entry.Attributes.Crtime > time.Now().Unix()-int64(timeAgo/time.Second) {
return nil
}
logTime, err := time.Parse(topic.TIME_FORMAT, entry.Name)
if err != nil {
// glog.Warningf("parse log time %s: %v", entry.Name, err)
return nil
}
if maxTsNs > 0 && logTime.UnixNano() <= maxTsNs {
return nil
}
logFiles = append(logFiles, entry)
return nil
})
return
}
func readAllParquetFiles(filerClient filer_pb.FilerClient, partitionDir string) (minTsNs, maxTsNs int64, err error) {
err = filer_pb.ReadDirAllEntries(filerClient, util.FullPath(partitionDir), "", func(entry *filer_pb.Entry, isLast bool) error {
if !strings.HasSuffix(entry.Name, ".parquet") {
return nil
}
if len(entry.Extended) == 0 {
return nil
}
// read min ts
minTsBytes := entry.Extended["min"]
if len(minTsBytes) != 8 {
return nil
}
minTs := int64(binary.BigEndian.Uint64(minTsBytes))
if minTsNs == 0 || minTs < minTsNs {
minTsNs = minTs
}
// read max ts
maxTsBytes := entry.Extended["max"]
if len(maxTsBytes) != 8 {
return nil
}
maxTs := int64(binary.BigEndian.Uint64(maxTsBytes))
if maxTsNs == 0 || maxTs > maxTsNs {
maxTsNs = maxTs
}
return nil
})
return
}
func writeLogFilesToParquet(filerClient filer_pb.FilerClient, partitionDir string, recordType *schema_pb.RecordType, logFileGroups []*filer_pb.Entry, parquetSchema *parquet.Schema, parquetLevels *schema.ParquetLevels, preference *operation.StoragePreference) (err error) {
tempFile, err := os.CreateTemp(".", "t*.parquet")
if err != nil {
return fmt.Errorf("create temp file: %v", err)
}
defer func() {
tempFile.Close()
os.Remove(tempFile.Name())
}()
writer := parquet.NewWriter(tempFile, parquetSchema, parquet.Compression(&zstd.Codec{Level: zstd.DefaultLevel}))
rowBuilder := parquet.NewRowBuilder(parquetSchema)
var startTsNs, stopTsNs int64
for _, logFile := range logFileGroups {
fmt.Printf("compact %s/%s ", partitionDir, logFile.Name)
var rows []parquet.Row
if err := iterateLogEntries(filerClient, logFile, func(entry *filer_pb.LogEntry) error {
if startTsNs == 0 {
startTsNs = entry.TsNs
}
stopTsNs = entry.TsNs
if len(entry.Key) == 0 {
return nil
}
// write to parquet file
rowBuilder.Reset()
record := &schema_pb.RecordValue{}
if err := proto.Unmarshal(entry.Data, record); err != nil {
return fmt.Errorf("unmarshal record value: %v", err)
}
record.Fields[SW_COLUMN_NAME_TS] = &schema_pb.Value{
Kind: &schema_pb.Value_Int64Value{
Int64Value: entry.TsNs,
},
}
record.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{
Kind: &schema_pb.Value_BytesValue{
BytesValue: entry.Key,
},
}
if err := schema.AddRecordValue(rowBuilder, recordType, parquetLevels, record); err != nil {
return fmt.Errorf("add record value: %v", err)
}
rows = append(rows, rowBuilder.Row())
return nil
}); err != nil {
return fmt.Errorf("iterate log entry %v/%v: %v", partitionDir, logFile.Name, err)
}
fmt.Printf("processed %d rows\n", len(rows))
if _, err := writer.WriteRows(rows); err != nil {
return fmt.Errorf("write rows: %v", err)
}
}
if err := writer.Close(); err != nil {
return fmt.Errorf("close writer: %v", err)
}
// write to parquet file to partitionDir
parquetFileName := fmt.Sprintf("%s.parquet", time.Unix(0, startTsNs).UTC().Format("2006-01-02-15-04-05"))
if err := saveParquetFileToPartitionDir(filerClient, tempFile, partitionDir, parquetFileName, preference, startTsNs, stopTsNs); err != nil {
return fmt.Errorf("save parquet file %s: %v", parquetFileName, err)
}
return nil
}
func saveParquetFileToPartitionDir(filerClient filer_pb.FilerClient, sourceFile *os.File, partitionDir, parquetFileName string, preference *operation.StoragePreference, startTsNs, stopTsNs int64) error {
uploader, err := operation.NewUploader()
if err != nil {
return fmt.Errorf("new uploader: %v", err)
}
// get file size
fileInfo, err := sourceFile.Stat()
if err != nil {
return fmt.Errorf("stat source file: %v", err)
}
// upload file in chunks
chunkSize := int64(4 * 1024 * 1024)
chunkCount := (fileInfo.Size() + chunkSize - 1) / chunkSize
entry := &filer_pb.Entry{
Name: parquetFileName,
Attributes: &filer_pb.FuseAttributes{
Crtime: time.Now().Unix(),
Mtime: time.Now().Unix(),
FileMode: uint32(os.FileMode(0644)),
FileSize: uint64(fileInfo.Size()),
Mime: "application/vnd.apache.parquet",
},
}
entry.Extended = make(map[string][]byte)
minTsBytes := make([]byte, 8)
binary.BigEndian.PutUint64(minTsBytes, uint64(startTsNs))
entry.Extended["min"] = minTsBytes
maxTsBytes := make([]byte, 8)
binary.BigEndian.PutUint64(maxTsBytes, uint64(stopTsNs))
entry.Extended["max"] = maxTsBytes
for i := int64(0); i < chunkCount; i++ {
fileId, uploadResult, err, _ := uploader.UploadWithRetry(
filerClient,
&filer_pb.AssignVolumeRequest{
Count: 1,
Replication: preference.Replication,
Collection: preference.Collection,
TtlSec: 0, // TODO set ttl
DiskType: preference.DiskType,
Path: partitionDir + "/" + parquetFileName,
},
&operation.UploadOption{
Filename: parquetFileName,
Cipher: false,
IsInputCompressed: false,
MimeType: "application/vnd.apache.parquet",
PairMap: nil,
},
func(host, fileId string) string {
return fmt.Sprintf("http://%s/%s", host, fileId)
},
io.NewSectionReader(sourceFile, i*chunkSize, chunkSize),
)
if err != nil {
return fmt.Errorf("upload chunk %d: %v", i, err)
}
if uploadResult.Error != "" {
return fmt.Errorf("upload result: %v", uploadResult.Error)
}
entry.Chunks = append(entry.Chunks, uploadResult.ToPbFileChunk(fileId, i*chunkSize, time.Now().UnixNano()))
}
// write the entry to partitionDir
if err := filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
return filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
Directory: partitionDir,
Entry: entry,
})
}); err != nil {
return fmt.Errorf("create entry: %v", err)
}
fmt.Printf("saved to %s/%s\n", partitionDir, parquetFileName)
return nil
}
func iterateLogEntries(filerClient filer_pb.FilerClient, logFile *filer_pb.Entry, eachLogEntryFn func(entry *filer_pb.LogEntry) error) error {
lookupFn := filer.LookupFn(filerClient)
_, err := eachFile(logFile, lookupFn, func(logEntry *filer_pb.LogEntry) (isDone bool, err error) {
if err := eachLogEntryFn(logEntry); err != nil {
return true, err
}
return false, nil
})
return err
}
func eachFile(entry *filer_pb.Entry, lookupFileIdFn func(fileId string) (targetUrls []string, err error), eachLogEntryFn log_buffer.EachLogEntryFuncType) (processedTsNs int64, err error) {
if len(entry.Content) > 0 {
// skip .offset files
return
}
var urlStrings []string
for _, chunk := range entry.Chunks {
if chunk.Size == 0 {
continue
}
if chunk.IsChunkManifest {
fmt.Printf("this should not happen. unexpected chunk manifest in %s", entry.Name)
return
}
urlStrings, err = lookupFileIdFn(chunk.FileId)
if err != nil {
err = fmt.Errorf("lookup %s: %v", chunk.FileId, err)
return
}
if len(urlStrings) == 0 {
err = fmt.Errorf("no url found for %s", chunk.FileId)
return
}
// try one of the urlString until util.Get(urlString) succeeds
var processed bool
for _, urlString := range urlStrings {
var data []byte
if data, _, err = util_http.Get(urlString); err == nil {
processed = true
if processedTsNs, err = eachChunk(data, eachLogEntryFn); err != nil {
return
}
break
}
}
if !processed {
err = fmt.Errorf("no data processed for %s %s", entry.Name, chunk.FileId)
return
}
}
return
}
func eachChunk(buf []byte, eachLogEntryFn log_buffer.EachLogEntryFuncType) (processedTsNs int64, err error) {
for pos := 0; pos+4 < len(buf); {
size := util.BytesToUint32(buf[pos : pos+4])
if pos+4+int(size) > len(buf) {
err = fmt.Errorf("reach each log chunk: read [%d,%d) from [0,%d)", pos, pos+int(size)+4, len(buf))
return
}
entryData := buf[pos+4 : pos+4+int(size)]
logEntry := &filer_pb.LogEntry{}
if err = proto.Unmarshal(entryData, logEntry); err != nil {
pos += 4 + int(size)
err = fmt.Errorf("unexpected unmarshal mq_pb.Message: %v", err)
return
}
if _, err = eachLogEntryFn(logEntry); err != nil {
err = fmt.Errorf("process log entry %v: %v", logEntry, err)
return
}
processedTsNs = logEntry.TsNs
pos += 4 + int(size)
}
return
}