From dc784bf217172b0e81eb4b3e5eb0e0e38b91849a Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 4 Nov 2024 12:08:25 -0800 Subject: [PATCH] merge current message queue code changes (#6201) * listing files to convert to parquet * write parquet files * save logs into parquet files * pass by value * compact logs into parquet format * can skip existing files * refactor * refactor * fix compilation * when no partition found * refactor * add untested parquet file read * rename package * refactor * rename files * remove unused * add merged log read func * parquet wants to know the file size * rewind by time * pass in stop ts * add stop ts * adjust log * minor * adjust log * skip .parquet files when reading message logs * skip non message files * Update subscriber_record.go * send messages * skip message data with only ts * skip non log files * update parquet-go package * ensure a valid record type * add new field to a record type * Update read_parquet_to_log.go * fix parquet file name generation * separating reading parquet and logs * add key field * add skipped logs * use in memory cache * refactor * refactor * refactor * refactor, and change compact log * refactor * rename * refactor * fix format * prefix v to version directory --- go.mod | 2 +- go.sum | 2 + weed/command/benchmark.go | 4 +- weed/command/upload.go | 18 +- weed/filer/reader_at.go | 4 + weed/filer/topics.go | 5 +- weed/filer_client/filer_client_accessor.go | 36 +- weed/mq/broker/broker_grpc_assign.go | 3 +- weed/mq/broker/broker_grpc_configure.go | 2 +- weed/mq/broker/broker_grpc_pub_follow.go | 5 +- weed/mq/broker/broker_grpc_sub_follow.go | 9 +- .../mq/broker/broker_topic_conf_read_write.go | 5 +- .../broker_topic_partition_read_write.go | 137 +----- .../cmd/weed_pub_record/publisher_record.go | 13 +- .../cmd/weed_sub_record/subscriber_record.go | 15 +- .../mq/client/sub_client/on_each_partition.go | 12 + weed/mq/client/sub_client/subscriber.go | 1 + weed/mq/logstore/log_to_parquet.go | 454 ++++++++++++++++++ weed/mq/logstore/merged_read.go | 41 ++ weed/mq/logstore/read_log_from_disk.go | 144 ++++++ weed/mq/logstore/read_parquet_to_log.go | 162 +++++++ weed/mq/pub_balancer/broker_stats.go | 4 +- weed/mq/schema/schema_builder.go | 7 + .../partition_consumer_mapping.go | 10 +- weed/mq/sub_coordinator/partition_list.go | 14 +- weed/mq/topic/local_manager.go | 2 +- weed/mq/topic/local_partition.go | 1 + weed/mq/topic/partition.go | 18 +- weed/mq/topic/topic.go | 44 +- weed/mq/topic/topic_partition.go | 2 +- weed/operation/submit.go | 80 +-- weed/shell/command_mq_topic_compact.go | 94 ++++ .../util/chunk_cache/chunk_cache_in_memory.go | 20 + 33 files changed, 1106 insertions(+), 264 deletions(-) create mode 100644 weed/mq/logstore/log_to_parquet.go create mode 100644 weed/mq/logstore/merged_read.go create mode 100644 weed/mq/logstore/read_log_from_disk.go create mode 100644 weed/mq/logstore/read_parquet_to_log.go create mode 100644 weed/shell/command_mq_topic_compact.go diff --git a/go.mod b/go.mod index a0301cb96..a270518b6 100644 --- a/go.mod +++ b/go.mod @@ -141,7 +141,7 @@ require ( github.com/hashicorp/raft v1.7.1 github.com/hashicorp/raft-boltdb/v2 v2.3.0 github.com/orcaman/concurrent-map/v2 v2.0.1 - github.com/parquet-go/parquet-go v0.23.0 + github.com/parquet-go/parquet-go v0.23.1-0.20241011155651-6446d1d0d2fe github.com/rabbitmq/amqp091-go v1.10.0 github.com/rclone/rclone v1.68.1 github.com/rdleal/intervalst v1.4.0 diff --git a/go.sum b/go.sum index be5f013dd..307e1f370 100644 --- a/go.sum +++ b/go.sum @@ -1338,6 +1338,8 @@ github.com/panjf2000/ants/v2 v2.9.1 h1:Q5vh5xohbsZXGcD6hhszzGqB7jSSc2/CRr3QKIga8 github.com/panjf2000/ants/v2 v2.9.1/go.mod h1:7ZxyxsqE4vvW0M7LSD8aI3cKwgFhBHbxnlN8mDqHa1I= github.com/parquet-go/parquet-go v0.23.0 h1:dyEU5oiHCtbASyItMCD2tXtT2nPmoPbKpqf0+nnGrmk= github.com/parquet-go/parquet-go v0.23.0/go.mod h1:MnwbUcFHU6uBYMymKAlPPAw9yh3kE1wWl6Gl1uLdkNk= +github.com/parquet-go/parquet-go v0.23.1-0.20241011155651-6446d1d0d2fe h1:oUJ5TPnrEK/z+/PeoLL+jCgfngAZIDMyhZASetRcYYg= +github.com/parquet-go/parquet-go v0.23.1-0.20241011155651-6446d1d0d2fe/go.mod h1:OqBBRGBl7+llplCvDMql8dEKaDqjaFA/VAPw+OJiNiw= github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go index bc7ee1292..08db2ef3d 100644 --- a/weed/command/benchmark.go +++ b/weed/command/benchmark.go @@ -21,8 +21,8 @@ import ( "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/security" "github.com/seaweedfs/seaweedfs/weed/util" - "github.com/seaweedfs/seaweedfs/weed/wdclient" util_http "github.com/seaweedfs/seaweedfs/weed/util/http" + "github.com/seaweedfs/seaweedfs/weed/wdclient" ) type BenchmarkOptions struct { @@ -242,7 +242,7 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) { DiskType: *b.diskType, } if assignResult, err := operation.Assign(b.masterClient.GetMaster, b.grpcDialOption, ar); err == nil { - fp.Server, fp.Fid, fp.Collection = assignResult.Url, assignResult.Fid, *b.collection + fp.Server, fp.Fid, fp.Pref.Collection = assignResult.Url, assignResult.Fid, *b.collection if !isSecure && assignResult.Auth != "" { isSecure = true } diff --git a/weed/command/upload.go b/weed/command/upload.go index 7135a707a..9f9ac1107 100644 --- a/weed/command/upload.go +++ b/weed/command/upload.go @@ -97,7 +97,14 @@ func runUpload(cmd *Command, args []string) bool { if e != nil { return e } - results, e := operation.SubmitFiles(func(_ context.Context) pb.ServerAddress { return pb.ServerAddress(*upload.master) }, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl) + results, e := operation.SubmitFiles(func(_ context.Context) pb.ServerAddress { return pb.ServerAddress(*upload.master) }, grpcDialOption, parts, operation.StoragePreference{ + Replication: *upload.replication, + Collection: *upload.collection, + DataCenter: *upload.dataCenter, + Ttl: *upload.ttl, + DiskType: *upload.diskType, + MaxMB: *upload.maxMB, + }, *upload.usePublicUrl) bytes, _ := json.Marshal(results) fmt.Println(string(bytes)) if e != nil { @@ -119,7 +126,14 @@ func runUpload(cmd *Command, args []string) bool { fmt.Println(e.Error()) return false } - results, err := operation.SubmitFiles(func(_ context.Context) pb.ServerAddress { return pb.ServerAddress(*upload.master) }, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl) + results, err := operation.SubmitFiles(func(_ context.Context) pb.ServerAddress { return pb.ServerAddress(*upload.master) }, grpcDialOption, parts, operation.StoragePreference{ + Replication: *upload.replication, + Collection: *upload.collection, + DataCenter: *upload.dataCenter, + Ttl: *upload.ttl, + DiskType: *upload.diskType, + MaxMB: *upload.maxMB, + }, *upload.usePublicUrl) if err != nil { fmt.Println(err.Error()) return false diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go index 24162995e..d7617b740 100644 --- a/weed/filer/reader_at.go +++ b/weed/filer/reader_at.go @@ -97,6 +97,10 @@ func NewChunkReaderAtFromClient(readerCache *ReaderCache, chunkViews *IntervalLi } } +func (c *ChunkReadAt) Size() int64 { + return c.fileSize +} + func (c *ChunkReadAt) Close() error { c.readerCache.destroy() return nil diff --git a/weed/filer/topics.go b/weed/filer/topics.go index 3a2fde8c4..707a4f878 100644 --- a/weed/filer/topics.go +++ b/weed/filer/topics.go @@ -1,6 +1,7 @@ package filer const ( - TopicsDir = "/topics" - SystemLogDir = TopicsDir + "/.system/log" + TopicsDir = "/topics" + SystemLogDir = TopicsDir + "/.system/log" + TopicConfFile = "topic.conf" ) diff --git a/weed/filer_client/filer_client_accessor.go b/weed/filer_client/filer_client_accessor.go index be70f2b82..20646d343 100644 --- a/weed/filer_client/filer_client_accessor.go +++ b/weed/filer_client/filer_client_accessor.go @@ -1,16 +1,12 @@ package filer_client import ( - "bytes" - "fmt" - "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "google.golang.org/grpc" - jsonpb "google.golang.org/protobuf/encoding/protojson" ) type FilerClientAccessor struct { @@ -22,41 +18,23 @@ func (fca *FilerClientAccessor) WithFilerClient(streamingMode bool, fn func(file return pb.WithFilerClient(streamingMode, 0, fca.GetFiler(), fca.GetGrpcDialOption(), fn) } -func (fca *FilerClientAccessor) SaveTopicConfToFiler(t *mq_pb.Topic, conf *mq_pb.ConfigureTopicResponse) error { +func (fca *FilerClientAccessor) SaveTopicConfToFiler(t topic.Topic, conf *mq_pb.ConfigureTopicResponse) error { glog.V(0).Infof("save conf for topic %v to filer", t) // save the topic configuration on filer - topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name) - if err := fca.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - var buf bytes.Buffer - filer.ProtoToText(&buf, conf) - return filer.SaveInsideFiler(client, topicDir, "topic.conf", buf.Bytes()) - }); err != nil { - return fmt.Errorf("save topic to %s: %v", topicDir, err) - } - return nil + return fca.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + return t.WriteConfFile(client, conf) + }) } func (fca *FilerClientAccessor) ReadTopicConfFromFiler(t topic.Topic) (conf *mq_pb.ConfigureTopicResponse, err error) { - glog.V(0).Infof("load conf for topic %v from filer", t) + glog.V(1).Infof("load conf for topic %v from filer", t) - topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name) if err = fca.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - data, err := filer.ReadInsideFiler(client, topicDir, "topic.conf") - if err == filer_pb.ErrNotFound { - return err - } - if err != nil { - return fmt.Errorf("read topic.conf of %v: %v", t, err) - } - // parse into filer conf object - conf = &mq_pb.ConfigureTopicResponse{} - if err = jsonpb.Unmarshal(data, conf); err != nil { - return fmt.Errorf("unmarshal topic %v conf: %v", t, err) - } - return nil + conf, err = t.ReadConfFile(client) + return err }); err != nil { return nil, err } diff --git a/weed/mq/broker/broker_grpc_assign.go b/weed/mq/broker/broker_grpc_assign.go index 48ec0d5bd..9a9b34c0b 100644 --- a/weed/mq/broker/broker_grpc_assign.go +++ b/weed/mq/broker/broker_grpc_assign.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/mq/logstore" "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb" @@ -26,7 +27,7 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m } else { var localPartition *topic.LocalPartition if localPartition = b.localTopicManager.GetLocalPartition(t, partition); localPartition == nil { - localPartition = topic.NewLocalPartition(partition, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition)) + localPartition = topic.NewLocalPartition(partition, b.genLogFlushFunc(t, partition), logstore.GenMergedReadFunc(b, t, partition)) b.localTopicManager.AddLocalPartition(t, localPartition) } } diff --git a/weed/mq/broker/broker_grpc_configure.go b/weed/mq/broker/broker_grpc_configure.go index 7222c8359..361af5c43 100644 --- a/weed/mq/broker/broker_grpc_configure.go +++ b/weed/mq/broker/broker_grpc_configure.go @@ -67,7 +67,7 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb. resp.RecordType = request.RecordType // save the topic configuration on filer - if err := b.fca.SaveTopicConfToFiler(request.Topic, resp); err != nil { + if err := b.fca.SaveTopicConfToFiler(t, resp); err != nil { return nil, fmt.Errorf("configure topic: %v", err) } diff --git a/weed/mq/broker/broker_grpc_pub_follow.go b/weed/mq/broker/broker_grpc_pub_follow.go index 8995b0cc2..291f1ef62 100644 --- a/weed/mq/broker/broker_grpc_pub_follow.go +++ b/weed/mq/broker/broker_grpc_pub_follow.go @@ -2,7 +2,6 @@ package broker import ( "fmt" - "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" @@ -93,9 +92,7 @@ func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_Publi time.Sleep(113 * time.Millisecond) } - topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name) - partitionGeneration := time.Unix(0, p.UnixTimeNs).UTC().Format(topic.TIME_FORMAT) - partitionDir := fmt.Sprintf("%s/%s/%04d-%04d", topicDir, partitionGeneration, p.RangeStart, p.RangeStop) + partitionDir := topic.PartitionDir(t, p) // flush the remaining messages inMemoryBuffers.CloseInput() diff --git a/weed/mq/broker/broker_grpc_sub_follow.go b/weed/mq/broker/broker_grpc_sub_follow.go index f7f4ac7e9..bed906c30 100644 --- a/weed/mq/broker/broker_grpc_sub_follow.go +++ b/weed/mq/broker/broker_grpc_sub_follow.go @@ -9,7 +9,6 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/util" "io" - "time" ) func (b *MessageQueueBroker) SubscribeFollowMe(stream mq_pb.SeaweedMessaging_SubscribeFollowMeServer) (err error) { @@ -65,9 +64,7 @@ func (b *MessageQueueBroker) SubscribeFollowMe(stream mq_pb.SeaweedMessaging_Sub func (b *MessageQueueBroker) readConsumerGroupOffset(initMessage *mq_pb.SubscribeMessageRequest_InitMessage) (offset int64, err error) { t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.PartitionOffset.Partition) - topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name) - partitionGeneration := time.Unix(0, p.UnixTimeNs).UTC().Format(topic.TIME_FORMAT) - partitionDir := fmt.Sprintf("%s/%s/%04d-%04d", topicDir, partitionGeneration, p.RangeStart, p.RangeStop) + partitionDir := topic.PartitionDir(t, p) offsetFileName := fmt.Sprintf("%s.offset", initMessage.ConsumerGroup) err = b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { @@ -86,9 +83,7 @@ func (b *MessageQueueBroker) readConsumerGroupOffset(initMessage *mq_pb.Subscrib func (b *MessageQueueBroker) saveConsumerGroupOffset(t topic.Topic, p topic.Partition, consumerGroup string, offset int64) error { - topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name) - partitionGeneration := time.Unix(0, p.UnixTimeNs).UTC().Format(topic.TIME_FORMAT) - partitionDir := fmt.Sprintf("%s/%s/%04d-%04d", topicDir, partitionGeneration, p.RangeStart, p.RangeStop) + partitionDir := topic.PartitionDir(t, p) offsetFileName := fmt.Sprintf("%s.offset", consumerGroup) offsetBytes := make([]byte, 8) diff --git a/weed/mq/broker/broker_topic_conf_read_write.go b/weed/mq/broker/broker_topic_conf_read_write.go index ea5cb71b9..222ff16ba 100644 --- a/weed/mq/broker/broker_topic_conf_read_write.go +++ b/weed/mq/broker/broker_topic_conf_read_write.go @@ -3,6 +3,7 @@ package broker import ( "fmt" "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/mq/logstore" "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" @@ -40,7 +41,7 @@ func (b *MessageQueueBroker) genLocalPartitionFromFiler(t topic.Topic, partition self := b.option.BrokerAddress() for _, assignment := range conf.BrokerPartitionAssignments { if assignment.LeaderBroker == string(self) && partition.Equals(topic.FromPbPartition(assignment.Partition)) { - localPartition = topic.NewLocalPartition(partition, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition)) + localPartition = topic.NewLocalPartition(partition, b.genLogFlushFunc(t, partition), logstore.GenMergedReadFunc(b, t, partition)) b.localTopicManager.AddLocalPartition(t, localPartition) isGenerated = true break @@ -55,7 +56,7 @@ func (b *MessageQueueBroker) ensureTopicActiveAssignments(t topic.Topic, conf *m hasChanges := pub_balancer.EnsureAssignmentsToActiveBrokers(b.PubBalancer.Brokers, 1, conf.BrokerPartitionAssignments) if hasChanges { glog.V(0).Infof("topic %v partition updated assignments: %v", t, conf.BrokerPartitionAssignments) - if err = b.fca.SaveTopicConfToFiler(t.ToPbTopic(), conf); err != nil { + if err = b.fca.SaveTopicConfToFiler(t, conf); err != nil { return err } } diff --git a/weed/mq/broker/broker_topic_partition_read_write.go b/weed/mq/broker/broker_topic_partition_read_write.go index 4c1b9a1e2..d6513b2a2 100644 --- a/weed/mq/broker/broker_topic_partition_read_write.go +++ b/weed/mq/broker/broker_topic_partition_read_write.go @@ -2,24 +2,15 @@ package broker import ( "fmt" - "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/topic" - "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" - "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" - "github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" - "google.golang.org/protobuf/proto" - "math" "sync/atomic" "time" - util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) -func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, partition *mq_pb.Partition) log_buffer.LogFlushFuncType { - topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name) - partitionGeneration := time.Unix(0, partition.UnixTimeNs).UTC().Format(topic.TIME_FORMAT) - partitionDir := fmt.Sprintf("%s/%s/%04d-%04d", topicDir, partitionGeneration, partition.RangeStart, partition.RangeStop) +func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, p topic.Partition) log_buffer.LogFlushFuncType { + partitionDir := topic.PartitionDir(t, p) return func(logBuffer *log_buffer.LogBuffer, startTime, stopTime time.Time, buf []byte) { if len(buf) == 0 { @@ -45,7 +36,6 @@ func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, partition *mq_pb.Par b.accessLock.Lock() defer b.accessLock.Unlock() - p := topic.FromPbPartition(partition) if localPartition := b.localTopicManager.GetLocalPartition(t, p); localPartition != nil { localPartition.NotifyLogFlushed(logBuffer.LastFlushTsNs) } @@ -53,126 +43,3 @@ func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, partition *mq_pb.Par glog.V(0).Infof("flushing at %d to %s size %d", logBuffer.LastFlushTsNs, targetFile, len(buf)) } } - -func (b *MessageQueueBroker) genLogOnDiskReadFunc(t topic.Topic, partition *mq_pb.Partition) log_buffer.LogReadFromDiskFuncType { - topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name) - partitionGeneration := time.Unix(0, partition.UnixTimeNs).UTC().Format(topic.TIME_FORMAT) - partitionDir := fmt.Sprintf("%s/%s/%04d-%04d", topicDir, partitionGeneration, partition.RangeStart, partition.RangeStop) - - lookupFileIdFn := func(fileId string) (targetUrls []string, err error) { - return b.MasterClient.LookupFileId(fileId) - } - - eachChunkFn := func(buf []byte, eachLogEntryFn log_buffer.EachLogEntryFuncType, starTsNs, stopTsNs int64) (processedTsNs int64, err error) { - for pos := 0; pos+4 < len(buf); { - - size := util.BytesToUint32(buf[pos : pos+4]) - if pos+4+int(size) > len(buf) { - err = fmt.Errorf("LogOnDiskReadFunc: read [%d,%d) from [0,%d)", pos, pos+int(size)+4, len(buf)) - return - } - entryData := buf[pos+4 : pos+4+int(size)] - - logEntry := &filer_pb.LogEntry{} - if err = proto.Unmarshal(entryData, logEntry); err != nil { - pos += 4 + int(size) - err = fmt.Errorf("unexpected unmarshal mq_pb.Message: %v", err) - return - } - if logEntry.TsNs < starTsNs { - pos += 4 + int(size) - continue - } - if stopTsNs != 0 && logEntry.TsNs > stopTsNs { - println("stopTsNs", stopTsNs, "logEntry.TsNs", logEntry.TsNs) - return - } - - if _, err = eachLogEntryFn(logEntry); err != nil { - err = fmt.Errorf("process log entry %v: %v", logEntry, err) - return - } - - processedTsNs = logEntry.TsNs - - pos += 4 + int(size) - - } - - return - } - - eachFileFn := func(entry *filer_pb.Entry, eachLogEntryFn log_buffer.EachLogEntryFuncType, starTsNs, stopTsNs int64) (processedTsNs int64, err error) { - if len(entry.Content) > 0 { - // skip .offset files - return - } - var urlStrings []string - for _, chunk := range entry.Chunks { - if chunk.Size == 0 { - continue - } - if chunk.IsChunkManifest { - glog.Warningf("this should not happen. unexpected chunk manifest in %s/%s", partitionDir, entry.Name) - return - } - urlStrings, err = lookupFileIdFn(chunk.FileId) - if err != nil { - err = fmt.Errorf("lookup %s: %v", chunk.FileId, err) - return - } - if len(urlStrings) == 0 { - err = fmt.Errorf("no url found for %s", chunk.FileId) - return - } - - // try one of the urlString until util.Get(urlString) succeeds - var processed bool - for _, urlString := range urlStrings { - // TODO optimization opportunity: reuse the buffer - var data []byte - if data, _, err = util_http.Get(urlString); err == nil { - processed = true - if processedTsNs, err = eachChunkFn(data, eachLogEntryFn, starTsNs, stopTsNs); err != nil { - return - } - break - } - } - if !processed { - err = fmt.Errorf("no data processed for %s %s", entry.Name, chunk.FileId) - return - } - - } - return - } - - return func(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastReadPosition log_buffer.MessagePosition, isDone bool, err error) { - startFileName := startPosition.UTC().Format(topic.TIME_FORMAT) - startTsNs := startPosition.Time.UnixNano() - stopTime := time.Unix(0, stopTsNs) - var processedTsNs int64 - err = b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - return filer_pb.SeaweedList(client, partitionDir, "", func(entry *filer_pb.Entry, isLast bool) error { - if entry.IsDirectory { - return nil - } - if stopTsNs != 0 && entry.Name > stopTime.UTC().Format(topic.TIME_FORMAT) { - isDone = true - return nil - } - if entry.Name < startPosition.UTC().Format(topic.TIME_FORMAT) { - return nil - } - if processedTsNs, err = eachFileFn(entry, eachLogEntryFn, startTsNs, stopTsNs); err != nil { - return err - } - return nil - - }, startFileName, true, math.MaxInt32) - }) - lastReadPosition = log_buffer.NewMessagePosition(processedTsNs, -2) - return - } -} diff --git a/weed/mq/client/cmd/weed_pub_record/publisher_record.go b/weed/mq/client/cmd/weed_pub_record/publisher_record.go index f340dd1c8..9b28200bc 100644 --- a/weed/mq/client/cmd/weed_pub_record/publisher_record.go +++ b/weed/mq/client/cmd/weed_pub_record/publisher_record.go @@ -7,11 +7,12 @@ import ( "github.com/seaweedfs/seaweedfs/weed/mq/schema" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" "log" "strings" "sync" + "sync/atomic" "time" - util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) var ( @@ -25,11 +26,17 @@ var ( namespace = flag.String("ns", "test", "namespace") t = flag.String("t", "test", "t") seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers") + + counter int32 ) func doPublish(publisher *pub_client.TopicPublisher, id int) { startTime := time.Now() - for i := 0; i < *messageCount / *concurrency; i++ { + for { + i := atomic.AddInt32(&counter, 1) + if i > int32(*messageCount) { + break + } // Simulate publishing a message myRecord := genMyRecord(int32(i)) if err := publisher.PublishRecord(myRecord.Key, myRecord.ToRecordValue()); err != nil { @@ -38,7 +45,7 @@ func doPublish(publisher *pub_client.TopicPublisher, id int) { } if *messageDelay > 0 { time.Sleep(*messageDelay) - fmt.Printf("sent %+v\n", myRecord) + fmt.Printf("sent %+v\n", string(myRecord.Key)) } } if err := publisher.FinishPublish(); err != nil { diff --git a/weed/mq/client/cmd/weed_sub_record/subscriber_record.go b/weed/mq/client/cmd/weed_sub_record/subscriber_record.go index 00fe83feb..7bdff3715 100644 --- a/weed/mq/client/cmd/weed_sub_record/subscriber_record.go +++ b/weed/mq/client/cmd/weed_sub_record/subscriber_record.go @@ -8,12 +8,12 @@ import ( "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "github.com/seaweedfs/seaweedfs/weed/util" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/protobuf/proto" "strings" "time" - util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) var ( @@ -22,6 +22,7 @@ var ( seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers") maxPartitionCount = flag.Int("maxPartitionCount", 3, "max partition count") perPartitionConcurrency = flag.Int("perPartitionConcurrency", 1, "per partition concurrency") + timeAgo = flag.Duration("timeAgo", 1*time.Hour, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"") clientId = flag.Uint("client_id", uint(util.RandomInt32()), "client id") ) @@ -65,7 +66,7 @@ func main() { contentConfig := &sub_client.ContentConfiguration{ Topic: topic.NewTopic(*namespace, *t), Filter: "", - StartTime: time.Unix(1, 1), + StartTime: time.Now().Add(-*timeAgo), } brokers := strings.Split(*seedBrokers, ",") @@ -75,9 +76,13 @@ func main() { subscriber.SetEachMessageFunc(func(key, value []byte) error { counter++ record := &schema_pb.RecordValue{} - proto.Unmarshal(value, record) - fmt.Printf("record: %v\n", record) - time.Sleep(1300 * time.Millisecond) + err := proto.Unmarshal(value, record) + if err != nil { + fmt.Printf("unmarshal record value: %v\n", err) + } else { + fmt.Printf("%s %d: %v\n", string(key), len(value), record) + } + //time.Sleep(1300 * time.Millisecond) return nil }) diff --git a/weed/mq/client/sub_client/on_each_partition.go b/weed/mq/client/sub_client/on_each_partition.go index 58d87d9ad..56cedb32e 100644 --- a/weed/mq/client/sub_client/on_each_partition.go +++ b/weed/mq/client/sub_client/on_each_partition.go @@ -8,6 +8,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/util" "io" + "reflect" ) func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssignment, stopCh chan struct{}) error { @@ -24,6 +25,11 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig perPartitionConcurrency = 1 } + var stopTsNs int64 + if !sub.ContentConfig.StopTime.IsZero() { + stopTsNs = sub.ContentConfig.StopTime.UnixNano() + } + if err = subscribeClient.Send(&mq_pb.SubscribeMessageRequest{ Message: &mq_pb.SubscribeMessageRequest_Init{ Init: &mq_pb.SubscribeMessageRequest_InitMessage{ @@ -32,6 +38,8 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig Topic: sub.ContentConfig.Topic.ToPbTopic(), PartitionOffset: &mq_pb.PartitionOffset{ Partition: assigned.Partition, + StartTsNs: sub.ContentConfig.StartTime.UnixNano(), + StopTsNs: stopTsNs, StartType: mq_pb.PartitionOffsetStartType_EARLIEST_IN_MEMORY, }, Filter: sub.ContentConfig.Filter, @@ -101,6 +109,10 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig glog.V(2).Infof("subscriber %s received control from producer:%s isClose:%v", sub.SubscriberConfig.ConsumerGroup, m.Data.Ctrl.PublisherName, m.Data.Ctrl.IsClose) continue } + if len(m.Data.Key) == 0 { + fmt.Printf("empty key %+v, type %v\n", m, reflect.TypeOf(m)) + continue + } executors.Execute(func() { processErr := sub.OnEachMessageFunc(m.Data.Key, m.Data.Value) if processErr == nil { diff --git a/weed/mq/client/sub_client/subscriber.go b/weed/mq/client/sub_client/subscriber.go index 922593b77..3e5316b67 100644 --- a/weed/mq/client/sub_client/subscriber.go +++ b/weed/mq/client/sub_client/subscriber.go @@ -21,6 +21,7 @@ type ContentConfiguration struct { Topic topic.Topic Filter string StartTime time.Time + StopTime time.Time } type OnEachMessageFunc func(key, value []byte) (err error) diff --git a/weed/mq/logstore/log_to_parquet.go b/weed/mq/logstore/log_to_parquet.go new file mode 100644 index 000000000..30cad8cc1 --- /dev/null +++ b/weed/mq/logstore/log_to_parquet.go @@ -0,0 +1,454 @@ +package logstore + +import ( + "encoding/binary" + "fmt" + "github.com/parquet-go/parquet-go" + "github.com/parquet-go/parquet-go/compress/zstd" + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/mq/schema" + "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "github.com/seaweedfs/seaweedfs/weed/operation" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" + "github.com/seaweedfs/seaweedfs/weed/util" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" + "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" + "google.golang.org/protobuf/proto" + "io" + "os" + "strings" + "time" +) + +const ( + SW_COLUMN_NAME_TS = "_ts_ns" + SW_COLUMN_NAME_KEY = "_key" +) + +func CompactTopicPartitions(filerClient filer_pb.FilerClient, t topic.Topic, timeAgo time.Duration, recordType *schema_pb.RecordType, preference *operation.StoragePreference) error { + // list the topic partition versions + topicVersions, err := collectTopicVersions(filerClient, t, timeAgo) + if err != nil { + return fmt.Errorf("list topic files: %v", err) + } + + // compact the partitions + for _, topicVersion := range topicVersions { + partitions, err := collectTopicVersionsPartitions(filerClient, t, topicVersion) + if err != nil { + return fmt.Errorf("list partitions %s/%s/%s: %v", t.Namespace, t.Name, topicVersion, err) + } + for _, partition := range partitions { + err := compactTopicPartition(filerClient, t, timeAgo, recordType, partition, preference) + if err != nil { + return fmt.Errorf("compact partition %s/%s/%s/%s: %v", t.Namespace, t.Name, topicVersion, partition, err) + } + } + } + return nil +} + +func collectTopicVersions(filerClient filer_pb.FilerClient, t topic.Topic, timeAgo time.Duration) (partitionVersions []time.Time, err error) { + err = filer_pb.ReadDirAllEntries(filerClient, util.FullPath(t.Dir()), "", func(entry *filer_pb.Entry, isLast bool) error { + t, err := topic.ParseTopicVersion(entry.Name) + if err != nil { + // skip non-partition directories + return nil + } + if t.Unix() < time.Now().Unix()-int64(timeAgo/time.Second) { + partitionVersions = append(partitionVersions, t) + } + return nil + }) + return +} + +func collectTopicVersionsPartitions(filerClient filer_pb.FilerClient, t topic.Topic, topicVersion time.Time) (partitions []topic.Partition, err error) { + version := topicVersion.Format(topic.PartitionGenerationFormat) + err = filer_pb.ReadDirAllEntries(filerClient, util.FullPath(t.Dir()).Child(version), "", func(entry *filer_pb.Entry, isLast bool) error { + if !entry.IsDirectory { + return nil + } + start, stop := topic.ParsePartitionBoundary(entry.Name) + if start != stop { + partitions = append(partitions, topic.Partition{ + RangeStart: start, + RangeStop: stop, + RingSize: topic.PartitionCount, + UnixTimeNs: topicVersion.UnixNano(), + }) + } + return nil + }) + return +} + +func compactTopicPartition(filerClient filer_pb.FilerClient, t topic.Topic, timeAgo time.Duration, recordType *schema_pb.RecordType, partition topic.Partition, preference *operation.StoragePreference) error { + partitionDir := topic.PartitionDir(t, partition) + + // compact the partition directory + return compactTopicPartitionDir(filerClient, t.Name, partitionDir, timeAgo, recordType, preference) +} + +func compactTopicPartitionDir(filerClient filer_pb.FilerClient, topicName, partitionDir string, timeAgo time.Duration, recordType *schema_pb.RecordType, preference *operation.StoragePreference) error { + // read all existing parquet files + minTsNs, maxTsNs, err := readAllParquetFiles(filerClient, partitionDir) + if err != nil { + return err + } + + // read all log files + logFiles, err := readAllLogFiles(filerClient, partitionDir, timeAgo, minTsNs, maxTsNs) + if err != nil { + return err + } + if len(logFiles) == 0 { + return nil + } + + // divide log files into groups of 128MB + logFileGroups := groupFilesBySize(logFiles, 128*1024*1024) + + // write to parquet file + parquetLevels, err := schema.ToParquetLevels(recordType) + if err != nil { + return fmt.Errorf("ToParquetLevels failed %+v: %v", recordType, err) + } + + // create a parquet schema + parquetSchema, err := schema.ToParquetSchema(topicName, recordType) + if err != nil { + return fmt.Errorf("ToParquetSchema failed: %v", err) + } + + // TODO parallelize the writing + for _, logFileGroup := range logFileGroups { + if err = writeLogFilesToParquet(filerClient, partitionDir, recordType, logFileGroup, parquetSchema, parquetLevels, preference); err != nil { + return err + } + } + + return nil +} + +func groupFilesBySize(logFiles []*filer_pb.Entry, maxGroupSize int64) (logFileGroups [][]*filer_pb.Entry) { + var logFileGroup []*filer_pb.Entry + var groupSize int64 + for _, logFile := range logFiles { + if groupSize+int64(logFile.Attributes.FileSize) > maxGroupSize { + logFileGroups = append(logFileGroups, logFileGroup) + logFileGroup = nil + groupSize = 0 + } + logFileGroup = append(logFileGroup, logFile) + groupSize += int64(logFile.Attributes.FileSize) + } + if len(logFileGroup) > 0 { + logFileGroups = append(logFileGroups, logFileGroup) + } + return +} + +func readAllLogFiles(filerClient filer_pb.FilerClient, partitionDir string, timeAgo time.Duration, minTsNs, maxTsNs int64) (logFiles []*filer_pb.Entry, err error) { + err = filer_pb.ReadDirAllEntries(filerClient, util.FullPath(partitionDir), "", func(entry *filer_pb.Entry, isLast bool) error { + if strings.HasSuffix(entry.Name, ".parquet") { + return nil + } + if entry.Attributes.Crtime > time.Now().Unix()-int64(timeAgo/time.Second) { + return nil + } + logTime, err := time.Parse(topic.TIME_FORMAT, entry.Name) + if err != nil { + // glog.Warningf("parse log time %s: %v", entry.Name, err) + return nil + } + if maxTsNs > 0 && logTime.UnixNano() <= maxTsNs { + return nil + } + logFiles = append(logFiles, entry) + return nil + }) + return +} + +func readAllParquetFiles(filerClient filer_pb.FilerClient, partitionDir string) (minTsNs, maxTsNs int64, err error) { + err = filer_pb.ReadDirAllEntries(filerClient, util.FullPath(partitionDir), "", func(entry *filer_pb.Entry, isLast bool) error { + if !strings.HasSuffix(entry.Name, ".parquet") { + return nil + } + if len(entry.Extended) == 0 { + return nil + } + + // read min ts + minTsBytes := entry.Extended["min"] + if len(minTsBytes) != 8 { + return nil + } + minTs := int64(binary.BigEndian.Uint64(minTsBytes)) + if minTsNs == 0 || minTs < minTsNs { + minTsNs = minTs + } + + // read max ts + maxTsBytes := entry.Extended["max"] + if len(maxTsBytes) != 8 { + return nil + } + maxTs := int64(binary.BigEndian.Uint64(maxTsBytes)) + if maxTsNs == 0 || maxTs > maxTsNs { + maxTsNs = maxTs + } + return nil + }) + return +} + +func writeLogFilesToParquet(filerClient filer_pb.FilerClient, partitionDir string, recordType *schema_pb.RecordType, logFileGroups []*filer_pb.Entry, parquetSchema *parquet.Schema, parquetLevels *schema.ParquetLevels, preference *operation.StoragePreference) (err error) { + + tempFile, err := os.CreateTemp(".", "t*.parquet") + if err != nil { + return fmt.Errorf("create temp file: %v", err) + } + defer func() { + tempFile.Close() + os.Remove(tempFile.Name()) + }() + + writer := parquet.NewWriter(tempFile, parquetSchema, parquet.Compression(&zstd.Codec{Level: zstd.DefaultLevel})) + rowBuilder := parquet.NewRowBuilder(parquetSchema) + + var startTsNs, stopTsNs int64 + + for _, logFile := range logFileGroups { + fmt.Printf("compact %s/%s ", partitionDir, logFile.Name) + var rows []parquet.Row + if err := iterateLogEntries(filerClient, logFile, func(entry *filer_pb.LogEntry) error { + + if startTsNs == 0 { + startTsNs = entry.TsNs + } + stopTsNs = entry.TsNs + + if len(entry.Key) == 0 { + return nil + } + + // write to parquet file + rowBuilder.Reset() + + record := &schema_pb.RecordValue{} + if err := proto.Unmarshal(entry.Data, record); err != nil { + return fmt.Errorf("unmarshal record value: %v", err) + } + + record.Fields[SW_COLUMN_NAME_TS] = &schema_pb.Value{ + Kind: &schema_pb.Value_Int64Value{ + Int64Value: entry.TsNs, + }, + } + record.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{ + Kind: &schema_pb.Value_BytesValue{ + BytesValue: entry.Key, + }, + } + + if err := schema.AddRecordValue(rowBuilder, recordType, parquetLevels, record); err != nil { + return fmt.Errorf("add record value: %v", err) + } + + rows = append(rows, rowBuilder.Row()) + + return nil + + }); err != nil { + return fmt.Errorf("iterate log entry %v/%v: %v", partitionDir, logFile.Name, err) + } + + fmt.Printf("processed %d rows\n", len(rows)) + + if _, err := writer.WriteRows(rows); err != nil { + return fmt.Errorf("write rows: %v", err) + } + } + + if err := writer.Close(); err != nil { + return fmt.Errorf("close writer: %v", err) + } + + // write to parquet file to partitionDir + parquetFileName := fmt.Sprintf("%s.parquet", time.Unix(0, startTsNs).UTC().Format("2006-01-02-15-04-05")) + if err := saveParquetFileToPartitionDir(filerClient, tempFile, partitionDir, parquetFileName, preference, startTsNs, stopTsNs); err != nil { + return fmt.Errorf("save parquet file %s: %v", parquetFileName, err) + } + + return nil + +} + +func saveParquetFileToPartitionDir(filerClient filer_pb.FilerClient, sourceFile *os.File, partitionDir, parquetFileName string, preference *operation.StoragePreference, startTsNs, stopTsNs int64) error { + uploader, err := operation.NewUploader() + if err != nil { + return fmt.Errorf("new uploader: %v", err) + } + + // get file size + fileInfo, err := sourceFile.Stat() + if err != nil { + return fmt.Errorf("stat source file: %v", err) + } + + // upload file in chunks + chunkSize := int64(4 * 1024 * 1024) + chunkCount := (fileInfo.Size() + chunkSize - 1) / chunkSize + entry := &filer_pb.Entry{ + Name: parquetFileName, + Attributes: &filer_pb.FuseAttributes{ + Crtime: time.Now().Unix(), + Mtime: time.Now().Unix(), + FileMode: uint32(os.FileMode(0644)), + FileSize: uint64(fileInfo.Size()), + Mime: "application/vnd.apache.parquet", + }, + } + entry.Extended = make(map[string][]byte) + minTsBytes := make([]byte, 8) + binary.BigEndian.PutUint64(minTsBytes, uint64(startTsNs)) + entry.Extended["min"] = minTsBytes + maxTsBytes := make([]byte, 8) + binary.BigEndian.PutUint64(maxTsBytes, uint64(stopTsNs)) + entry.Extended["max"] = maxTsBytes + + for i := int64(0); i < chunkCount; i++ { + fileId, uploadResult, err, _ := uploader.UploadWithRetry( + filerClient, + &filer_pb.AssignVolumeRequest{ + Count: 1, + Replication: preference.Replication, + Collection: preference.Collection, + TtlSec: 0, // TODO set ttl + DiskType: preference.DiskType, + Path: partitionDir + "/" + parquetFileName, + }, + &operation.UploadOption{ + Filename: parquetFileName, + Cipher: false, + IsInputCompressed: false, + MimeType: "application/vnd.apache.parquet", + PairMap: nil, + }, + func(host, fileId string) string { + return fmt.Sprintf("http://%s/%s", host, fileId) + }, + io.NewSectionReader(sourceFile, i*chunkSize, chunkSize), + ) + if err != nil { + return fmt.Errorf("upload chunk %d: %v", i, err) + } + if uploadResult.Error != "" { + return fmt.Errorf("upload result: %v", uploadResult.Error) + } + entry.Chunks = append(entry.Chunks, uploadResult.ToPbFileChunk(fileId, i*chunkSize, time.Now().UnixNano())) + } + + // write the entry to partitionDir + if err := filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + return filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{ + Directory: partitionDir, + Entry: entry, + }) + }); err != nil { + return fmt.Errorf("create entry: %v", err) + } + fmt.Printf("saved to %s/%s\n", partitionDir, parquetFileName) + + return nil +} + +func iterateLogEntries(filerClient filer_pb.FilerClient, logFile *filer_pb.Entry, eachLogEntryFn func(entry *filer_pb.LogEntry) error) error { + lookupFn := filer.LookupFn(filerClient) + _, err := eachFile(logFile, lookupFn, func(logEntry *filer_pb.LogEntry) (isDone bool, err error) { + if err := eachLogEntryFn(logEntry); err != nil { + return true, err + } + return false, nil + }) + return err +} + +func eachFile(entry *filer_pb.Entry, lookupFileIdFn func(fileId string) (targetUrls []string, err error), eachLogEntryFn log_buffer.EachLogEntryFuncType) (processedTsNs int64, err error) { + if len(entry.Content) > 0 { + // skip .offset files + return + } + var urlStrings []string + for _, chunk := range entry.Chunks { + if chunk.Size == 0 { + continue + } + if chunk.IsChunkManifest { + fmt.Printf("this should not happen. unexpected chunk manifest in %s", entry.Name) + return + } + urlStrings, err = lookupFileIdFn(chunk.FileId) + if err != nil { + err = fmt.Errorf("lookup %s: %v", chunk.FileId, err) + return + } + if len(urlStrings) == 0 { + err = fmt.Errorf("no url found for %s", chunk.FileId) + return + } + + // try one of the urlString until util.Get(urlString) succeeds + var processed bool + for _, urlString := range urlStrings { + var data []byte + if data, _, err = util_http.Get(urlString); err == nil { + processed = true + if processedTsNs, err = eachChunk(data, eachLogEntryFn); err != nil { + return + } + break + } + } + if !processed { + err = fmt.Errorf("no data processed for %s %s", entry.Name, chunk.FileId) + return + } + + } + return +} + +func eachChunk(buf []byte, eachLogEntryFn log_buffer.EachLogEntryFuncType) (processedTsNs int64, err error) { + for pos := 0; pos+4 < len(buf); { + + size := util.BytesToUint32(buf[pos : pos+4]) + if pos+4+int(size) > len(buf) { + err = fmt.Errorf("reach each log chunk: read [%d,%d) from [0,%d)", pos, pos+int(size)+4, len(buf)) + return + } + entryData := buf[pos+4 : pos+4+int(size)] + + logEntry := &filer_pb.LogEntry{} + if err = proto.Unmarshal(entryData, logEntry); err != nil { + pos += 4 + int(size) + err = fmt.Errorf("unexpected unmarshal mq_pb.Message: %v", err) + return + } + + if _, err = eachLogEntryFn(logEntry); err != nil { + err = fmt.Errorf("process log entry %v: %v", logEntry, err) + return + } + + processedTsNs = logEntry.TsNs + + pos += 4 + int(size) + + } + + return +} diff --git a/weed/mq/logstore/merged_read.go b/weed/mq/logstore/merged_read.go new file mode 100644 index 000000000..03a47ace4 --- /dev/null +++ b/weed/mq/logstore/merged_read.go @@ -0,0 +1,41 @@ +package logstore + +import ( + "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" +) + +func GenMergedReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic.Partition) log_buffer.LogReadFromDiskFuncType { + fromParquetFn := GenParquetReadFunc(filerClient, t, p) + readLogDirectFn := GenLogOnDiskReadFunc(filerClient, t, p) + return mergeReadFuncs(fromParquetFn, readLogDirectFn) +} + +func mergeReadFuncs(fromParquetFn, readLogDirectFn log_buffer.LogReadFromDiskFuncType) log_buffer.LogReadFromDiskFuncType { + var exhaustedParquet bool + var lastProcessedPosition log_buffer.MessagePosition + return func(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastReadPosition log_buffer.MessagePosition, isDone bool, err error) { + if !exhaustedParquet { + // glog.V(4).Infof("reading from parquet startPosition: %v\n", startPosition.UTC()) + lastReadPosition, isDone, err = fromParquetFn(startPosition, stopTsNs, eachLogEntryFn) + // glog.V(4).Infof("read from parquet: %v %v %v %v\n", startPosition, lastReadPosition, isDone, err) + if isDone { + isDone = false + } + if err != nil { + return + } + lastProcessedPosition = lastReadPosition + } + exhaustedParquet = true + + if startPosition.Before(lastProcessedPosition.Time) { + startPosition = lastProcessedPosition + } + + // glog.V(4).Infof("reading from direct log startPosition: %v\n", startPosition.UTC()) + lastReadPosition, isDone, err = readLogDirectFn(startPosition, stopTsNs, eachLogEntryFn) + return + } +} diff --git a/weed/mq/logstore/read_log_from_disk.go b/weed/mq/logstore/read_log_from_disk.go new file mode 100644 index 000000000..c3c679b87 --- /dev/null +++ b/weed/mq/logstore/read_log_from_disk.go @@ -0,0 +1,144 @@ +package logstore + +import ( + "fmt" + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/util" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" + "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" + "google.golang.org/protobuf/proto" + "math" + "strings" + "time" +) + +func GenLogOnDiskReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic.Partition) log_buffer.LogReadFromDiskFuncType { + partitionDir := topic.PartitionDir(t, p) + + lookupFileIdFn := filer.LookupFn(filerClient) + + eachChunkFn := func(buf []byte, eachLogEntryFn log_buffer.EachLogEntryFuncType, starTsNs, stopTsNs int64) (processedTsNs int64, err error) { + for pos := 0; pos+4 < len(buf); { + + size := util.BytesToUint32(buf[pos : pos+4]) + if pos+4+int(size) > len(buf) { + err = fmt.Errorf("GenLogOnDiskReadFunc: read [%d,%d) from [0,%d)", pos, pos+int(size)+4, len(buf)) + return + } + entryData := buf[pos+4 : pos+4+int(size)] + + logEntry := &filer_pb.LogEntry{} + if err = proto.Unmarshal(entryData, logEntry); err != nil { + pos += 4 + int(size) + err = fmt.Errorf("unexpected unmarshal mq_pb.Message: %v", err) + return + } + if logEntry.TsNs < starTsNs { + pos += 4 + int(size) + continue + } + if stopTsNs != 0 && logEntry.TsNs > stopTsNs { + println("stopTsNs", stopTsNs, "logEntry.TsNs", logEntry.TsNs) + return + } + + // fmt.Printf(" read logEntry: %v, ts %v\n", string(logEntry.Key), time.Unix(0, logEntry.TsNs).UTC()) + if _, err = eachLogEntryFn(logEntry); err != nil { + err = fmt.Errorf("process log entry %v: %v", logEntry, err) + return + } + + processedTsNs = logEntry.TsNs + + pos += 4 + int(size) + + } + + return + } + + eachFileFn := func(entry *filer_pb.Entry, eachLogEntryFn log_buffer.EachLogEntryFuncType, starTsNs, stopTsNs int64) (processedTsNs int64, err error) { + if len(entry.Content) > 0 { + // skip .offset files + return + } + var urlStrings []string + for _, chunk := range entry.Chunks { + if chunk.Size == 0 { + continue + } + if chunk.IsChunkManifest { + glog.Warningf("this should not happen. unexpected chunk manifest in %s/%s", partitionDir, entry.Name) + return + } + urlStrings, err = lookupFileIdFn(chunk.FileId) + if err != nil { + err = fmt.Errorf("lookup %s: %v", chunk.FileId, err) + return + } + if len(urlStrings) == 0 { + err = fmt.Errorf("no url found for %s", chunk.FileId) + return + } + + // try one of the urlString until util.Get(urlString) succeeds + var processed bool + for _, urlString := range urlStrings { + // TODO optimization opportunity: reuse the buffer + var data []byte + // fmt.Printf("reading %s/%s %s\n", partitionDir, entry.Name, urlString) + if data, _, err = util_http.Get(urlString); err == nil { + processed = true + if processedTsNs, err = eachChunkFn(data, eachLogEntryFn, starTsNs, stopTsNs); err != nil { + return + } + break + } + } + if !processed { + err = fmt.Errorf("no data processed for %s %s", entry.Name, chunk.FileId) + return + } + + } + return + } + + return func(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastReadPosition log_buffer.MessagePosition, isDone bool, err error) { + startFileName := startPosition.UTC().Format(topic.TIME_FORMAT) + startTsNs := startPosition.Time.UnixNano() + stopTime := time.Unix(0, stopTsNs) + var processedTsNs int64 + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + return filer_pb.SeaweedList(client, partitionDir, "", func(entry *filer_pb.Entry, isLast bool) error { + if entry.IsDirectory { + return nil + } + if strings.HasSuffix(entry.Name, ".parquet") { + return nil + } + // FIXME: this is a hack to skip the .offset files + if strings.HasSuffix(entry.Name, ".offset") { + return nil + } + if stopTsNs != 0 && entry.Name > stopTime.UTC().Format(topic.TIME_FORMAT) { + isDone = true + return nil + } + if entry.Name < startPosition.UTC().Format(topic.TIME_FORMAT) { + return nil + } + if processedTsNs, err = eachFileFn(entry, eachLogEntryFn, startTsNs, stopTsNs); err != nil { + return err + } + return nil + + }, startFileName, true, math.MaxInt32) + }) + lastReadPosition = log_buffer.NewMessagePosition(processedTsNs, -2) + return + } +} diff --git a/weed/mq/logstore/read_parquet_to_log.go b/weed/mq/logstore/read_parquet_to_log.go new file mode 100644 index 000000000..f55d5e3b7 --- /dev/null +++ b/weed/mq/logstore/read_parquet_to_log.go @@ -0,0 +1,162 @@ +package logstore + +import ( + "encoding/binary" + "fmt" + "github.com/parquet-go/parquet-go" + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/mq/schema" + "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache" + "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" + "google.golang.org/protobuf/proto" + "io" + "math" + "strings" +) + +var ( + chunkCache = chunk_cache.NewChunkCacheInMemory(256) // 256 entries, 8MB max per entry +) + +func GenParquetReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic.Partition) log_buffer.LogReadFromDiskFuncType { + partitionDir := topic.PartitionDir(t, p) + + lookupFileIdFn := filer.LookupFn(filerClient) + + // read topic conf from filer + var topicConf *mq_pb.ConfigureTopicResponse + var err error + if err := filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + topicConf, err = t.ReadConfFile(client) + return err + }); err != nil { + return nil + } + recordType := topicConf.GetRecordType() + recordType = schema.NewRecordTypeBuilder(recordType). + WithField(SW_COLUMN_NAME_TS, schema.TypeInt64). + WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes). + RecordTypeEnd() + + parquetSchema, err := schema.ToParquetSchema(t.Name, recordType) + if err != nil { + return nil + } + parquetLevels, err := schema.ToParquetLevels(recordType) + if err != nil { + return nil + } + + // eachFileFn reads a parquet file and calls eachLogEntryFn for each log entry + eachFileFn := func(entry *filer_pb.Entry, eachLogEntryFn log_buffer.EachLogEntryFuncType, starTsNs, stopTsNs int64) (processedTsNs int64, err error) { + // create readerAt for the parquet file + fileSize := filer.FileSize(entry) + visibleIntervals, _ := filer.NonOverlappingVisibleIntervals(lookupFileIdFn, entry.Chunks, 0, int64(fileSize)) + chunkViews := filer.ViewFromVisibleIntervals(visibleIntervals, 0, int64(fileSize)) + readerCache := filer.NewReaderCache(32, chunkCache, lookupFileIdFn) + readerAt := filer.NewChunkReaderAtFromClient(readerCache, chunkViews, int64(fileSize)) + + // create parquet reader + parquetReader := parquet.NewReader(readerAt, parquetSchema) + rows := make([]parquet.Row, 128) + for { + rowCount, readErr := parquetReader.ReadRows(rows) + + for i := 0; i < rowCount; i++ { + row := rows[i] + // convert parquet row to schema_pb.RecordValue + recordValue, err := schema.ToRecordValue(recordType, parquetLevels, row) + if err != nil { + return processedTsNs, fmt.Errorf("ToRecordValue failed: %v", err) + } + processedTsNs = recordValue.Fields[SW_COLUMN_NAME_TS].GetInt64Value() + if processedTsNs < starTsNs { + continue + } + if stopTsNs != 0 && processedTsNs >= stopTsNs { + return processedTsNs, nil + } + + data, marshalErr := proto.Marshal(recordValue) + if marshalErr != nil { + return processedTsNs, fmt.Errorf("marshal record value: %v", marshalErr) + } + + logEntry := &filer_pb.LogEntry{ + Key: recordValue.Fields[SW_COLUMN_NAME_KEY].GetBytesValue(), + TsNs: processedTsNs, + Data: data, + } + + // fmt.Printf(" parquet entry %s ts %v\n", string(logEntry.Key), time.Unix(0, logEntry.TsNs).UTC()) + + if _, err = eachLogEntryFn(logEntry); err != nil { + return processedTsNs, fmt.Errorf("process log entry %v: %v", logEntry, err) + } + } + + if readErr != nil { + if readErr == io.EOF { + return processedTsNs, nil + } + return processedTsNs, readErr + } + } + return + } + + return func(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastReadPosition log_buffer.MessagePosition, isDone bool, err error) { + startFileName := startPosition.UTC().Format(topic.TIME_FORMAT) + startTsNs := startPosition.Time.UnixNano() + var processedTsNs int64 + + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + + return filer_pb.SeaweedList(client, partitionDir, "", func(entry *filer_pb.Entry, isLast bool) error { + if entry.IsDirectory { + return nil + } + if !strings.HasSuffix(entry.Name, ".parquet") { + return nil + } + if len(entry.Extended) == 0 { + return nil + } + + // read minTs from the parquet file + minTsBytes := entry.Extended["min"] + if len(minTsBytes) != 8 { + return nil + } + minTsNs := int64(binary.BigEndian.Uint64(minTsBytes)) + + // read max ts + maxTsBytes := entry.Extended["max"] + if len(maxTsBytes) != 8 { + return nil + } + maxTsNs := int64(binary.BigEndian.Uint64(maxTsBytes)) + + if stopTsNs != 0 && stopTsNs <= minTsNs { + isDone = true + return nil + } + + if maxTsNs < startTsNs { + return nil + } + + if processedTsNs, err = eachFileFn(entry, eachLogEntryFn, startTsNs, stopTsNs); err != nil { + return err + } + return nil + + }, startFileName, true, math.MaxInt32) + }) + lastReadPosition = log_buffer.NewMessagePosition(processedTsNs, -2) + return + } +} diff --git a/weed/mq/pub_balancer/broker_stats.go b/weed/mq/pub_balancer/broker_stats.go index c579c275e..e72703d5f 100644 --- a/weed/mq/pub_balancer/broker_stats.go +++ b/weed/mq/pub_balancer/broker_stats.go @@ -53,7 +53,7 @@ func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) { } publisherCount += topicPartitionStats.PublisherCount subscriberCount += topicPartitionStats.SubscriberCount - key := tps.TopicPartition.String() + key := tps.TopicPartition.TopicPartitionId() bs.TopicPartitionStats.Set(key, tps) delete(currentTopicPartitions, key) } @@ -79,7 +79,7 @@ func (bs *BrokerStats) RegisterAssignment(t *mq_pb.Topic, partition *mq_pb.Parti PublisherCount: 0, SubscriberCount: 0, } - key := tps.TopicPartition.String() + key := tps.TopicPartition.TopicPartitionId() if isAdd { bs.TopicPartitionStats.SetIfAbsent(key, tps) } else { diff --git a/weed/mq/schema/schema_builder.go b/weed/mq/schema/schema_builder.go index db89ce34c..35272af47 100644 --- a/weed/mq/schema/schema_builder.go +++ b/weed/mq/schema/schema_builder.go @@ -19,10 +19,12 @@ type RecordTypeBuilder struct { recordType *schema_pb.RecordType } +// RecordTypeBegin creates a new RecordTypeBuilder, it should be followed by a series of WithField methods and RecordTypeEnd func RecordTypeBegin() *RecordTypeBuilder { return &RecordTypeBuilder{recordType: &schema_pb.RecordType{}} } +// RecordTypeEnd finishes the building of a RecordValue func (rtb *RecordTypeBuilder) RecordTypeEnd() *schema_pb.RecordType { // be consistent with parquet.node.go `func (g Group) Fields() []Field` sort.Slice(rtb.recordType.Fields, func(i, j int) bool { @@ -31,6 +33,11 @@ func (rtb *RecordTypeBuilder) RecordTypeEnd() *schema_pb.RecordType { return rtb.recordType } +// NewRecordTypeBuilder creates a new RecordTypeBuilder from an existing RecordType, it should be followed by a series of WithField methods and RecordTypeEnd +func NewRecordTypeBuilder(recordType *schema_pb.RecordType) (rtb *RecordTypeBuilder) { + return &RecordTypeBuilder{recordType: recordType} +} + func (rtb *RecordTypeBuilder) WithField(name string, scalarType *schema_pb.Type) *RecordTypeBuilder { rtb.recordType.Fields = append(rtb.recordType.Fields, &schema_pb.Field{ Name: name, diff --git a/weed/mq/sub_coordinator/partition_consumer_mapping.go b/weed/mq/sub_coordinator/partition_consumer_mapping.go index 5d1cf158a..e900e4a33 100644 --- a/weed/mq/sub_coordinator/partition_consumer_mapping.go +++ b/weed/mq/sub_coordinator/partition_consumer_mapping.go @@ -11,13 +11,6 @@ type PartitionConsumerMapping struct { prevMappings []*PartitionSlotToConsumerInstanceList } -func NewPartitionConsumerMapping(ringSize int32) *PartitionConsumerMapping { - newVersion := time.Now().UnixNano() - return &PartitionConsumerMapping{ - currentMapping: NewPartitionSlotToConsumerInstanceList(ringSize, newVersion), - } -} - // Balance goal: // 1. max processing power utilization // 2. allow one consumer instance to be down unexpectedly @@ -27,8 +20,7 @@ func (pcm *PartitionConsumerMapping) BalanceToConsumerInstances(partitionSlotToB if len(partitionSlotToBrokerList.PartitionSlots) == 0 || len(consumerInstances) == 0 { return } - newVersion := time.Now().UnixNano() - newMapping := NewPartitionSlotToConsumerInstanceList(partitionSlotToBrokerList.RingSize, newVersion) + newMapping := NewPartitionSlotToConsumerInstanceList(partitionSlotToBrokerList.RingSize, time.Now()) var prevMapping *PartitionSlotToConsumerInstanceList if len(pcm.prevMappings) > 0 { prevMapping = pcm.prevMappings[len(pcm.prevMappings)-1] diff --git a/weed/mq/sub_coordinator/partition_list.go b/weed/mq/sub_coordinator/partition_list.go index 384c1b875..16bf1ff0c 100644 --- a/weed/mq/sub_coordinator/partition_list.go +++ b/weed/mq/sub_coordinator/partition_list.go @@ -1,6 +1,6 @@ package sub_coordinator -import "github.com/seaweedfs/seaweedfs/weed/mq/topic" +import "time" type PartitionSlotToConsumerInstance struct { RangeStart int32 @@ -17,17 +17,9 @@ type PartitionSlotToConsumerInstanceList struct { Version int64 } -func NewPartitionSlotToConsumerInstanceList(ringSize int32, version int64) *PartitionSlotToConsumerInstanceList { +func NewPartitionSlotToConsumerInstanceList(ringSize int32, version time.Time) *PartitionSlotToConsumerInstanceList { return &PartitionSlotToConsumerInstanceList{ RingSize: ringSize, - Version: version, + Version: version.UnixNano(), } } - -func ToPartitions(ringSize int32, slots []*PartitionSlotToConsumerInstance) []*topic.Partition { - partitions := make([]*topic.Partition, 0, len(slots)) - for _, slot := range slots { - partitions = append(partitions, topic.NewPartition(slot.RangeStart, slot.RangeStop, ringSize, slot.UnixTimeNs)) - } - return partitions -} diff --git a/weed/mq/topic/local_manager.go b/weed/mq/topic/local_manager.go index d87eff911..9f273723d 100644 --- a/weed/mq/topic/local_manager.go +++ b/weed/mq/topic/local_manager.go @@ -88,7 +88,7 @@ func (manager *LocalTopicManager) CollectStats(duration time.Duration) *mq_pb.Br Topic: Topic{Namespace: localTopic.Namespace, Name: localTopic.Name}, Partition: localPartition.Partition, } - stats.Stats[topicPartition.String()] = &mq_pb.TopicPartitionStats{ + stats.Stats[topicPartition.TopicPartitionId()] = &mq_pb.TopicPartitionStats{ Topic: &mq_pb.Topic{ Namespace: string(localTopic.Namespace), Name: localTopic.Name, diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go index 8911c1841..e32fc2398 100644 --- a/weed/mq/topic/local_partition.go +++ b/weed/mq/topic/local_partition.go @@ -34,6 +34,7 @@ type LocalPartition struct { } var TIME_FORMAT = "2006-01-02-15-04-05" +var PartitionGenerationFormat = "v2006-01-02-15-04-05" func NewLocalPartition(partition Partition, logFlushFn log_buffer.LogFlushFuncType, readFromDiskFn log_buffer.LogReadFromDiskFuncType) *LocalPartition { lp := &LocalPartition{ diff --git a/weed/mq/topic/partition.go b/weed/mq/topic/partition.go index 192af6c98..7edf979b5 100644 --- a/weed/mq/topic/partition.go +++ b/weed/mq/topic/partition.go @@ -3,6 +3,7 @@ package topic import ( "fmt" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "time" ) const PartitionCount = 4096 @@ -89,6 +90,19 @@ func (partition Partition) String() string { return fmt.Sprintf("%04d-%04d", partition.RangeStart, partition.RangeStop) } -func ToString(partition *mq_pb.Partition) string { - return fmt.Sprintf("%04d-%04d", partition.RangeStart, partition.RangeStop) +func ParseTopicVersion(name string) (t time.Time, err error) { + return time.Parse(PartitionGenerationFormat, name) +} + +func ParsePartitionBoundary(name string) (start, stop int32) { + _, err := fmt.Sscanf(name, "%04d-%04d", &start, &stop) + if err != nil { + return 0, 0 + } + return start, stop +} + +func PartitionDir(t Topic, p Partition) string { + partitionGeneration := time.Unix(0, p.UnixTimeNs).UTC().Format(PartitionGenerationFormat) + return fmt.Sprintf("%s/%s/%04d-%04d", t.Dir(), partitionGeneration, p.RangeStart, p.RangeStop) } diff --git a/weed/mq/topic/topic.go b/weed/mq/topic/topic.go index 6932fcb56..5e9012e70 100644 --- a/weed/mq/topic/topic.go +++ b/weed/mq/topic/topic.go @@ -1,8 +1,13 @@ package topic import ( + "bytes" + "errors" "fmt" + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + jsonpb "google.golang.org/protobuf/encoding/protojson" ) type Topic struct { @@ -23,13 +28,42 @@ func FromPbTopic(topic *mq_pb.Topic) Topic { } } -func (tp Topic) ToPbTopic() *mq_pb.Topic { +func (t Topic) ToPbTopic() *mq_pb.Topic { return &mq_pb.Topic{ - Namespace: tp.Namespace, - Name: tp.Name, + Namespace: t.Namespace, + Name: t.Name, } } -func (tp Topic) String() string { - return fmt.Sprintf("%s.%s", tp.Namespace, tp.Name) +func (t Topic) String() string { + return fmt.Sprintf("%s.%s", t.Namespace, t.Name) +} + +func (t Topic) Dir() string { + return fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name) +} + +func (t Topic) ReadConfFile(client filer_pb.SeaweedFilerClient) (*mq_pb.ConfigureTopicResponse, error) { + data, err := filer.ReadInsideFiler(client, t.Dir(), filer.TopicConfFile) + if errors.Is(err, filer_pb.ErrNotFound) { + return nil, err + } + if err != nil { + return nil, fmt.Errorf("read topic.conf of %v: %v", t, err) + } + // parse into filer conf object + conf := &mq_pb.ConfigureTopicResponse{} + if err = jsonpb.Unmarshal(data, conf); err != nil { + return nil, fmt.Errorf("unmarshal topic %v conf: %v", t, err) + } + return conf, nil +} + +func (t Topic) WriteConfFile(client filer_pb.SeaweedFilerClient, conf *mq_pb.ConfigureTopicResponse) error { + var buf bytes.Buffer + filer.ProtoToText(&buf, conf) + if err := filer.SaveInsideFiler(client, t.Dir(), filer.TopicConfFile, buf.Bytes()); err != nil { + return fmt.Errorf("save topic %v conf: %v", t, err) + } + return nil } diff --git a/weed/mq/topic/topic_partition.go b/weed/mq/topic/topic_partition.go index 20b33a7e4..b14bc9c46 100644 --- a/weed/mq/topic/topic_partition.go +++ b/weed/mq/topic/topic_partition.go @@ -7,6 +7,6 @@ type TopicPartition struct { Partition } -func (tp *TopicPartition) String() string { +func (tp *TopicPartition) TopicPartitionId() string { return fmt.Sprintf("%v.%v-%04d-%04d", tp.Namespace, tp.Topic, tp.RangeStart, tp.RangeStop) } diff --git a/weed/operation/submit.go b/weed/operation/submit.go index 73e50cc48..9470afced 100644 --- a/weed/operation/submit.go +++ b/weed/operation/submit.go @@ -19,19 +19,15 @@ import ( ) type FilePart struct { - Reader io.Reader - FileName string - FileSize int64 - MimeType string - ModTime int64 //in seconds - Replication string - Collection string - DataCenter string - Ttl string - DiskType string - Server string //this comes from assign result - Fid string //this comes from assign result, but customizable - Fsync bool + Reader io.Reader + FileName string + FileSize int64 + MimeType string + ModTime int64 //in seconds + Pref StoragePreference + Server string //this comes from assign result + Fid string //this comes from assign result, but customizable + Fsync bool } type SubmitResult struct { @@ -42,20 +38,29 @@ type SubmitResult struct { Error string `json:"error,omitempty"` } +type StoragePreference struct { + Replication string + Collection string + DataCenter string + Ttl string + DiskType string + MaxMB int +} + type GetMasterFn func(ctx context.Context) pb.ServerAddress -func SubmitFiles(masterFn GetMasterFn, grpcDialOption grpc.DialOption, files []FilePart, replication string, collection string, dataCenter string, ttl string, diskType string, maxMB int, usePublicUrl bool) ([]SubmitResult, error) { +func SubmitFiles(masterFn GetMasterFn, grpcDialOption grpc.DialOption, files []*FilePart, pref StoragePreference, usePublicUrl bool) ([]SubmitResult, error) { results := make([]SubmitResult, len(files)) for index, file := range files { results[index].FileName = file.FileName } ar := &VolumeAssignRequest{ Count: uint64(len(files)), - Replication: replication, - Collection: collection, - DataCenter: dataCenter, - Ttl: ttl, - DiskType: diskType, + Replication: pref.Replication, + Collection: pref.Collection, + DataCenter: pref.DataCenter, + Ttl: pref.Ttl, + DiskType: pref.DiskType, } ret, err := Assign(masterFn, grpcDialOption, ar) if err != nil { @@ -73,12 +78,8 @@ func SubmitFiles(masterFn GetMasterFn, grpcDialOption grpc.DialOption, files []F if usePublicUrl { file.Server = ret.PublicUrl } - file.Replication = replication - file.Collection = collection - file.DataCenter = dataCenter - file.Ttl = ttl - file.DiskType = diskType - results[index].Size, err = file.Upload(maxMB, masterFn, usePublicUrl, ret.Auth, grpcDialOption) + file.Pref = pref + results[index].Size, err = file.Upload(pref.MaxMB, masterFn, usePublicUrl, ret.Auth, grpcDialOption) if err != nil { results[index].Error = err.Error() } @@ -88,8 +89,8 @@ func SubmitFiles(masterFn GetMasterFn, grpcDialOption grpc.DialOption, files []F return results, nil } -func NewFileParts(fullPathFilenames []string) (ret []FilePart, err error) { - ret = make([]FilePart, len(fullPathFilenames)) +func NewFileParts(fullPathFilenames []string) (ret []*FilePart, err error) { + ret = make([]*FilePart, len(fullPathFilenames)) for index, file := range fullPathFilenames { if ret[index], err = newFilePart(file); err != nil { return @@ -97,7 +98,8 @@ func NewFileParts(fullPathFilenames []string) (ret []FilePart, err error) { } return } -func newFilePart(fullPathFilename string) (ret FilePart, err error) { +func newFilePart(fullPathFilename string) (ret *FilePart, err error) { + ret = &FilePart{} fh, openErr := os.Open(fullPathFilename) if openErr != nil { glog.V(0).Info("Failed to open file: ", fullPathFilename) @@ -121,7 +123,7 @@ func newFilePart(fullPathFilename string) (ret FilePart, err error) { return ret, nil } -func (fi FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, jwt security.EncodedJwt, grpcDialOption grpc.DialOption) (retSize uint32, err error) { +func (fi *FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, jwt security.EncodedJwt, grpcDialOption grpc.DialOption) (retSize uint32, err error) { fileUrl := "http://" + fi.Server + "/" + fi.Fid if fi.ModTime != 0 { fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime)) @@ -145,13 +147,13 @@ func (fi FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, jw var ret *AssignResult var id string - if fi.DataCenter != "" { + if fi.Pref.DataCenter != "" { ar := &VolumeAssignRequest{ Count: uint64(chunks), - Replication: fi.Replication, - Collection: fi.Collection, - Ttl: fi.Ttl, - DiskType: fi.DiskType, + Replication: fi.Pref.Replication, + Collection: fi.Pref.Collection, + Ttl: fi.Pref.Ttl, + DiskType: fi.Pref.DiskType, } ret, err = Assign(masterFn, grpcDialOption, ar) if err != nil { @@ -159,13 +161,13 @@ func (fi FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, jw } } for i := int64(0); i < chunks; i++ { - if fi.DataCenter == "" { + if fi.Pref.DataCenter == "" { ar := &VolumeAssignRequest{ Count: 1, - Replication: fi.Replication, - Collection: fi.Collection, - Ttl: fi.Ttl, - DiskType: fi.DiskType, + Replication: fi.Pref.Replication, + Collection: fi.Pref.Collection, + Ttl: fi.Pref.Ttl, + DiskType: fi.Pref.DiskType, } ret, err = Assign(masterFn, grpcDialOption, ar) if err != nil { diff --git a/weed/shell/command_mq_topic_compact.go b/weed/shell/command_mq_topic_compact.go new file mode 100644 index 000000000..f1dee8662 --- /dev/null +++ b/weed/shell/command_mq_topic_compact.go @@ -0,0 +1,94 @@ +package shell + +import ( + "flag" + "github.com/seaweedfs/seaweedfs/weed/filer_client" + "github.com/seaweedfs/seaweedfs/weed/mq/logstore" + "github.com/seaweedfs/seaweedfs/weed/mq/schema" + "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "github.com/seaweedfs/seaweedfs/weed/operation" + "github.com/seaweedfs/seaweedfs/weed/pb" + "google.golang.org/grpc" + "io" + "time" +) + +func init() { + Commands = append(Commands, &commandMqTopicCompact{}) +} + +type commandMqTopicCompact struct { +} + +func (c *commandMqTopicCompact) Name() string { + return "mq.topic.compact" +} + +func (c *commandMqTopicCompact) Help() string { + return `compact the topic storage into parquet format + + Example: + mq.topic.compact -namespace -topic -timeAgo + +` +} + +func (c *commandMqTopicCompact) HasTag(tag CommandTag) bool { + return ResourceHeavy == tag +} + +func (c *commandMqTopicCompact) Do(args []string, commandEnv *CommandEnv, writer io.Writer) error { + + // parse parameters + mqCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + namespace := mqCommand.String("namespace", "", "namespace name") + topicName := mqCommand.String("topic", "", "topic name") + timeAgo := mqCommand.Duration("timeAgo", 2*time.Minute, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"") + replication := mqCommand.String("replication", "", "replication type") + collection := mqCommand.String("collection", "", "optional collection name") + dataCenter := mqCommand.String("dataCenter", "", "optional data center name") + diskType := mqCommand.String("disk", "", "[hdd|ssd|] hard drive or solid state drive or any tag") + maxMB := mqCommand.Int("maxMB", 4, "split files larger than the limit") + + if err := mqCommand.Parse(args); err != nil { + return err + } + + storagePreference := &operation.StoragePreference{ + Replication: *replication, + Collection: *collection, + DataCenter: *dataCenter, + DiskType: *diskType, + MaxMB: *maxMB, + } + + // read topic configuration + fca := &filer_client.FilerClientAccessor{ + GetFiler: func() pb.ServerAddress { + return commandEnv.option.FilerAddress + }, + GetGrpcDialOption: func() grpc.DialOption { + return commandEnv.option.GrpcDialOption + }, + } + t := topic.NewTopic(*namespace, *topicName) + topicConf, err := fca.ReadTopicConfFromFiler(t) + if err != nil { + return err + } + + // get record type + recordType := topicConf.GetRecordType() + recordType = schema.NewRecordTypeBuilder(recordType). + WithField(logstore.SW_COLUMN_NAME_TS, schema.TypeInt64). + WithField(logstore.SW_COLUMN_NAME_KEY, schema.TypeBytes). + RecordTypeEnd() + + // compact the topic partition versions + if err = logstore.CompactTopicPartitions(commandEnv, t, *timeAgo, recordType, storagePreference); err != nil { + return err + } + + return nil + +} diff --git a/weed/util/chunk_cache/chunk_cache_in_memory.go b/weed/util/chunk_cache/chunk_cache_in_memory.go index 2982d0979..dd101996e 100644 --- a/weed/util/chunk_cache/chunk_cache_in_memory.go +++ b/weed/util/chunk_cache/chunk_cache_in_memory.go @@ -5,11 +5,31 @@ import ( "time" ) +var ( + _ ChunkCache = &ChunkCacheInMemory{} +) + // a global cache for recently accessed file chunks type ChunkCacheInMemory struct { cache *ccache.Cache } +func (c *ChunkCacheInMemory) ReadChunkAt(data []byte, fileId string, offset uint64) (n int, err error) { + return c.readChunkAt(data, fileId, offset) +} + +func (c *ChunkCacheInMemory) IsInCache(fileId string, lockNeeded bool) (answer bool) { + item := c.cache.Get(fileId) + if item == nil { + return false + } + return true +} + +func (c *ChunkCacheInMemory) GetMaxFilePartSizeInCache() (answer uint64) { + return 8 * 1024 * 1024 +} + func NewChunkCacheInMemory(maxEntries int64) *ChunkCacheInMemory { pruneCount := maxEntries >> 3 if pruneCount <= 0 {