package shell

import (
	"context"
	"errors"
	"flag"
	"fmt"
	"io"
	"net/http"
	"sort"
	"strings"

	"github.com/seaweedfs/seaweedfs/weed/storage/needle"
	"github.com/seaweedfs/seaweedfs/weed/wdclient"
	"golang.org/x/exp/maps"
	"golang.org/x/exp/slices"

	"github.com/seaweedfs/seaweedfs/weed/operation"
	"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
	"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
	"github.com/seaweedfs/seaweedfs/weed/util"
	util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)

func init() {
	Commands = append(Commands, &commandFsMergeVolumes{})
}

type commandFsMergeVolumes struct {
	volumes         map[needle.VolumeId]*master_pb.VolumeInformationMessage
	volumeSizeLimit uint64
}

func (c *commandFsMergeVolumes) Name() string {
	return "fs.mergeVolumes"
}

func (c *commandFsMergeVolumes) Help() string {
	return `re-locate chunks into target volumes and try to clear lighter volumes.
	
	This would help clear half-full volumes and let vacuum system to delete them later.

	fs.mergeVolumes [-toVolumeId=y] [-fromVolumeId=x] [-collection="*"] [-dir=/] [-apply]
`
}

func (c *commandFsMergeVolumes) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {

	fsMergeVolumesCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
	dirArg := fsMergeVolumesCommand.String("dir", "/", "base directory to find and update files")
	fromVolumeArg := fsMergeVolumesCommand.Uint("fromVolumeId", 0, "move chunks with this volume id")
	toVolumeArg := fsMergeVolumesCommand.Uint("toVolumeId", 0, "change chunks to this volume id")
	collectionArg := fsMergeVolumesCommand.String("collection", "*", "Name of collection to merge")
	apply := fsMergeVolumesCommand.Bool("apply", false, "applying the metadata changes")
	if err = fsMergeVolumesCommand.Parse(args); err != nil {
		return err
	}

	dir := *dirArg
	if dir != "/" {
		dir = strings.TrimRight(dir, "/")
	}

	fromVolumeId := needle.VolumeId(*fromVolumeArg)
	toVolumeId := needle.VolumeId(*toVolumeArg)

	c.reloadVolumesInfo(commandEnv.MasterClient)

	if fromVolumeId != 0 && toVolumeId != 0 {
		if fromVolumeId == toVolumeId {
			return fmt.Errorf("no volume id changes, %d == %d", fromVolumeId, toVolumeId)
		}
		compatible, err := c.volumesAreCompatible(fromVolumeId, toVolumeId)
		if err != nil {
			return fmt.Errorf("cannot determine volumes are compatible: %d and %d", fromVolumeId, toVolumeId)
		}
		if !compatible {
			return fmt.Errorf("volume %d is not compatible with volume %d", fromVolumeId, toVolumeId)
		}
		fromSize := c.getVolumeSizeById(fromVolumeId)
		toSize := c.getVolumeSizeById(toVolumeId)
		if fromSize+toSize > c.volumeSizeLimit {
			return fmt.Errorf(
				"volume %d (%d MB) cannot merge into volume %d (%d MB_ due to volume size limit (%d MB)",
				fromVolumeId, fromSize/1024/1024,
				toVolumeId, toSize/1024/1024,
				c.volumeSizeLimit/1024/102,
			)
		}
	}

	plan, err := c.createMergePlan(*collectionArg, toVolumeId, fromVolumeId)

	if err != nil {
		return err
	}
	c.printPlan(plan)

	if len(plan) == 0 {
		return nil
	}

	defer util_http.GetGlobalHttpClient().CloseIdleConnections()

	return commandEnv.WithFilerClient(false, func(filerClient filer_pb.SeaweedFilerClient) error {
		return filer_pb.TraverseBfs(commandEnv, util.FullPath(dir), func(parentPath util.FullPath, entry *filer_pb.Entry) {
			if entry.IsDirectory {
				return
			}
			for _, chunk := range entry.Chunks {
				if chunk.IsChunkManifest {
					fmt.Printf("Change volume id for large file is not implemented yet: %s/%s\n", parentPath, entry.Name)
					continue
				}
				chunkVolumeId := needle.VolumeId(chunk.Fid.VolumeId)
				toVolumeId, found := plan[chunkVolumeId]
				if !found {
					continue
				}
				path := parentPath.Child(entry.Name)

				fmt.Printf("move %s(%s)\n", path, chunk.GetFileIdString())
				if !*apply {
					continue
				}
				if err = moveChunk(chunk, toVolumeId, commandEnv.MasterClient); err != nil {
					fmt.Printf("failed to move %s/%s: %v\n", path, chunk.GetFileIdString(), err)
					continue
				}

				if err = filer_pb.UpdateEntry(filerClient, &filer_pb.UpdateEntryRequest{
					Directory: string(parentPath),
					Entry:     entry,
				}); err != nil {
					fmt.Printf("failed to update %s: %v\n", path, err)
				}
			}
		})
	})
}

func (c *commandFsMergeVolumes) getVolumeInfoById(vid needle.VolumeId) (*master_pb.VolumeInformationMessage, error) {
	info := c.volumes[vid]
	var err error
	if info == nil {
		err = errors.New("cannot find volume")
	}
	return info, err
}

func (c *commandFsMergeVolumes) volumesAreCompatible(src needle.VolumeId, dest needle.VolumeId) (bool, error) {
	srcInfo, err := c.getVolumeInfoById(src)
	if err != nil {
		return false, err
	}
	destInfo, err := c.getVolumeInfoById(dest)
	if err != nil {
		return false, err
	}
	return (srcInfo.Collection == destInfo.Collection &&
		srcInfo.Ttl == destInfo.Ttl &&
		srcInfo.ReplicaPlacement == destInfo.ReplicaPlacement), nil
}

func (c *commandFsMergeVolumes) reloadVolumesInfo(masterClient *wdclient.MasterClient) error {
	c.volumes = make(map[needle.VolumeId]*master_pb.VolumeInformationMessage)

	return masterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
		volumes, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
		if err != nil {
			return err
		}

		c.volumeSizeLimit = volumes.GetVolumeSizeLimitMb() * 1024 * 1024

		for _, dc := range volumes.TopologyInfo.DataCenterInfos {
			for _, rack := range dc.RackInfos {
				for _, node := range rack.DataNodeInfos {
					for _, disk := range node.DiskInfos {
						for _, volume := range disk.VolumeInfos {
							vid := needle.VolumeId(volume.Id)
							if found := c.volumes[vid]; found == nil {
								c.volumes[vid] = volume
							}
						}
					}
				}
			}
		}
		return nil
	})
}

func (c *commandFsMergeVolumes) createMergePlan(collection string, toVolumeId needle.VolumeId, fromVolumeId needle.VolumeId) (map[needle.VolumeId]needle.VolumeId, error) {
	plan := make(map[needle.VolumeId]needle.VolumeId)
	volumes := maps.Keys(c.volumes)
	sort.Slice(volumes, func(a, b int) bool {
		return c.volumes[volumes[b]].Size < c.volumes[volumes[a]].Size
	})

	l := len(volumes)
	for i := 0; i < l; i++ {
		volume := c.volumes[volumes[i]]
		if volume.GetReadOnly() || c.getVolumeSize(volume) == 0 || (collection != "*" && collection != volume.GetCollection()) {
			volumes = slices.Delete(volumes, i, i+1)
			i--
			l--
		}
	}
	for i := l - 1; i >= 0; i-- {
		src := volumes[i]
		if fromVolumeId != 0 && src != fromVolumeId {
			continue
		}
		for j := 0; j < i; j++ {
			condidate := volumes[j]
			if toVolumeId != 0 && condidate != toVolumeId {
				continue
			}
			if _, moving := plan[condidate]; moving {
				continue
			}
			compatible, err := c.volumesAreCompatible(src, condidate)
			if err != nil {
				return nil, err
			}
			if !compatible {
				continue
			}
			if c.getVolumeSizeBasedOnPlan(plan, condidate)+c.getVolumeSizeById(src) > c.volumeSizeLimit {
				continue
			}
			plan[src] = condidate
			break
		}
	}

	return plan, nil
}

func (c *commandFsMergeVolumes) getVolumeSizeBasedOnPlan(plan map[needle.VolumeId]needle.VolumeId, vid needle.VolumeId) uint64 {
	size := c.getVolumeSizeById(vid)
	for src, dest := range plan {
		if dest == vid {
			size += c.getVolumeSizeById(src)
		}
	}
	return size
}

func (c *commandFsMergeVolumes) getVolumeSize(volume *master_pb.VolumeInformationMessage) uint64 {
	return volume.Size - volume.DeletedByteCount
}

func (c *commandFsMergeVolumes) getVolumeSizeById(vid needle.VolumeId) uint64 {
	return c.getVolumeSize(c.volumes[vid])
}

func (c *commandFsMergeVolumes) printPlan(plan map[needle.VolumeId]needle.VolumeId) {
	fmt.Printf("max volume size: %d MB\n", c.volumeSizeLimit/1024/1024)
	reversePlan := make(map[needle.VolumeId][]needle.VolumeId)
	for src, dest := range plan {
		reversePlan[dest] = append(reversePlan[dest], src)
	}
	for dest, srcs := range reversePlan {
		currentSize := c.getVolumeSizeById(dest)
		for _, src := range srcs {
			srcSize := c.getVolumeSizeById(src)
			newSize := currentSize + srcSize
			fmt.Printf(
				"volume %d (%d MB) merge into volume %d (%d MB => %d MB)\n",
				src, srcSize/1024/1024,
				dest, currentSize/1024/1024, newSize/1024/1024,
			)
			currentSize = newSize

		}
		fmt.Println()
	}
}

func moveChunk(chunk *filer_pb.FileChunk, toVolumeId needle.VolumeId, masterClient *wdclient.MasterClient) error {
	fromFid := needle.NewFileId(needle.VolumeId(chunk.Fid.VolumeId), chunk.Fid.FileKey, chunk.Fid.Cookie)
	toFid := needle.NewFileId(toVolumeId, chunk.Fid.FileKey, chunk.Fid.Cookie)

	downloadURLs, err := masterClient.LookupVolumeServerUrl(fromFid.VolumeId.String())
	if err != nil {
		return err
	}

	downloadURL := fmt.Sprintf("http://%s/%s?readDeleted=true", downloadURLs[0], fromFid.String())

	uploadURLs, err := masterClient.LookupVolumeServerUrl(toVolumeId.String())
	if err != nil {
		return err
	}
	uploadURL := fmt.Sprintf("http://%s/%s", uploadURLs[0], toFid.String())

	resp, reader, err := readUrl(downloadURL)
	if err != nil {
		return err
	}
	defer util_http.CloseResponse(resp)
	defer reader.Close()

	var filename string

	contentDisposition := resp.Header.Get("Content-Disposition")
	if len(contentDisposition) > 0 {
		idx := strings.Index(contentDisposition, "filename=")
		if idx != -1 {
			filename = contentDisposition[idx+len("filename="):]
			filename = strings.Trim(filename, "\"")
		}
	}

	contentType := resp.Header.Get("Content-Type")
	isCompressed := resp.Header.Get("Content-Encoding") == "gzip"
	md5 := resp.Header.Get("Content-MD5")

	uploader, err := operation.NewUploader()
	if err != nil {
		return err
	}
	
	_, err, _ = uploader.Upload(reader, &operation.UploadOption{
		UploadUrl:         uploadURL,
		Filename:          filename,
		IsInputCompressed: isCompressed,
		Cipher:            false,
		MimeType:          contentType,
		PairMap:           nil,
		Md5:               md5,
	})
	if err != nil {
		return err
	}
	chunk.Fid.VolumeId = uint32(toVolumeId)
	chunk.FileId = ""

	return nil
}

func readUrl(fileUrl string) (*http.Response, io.ReadCloser, error) {

	req, err := http.NewRequest(http.MethodGet, fileUrl, nil)
	if err != nil {
		return nil, nil, err
	}
	req.Header.Add("Accept-Encoding", "gzip")

	r, err := util_http.GetGlobalHttpClient().Do(req)
	if err != nil {
		return nil, nil, err
	}
	if r.StatusCode >= 400 {
		util_http.CloseResponse(r)
		return nil, nil, fmt.Errorf("%s: %s", fileUrl, r.Status)
	}

	return r, r.Body, nil
}