2021-09-01 16:29:22 +08:00
|
|
|
package remote_storage
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"errors"
|
|
|
|
"github.com/chrislusf/seaweedfs/weed/pb"
|
|
|
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
|
|
|
"github.com/chrislusf/seaweedfs/weed/util"
|
|
|
|
"google.golang.org/grpc"
|
|
|
|
)
|
|
|
|
|
|
|
|
const (
|
|
|
|
SyncKeyPrefix = "remote.sync."
|
|
|
|
)
|
|
|
|
|
2021-09-13 13:47:52 +08:00
|
|
|
func GetSyncOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, dir string) (lastOffsetTsNs int64, readErr error) {
|
2021-09-01 16:29:22 +08:00
|
|
|
|
|
|
|
dirHash := uint32(util.HashStringToLong(dir))
|
|
|
|
|
|
|
|
readErr = pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
|
|
|
|
syncKey := []byte(SyncKeyPrefix + "____")
|
|
|
|
util.Uint32toBytes(syncKey[len(SyncKeyPrefix):len(SyncKeyPrefix)+4], dirHash)
|
|
|
|
|
|
|
|
resp, err := client.KvGet(context.Background(), &filer_pb.KvGetRequest{Key: syncKey})
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(resp.Error) != 0 {
|
|
|
|
return errors.New(resp.Error)
|
|
|
|
}
|
|
|
|
if len(resp.Value) < 8 {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
lastOffsetTsNs = int64(util.BytesToUint64(resp.Value))
|
|
|
|
|
|
|
|
return nil
|
|
|
|
})
|
|
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
}
|
|
|
|
|
2021-09-13 13:47:52 +08:00
|
|
|
func SetSyncOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, dir string, offsetTsNs int64) error {
|
2021-09-01 16:29:22 +08:00
|
|
|
|
|
|
|
dirHash := uint32(util.HashStringToLong(dir))
|
|
|
|
|
|
|
|
return pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
|
|
|
|
|
|
|
|
syncKey := []byte(SyncKeyPrefix + "____")
|
|
|
|
util.Uint32toBytes(syncKey[len(SyncKeyPrefix):len(SyncKeyPrefix)+4], dirHash)
|
|
|
|
|
|
|
|
valueBuf := make([]byte, 8)
|
|
|
|
util.Uint64toBytes(valueBuf, uint64(offsetTsNs))
|
|
|
|
|
|
|
|
resp, err := client.KvPut(context.Background(), &filer_pb.KvPutRequest{
|
|
|
|
Key: syncKey,
|
|
|
|
Value: valueBuf,
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(resp.Error) != 0 {
|
|
|
|
return errors.New(resp.Error)
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
}
|