2020-09-01 15:21:19 +08:00
package filer
2020-03-27 19:50:51 +08:00
import (
"context"
"fmt"
2020-11-01 18:36:43 +08:00
"io"
"math/rand"
"sync"
2020-03-27 19:50:51 +08:00
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
2020-11-01 16:58:48 +08:00
"github.com/chrislusf/seaweedfs/weed/util"
2020-04-12 03:45:24 +08:00
"github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
2020-03-27 19:50:51 +08:00
"github.com/chrislusf/seaweedfs/weed/wdclient"
2020-10-08 14:58:32 +08:00
"github.com/golang/groupcache/singleflight"
2020-10-11 11:09:43 +08:00
)
2020-03-27 19:50:51 +08:00
type ChunkReadAt struct {
masterClient * wdclient . MasterClient
2020-03-30 12:07:55 +08:00
chunkViews [ ] * ChunkView
2020-10-08 13:49:04 +08:00
lookupFileId LookupFileIdFunctionType
2020-03-27 19:50:51 +08:00
readerLock sync . Mutex
2020-08-16 15:49:08 +08:00
fileSize int64
2020-03-29 04:43:31 +08:00
2020-10-08 13:49:04 +08:00
fetchGroup singleflight . Group
chunkCache chunk_cache . ChunkCache
2020-03-27 19:50:51 +08:00
}
// var _ = io.ReaderAt(&ChunkReadAt{})
2020-10-08 13:49:04 +08:00
type LookupFileIdFunctionType func ( fileId string ) ( targetUrls [ ] string , err error )
2020-04-30 08:40:08 +08:00
func LookupFn ( filerClient filer_pb . FilerClient ) LookupFileIdFunctionType {
2020-10-04 11:16:42 +08:00
vidCache := make ( map [ string ] * filer_pb . Locations )
2020-10-22 10:28:59 +08:00
var vicCacheLock sync . RWMutex
2020-10-08 13:49:04 +08:00
return func ( fileId string ) ( targetUrls [ ] string , err error ) {
2020-10-04 11:16:42 +08:00
vid := VolumeId ( fileId )
2020-10-22 10:28:59 +08:00
vicCacheLock . RLock ( )
2020-10-04 11:16:42 +08:00
locations , found := vidCache [ vid ]
2020-10-22 10:28:59 +08:00
vicCacheLock . RUnlock ( )
2020-10-04 11:16:42 +08:00
2020-11-01 16:58:48 +08:00
if ! found {
2020-11-01 18:36:43 +08:00
util . Retry ( "lookup volume " + vid , func ( ) error {
2020-11-01 16:58:48 +08:00
err = filerClient . WithFilerClient ( func ( client filer_pb . SeaweedFilerClient ) error {
resp , err := client . LookupVolume ( context . Background ( ) , & filer_pb . LookupVolumeRequest {
VolumeIds : [ ] string { vid } ,
} )
if err != nil {
return err
}
locations = resp . LocationsMap [ vid ]
if locations == nil || len ( locations . Locations ) == 0 {
glog . V ( 0 ) . Infof ( "failed to locate %s" , fileId )
return fmt . Errorf ( "failed to locate %s" , fileId )
}
vicCacheLock . Lock ( )
vidCache [ vid ] = locations
vicCacheLock . Unlock ( )
return nil
2020-10-04 11:16:42 +08:00
} )
2020-11-01 16:58:48 +08:00
return err
2020-04-30 08:40:08 +08:00
} )
2020-10-04 11:16:42 +08:00
}
2020-04-30 08:40:08 +08:00
2020-10-11 06:43:22 +08:00
if err != nil {
return nil , err
}
2020-10-08 13:49:04 +08:00
for _ , loc := range locations . Locations {
2020-10-12 11:15:10 +08:00
volumeServerAddress := filerClient . AdjustedUrl ( loc )
2020-10-08 13:49:04 +08:00
targetUrl := fmt . Sprintf ( "http://%s/%s" , volumeServerAddress , fileId )
targetUrls = append ( targetUrls , targetUrl )
}
2020-04-30 08:40:08 +08:00
2020-10-08 14:58:32 +08:00
for i := len ( targetUrls ) - 1 ; i > 0 ; i -- {
j := rand . Intn ( i + 1 )
targetUrls [ i ] , targetUrls [ j ] = targetUrls [ j ] , targetUrls [ i ]
}
2020-04-30 08:40:08 +08:00
return
}
}
2020-08-18 11:20:08 +08:00
func NewChunkReaderAtFromClient ( filerClient filer_pb . FilerClient , chunkViews [ ] * ChunkView , chunkCache chunk_cache . ChunkCache , fileSize int64 ) * ChunkReadAt {
2020-03-27 19:50:51 +08:00
return & ChunkReadAt {
2020-05-10 18:50:30 +08:00
chunkViews : chunkViews ,
2020-04-30 08:40:08 +08:00
lookupFileId : LookupFn ( filerClient ) ,
2020-03-29 05:07:16 +08:00
chunkCache : chunkCache ,
2020-08-16 15:49:08 +08:00
fileSize : fileSize ,
2020-03-27 19:50:51 +08:00
}
}
func ( c * ChunkReadAt ) ReadAt ( p [ ] byte , offset int64 ) ( n int , err error ) {
c . readerLock . Lock ( )
defer c . readerLock . Unlock ( )
2020-08-18 07:05:40 +08:00
glog . V ( 4 ) . Infof ( "ReadAt [%d,%d) of total file size %d bytes %d chunk views" , offset , offset + int64 ( len ( p ) ) , c . fileSize , len ( c . chunkViews ) )
2020-08-18 11:14:40 +08:00
return c . doReadAt ( p [ n : ] , offset + int64 ( n ) )
2020-03-27 19:50:51 +08:00
}
func ( c * ChunkReadAt ) doReadAt ( p [ ] byte , offset int64 ) ( n int , err error ) {
2020-08-18 11:14:40 +08:00
startOffset , remaining := offset , int64 ( len ( p ) )
2020-10-04 16:31:04 +08:00
var nextChunk * ChunkView
2020-08-18 07:05:40 +08:00
for i , chunk := range c . chunkViews {
2020-08-18 11:14:40 +08:00
if remaining <= 0 {
break
}
2020-10-04 16:31:04 +08:00
if i + 1 < len ( c . chunkViews ) {
nextChunk = c . chunkViews [ i + 1 ]
} else {
nextChunk = nil
}
2020-08-18 11:14:40 +08:00
if startOffset < chunk . LogicOffset {
gap := int ( chunk . LogicOffset - startOffset )
2020-08-18 15:32:01 +08:00
glog . V ( 4 ) . Infof ( "zero [%d,%d)" , startOffset , startOffset + int64 ( gap ) )
n += int ( min ( int64 ( gap ) , remaining ) )
2020-08-18 11:14:40 +08:00
startOffset , remaining = chunk . LogicOffset , remaining - int64 ( gap )
if remaining <= 0 {
break
}
2020-08-17 12:07:46 +08:00
}
2020-08-17 06:16:46 +08:00
// fmt.Printf(">>> doReadAt [%d,%d), chunk[%d,%d)\n", offset, offset+int64(len(p)), chunk.LogicOffset, chunk.LogicOffset+int64(chunk.Size))
2020-08-18 11:14:40 +08:00
chunkStart , chunkStop := max ( chunk . LogicOffset , startOffset ) , min ( chunk . LogicOffset + int64 ( chunk . Size ) , startOffset + remaining )
2020-08-17 12:07:46 +08:00
if chunkStart >= chunkStop {
continue
}
2020-08-18 11:14:40 +08:00
glog . V ( 4 ) . Infof ( "read [%d,%d), %d/%d chunk %s [%d,%d)" , chunkStart , chunkStop , i , len ( c . chunkViews ) , chunk . FileId , chunk . LogicOffset - chunk . Offset , chunk . LogicOffset - chunk . Offset + int64 ( chunk . Size ) )
2020-12-08 18:38:53 +08:00
var buffer [ ] byte
2020-10-04 16:31:04 +08:00
buffer , err = c . readFromWholeChunkData ( chunk , nextChunk )
2020-08-17 12:07:46 +08:00
if err != nil {
glog . Errorf ( "fetching chunk %+v: %v\n" , chunk , err )
return
2020-03-27 19:50:51 +08:00
}
2020-08-17 12:07:46 +08:00
bufferOffset := chunkStart - chunk . LogicOffset + chunk . Offset
2020-08-18 12:17:32 +08:00
copied := copy ( p [ startOffset - offset : chunkStop - chunkStart + startOffset - offset ] , buffer [ bufferOffset : bufferOffset + chunkStop - chunkStart ] )
2020-08-17 12:07:46 +08:00
n += copied
2020-08-18 11:20:08 +08:00
startOffset , remaining = startOffset + int64 ( copied ) , remaining - int64 ( copied )
2020-03-27 19:50:51 +08:00
}
2020-08-16 15:49:08 +08:00
2020-08-18 11:14:40 +08:00
glog . V ( 4 ) . Infof ( "doReadAt [%d,%d), n:%v, err:%v" , offset , offset + int64 ( len ( p ) ) , n , err )
2020-08-16 15:49:08 +08:00
2020-08-18 23:50:14 +08:00
if err == nil && remaining > 0 && c . fileSize > startOffset {
2020-08-30 13:28:33 +08:00
delta := int ( min ( remaining , c . fileSize - startOffset ) )
2020-08-18 15:34:15 +08:00
glog . V ( 4 ) . Infof ( "zero2 [%d,%d) of file size %d bytes" , startOffset , startOffset + int64 ( delta ) , c . fileSize )
2020-08-18 13:46:32 +08:00
n += delta
2020-03-27 19:50:51 +08:00
}
2020-08-18 11:14:40 +08:00
2020-10-15 03:18:24 +08:00
if err == nil && offset + int64 ( len ( p ) ) >= c . fileSize {
2020-08-16 15:49:08 +08:00
err = io . EOF
}
2020-08-17 06:16:46 +08:00
// fmt.Printf("~~~ filled %d, err: %v\n\n", n, err)
2020-03-27 19:50:51 +08:00
return
}
2020-10-08 13:49:04 +08:00
func ( c * ChunkReadAt ) readFromWholeChunkData ( chunkView * ChunkView , nextChunkViews ... * ChunkView ) ( chunkData [ ] byte , err error ) {
2020-10-04 11:16:42 +08:00
2020-10-04 16:31:04 +08:00
v , doErr := c . readOneWholeChunk ( chunkView )
2020-03-29 04:43:31 +08:00
2020-10-04 16:31:04 +08:00
if doErr != nil {
2020-10-09 14:19:20 +08:00
return nil , doErr
2020-04-12 16:13:57 +08:00
}
2020-03-29 15:54:39 +08:00
2020-10-04 16:31:04 +08:00
chunkData = v . ( [ ] byte )
2020-10-06 05:06:18 +08:00
for _ , nextChunkView := range nextChunkViews {
if c . chunkCache != nil && nextChunkView != nil {
go c . readOneWholeChunk ( nextChunkView )
2020-10-04 16:31:04 +08:00
}
2020-10-06 05:06:18 +08:00
}
2020-10-04 16:31:04 +08:00
2020-08-17 06:16:46 +08:00
return
2020-03-29 15:54:39 +08:00
}
2020-10-04 16:31:04 +08:00
func ( c * ChunkReadAt ) readOneWholeChunk ( chunkView * ChunkView ) ( interface { } , error ) {
var err error
return c . fetchGroup . Do ( chunkView . FileId , func ( ) ( interface { } , error ) {
glog . V ( 4 ) . Infof ( "readFromWholeChunkData %s offset %d [%d,%d) size at least %d" , chunkView . FileId , chunkView . Offset , chunkView . LogicOffset , chunkView . LogicOffset + int64 ( chunkView . Size ) , chunkView . ChunkSize )
data := c . chunkCache . GetChunk ( chunkView . FileId , chunkView . ChunkSize )
if data != nil {
glog . V ( 4 ) . Infof ( "cache hit %s [%d,%d)" , chunkView . FileId , chunkView . LogicOffset - chunkView . Offset , chunkView . LogicOffset - chunkView . Offset + int64 ( len ( data ) ) )
} else {
var err error
data , err = c . doFetchFullChunkData ( chunkView )
if err != nil {
return data , err
}
c . chunkCache . SetChunk ( chunkView . FileId , data )
}
return data , err
} )
}
2020-10-04 11:16:42 +08:00
func ( c * ChunkReadAt ) doFetchFullChunkData ( chunkView * ChunkView ) ( [ ] byte , error ) {
2020-10-04 16:31:04 +08:00
2020-10-05 07:21:43 +08:00
glog . V ( 4 ) . Infof ( "+ doFetchFullChunkData %s" , chunkView . FileId )
2020-10-04 16:31:04 +08:00
2020-10-04 11:16:42 +08:00
data , err := fetchChunk ( c . lookupFileId , chunkView . FileId , chunkView . CipherKey , chunkView . IsGzipped )
2020-03-29 15:54:39 +08:00
2020-10-05 07:21:43 +08:00
glog . V ( 4 ) . Infof ( "- doFetchFullChunkData %s" , chunkView . FileId )
2020-10-04 16:31:04 +08:00
2020-10-04 11:16:42 +08:00
return data , err
2020-03-27 19:50:51 +08:00
}