package weed_server

import (
	"context"
	"fmt"
	"strings"
	"sync"
	"time"

	"github.com/seaweedfs/seaweedfs/weed/filer"
	"github.com/seaweedfs/seaweedfs/weed/operation"
	"github.com/seaweedfs/seaweedfs/weed/pb"
	"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
	"github.com/seaweedfs/seaweedfs/weed/pb/remote_pb"
	"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
	"github.com/seaweedfs/seaweedfs/weed/storage/needle"
	"github.com/seaweedfs/seaweedfs/weed/util"
	"google.golang.org/protobuf/proto"
)

func (fs *FilerServer) CacheRemoteObjectToLocalCluster(ctx context.Context, req *filer_pb.CacheRemoteObjectToLocalClusterRequest) (*filer_pb.CacheRemoteObjectToLocalClusterResponse, error) {

	// load all mappings
	mappingEntry, err := fs.filer.FindEntry(ctx, util.JoinPath(filer.DirectoryEtcRemote, filer.REMOTE_STORAGE_MOUNT_FILE))
	if err != nil {
		return nil, err
	}
	mappings, err := filer.UnmarshalRemoteStorageMappings(mappingEntry.Content)
	if err != nil {
		return nil, err
	}

	// find mapping
	var remoteStorageMountedLocation *remote_pb.RemoteStorageLocation
	var localMountedDir string
	for k, loc := range mappings.Mappings {
		if strings.HasPrefix(req.Directory, k) {
			localMountedDir, remoteStorageMountedLocation = k, loc
		}
	}
	if localMountedDir == "" {
		return nil, fmt.Errorf("%s is not mounted", req.Directory)
	}

	// find storage configuration
	storageConfEntry, err := fs.filer.FindEntry(ctx, util.JoinPath(filer.DirectoryEtcRemote, remoteStorageMountedLocation.Name+filer.REMOTE_STORAGE_CONF_SUFFIX))
	if err != nil {
		return nil, err
	}
	storageConf := &remote_pb.RemoteConf{}
	if unMarshalErr := proto.Unmarshal(storageConfEntry.Content, storageConf); unMarshalErr != nil {
		return nil, fmt.Errorf("unmarshal remote storage conf %s/%s: %v", filer.DirectoryEtcRemote, remoteStorageMountedLocation.Name+filer.REMOTE_STORAGE_CONF_SUFFIX, unMarshalErr)
	}

	// find the entry
	entry, err := fs.filer.FindEntry(ctx, util.JoinPath(req.Directory, req.Name))
	if err == filer_pb.ErrNotFound {
		return nil, err
	}

	resp := &filer_pb.CacheRemoteObjectToLocalClusterResponse{}
	if entry.Remote == nil || entry.Remote.RemoteSize == 0 {
		return resp, nil
	}

	// detect storage option
	so, err := fs.detectStorageOption(req.Directory, "", "", 0, "", "", "", "")
	if err != nil {
		return resp, err
	}
	assignRequest, altRequest := so.ToAssignRequests(1)

	// find a good chunk size
	chunkSize := int64(5 * 1024 * 1024)
	chunkCount := entry.Remote.RemoteSize/chunkSize + 1
	for chunkCount > 1000 && chunkSize < int64(fs.option.MaxMB)*1024*1024/2 {
		chunkSize *= 2
		chunkCount = entry.Remote.RemoteSize/chunkSize + 1
	}

	dest := util.FullPath(remoteStorageMountedLocation.Path).Child(string(util.FullPath(req.Directory).Child(req.Name))[len(localMountedDir):])

	var chunks []*filer_pb.FileChunk
	var fetchAndWriteErr error
	var wg sync.WaitGroup

	limitedConcurrentExecutor := util.NewLimitedConcurrentExecutor(8)
	for offset := int64(0); offset < entry.Remote.RemoteSize; offset += chunkSize {
		localOffset := offset

		wg.Add(1)
		limitedConcurrentExecutor.Execute(func() {
			defer wg.Done()
			size := chunkSize
			if localOffset+chunkSize > entry.Remote.RemoteSize {
				size = entry.Remote.RemoteSize - localOffset
			}

			// assign one volume server
			assignResult, err := operation.Assign(fs.filer.GetMaster, fs.grpcDialOption, assignRequest, altRequest)
			if err != nil {
				fetchAndWriteErr = err
				return
			}
			if assignResult.Error != "" {
				fetchAndWriteErr = fmt.Errorf("assign: %v", assignResult.Error)
				return
			}
			fileId, parseErr := needle.ParseFileIdFromString(assignResult.Fid)
			if assignResult.Error != "" {
				fetchAndWriteErr = fmt.Errorf("unrecognized file id %s: %v", assignResult.Fid, parseErr)
				return
			}

			var replicas []*volume_server_pb.FetchAndWriteNeedleRequest_Replica
			for _, r := range assignResult.Replicas {
				replicas = append(replicas, &volume_server_pb.FetchAndWriteNeedleRequest_Replica{
					Url:       r.Url,
					PublicUrl: r.PublicUrl,
					GrpcPort:  int32(r.GrpcPort),
				})
			}

			// tell filer to tell volume server to download into needles
			assignedServerAddress := pb.NewServerAddressWithGrpcPort(assignResult.Url, assignResult.GrpcPort)
			var etag string
			err = operation.WithVolumeServerClient(false, assignedServerAddress, fs.grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
				resp, fetchAndWriteErr := volumeServerClient.FetchAndWriteNeedle(context.Background(), &volume_server_pb.FetchAndWriteNeedleRequest{
					VolumeId:   uint32(fileId.VolumeId),
					NeedleId:   uint64(fileId.Key),
					Cookie:     uint32(fileId.Cookie),
					Offset:     localOffset,
					Size:       size,
					Replicas:   replicas,
					Auth:       string(assignResult.Auth),
					RemoteConf: storageConf,
					RemoteLocation: &remote_pb.RemoteStorageLocation{
						Name:   remoteStorageMountedLocation.Name,
						Bucket: remoteStorageMountedLocation.Bucket,
						Path:   string(dest),
					},
				})
				if fetchAndWriteErr != nil {
					return fmt.Errorf("volume server %s fetchAndWrite %s: %v", assignResult.Url, dest, fetchAndWriteErr)
				} else {
					etag = resp.ETag
				}
				return nil
			})

			if err != nil && fetchAndWriteErr == nil {
				fetchAndWriteErr = err
				return
			}

			chunks = append(chunks, &filer_pb.FileChunk{

				FileId:       assignResult.Fid,
				Offset:       localOffset,
				Size:         uint64(size),
				ModifiedTsNs: time.Now().UnixNano(),
				ETag:         etag,
				Fid: &filer_pb.FileId{
					VolumeId: uint32(fileId.VolumeId),
					FileKey:  uint64(fileId.Key),
					Cookie:   uint32(fileId.Cookie),
				},
			})
		})
	}

	wg.Wait()
	if fetchAndWriteErr != nil {
		return nil, fetchAndWriteErr
	}

	garbage := entry.GetChunks()

	newEntry := entry.ShallowClone()
	newEntry.Chunks = chunks
	newEntry.Remote = proto.Clone(entry.Remote).(*filer_pb.RemoteEntry)
	newEntry.Remote.LastLocalSyncTsNs = time.Now().UnixNano()

	// this skips meta data log events

	if err := fs.filer.Store.UpdateEntry(context.Background(), newEntry); err != nil {
		fs.filer.DeleteUncommittedChunks(chunks)
		return nil, err
	}
	fs.filer.DeleteChunks(entry.FullPath, garbage)

	fs.filer.NotifyUpdateEvent(ctx, entry, newEntry, true, false, nil)

	resp.Entry = newEntry.ToProtoEntry()

	return resp, nil

}