2020-04-20 14:37:04 +08:00
|
|
|
package log_buffer
|
|
|
|
|
|
|
|
import (
|
2023-08-21 03:46:15 +08:00
|
|
|
"crypto/rand"
|
2020-04-20 18:08:10 +08:00
|
|
|
"fmt"
|
2024-04-03 07:25:43 +08:00
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
|
2023-08-21 03:46:15 +08:00
|
|
|
"io"
|
|
|
|
"sync"
|
2020-04-20 14:37:04 +08:00
|
|
|
"testing"
|
|
|
|
"time"
|
|
|
|
|
2022-07-29 15:17:28 +08:00
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
2020-04-20 14:37:04 +08:00
|
|
|
)
|
|
|
|
|
2020-04-20 17:54:21 +08:00
|
|
|
func TestNewLogBufferFirstBuffer(t *testing.T) {
|
2023-08-21 03:46:15 +08:00
|
|
|
flushInterval := time.Second
|
2024-03-17 14:48:31 +08:00
|
|
|
lb := NewLogBuffer("test", flushInterval, func(logBuffer *LogBuffer, startTime time.Time, stopTime time.Time, buf []byte) {
|
2023-08-21 03:46:15 +08:00
|
|
|
fmt.Printf("flush from %v to %v %d bytes\n", startTime, stopTime, len(buf))
|
2024-01-15 16:20:12 +08:00
|
|
|
}, nil, func() {
|
2020-04-20 14:37:04 +08:00
|
|
|
})
|
|
|
|
|
2024-03-17 14:48:31 +08:00
|
|
|
startTime := MessagePosition{Time:time.Now()}
|
2020-04-20 14:37:04 +08:00
|
|
|
|
|
|
|
messageSize := 1024
|
2020-04-21 08:26:38 +08:00
|
|
|
messageCount := 5000
|
2023-08-21 03:46:15 +08:00
|
|
|
|
2023-11-10 00:02:39 +08:00
|
|
|
receivedMessageCount := 0
|
2023-08-21 03:46:15 +08:00
|
|
|
var wg sync.WaitGroup
|
|
|
|
wg.Add(1)
|
|
|
|
go func() {
|
|
|
|
defer wg.Done()
|
2024-01-15 16:20:12 +08:00
|
|
|
lastProcessedTime, isDone, err := lb.LoopProcessLogData("test", startTime, 0, func() bool {
|
2023-08-21 03:46:15 +08:00
|
|
|
// stop if no more messages
|
2023-11-10 00:02:39 +08:00
|
|
|
return receivedMessageCount < messageCount
|
2024-03-17 14:48:31 +08:00
|
|
|
}, func(logEntry *filer_pb.LogEntry) (isDone bool, err error) {
|
2023-11-10 00:02:39 +08:00
|
|
|
receivedMessageCount++
|
|
|
|
if receivedMessageCount >= messageCount {
|
2023-08-21 03:46:15 +08:00
|
|
|
println("processed all messages")
|
2024-03-17 14:48:31 +08:00
|
|
|
return true, io.EOF
|
2023-08-21 03:46:15 +08:00
|
|
|
}
|
2024-03-17 14:48:31 +08:00
|
|
|
return false,nil
|
2023-08-21 03:46:15 +08:00
|
|
|
})
|
|
|
|
|
2023-11-10 00:02:39 +08:00
|
|
|
fmt.Printf("before flush: sent %d received %d\n", messageCount, receivedMessageCount)
|
2023-08-21 03:46:15 +08:00
|
|
|
fmt.Printf("lastProcessedTime %v isDone %v err: %v\n", lastProcessedTime, isDone, err)
|
|
|
|
if err != nil && err != io.EOF {
|
|
|
|
t.Errorf("unexpected error %v", err)
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
2020-04-20 14:37:04 +08:00
|
|
|
var buf = make([]byte, messageSize)
|
|
|
|
for i := 0; i < messageCount; i++ {
|
|
|
|
rand.Read(buf)
|
2024-04-03 07:25:43 +08:00
|
|
|
lb.AddToBuffer(&mq_pb.DataMessage{
|
|
|
|
Key: nil,
|
|
|
|
Value: buf,
|
|
|
|
TsNs: 0,
|
|
|
|
})
|
2020-04-20 14:37:04 +08:00
|
|
|
}
|
2023-08-21 03:46:15 +08:00
|
|
|
wg.Wait()
|
2020-04-20 14:37:04 +08:00
|
|
|
|
2023-11-10 00:02:39 +08:00
|
|
|
if receivedMessageCount != messageCount {
|
|
|
|
t.Errorf("expect %d messages, but got %d", messageCount, receivedMessageCount)
|
2020-04-20 14:37:04 +08:00
|
|
|
}
|
|
|
|
}
|