mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-11-24 19:19:11 +08:00
00707ec00f
Running mount outside of the cluster would not need to expose all the volume servers to outside of the cluster. The chunk read and write will go through the filer.
230 lines
6.5 KiB
Go
230 lines
6.5 KiB
Go
package filer
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"math/rand"
|
|
"sync"
|
|
|
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/chrislusf/seaweedfs/weed/util"
|
|
"github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
|
|
"github.com/chrislusf/seaweedfs/weed/wdclient"
|
|
"github.com/golang/groupcache/singleflight"
|
|
)
|
|
|
|
type ChunkReadAt struct {
|
|
masterClient *wdclient.MasterClient
|
|
chunkViews []*ChunkView
|
|
lookupFileId wdclient.LookupFileIdFunctionType
|
|
readerLock sync.Mutex
|
|
fileSize int64
|
|
|
|
fetchGroup singleflight.Group
|
|
chunkCache chunk_cache.ChunkCache
|
|
lastChunkFileId string
|
|
lastChunkData []byte
|
|
}
|
|
|
|
var _ = io.ReaderAt(&ChunkReadAt{})
|
|
var _ = io.Closer(&ChunkReadAt{})
|
|
|
|
func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionType {
|
|
|
|
vidCache := make(map[string]*filer_pb.Locations)
|
|
var vicCacheLock sync.RWMutex
|
|
return func(fileId string) (targetUrls []string, err error) {
|
|
vid := VolumeId(fileId)
|
|
vicCacheLock.RLock()
|
|
locations, found := vidCache[vid]
|
|
vicCacheLock.RUnlock()
|
|
|
|
if !found {
|
|
util.Retry("lookup volume "+vid, func() error {
|
|
err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
|
resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
|
|
VolumeIds: []string{vid},
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
locations = resp.LocationsMap[vid]
|
|
if locations == nil || len(locations.Locations) == 0 {
|
|
glog.V(0).Infof("failed to locate %s", fileId)
|
|
return fmt.Errorf("failed to locate %s", fileId)
|
|
}
|
|
vicCacheLock.Lock()
|
|
vidCache[vid] = locations
|
|
vicCacheLock.Unlock()
|
|
|
|
return nil
|
|
})
|
|
return err
|
|
})
|
|
}
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for _, loc := range locations.Locations {
|
|
volumeServerAddress := loc.Url
|
|
targetUrl := fmt.Sprintf("http://%s/%s", volumeServerAddress, fileId)
|
|
targetUrls = append(targetUrls, targetUrl)
|
|
}
|
|
|
|
for i := len(targetUrls) - 1; i > 0; i-- {
|
|
j := rand.Intn(i + 1)
|
|
targetUrls[i], targetUrls[j] = targetUrls[j], targetUrls[i]
|
|
}
|
|
|
|
return
|
|
}
|
|
}
|
|
|
|
func NewChunkReaderAtFromClient(lookupFn wdclient.LookupFileIdFunctionType, chunkViews []*ChunkView, chunkCache chunk_cache.ChunkCache, fileSize int64) *ChunkReadAt {
|
|
|
|
return &ChunkReadAt{
|
|
chunkViews: chunkViews,
|
|
lookupFileId: lookupFn,
|
|
chunkCache: chunkCache,
|
|
fileSize: fileSize,
|
|
}
|
|
}
|
|
|
|
func (c *ChunkReadAt) Close() error {
|
|
c.lastChunkData = nil
|
|
c.lastChunkFileId = ""
|
|
return nil
|
|
}
|
|
|
|
func (c *ChunkReadAt) ReadAt(p []byte, offset int64) (n int, err error) {
|
|
|
|
c.readerLock.Lock()
|
|
defer c.readerLock.Unlock()
|
|
|
|
glog.V(4).Infof("ReadAt [%d,%d) of total file size %d bytes %d chunk views", offset, offset+int64(len(p)), c.fileSize, len(c.chunkViews))
|
|
return c.doReadAt(p, offset)
|
|
}
|
|
|
|
func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) {
|
|
|
|
startOffset, remaining := offset, int64(len(p))
|
|
var nextChunk *ChunkView
|
|
for i, chunk := range c.chunkViews {
|
|
if remaining <= 0 {
|
|
break
|
|
}
|
|
if i+1 < len(c.chunkViews) {
|
|
nextChunk = c.chunkViews[i+1]
|
|
} else {
|
|
nextChunk = nil
|
|
}
|
|
if startOffset < chunk.LogicOffset {
|
|
gap := int(chunk.LogicOffset - startOffset)
|
|
glog.V(4).Infof("zero [%d,%d)", startOffset, startOffset+int64(gap))
|
|
n += int(min(int64(gap), remaining))
|
|
startOffset, remaining = chunk.LogicOffset, remaining-int64(gap)
|
|
if remaining <= 0 {
|
|
break
|
|
}
|
|
}
|
|
// fmt.Printf(">>> doReadAt [%d,%d), chunk[%d,%d)\n", offset, offset+int64(len(p)), chunk.LogicOffset, chunk.LogicOffset+int64(chunk.Size))
|
|
chunkStart, chunkStop := max(chunk.LogicOffset, startOffset), min(chunk.LogicOffset+int64(chunk.Size), startOffset+remaining)
|
|
if chunkStart >= chunkStop {
|
|
continue
|
|
}
|
|
glog.V(4).Infof("read [%d,%d), %d/%d chunk %s [%d,%d)", chunkStart, chunkStop, i, len(c.chunkViews), chunk.FileId, chunk.LogicOffset-chunk.Offset, chunk.LogicOffset-chunk.Offset+int64(chunk.Size))
|
|
var buffer []byte
|
|
buffer, err = c.readFromWholeChunkData(chunk, nextChunk)
|
|
if err != nil {
|
|
glog.Errorf("fetching chunk %+v: %v\n", chunk, err)
|
|
return
|
|
}
|
|
bufferOffset := chunkStart - chunk.LogicOffset + chunk.Offset
|
|
copied := copy(p[startOffset-offset:chunkStop-chunkStart+startOffset-offset], buffer[bufferOffset:bufferOffset+chunkStop-chunkStart])
|
|
n += copied
|
|
startOffset, remaining = startOffset+int64(copied), remaining-int64(copied)
|
|
}
|
|
|
|
glog.V(4).Infof("doReadAt [%d,%d), n:%v, err:%v", offset, offset+int64(len(p)), n, err)
|
|
|
|
if err == nil && remaining > 0 && c.fileSize > startOffset {
|
|
delta := int(min(remaining, c.fileSize-startOffset))
|
|
glog.V(4).Infof("zero2 [%d,%d) of file size %d bytes", startOffset, startOffset+int64(delta), c.fileSize)
|
|
n += delta
|
|
}
|
|
|
|
if err == nil && offset+int64(len(p)) >= c.fileSize {
|
|
err = io.EOF
|
|
}
|
|
// fmt.Printf("~~~ filled %d, err: %v\n\n", n, err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
func (c *ChunkReadAt) readFromWholeChunkData(chunkView *ChunkView, nextChunkViews ...*ChunkView) (chunkData []byte, err error) {
|
|
|
|
if c.lastChunkFileId == chunkView.FileId {
|
|
return c.lastChunkData, nil
|
|
}
|
|
|
|
v, doErr := c.readOneWholeChunk(chunkView)
|
|
|
|
if doErr != nil {
|
|
return nil, doErr
|
|
}
|
|
|
|
chunkData = v.([]byte)
|
|
|
|
c.lastChunkData = chunkData
|
|
c.lastChunkFileId = chunkView.FileId
|
|
|
|
for _, nextChunkView := range nextChunkViews {
|
|
if c.chunkCache != nil && nextChunkView != nil {
|
|
go c.readOneWholeChunk(nextChunkView)
|
|
}
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (c *ChunkReadAt) readOneWholeChunk(chunkView *ChunkView) (interface{}, error) {
|
|
|
|
var err error
|
|
|
|
return c.fetchGroup.Do(chunkView.FileId, func() (interface{}, error) {
|
|
|
|
glog.V(4).Infof("readFromWholeChunkData %s offset %d [%d,%d) size at least %d", chunkView.FileId, chunkView.Offset, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size), chunkView.ChunkSize)
|
|
|
|
data := c.chunkCache.GetChunk(chunkView.FileId, chunkView.ChunkSize)
|
|
if data != nil {
|
|
glog.V(4).Infof("cache hit %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset-chunkView.Offset, chunkView.LogicOffset-chunkView.Offset+int64(len(data)))
|
|
} else {
|
|
var err error
|
|
data, err = c.doFetchFullChunkData(chunkView)
|
|
if err != nil {
|
|
return data, err
|
|
}
|
|
c.chunkCache.SetChunk(chunkView.FileId, data)
|
|
}
|
|
return data, err
|
|
})
|
|
}
|
|
|
|
func (c *ChunkReadAt) doFetchFullChunkData(chunkView *ChunkView) ([]byte, error) {
|
|
|
|
glog.V(4).Infof("+ doFetchFullChunkData %s", chunkView.FileId)
|
|
|
|
data, err := fetchChunk(c.lookupFileId, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped)
|
|
|
|
glog.V(4).Infof("- doFetchFullChunkData %s", chunkView.FileId)
|
|
|
|
return data, err
|
|
|
|
}
|