seaweedfs/weed/util/log_buffer/log_buffer_test.go

66 lines
1.6 KiB
Go
Raw Normal View History

package log_buffer
import (
2023-08-21 03:46:15 +08:00
"crypto/rand"
2020-04-20 18:08:10 +08:00
"fmt"
2024-04-03 07:25:43 +08:00
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
2023-08-21 03:46:15 +08:00
"io"
"sync"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
)
2020-04-20 17:54:21 +08:00
func TestNewLogBufferFirstBuffer(t *testing.T) {
2023-08-21 03:46:15 +08:00
flushInterval := time.Second
2024-03-17 14:48:31 +08:00
lb := NewLogBuffer("test", flushInterval, func(logBuffer *LogBuffer, startTime time.Time, stopTime time.Time, buf []byte) {
2023-08-21 03:46:15 +08:00
fmt.Printf("flush from %v to %v %d bytes\n", startTime, stopTime, len(buf))
2024-01-15 16:20:12 +08:00
}, nil, func() {
})
2024-05-21 02:03:56 +08:00
startTime := MessagePosition{Time: time.Now()}
messageSize := 1024
messageCount := 5000
2023-08-21 03:46:15 +08:00
2023-11-10 00:02:39 +08:00
receivedMessageCount := 0
2023-08-21 03:46:15 +08:00
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
2024-01-15 16:20:12 +08:00
lastProcessedTime, isDone, err := lb.LoopProcessLogData("test", startTime, 0, func() bool {
2023-08-21 03:46:15 +08:00
// stop if no more messages
2023-11-10 00:02:39 +08:00
return receivedMessageCount < messageCount
2024-03-17 14:48:31 +08:00
}, func(logEntry *filer_pb.LogEntry) (isDone bool, err error) {
2023-11-10 00:02:39 +08:00
receivedMessageCount++
if receivedMessageCount >= messageCount {
2023-08-21 03:46:15 +08:00
println("processed all messages")
2024-03-17 14:48:31 +08:00
return true, io.EOF
2023-08-21 03:46:15 +08:00
}
2024-05-21 02:03:56 +08:00
return false, nil
2023-08-21 03:46:15 +08:00
})
2023-11-10 00:02:39 +08:00
fmt.Printf("before flush: sent %d received %d\n", messageCount, receivedMessageCount)
2023-08-21 03:46:15 +08:00
fmt.Printf("lastProcessedTime %v isDone %v err: %v\n", lastProcessedTime, isDone, err)
if err != nil && err != io.EOF {
t.Errorf("unexpected error %v", err)
}
}()
var buf = make([]byte, messageSize)
for i := 0; i < messageCount; i++ {
rand.Read(buf)
2024-04-03 07:25:43 +08:00
lb.AddToBuffer(&mq_pb.DataMessage{
Key: nil,
Value: buf,
TsNs: 0,
})
}
2023-08-21 03:46:15 +08:00
wg.Wait()
2023-11-10 00:02:39 +08:00
if receivedMessageCount != messageCount {
t.Errorf("expect %d messages, but got %d", messageCount, receivedMessageCount)
}
}