2020-03-30 12:07:55 +08:00
package filer2
2020-03-27 19:50:51 +08:00
import (
"context"
"fmt"
"io"
"sync"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
2020-04-12 03:45:24 +08:00
"github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
2020-03-27 19:50:51 +08:00
"github.com/chrislusf/seaweedfs/weed/wdclient"
)
type ChunkReadAt struct {
masterClient * wdclient . MasterClient
2020-03-30 12:07:55 +08:00
chunkViews [ ] * ChunkView
2020-03-27 19:50:51 +08:00
buffer [ ] byte
2020-08-17 06:16:46 +08:00
bufferFileId string
2020-03-27 19:50:51 +08:00
lookupFileId func ( fileId string ) ( targetUrl string , err error )
readerLock sync . Mutex
2020-08-16 15:49:08 +08:00
fileSize int64
2020-03-29 04:43:31 +08:00
2020-04-12 03:45:24 +08:00
chunkCache * chunk_cache . ChunkCache
2020-03-27 19:50:51 +08:00
}
// var _ = io.ReaderAt(&ChunkReadAt{})
2020-04-30 08:40:08 +08:00
type LookupFileIdFunctionType func ( fileId string ) ( targetUrl string , err error )
func LookupFn ( filerClient filer_pb . FilerClient ) LookupFileIdFunctionType {
return func ( fileId string ) ( targetUrl string , err error ) {
err = filerClient . WithFilerClient ( func ( client filer_pb . SeaweedFilerClient ) error {
vid := VolumeId ( fileId )
resp , err := client . LookupVolume ( context . Background ( ) , & filer_pb . LookupVolumeRequest {
VolumeIds : [ ] string { vid } ,
} )
if err != nil {
return err
}
locations := resp . LocationsMap [ vid ]
if locations == nil || len ( locations . Locations ) == 0 {
glog . V ( 0 ) . Infof ( "failed to locate %s" , fileId )
return fmt . Errorf ( "failed to locate %s" , fileId )
}
volumeServerAddress := filerClient . AdjustedUrl ( locations . Locations [ 0 ] . Url )
targetUrl = fmt . Sprintf ( "http://%s/%s" , volumeServerAddress , fileId )
return nil
} )
return
}
}
2020-08-16 15:49:08 +08:00
func NewChunkReaderAtFromClient ( filerClient filer_pb . FilerClient , chunkViews [ ] * ChunkView , chunkCache * chunk_cache . ChunkCache , fileSize int64 ) * ChunkReadAt {
2020-03-27 19:50:51 +08:00
return & ChunkReadAt {
2020-05-10 18:50:30 +08:00
chunkViews : chunkViews ,
2020-04-30 08:40:08 +08:00
lookupFileId : LookupFn ( filerClient ) ,
2020-03-29 05:07:16 +08:00
chunkCache : chunkCache ,
2020-08-16 15:49:08 +08:00
fileSize : fileSize ,
2020-03-27 19:50:51 +08:00
}
}
func ( c * ChunkReadAt ) ReadAt ( p [ ] byte , offset int64 ) ( n int , err error ) {
c . readerLock . Lock ( )
defer c . readerLock . Unlock ( )
2020-08-18 07:05:40 +08:00
glog . V ( 4 ) . Infof ( "ReadAt [%d,%d) of total file size %d bytes %d chunk views" , offset , offset + int64 ( len ( p ) ) , c . fileSize , len ( c . chunkViews ) )
2020-03-27 19:50:51 +08:00
for n < len ( p ) && err == nil {
readCount , readErr := c . doReadAt ( p [ n : ] , offset + int64 ( n ) )
n += readCount
err = readErr
}
return
}
func ( c * ChunkReadAt ) doReadAt ( p [ ] byte , offset int64 ) ( n int , err error ) {
2020-08-17 12:07:46 +08:00
var startOffset = offset
2020-08-18 07:05:40 +08:00
for i , chunk := range c . chunkViews {
2020-08-17 12:07:46 +08:00
if startOffset < min ( chunk . LogicOffset , int64 ( len ( p ) ) + offset ) {
gap := int ( min ( chunk . LogicOffset , int64 ( len ( p ) ) + offset ) - startOffset )
glog . V ( 4 ) . Infof ( "zero [%d,%d)" , n , n + gap )
n += gap
startOffset = chunk . LogicOffset
}
2020-08-17 06:16:46 +08:00
// fmt.Printf(">>> doReadAt [%d,%d), chunk[%d,%d)\n", offset, offset+int64(len(p)), chunk.LogicOffset, chunk.LogicOffset+int64(chunk.Size))
2020-08-17 12:07:46 +08:00
chunkStart , chunkStop := max ( chunk . LogicOffset , offset ) , min ( chunk . LogicOffset + int64 ( chunk . Size ) , offset + int64 ( len ( p ) ) )
if chunkStart >= chunkStop {
continue
}
2020-08-18 07:05:40 +08:00
glog . V ( 4 ) . Infof ( "read [%d,%d), %d chunk %s [%d,%d)" , chunkStart , chunkStop , i , chunk . FileId , chunk . LogicOffset - chunk . Offset , chunk . LogicOffset - chunk . Offset + int64 ( chunk . Size ) )
2020-08-17 12:07:46 +08:00
c . buffer , err = c . fetchWholeChunkData ( chunk )
if err != nil {
glog . Errorf ( "fetching chunk %+v: %v\n" , chunk , err )
return
2020-03-27 19:50:51 +08:00
}
2020-08-17 12:07:46 +08:00
c . bufferFileId = chunk . FileId
bufferOffset := chunkStart - chunk . LogicOffset + chunk . Offset
copied := copy ( p [ chunkStart - offset : chunkStop - offset ] , c . buffer [ bufferOffset : bufferOffset + chunkStop - chunkStart ] )
n += copied
startOffset += int64 ( copied )
2020-03-27 19:50:51 +08:00
}
2020-08-16 15:49:08 +08:00
2020-08-17 06:16:46 +08:00
// fmt.Printf("> doReadAt [%d,%d), buffer %s:%d, found:%v, err:%v\n", offset, offset+int64(len(p)), c.bufferFileId, int64(len(c.buffer)), found, err)
2020-08-16 15:49:08 +08:00
2020-08-17 12:07:46 +08:00
if startOffset < min ( c . fileSize , int64 ( len ( p ) ) + offset ) {
gap := int ( min ( c . fileSize , int64 ( len ( p ) ) + offset ) - startOffset )
glog . V ( 4 ) . Infof ( "zero2 [%d,%d)" , n , n + gap )
n += gap
2020-03-27 19:50:51 +08:00
}
2020-08-16 15:49:08 +08:00
if offset + int64 ( n ) >= c . fileSize {
err = io . EOF
}
2020-08-17 06:16:46 +08:00
// fmt.Printf("~~~ filled %d, err: %v\n\n", n, err)
2020-03-27 19:50:51 +08:00
return
}
2020-08-17 06:16:46 +08:00
func ( c * ChunkReadAt ) fetchWholeChunkData ( chunkView * ChunkView ) ( chunkData [ ] byte , err error ) {
2020-03-27 19:50:51 +08:00
2020-08-18 07:05:40 +08:00
glog . V ( 4 ) . Infof ( "fetchWholeChunkData %s offset %d [%d,%d) size at least %d" , chunkView . FileId , chunkView . Offset , chunkView . LogicOffset , chunkView . LogicOffset + int64 ( chunkView . Size ) , chunkView . ChunkSize )
2020-03-29 04:43:31 +08:00
2020-08-17 06:16:46 +08:00
chunkData = c . chunkCache . GetChunk ( chunkView . FileId , chunkView . ChunkSize )
2020-03-29 04:43:31 +08:00
if chunkData != nil {
2020-08-17 12:07:46 +08:00
glog . V ( 5 ) . Infof ( "cache hit %s [%d,%d)" , chunkView . FileId , chunkView . LogicOffset - chunkView . Offset , chunkView . LogicOffset - chunkView . Offset + int64 ( len ( chunkData ) ) )
2020-03-29 15:54:39 +08:00
} else {
2020-08-18 07:05:40 +08:00
glog . V ( 4 ) . Infof ( "doFetchFullChunkData %s" , chunkView . FileId )
2020-03-29 15:54:39 +08:00
chunkData , err = c . doFetchFullChunkData ( chunkView . FileId , chunkView . CipherKey , chunkView . IsGzipped )
if err != nil {
2020-08-17 06:16:46 +08:00
return
2020-03-29 15:54:39 +08:00
}
2020-04-12 16:13:57 +08:00
c . chunkCache . SetChunk ( chunkView . FileId , chunkData )
}
2020-03-29 15:54:39 +08:00
2020-08-17 06:16:46 +08:00
return
2020-03-29 15:54:39 +08:00
}
func ( c * ChunkReadAt ) doFetchFullChunkData ( fileId string , cipherKey [ ] byte , isGzipped bool ) ( [ ] byte , error ) {
2020-07-20 08:59:43 +08:00
return fetchChunk ( c . lookupFileId , fileId , cipherKey , isGzipped )
2020-03-27 19:50:51 +08:00
}