seaweedfs/weed/shell/command_volume_fsck.go

745 lines
24 KiB
Go
Raw Normal View History

package shell
import (
"bufio"
"bytes"
"context"
"errors"
2020-03-25 13:38:33 +08:00
"flag"
"fmt"
2022-04-01 00:36:10 +08:00
"io"
"math"
"net/http"
"net/url"
"os"
"path"
"path/filepath"
"strconv"
"strings"
2022-04-01 00:36:10 +08:00
"sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/storage"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/util"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
"golang.org/x/sync/errgroup"
)
func init() {
Commands = append(Commands, &commandVolumeFsck{})
}
const (
readbufferSize = 16
)
type commandVolumeFsck struct {
env *CommandEnv
writer io.Writer
bucketsPath string
collection *string
volumeIds map[uint32]bool
tempFolder string
verbose *bool
forcePurging *bool
findMissingChunksInFiler *bool
verifyNeedle *bool
}
func (c *commandVolumeFsck) Name() string {
return "volume.fsck"
}
func (c *commandVolumeFsck) Help() string {
return `check all volumes to find entries not used by the filer
Important assumption!!!
the system is all used by one filer.
This command works this way:
1. collect all file ids from all volumes, as set A
2. collect all file ids from the filer, as set B
3. find out the set A subtract B
If -findMissingChunksInFiler is enabled, this works
in a reverse way:
1. collect all file ids from all volumes, as set A
2. collect all file ids from the filer, as set B
3. find out the set B subtract A
`
}
func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
2020-03-25 13:38:33 +08:00
fsckCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
c.verbose = fsckCommand.Bool("v", false, "verbose mode")
c.findMissingChunksInFiler = fsckCommand.Bool("findMissingChunksInFiler", false, "see \"help volume.fsck\"")
c.collection = fsckCommand.String("collection", "", "the collection name")
volumeIds := fsckCommand.String("volumeId", "", "comma separated the volume id")
applyPurging := fsckCommand.Bool("reallyDeleteFromVolume", false, "<expert only!> after detection, delete missing data from volumes / delete missing file entries from filer. Currently this only works with default filerGroup.")
2022-04-26 02:11:56 +08:00
c.forcePurging = fsckCommand.Bool("forcePurging", false, "delete missing data from volumes in one replica used together with applyPurging")
2022-02-13 07:53:35 +08:00
purgeAbsent := fsckCommand.Bool("reallyDeleteFilerEntries", false, "<expert only!> delete missing file entries from filer if the corresponding volume is missing for any reason, please ensure all still existing/expected volumes are connected! used together with findMissingChunksInFiler")
2022-03-31 22:10:06 +08:00
tempPath := fsckCommand.String("tempPath", path.Join(os.TempDir()), "path for temporary idx files")
cutoffTimeAgo := fsckCommand.Duration("cutoffTimeAgo", 5*time.Minute, "only include entries on volume servers before this cutoff time to check orphan chunks")
modifyTimeAgo := fsckCommand.Duration("modifyTimeAgo", 0, "only include entries after this modify time to check orphan chunks")
c.verifyNeedle = fsckCommand.Bool("verifyNeedles", false, "check needles status from volume server")
2022-03-31 22:10:06 +08:00
2020-03-25 13:38:33 +08:00
if err = fsckCommand.Parse(args); err != nil {
return nil
}
2021-12-11 05:24:38 +08:00
if err = commandEnv.confirmIsLocked(args); err != nil {
return
}
c.volumeIds = make(map[uint32]bool)
if *volumeIds != "" {
for _, volumeIdStr := range strings.Split(*volumeIds, ",") {
if volumeIdInt, err := strconv.ParseUint(volumeIdStr, 10, 32); err == nil {
c.volumeIds[uint32(volumeIdInt)] = true
} else {
return fmt.Errorf("parse volumeId string %s to int: %v", volumeIdStr, err)
}
}
}
c.env = commandEnv
c.writer = writer
c.bucketsPath, err = readFilerBucketsPath(commandEnv)
if err != nil {
return fmt.Errorf("read filer buckets path: %v", err)
}
// create a temp folder
c.tempFolder, err = os.MkdirTemp(*tempPath, "sw_fsck")
if err != nil {
return fmt.Errorf("failed to create temp folder: %v", err)
}
if *c.verbose {
fmt.Fprintf(c.writer, "working directory: %s\n", c.tempFolder)
2020-03-25 13:38:33 +08:00
}
defer os.RemoveAll(c.tempFolder)
2020-03-25 15:56:47 +08:00
// collect all volume id locations
dataNodeVolumeIdToVInfo, err := c.collectVolumeIds()
2020-03-25 15:56:47 +08:00
if err != nil {
return fmt.Errorf("failed to collect all volume locations: %v", err)
}
2022-03-31 16:35:58 +08:00
if err != nil {
return fmt.Errorf("read filer buckets path: %v", err)
}
var collectCutoffFromAtNs int64 = 0
if cutoffTimeAgo.Seconds() != 0 {
collectCutoffFromAtNs = time.Now().Add(-*cutoffTimeAgo).UnixNano()
}
var collectModifyFromAtNs int64 = 0
if modifyTimeAgo.Seconds() != 0 {
collectModifyFromAtNs = time.Now().Add(-*modifyTimeAgo).UnixNano()
}
// collect each volume file ids
eg, gCtx := errgroup.WithContext(context.Background())
_ = gCtx
for _dataNodeId, _volumeIdToVInfo := range dataNodeVolumeIdToVInfo {
dataNodeId, volumeIdToVInfo := _dataNodeId, _volumeIdToVInfo
eg.Go(func() error {
for volumeId, vinfo := range volumeIdToVInfo {
if len(c.volumeIds) > 0 {
if _, ok := c.volumeIds[volumeId]; !ok {
delete(volumeIdToVInfo, volumeId)
continue
}
}
if *c.collection != "" && vinfo.collection != *c.collection {
delete(volumeIdToVInfo, volumeId)
continue
}
err = c.collectOneVolumeFileIds(dataNodeId, volumeId, vinfo)
if err != nil {
return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, err)
}
2022-03-31 22:10:06 +08:00
}
if *c.verbose {
fmt.Fprintf(c.writer, "dn %+v filtred %d volumes and locations.\n", dataNodeId, len(dataNodeVolumeIdToVInfo[dataNodeId]))
2022-03-31 22:10:06 +08:00
}
return nil
})
}
err = eg.Wait()
if err != nil {
fmt.Fprintf(c.writer, "got error: %v", err)
return err
}
if *c.findMissingChunksInFiler {
// collect all filer file ids and paths
if err = c.collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo, *purgeAbsent, collectModifyFromAtNs, collectCutoffFromAtNs); err != nil {
return fmt.Errorf("collectFilerFileIdAndPaths: %v", err)
}
2022-03-31 22:10:06 +08:00
for dataNodeId, volumeIdToVInfo := range dataNodeVolumeIdToVInfo {
// for each volume, check filer file ids
if err = c.findFilerChunksMissingInVolumeServers(volumeIdToVInfo, dataNodeId, *applyPurging); err != nil {
2022-03-31 22:10:06 +08:00
return fmt.Errorf("findFilerChunksMissingInVolumeServers: %v", err)
}
}
} else {
// collect all filer file ids
if err = c.collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo, false, 0, 0); err != nil {
return fmt.Errorf("failed to collect file ids from filer: %v", err)
}
// volume file ids subtract filer file ids
if err = c.findExtraChunksInVolumeServers(dataNodeVolumeIdToVInfo, *applyPurging, uint64(collectModifyFromAtNs), uint64(collectCutoffFromAtNs)); err != nil {
return fmt.Errorf("findExtraChunksInVolumeServers: %v", err)
}
}
return nil
}
func (c *commandVolumeFsck) collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo map[string]map[uint32]VInfo, purgeAbsent bool, collectModifyFromAtNs int64, cutoffFromAtNs int64) error {
if *c.verbose {
fmt.Fprintf(c.writer, "checking each file from filer path %s...\n", c.getCollectFilerFilePath())
}
files := make(map[uint32]*os.File)
2022-03-31 22:10:06 +08:00
for _, volumeIdToServer := range dataNodeVolumeIdToVInfo {
for vid := range volumeIdToServer {
2022-04-01 13:17:09 +08:00
if _, ok := files[vid]; ok {
continue
}
dst, openErr := os.OpenFile(getFilerFileIdFile(c.tempFolder, vid), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
2022-03-31 22:10:06 +08:00
if openErr != nil {
return fmt.Errorf("failed to create file %s: %v", getFilerFileIdFile(c.tempFolder, vid), openErr)
2022-03-31 22:10:06 +08:00
}
2022-04-01 13:17:09 +08:00
files[vid] = dst
}
}
defer func() {
for _, f := range files {
f.Close()
}
}()
more solid weed mount (#4089) * compare chunks by timestamp * fix slab clearing error * fix test compilation * move oldest chunk to sealed, instead of by fullness * lock on fh.entryViewCache * remove verbose logs * revert slat clearing * less logs * less logs * track write and read by timestamp * remove useless logic * add entry lock on file handle release * use mem chunk only, swap file chunk has problems * comment out code that maybe used later * add debug mode to compare data read and write * more efficient readResolvedChunks with linked list * small optimization * fix test compilation * minor fix on writer * add SeparateGarbageChunks * group chunks into sections * turn off debug mode * fix tests * fix tests * tmp enable swap file chunk * Revert "tmp enable swap file chunk" This reverts commit 985137ec472924e4815f258189f6ca9f2168a0a7. * simple refactoring * simple refactoring * do not re-use swap file chunk. Sealed chunks should not be re-used. * comment out debugging facilities * either mem chunk or swap file chunk is fine now * remove orderedMutex as *semaphore.Weighted not found impactful * optimize size calculation for changing large files * optimize performance to avoid going through the long list of chunks * still problems with swap file chunk * rename * tiny optimization * swap file chunk save only successfully read data * fix * enable both mem and swap file chunk * resolve chunks with range * rename * fix chunk interval list * also change file handle chunk group when adding chunks * pick in-active chunk with time-decayed counter * fix compilation * avoid nil with empty fh.entry * refactoring * rename * rename * refactor visible intervals to *list.List * refactor chunkViews to *list.List * add IntervalList for generic interval list * change visible interval to use IntervalList in generics * cahnge chunkViews to *IntervalList[*ChunkView] * use NewFileChunkSection to create * rename variables * refactor * fix renaming leftover * renaming * renaming * add insert interval * interval list adds lock * incrementally add chunks to readers Fixes: 1. set start and stop offset for the value object 2. clone the value object 3. use pointer instead of copy-by-value when passing to interval.Value 4. use insert interval since adding chunk could be out of order * fix tests compilation * fix tests compilation
2023-01-03 15:20:45 +08:00
return doTraverseBfsAndSaving(c.env, c.writer, c.getCollectFilerFilePath(), false,
func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) {
if *c.verbose && entry.Entry.IsDirectory {
fmt.Fprintf(c.writer, "checking directory %s\n", util.NewFullPath(entry.Dir, entry.Entry.Name))
2022-04-01 00:36:10 +08:00
}
dataChunks, manifestChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.GetChunks(), 0, math.MaxInt64)
if resolveErr != nil {
return fmt.Errorf("failed to ResolveChunkManifest: %+v", resolveErr)
}
dataChunks = append(dataChunks, manifestChunks...)
for _, chunk := range dataChunks {
if cutoffFromAtNs != 0 && chunk.ModifiedTsNs > cutoffFromAtNs {
continue
}
if collectModifyFromAtNs != 0 && chunk.ModifiedTsNs < collectModifyFromAtNs {
continue
}
outputChan <- &Item{
vid: chunk.Fid.VolumeId,
fileKey: chunk.Fid.FileKey,
cookie: chunk.Fid.Cookie,
path: util.NewFullPath(entry.Dir, entry.Entry.Name),
}
2022-01-22 04:08:58 +08:00
}
return nil
},
func(outputChan chan interface{}) {
buffer := make([]byte, readbufferSize)
for item := range outputChan {
i := item.(*Item)
if f, ok := files[i.vid]; ok {
util.Uint64toBytes(buffer, i.fileKey)
util.Uint32toBytes(buffer[8:], i.cookie)
util.Uint32toBytes(buffer[12:], uint32(len(i.path)))
f.Write(buffer)
f.Write([]byte(i.path))
} else if *c.findMissingChunksInFiler && len(c.volumeIds) == 0 {
fmt.Fprintf(c.writer, "%d,%x%08x %s volume not found\n", i.vid, i.fileKey, i.cookie, i.path)
if purgeAbsent {
fmt.Printf("deleting path %s after volume not found", i.path)
c.httpDelete(i.path)
}
}
}
})
}
func (c *commandVolumeFsck) findFilerChunksMissingInVolumeServers(volumeIdToVInfo map[uint32]VInfo, dataNodeId string, applyPurging bool) error {
2021-06-25 08:22:53 +08:00
for volumeId, vinfo := range volumeIdToVInfo {
checkErr := c.oneVolumeFileIdsCheckOneVolume(dataNodeId, volumeId, applyPurging)
if checkErr != nil {
return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, checkErr)
}
}
return nil
2021-06-25 08:22:53 +08:00
}
func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVInfo map[string]map[uint32]VInfo, applyPurging bool, modifyFrom, cutoffFrom uint64) error {
2020-03-25 13:38:33 +08:00
var totalInUseCount, totalOrphanChunkCount, totalOrphanDataSize uint64
volumeIdOrphanFileIds := make(map[uint32]map[string]bool)
isSeveralReplicas := make(map[uint32]bool)
isEcVolumeReplicas := make(map[uint32]bool)
isReadOnlyReplicas := make(map[uint32]bool)
serverReplicas := make(map[uint32][]pb.ServerAddress)
2022-03-31 22:10:06 +08:00
for dataNodeId, volumeIdToVInfo := range dataNodeVolumeIdToVInfo {
for volumeId, vinfo := range volumeIdToVInfo {
inUseCount, orphanFileIds, orphanDataSize, checkErr := c.oneVolumeFileIdsSubtractFilerFileIds(dataNodeId, volumeId, &vinfo, modifyFrom, cutoffFrom)
2022-03-31 22:10:06 +08:00
if checkErr != nil {
return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, checkErr)
}
isSeveralReplicas[volumeId] = false
if _, found := volumeIdOrphanFileIds[volumeId]; !found {
volumeIdOrphanFileIds[volumeId] = make(map[string]bool)
} else {
isSeveralReplicas[volumeId] = true
}
for _, fid := range orphanFileIds {
if isSeveralReplicas[volumeId] {
if _, found := volumeIdOrphanFileIds[volumeId][fid]; !found {
continue
}
}
volumeIdOrphanFileIds[volumeId][fid] = isSeveralReplicas[volumeId]
}
2022-03-31 22:10:06 +08:00
totalInUseCount += inUseCount
totalOrphanChunkCount += uint64(len(orphanFileIds))
totalOrphanDataSize += orphanDataSize
if *c.verbose {
2022-03-31 22:10:06 +08:00
for _, fid := range orphanFileIds {
fmt.Fprintf(c.writer, "%s:%s\n", vinfo.collection, fid)
2022-03-31 22:10:06 +08:00
}
2020-03-25 17:41:22 +08:00
}
isEcVolumeReplicas[volumeId] = vinfo.isEcVolume
if isReadOnly, found := isReadOnlyReplicas[volumeId]; !(found && isReadOnly) {
isReadOnlyReplicas[volumeId] = vinfo.isReadOnly
}
serverReplicas[volumeId] = append(serverReplicas[volumeId], vinfo.server)
}
for volumeId, orphanReplicaFileIds := range volumeIdOrphanFileIds {
if !(applyPurging && len(orphanReplicaFileIds) > 0) {
continue
}
orphanFileIds := []string{}
for fid, foundInAllReplicas := range orphanReplicaFileIds {
2022-04-26 02:10:01 +08:00
if !isSeveralReplicas[volumeId] || *c.forcePurging || (isSeveralReplicas[volumeId] && foundInAllReplicas) {
orphanFileIds = append(orphanFileIds, fid)
}
}
if !(len(orphanFileIds) > 0) {
continue
}
if *c.verbose {
fmt.Fprintf(c.writer, "purging process for volume %d.\n", volumeId)
}
if isEcVolumeReplicas[volumeId] {
fmt.Fprintf(c.writer, "skip purging for Erasure Coded volume %d.\n", volumeId)
continue
}
for _, server := range serverReplicas[volumeId] {
2022-03-31 22:10:06 +08:00
needleVID := needle.VolumeId(volumeId)
if isReadOnlyReplicas[volumeId] {
err := markVolumeWritable(c.env.option.GrpcDialOption, needleVID, server, true, false)
2022-03-31 22:10:06 +08:00
if err != nil {
return fmt.Errorf("mark volume %d read/write: %v", volumeId, err)
}
fmt.Fprintf(c.writer, "temporarily marked %d on server %v writable for forced purge\n", volumeId, server)
defer markVolumeWritable(c.env.option.GrpcDialOption, needleVID, server, false, false)
fmt.Fprintf(c.writer, "marked %d on server %v writable for forced purge\n", volumeId, server)
}
if *c.verbose {
fmt.Fprintf(c.writer, "purging files from volume %d\n", volumeId)
2022-03-31 22:10:06 +08:00
}
if err := c.purgeFileIdsForOneVolume(volumeId, orphanFileIds); err != nil {
2022-03-31 22:10:06 +08:00
return fmt.Errorf("purging volume %d: %v", volumeId, err)
}
2020-03-25 17:21:15 +08:00
}
}
}
if !applyPurging {
2020-03-25 17:21:15 +08:00
pct := float64(totalOrphanChunkCount*100) / (float64(totalOrphanChunkCount + totalInUseCount))
fmt.Fprintf(c.writer, "\nTotal\t\tentries:%d\torphan:%d\t%.2f%%\t%dB\n",
2020-03-25 17:21:15 +08:00
totalOrphanChunkCount+totalInUseCount, totalOrphanChunkCount, pct, totalOrphanDataSize)
2020-03-25 13:38:33 +08:00
fmt.Fprintf(c.writer, "This could be normal if multiple filers or no filers are used.\n")
2020-03-25 13:38:33 +08:00
}
if totalOrphanChunkCount == 0 {
fmt.Fprintf(c.writer, "no orphan data\n")
}
return nil
}
func (c *commandVolumeFsck) collectOneVolumeFileIds(dataNodeId string, volumeId uint32, vinfo VInfo) error {
2020-03-25 15:56:47 +08:00
if *c.verbose {
fmt.Fprintf(c.writer, "collecting volume %d file ids from %s ...\n", volumeId, vinfo.server)
2020-03-25 15:56:47 +08:00
}
return operation.WithVolumeServerClient(false, vinfo.server, c.env.option.GrpcDialOption,
func(volumeServerClient volume_server_pb.VolumeServerClient) error {
ext := ".idx"
if vinfo.isEcVolume {
ext = ".ecx"
}
copyFileClient, err := volumeServerClient.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{
VolumeId: volumeId,
Ext: ext,
CompactionRevision: math.MaxUint32,
StopOffset: math.MaxInt64,
Collection: vinfo.collection,
IsEcVolume: vinfo.isEcVolume,
IgnoreSourceFileNotFound: false,
})
if err != nil {
return fmt.Errorf("failed to start copying volume %d%s: %v", volumeId, ext, err)
}
var buf bytes.Buffer
for {
resp, err := copyFileClient.Recv()
if errors.Is(err, io.EOF) {
break
}
if err != nil {
return err
}
buf.Write(resp.FileContent)
}
idxFilename := getVolumeFileIdFile(c.tempFolder, dataNodeId, volumeId)
err = writeToFile(buf.Bytes(), idxFilename)
if err != nil {
return fmt.Errorf("failed to copy %d%s from %s: %v", volumeId, ext, vinfo.server, err)
}
return nil
})
}
type Item struct {
vid uint32
fileKey uint64
cookie uint32
path util.FullPath
}
func (c *commandVolumeFsck) readFilerFileIdFile(volumeId uint32, fn func(needleId types.NeedleId, itemPath util.FullPath)) error {
fp, err := os.Open(getFilerFileIdFile(c.tempFolder, volumeId))
if err != nil {
return err
}
defer fp.Close()
br := bufio.NewReader(fp)
buffer := make([]byte, readbufferSize)
var readSize int
var readErr error
item := &Item{vid: volumeId}
for {
readSize, readErr = io.ReadFull(br, buffer)
if errors.Is(readErr, io.EOF) {
break
}
if readErr != nil {
return readErr
}
if readSize != readbufferSize {
return fmt.Errorf("readSize mismatch")
}
item.fileKey = util.BytesToUint64(buffer[:8])
item.cookie = util.BytesToUint32(buffer[8:12])
pathSize := util.BytesToUint32(buffer[12:16])
pathBytes := make([]byte, int(pathSize))
2021-06-28 14:32:57 +08:00
n, err := io.ReadFull(br, pathBytes)
if err != nil {
fmt.Fprintf(c.writer, "%d,%x%08x in unexpected error: %v\n", volumeId, item.fileKey, item.cookie, err)
2021-06-28 14:32:57 +08:00
}
if n != int(pathSize) {
fmt.Fprintf(c.writer, "%d,%x%08x %d unexpected file name size %d\n", volumeId, item.fileKey, item.cookie, pathSize, n)
2021-06-28 14:32:57 +08:00
}
item.path = util.FullPath(pathBytes)
needleId := types.NeedleId(item.fileKey)
fn(needleId, item.path)
}
return nil
}
func (c *commandVolumeFsck) oneVolumeFileIdsCheckOneVolume(dataNodeId string, volumeId uint32, applyPurging bool) (err error) {
if *c.verbose {
fmt.Fprintf(c.writer, "find missing file chunks in dataNodeId %s volume %d ...\n", dataNodeId, volumeId)
}
db := needle_map.NewMemDb()
defer db.Close()
if err = db.LoadFromIdx(getVolumeFileIdFile(c.tempFolder, dataNodeId, volumeId)); err != nil {
return
}
if err = c.readFilerFileIdFile(volumeId, func(needleId types.NeedleId, itemPath util.FullPath) {
if _, found := db.Get(needleId); !found {
fmt.Fprintf(c.writer, "%s\n", itemPath)
if applyPurging {
c.httpDelete(itemPath)
}
}
}); err != nil {
return
}
return nil
}
func (c *commandVolumeFsck) httpDelete(path util.FullPath) {
req, err := http.NewRequest(http.MethodDelete, "", nil)
req.URL = &url.URL{
Scheme: "http",
Host: c.env.option.FilerAddress.ToHttpAddress(),
Path: string(path),
}
if *c.verbose {
fmt.Fprintf(c.writer, "full HTTP delete request to be sent: %v\n", req)
}
if err != nil {
fmt.Fprintf(c.writer, "HTTP delete request error: %v\n", err)
}
resp, err := util_http.GetGlobalHttpClient().Do(req)
if err != nil {
fmt.Fprintf(c.writer, "DELETE fetch error: %v\n", err)
}
defer resp.Body.Close()
_, err = io.ReadAll(resp.Body)
if err != nil {
fmt.Fprintf(c.writer, "DELETE response error: %v\n", err)
}
if *c.verbose {
fmt.Fprintln(c.writer, "delete response Status : ", resp.Status)
fmt.Fprintln(c.writer, "delete response Headers : ", resp.Header)
}
}
func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(dataNodeId string, volumeId uint32, vinfo *VInfo, modifyFrom, cutoffFrom uint64) (inUseCount uint64, orphanFileIds []string, orphanDataSize uint64, err error) {
volumeFileIdDb := needle_map.NewMemDb()
defer volumeFileIdDb.Close()
if err = volumeFileIdDb.LoadFromIdx(getVolumeFileIdFile(c.tempFolder, dataNodeId, volumeId)); err != nil {
err = fmt.Errorf("failed to LoadFromIdx %+v", err)
return
}
if err = c.readFilerFileIdFile(volumeId, func(filerNeedleId types.NeedleId, itemPath util.FullPath) {
inUseCount++
if *c.verifyNeedle {
if needleValue, ok := volumeFileIdDb.Get(filerNeedleId); ok && !needleValue.Size.IsDeleted() {
if _, err := readNeedleStatus(c.env.option.GrpcDialOption, vinfo.server, volumeId, *needleValue); err != nil {
// files may be deleted during copying filesIds
if !strings.Contains(err.Error(), storage.ErrorDeleted.Error()) {
fmt.Fprintf(c.writer, "failed to read %d:%s needle status of file %s: %+v\n",
volumeId, filerNeedleId.String(), itemPath, err)
if *c.forcePurging {
return
}
}
}
}
}
if err = volumeFileIdDb.Delete(filerNeedleId); err != nil && *c.verbose {
fmt.Fprintf(c.writer, "failed to nm.delete %s(%+v): %+v", itemPath, filerNeedleId, err)
}
}); err != nil {
err = fmt.Errorf("failed to readFilerFileIdFile %+v", err)
return
}
2020-03-25 17:21:15 +08:00
var orphanFileCount uint64
if err = volumeFileIdDb.AscendingVisit(func(n needle_map.NeedleValue) error {
if n.Size.IsDeleted() {
return nil
}
if cutoffFrom > 0 || modifyFrom > 0 {
return operation.WithVolumeServerClient(false, vinfo.server, c.env.option.GrpcDialOption,
func(volumeServerClient volume_server_pb.VolumeServerClient) error {
resp, err := volumeServerClient.ReadNeedleMeta(context.Background(), &volume_server_pb.ReadNeedleMetaRequest{
VolumeId: volumeId,
NeedleId: types.NeedleIdToUint64(n.Key),
Offset: n.Offset.ToActualOffset(),
Size: int32(n.Size),
})
if err != nil {
return fmt.Errorf("read needle meta with id %d from volume %d: %v", n.Key, volumeId, err)
}
if (modifyFrom == 0 || modifyFrom <= resp.AppendAtNs) && (cutoffFrom == 0 || resp.AppendAtNs <= cutoffFrom) {
orphanFileIds = append(orphanFileIds, n.Key.FileId(volumeId))
orphanFileCount++
orphanDataSize += uint64(n.Size)
}
return nil
})
} else {
orphanFileIds = append(orphanFileIds, n.Key.FileId(volumeId))
orphanFileCount++
orphanDataSize += uint64(n.Size)
}
return nil
}); err != nil {
err = fmt.Errorf("failed to AscendingVisit %+v", err)
return
}
2020-03-25 17:21:15 +08:00
if orphanFileCount > 0 {
pct := float64(orphanFileCount*100) / (float64(orphanFileCount + inUseCount))
fmt.Fprintf(c.writer, "dataNode:%s\tvolume:%d\tentries:%d\torphan:%d\t%.2f%%\t%dB\n",
2022-03-31 22:10:06 +08:00
dataNodeId, volumeId, orphanFileCount+inUseCount, orphanFileCount, pct, orphanDataSize)
2020-03-24 17:34:28 +08:00
}
return
}
type VInfo struct {
server pb.ServerAddress
collection string
isEcVolume bool
isReadOnly bool
}
func (c *commandVolumeFsck) collectVolumeIds() (volumeIdToServer map[string]map[uint32]VInfo, err error) {
2020-03-25 15:56:47 +08:00
if *c.verbose {
fmt.Fprintf(c.writer, "collecting volume id and locations from master ...\n")
2020-03-25 15:56:47 +08:00
}
2022-03-31 22:10:06 +08:00
volumeIdToServer = make(map[string]map[uint32]VInfo)
2021-02-22 16:28:42 +08:00
// collect topology information
topologyInfo, _, err := collectTopologyInfo(c.env, 0)
if err != nil {
return
}
2021-02-22 16:28:42 +08:00
eachDataNode(topologyInfo, func(dc string, rack RackId, t *master_pb.DataNodeInfo) {
var volumeCount, ecShardCount int
dataNodeId := t.GetId()
2021-02-18 12:57:08 +08:00
for _, diskInfo := range t.DiskInfos {
if _, ok := volumeIdToServer[dataNodeId]; !ok {
volumeIdToServer[dataNodeId] = make(map[uint32]VInfo)
}
2021-02-16 18:47:02 +08:00
for _, vi := range diskInfo.VolumeInfos {
2022-03-31 22:10:06 +08:00
volumeIdToServer[dataNodeId][vi.Id] = VInfo{
server: pb.NewServerAddressFromDataNode(t),
2021-02-16 18:47:02 +08:00
collection: vi.Collection,
isEcVolume: false,
isReadOnly: vi.ReadOnly,
2021-02-16 18:47:02 +08:00
}
volumeCount += 1
}
2021-02-16 18:47:02 +08:00
for _, ecShardInfo := range diskInfo.EcShardInfos {
2022-03-31 22:10:06 +08:00
volumeIdToServer[dataNodeId][ecShardInfo.Id] = VInfo{
server: pb.NewServerAddressFromDataNode(t),
2021-02-16 18:47:02 +08:00
collection: ecShardInfo.Collection,
isEcVolume: true,
isReadOnly: true,
2021-02-16 18:47:02 +08:00
}
ecShardCount += 1
}
}
if *c.verbose {
fmt.Fprintf(c.writer, "dn %+v collected %d volumes and %d ec shards.\n", dataNodeId, volumeCount, ecShardCount)
}
})
return
}
func (c *commandVolumeFsck) purgeFileIdsForOneVolume(volumeId uint32, fileIds []string) (err error) {
fmt.Fprintf(c.writer, "purging orphan data for volume %d...\n", volumeId)
2020-03-25 17:21:15 +08:00
locations, found := c.env.MasterClient.GetLocations(volumeId)
if !found {
return fmt.Errorf("failed to find volume %d locations", volumeId)
}
resultChan := make(chan []*volume_server_pb.DeleteResult, len(locations))
var wg sync.WaitGroup
for _, location := range locations {
wg.Add(1)
go func(server pb.ServerAddress, fidList []string) {
2020-03-25 17:21:15 +08:00
defer wg.Done()
2024-08-02 14:54:42 +08:00
if deleteResults, deleteErr := operation.DeleteFileIdsAtOneVolumeServer(server, c.env.option.GrpcDialOption, fidList, false); deleteErr != nil {
2020-03-25 17:21:15 +08:00
err = deleteErr
} else if deleteResults != nil {
resultChan <- deleteResults
}
}(location.ServerAddress(), fileIds)
2020-03-25 17:21:15 +08:00
}
wg.Wait()
close(resultChan)
for results := range resultChan {
for _, result := range results {
if result.Error != "" {
fmt.Fprintf(c.writer, "purge error: %s\n", result.Error)
2020-03-25 17:21:15 +08:00
}
}
}
return
}
func (c *commandVolumeFsck) getCollectFilerFilePath() string {
if *c.collection != "" {
return fmt.Sprintf("%s/%s", c.bucketsPath, *c.collection)
}
return "/"
}
2022-03-31 22:10:06 +08:00
func getVolumeFileIdFile(tempFolder string, dataNodeid string, vid uint32) string {
return filepath.Join(tempFolder, fmt.Sprintf("%s_%d.idx", dataNodeid, vid))
}
func getFilerFileIdFile(tempFolder string, vid uint32) string {
return filepath.Join(tempFolder, fmt.Sprintf("%d.fid", vid))
}
func writeToFile(bytes []byte, fileName string) error {
flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC
dst, err := os.OpenFile(fileName, flags, 0644)
if err != nil {
return nil
}
defer dst.Close()
dst.Write(bytes)
return nil
}