seaweedfs/weed/util/chunk_cache/chunk_cache.go
Eugeniy E. Mikhailov c04edeed68
bug fix in the data received from cache processing (#6002)
The patch addresses #3745.

The cache should return the exact amount of data requested by the buffer.
By construction of the cache it is always all requested data range
or we have error happening.

The old use of minsize miscalculate the requested data size,
if non zero offset is requested.
2024-09-10 13:30:18 -07:00

145 lines
3.2 KiB
Go

package chunk_cache
import (
"errors"
"sync"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
)
var ErrorOutOfBounds = errors.New("attempt to read out of bounds")
type ChunkCache interface {
ReadChunkAt(data []byte, fileId string, offset uint64) (n int, err error)
SetChunk(fileId string, data []byte)
}
// a global cache for recently accessed file chunks
type TieredChunkCache struct {
memCache *ChunkCacheInMemory
diskCaches []*OnDiskCacheLayer
sync.RWMutex
onDiskCacheSizeLimit0 uint64
onDiskCacheSizeLimit1 uint64
onDiskCacheSizeLimit2 uint64
}
var _ ChunkCache = &TieredChunkCache{}
func NewTieredChunkCache(maxEntries int64, dir string, diskSizeInUnit int64, unitSize int64) *TieredChunkCache {
c := &TieredChunkCache{
memCache: NewChunkCacheInMemory(maxEntries),
}
c.diskCaches = make([]*OnDiskCacheLayer, 3)
c.onDiskCacheSizeLimit0 = uint64(unitSize)
c.onDiskCacheSizeLimit1 = 4 * c.onDiskCacheSizeLimit0
c.onDiskCacheSizeLimit2 = 2 * c.onDiskCacheSizeLimit1
c.diskCaches[0] = NewOnDiskCacheLayer(dir, "c0_2", diskSizeInUnit*unitSize/8, 2)
c.diskCaches[1] = NewOnDiskCacheLayer(dir, "c1_3", diskSizeInUnit*unitSize/4+diskSizeInUnit*unitSize/8, 3)
c.diskCaches[2] = NewOnDiskCacheLayer(dir, "c2_2", diskSizeInUnit*unitSize/2, 2)
return c
}
func (c *TieredChunkCache) ReadChunkAt(data []byte, fileId string, offset uint64) (n int, err error) {
if c == nil {
return 0, nil
}
c.RLock()
defer c.RUnlock()
minSize := offset + uint64(len(data))
if minSize <= c.onDiskCacheSizeLimit0 {
n, err = c.memCache.readChunkAt(data, fileId, offset)
if err != nil {
glog.Errorf("failed to read from memcache: %s", err)
}
if n == int(len(data)) {
return n, nil
}
}
fid, err := needle.ParseFileIdFromString(fileId)
if err != nil {
glog.Errorf("failed to parse file id %s", fileId)
return 0, nil
}
if minSize <= c.onDiskCacheSizeLimit0 {
n, err = c.diskCaches[0].readChunkAt(data, fid.Key, offset)
if n == int(len(data)) {
return
}
}
if minSize <= c.onDiskCacheSizeLimit1 {
n, err = c.diskCaches[1].readChunkAt(data, fid.Key, offset)
if n == int(len(data)) {
return
}
}
{
n, err = c.diskCaches[2].readChunkAt(data, fid.Key, offset)
if n == int(len(data)) {
return
}
}
return 0, nil
}
func (c *TieredChunkCache) SetChunk(fileId string, data []byte) {
if c == nil {
return
}
c.Lock()
defer c.Unlock()
glog.V(4).Infof("SetChunk %s size %d\n", fileId, len(data))
c.doSetChunk(fileId, data)
}
func (c *TieredChunkCache) doSetChunk(fileId string, data []byte) {
if len(data) <= int(c.onDiskCacheSizeLimit0) {
c.memCache.SetChunk(fileId, data)
}
fid, err := needle.ParseFileIdFromString(fileId)
if err != nil {
glog.Errorf("failed to parse file id %s", fileId)
return
}
if len(data) <= int(c.onDiskCacheSizeLimit0) {
c.diskCaches[0].setChunk(fid.Key, data)
} else if len(data) <= int(c.onDiskCacheSizeLimit1) {
c.diskCaches[1].setChunk(fid.Key, data)
} else {
c.diskCaches[2].setChunk(fid.Key, data)
}
}
func (c *TieredChunkCache) Shutdown() {
if c == nil {
return
}
c.Lock()
defer c.Unlock()
for _, diskCache := range c.diskCaches {
diskCache.shutdown()
}
}
func min(x, y int) int {
if x < y {
return x
}
return y
}