Merge branch 'master' into dependabot/go_modules/github.com/fluent/fluent-logger-golang-1.10.0

This commit is contained in:
Chris Lu 2025-06-02 23:43:51 -07:00 committed by GitHub
commit 3863805998
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 55 additions and 16 deletions

View File

@ -116,7 +116,13 @@ connection_max_open = 100
connection_max_lifetime_seconds = 0
# if insert/upsert failing, you can disable upsert or update query syntax to match your RDBMS syntax:
enableUpsert = true
upsertQuery = """UPSERT INTO "%[1]s" (dirhash,name,directory,meta) VALUES($1,$2,$3,$4)"""
upsertQuery = """
INSERT INTO "%[1]s" (dirhash, name, directory, meta)
VALUES($1, $2, $3, $4)
ON CONFLICT (dirhash, name) DO UPDATE SET
directory=EXCLUDED.directory,
meta=EXCLUDED.meta
"""
[postgres2]
enabled = false
@ -141,7 +147,13 @@ connection_max_open = 100
connection_max_lifetime_seconds = 0
# if insert/upsert failing, you can disable upsert or update query syntax to match your RDBMS syntax:
enableUpsert = true
upsertQuery = """UPSERT INTO "%[1]s" (dirhash,name,directory,meta) VALUES($1,$2,$3,$4)"""
upsertQuery = """
INSERT INTO "%[1]s" (dirhash, name, directory, meta)
VALUES($1, $2, $3, $4)
ON CONFLICT (dirhash, name) DO UPDATE SET
directory=EXCLUDED.directory,
meta=EXCLUDED.meta
"""
[cassandra2]
# CREATE TABLE filemeta (

View File

@ -8,7 +8,8 @@ import (
)
const (
MaxSectionBucketSize = 10000
MaxSectionBucketSize = 1024 * 8
LookBackWindowSize = 1024 // how many entries to look back when inserting into a section
)
type SectionalNeedleId uint32
@ -61,6 +62,7 @@ func (cs *CompactSection) Set(key NeedleId, offset Offset, size Size) (oldOffset
lkey = cs.values[len(cs.values)-1].Key
}
hasAdded := false
switch {
case len(cs.values) < MaxSectionBucketSize && lkey <= skey:
// non-overflow insert
@ -70,28 +72,33 @@ func (cs *CompactSection) Set(key NeedleId, offset Offset, size Size) (oldOffset
Size: size,
OffsetHigher: offset.OffsetHigher,
})
hasAdded = true
case len(cs.values) < MaxSectionBucketSize:
// still has capacity and only partially out of order
lookBackIndex := len(cs.values) - 128
lookBackIndex := len(cs.values) - LookBackWindowSize
if lookBackIndex < 0 {
lookBackIndex = 0
}
for ; lookBackIndex < len(cs.values); lookBackIndex++ {
if cs.values[lookBackIndex].Key >= skey {
break
if cs.values[lookBackIndex].Key <= skey {
for ; lookBackIndex < len(cs.values); lookBackIndex++ {
if cs.values[lookBackIndex].Key >= skey {
break
}
}
cs.values = append(cs.values, SectionalNeedleValue{})
copy(cs.values[lookBackIndex+1:], cs.values[lookBackIndex:])
cs.values[lookBackIndex].Key, cs.values[lookBackIndex].Size = skey, size
cs.values[lookBackIndex].OffsetLower, cs.values[lookBackIndex].OffsetHigher = offset.OffsetLower, offset.OffsetHigher
hasAdded = true
}
cs.values = append(cs.values, SectionalNeedleValue{})
copy(cs.values[lookBackIndex+1:], cs.values[lookBackIndex:])
cs.values[lookBackIndex].Key, cs.values[lookBackIndex].Size = skey, size
cs.values[lookBackIndex].OffsetLower, cs.values[lookBackIndex].OffsetHigher = offset.OffsetLower, offset.OffsetHigher
default:
// overflow insert
}
// overflow insert
if !hasAdded {
if oldValue, found := cs.findOverflowEntry(skey); found {
oldOffset.OffsetHigher, oldOffset.OffsetLower, oldSize = oldValue.OffsetHigher, oldValue.OffsetLower, oldValue.Size
}
cs.setOverflowEntry(skey, offset, size)
return
}
// if we maxed out our values bucket, pin its capacity to minimize memory usage

View File

@ -16,7 +16,7 @@ import (
To see the memory usage:
go test -run TestMemoryUsage
The TotalAlloc section shows the memory increase for each iteration.
The Alloc section shows the in-use memory increase for each iteration.
go test -run TestMemoryUsage -memprofile=mem.out
go tool pprof --alloc_space needle.test mem.out
@ -81,7 +81,7 @@ func PrintMemUsage(totalRowCount uint64) {
var m runtime.MemStats
runtime.ReadMemStats(&m)
// For info on each, see: https://golang.org/pkg/runtime/#MemStats
fmt.Printf("Each %.2f Bytes", float64(m.TotalAlloc)/float64(totalRowCount))
fmt.Printf("Each %.02f Bytes", float64(m.Alloc)/float64(totalRowCount))
fmt.Printf("\tAlloc = %v MiB", bToMb(m.Alloc))
fmt.Printf("\tTotalAlloc = %v MiB", bToMb(m.TotalAlloc))
fmt.Printf("\tSys = %v MiB", bToMb(m.Sys))

View File

@ -219,3 +219,23 @@ func TestCompactSection_Get(t *testing.T) {
t.Error(uint64(nv3.Size))
}
}
// Test after putting 1 ~ LookBackWindowSize*3 items in sequential order, but missing item LookBackWindowSize
// insert the item LookBackWindowSize in the middle of the sequence
func TestCompactSection_PutOutOfOrderItemBeyondLookBackWindow(t *testing.T) {
m := NewCompactMap()
// put 1 ~ 10
for i := 1; i <= LookBackWindowSize*3; i++ {
if i != LookBackWindowSize {
m.Set(NeedleId(i), ToOffset(int64(i)), Size(i))
}
}
m.Set(NeedleId(LookBackWindowSize), ToOffset(int64(LookBackWindowSize)), Size(LookBackWindowSize))
// check if 8 is in the right place
if v, ok := m.Get(NeedleId(LookBackWindowSize)); !ok || v.Offset != ToOffset(LookBackWindowSize) || v.Size != Size(LookBackWindowSize) {
t.Fatalf("expected to find LookBackWindowSize at offset %d with size %d, but got %v", LookBackWindowSize, LookBackWindowSize, v)
}
}