2020-03-24 17:18:13 +08:00
package shell
import (
2021-06-25 14:56:24 +08:00
"bufio"
2022-09-11 06:29:17 +08:00
"bytes"
2020-03-24 17:18:13 +08:00
"context"
2022-09-11 06:29:17 +08:00
"errors"
2020-03-25 13:38:33 +08:00
"flag"
2020-03-24 17:18:13 +08:00
"fmt"
2022-07-29 15:17:28 +08:00
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
2022-11-01 02:33:04 +08:00
"github.com/seaweedfs/seaweedfs/weed/storage"
2022-09-11 06:29:17 +08:00
"github.com/seaweedfs/seaweedfs/weed/storage/idx"
2022-07-29 15:17:28 +08:00
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/util"
2022-04-01 00:36:10 +08:00
"io"
"io/ioutil"
"math"
"net/http"
"net/url"
"os"
"path"
"path/filepath"
2022-11-01 02:36:26 +08:00
"strconv"
2022-11-01 02:33:04 +08:00
"strings"
2022-04-01 00:36:10 +08:00
"sync"
"time"
2020-03-24 17:18:13 +08:00
)
func init ( ) {
Commands = append ( Commands , & commandVolumeFsck { } )
}
2022-10-16 11:38:46 +08:00
const (
2022-11-01 02:33:04 +08:00
readbufferSize = 16
2022-10-16 11:38:46 +08:00
)
2020-03-24 17:18:13 +08:00
type commandVolumeFsck struct {
2022-10-14 14:30:30 +08:00
env * CommandEnv
writer io . Writer
bucketsPath string
collection * string
2022-11-01 02:36:26 +08:00
volumeIds map [ uint32 ] bool
2022-10-14 14:30:30 +08:00
tempFolder string
verbose * bool
forcePurging * bool
findMissingChunksInFiler * bool
2022-10-16 11:38:46 +08:00
verifyNeedle * bool
2020-03-24 17:18:13 +08:00
}
func ( c * commandVolumeFsck ) Name ( ) string {
return "volume.fsck"
}
func ( c * commandVolumeFsck ) Help ( ) string {
return ` check all volumes to find entries not used by the filer
Important assumption ! ! !
the system is all used by one filer .
This command works this way :
1. collect all file ids from all volumes , as set A
2. collect all file ids from the filer , as set B
3. find out the set A subtract B
2021-06-25 14:56:24 +08:00
If - findMissingChunksInFiler is enabled , this works
in a reverse way :
1. collect all file ids from all volumes , as set A
2. collect all file ids from the filer , as set B
3. find out the set B subtract A
2020-03-24 17:18:13 +08:00
`
}
func ( c * commandVolumeFsck ) Do ( args [ ] string , commandEnv * CommandEnv , writer io . Writer ) ( err error ) {
2020-03-25 13:38:33 +08:00
fsckCommand := flag . NewFlagSet ( c . Name ( ) , flag . ContinueOnError )
2022-10-14 14:30:30 +08:00
c . verbose = fsckCommand . Bool ( "v" , false , "verbose mode" )
c . findMissingChunksInFiler = fsckCommand . Bool ( "findMissingChunksInFiler" , false , "see \"help volume.fsck\"" )
c . collection = fsckCommand . String ( "collection" , "" , "the collection name" )
2022-11-01 02:36:26 +08:00
volumeIds := fsckCommand . String ( "volumeId" , "" , "comma separated the volume id" )
2022-05-16 02:07:04 +08:00
applyPurging := fsckCommand . Bool ( "reallyDeleteFromVolume" , false , "<expert only!> after detection, delete missing data from volumes / delete missing file entries from filer. Currently this only works with default filerGroup." )
2022-04-26 02:11:56 +08:00
c . forcePurging = fsckCommand . Bool ( "forcePurging" , false , "delete missing data from volumes in one replica used together with applyPurging" )
2022-02-13 07:53:35 +08:00
purgeAbsent := fsckCommand . Bool ( "reallyDeleteFilerEntries" , false , "<expert only!> delete missing file entries from filer if the corresponding volume is missing for any reason, please ensure all still existing/expected volumes are connected! used together with findMissingChunksInFiler" )
2022-03-31 22:10:06 +08:00
tempPath := fsckCommand . String ( "tempPath" , path . Join ( os . TempDir ( ) ) , "path for temporary idx files" )
2022-09-11 06:29:17 +08:00
cutoffTimeAgo := fsckCommand . Duration ( "cutoffTimeAgo" , 5 * time . Minute , "only include entries on volume servers before this cutoff time to check orphan chunks" )
2022-11-01 02:33:04 +08:00
c . verifyNeedle = fsckCommand . Bool ( "verifyNeedles" , false , "check needles status from volume server" )
2022-03-31 22:10:06 +08:00
2020-03-25 13:38:33 +08:00
if err = fsckCommand . Parse ( args ) ; err != nil {
return nil
}
2021-12-11 05:24:38 +08:00
if err = commandEnv . confirmIsLocked ( args ) ; err != nil {
2021-09-14 13:13:34 +08:00
return
}
2022-11-01 02:36:26 +08:00
c . volumeIds = make ( map [ uint32 ] bool )
if * volumeIds != "" {
for _ , volumeIdStr := range strings . Split ( * volumeIds , "," ) {
if volumeIdInt , err := strconv . ParseUint ( volumeIdStr , 10 , 32 ) ; err == nil {
c . volumeIds [ uint32 ( volumeIdInt ) ] = true
} else {
return fmt . Errorf ( "parse volumeId string %s to int: %v" , volumeIdStr , err )
}
}
}
2020-03-24 17:18:13 +08:00
c . env = commandEnv
2022-10-14 14:30:30 +08:00
c . writer = writer
c . bucketsPath , err = readFilerBucketsPath ( commandEnv )
if err != nil {
return fmt . Errorf ( "read filer buckets path: %v" , err )
}
2020-03-24 17:18:13 +08:00
// create a temp folder
2022-10-14 14:30:30 +08:00
c . tempFolder , err = os . MkdirTemp ( * tempPath , "sw_fsck" )
2020-03-24 17:18:13 +08:00
if err != nil {
return fmt . Errorf ( "failed to create temp folder: %v" , err )
}
2022-10-14 14:30:30 +08:00
if * c . verbose {
fmt . Fprintf ( c . writer , "working directory: %s\n" , c . tempFolder )
2020-03-25 13:38:33 +08:00
}
2022-10-14 14:30:30 +08:00
defer os . RemoveAll ( c . tempFolder )
2020-03-24 17:18:13 +08:00
2020-03-25 15:56:47 +08:00
// collect all volume id locations
2022-10-14 14:30:30 +08:00
dataNodeVolumeIdToVInfo , err := c . collectVolumeIds ( )
2020-03-25 15:56:47 +08:00
if err != nil {
return fmt . Errorf ( "failed to collect all volume locations: %v" , err )
}
2022-03-31 16:35:58 +08:00
if err != nil {
return fmt . Errorf ( "read filer buckets path: %v" , err )
}
2022-11-01 02:38:12 +08:00
collectCutoffFromAtNs := time . Now ( ) . UnixNano ( )
2020-03-24 17:18:13 +08:00
// collect each volume file ids
2022-03-31 22:10:06 +08:00
for dataNodeId , volumeIdToVInfo := range dataNodeVolumeIdToVInfo {
for volumeId , vinfo := range volumeIdToVInfo {
2022-11-01 02:36:26 +08:00
if len ( c . volumeIds ) > 0 {
if _ , ok := c . volumeIds [ volumeId ] ; ! ok {
delete ( volumeIdToVInfo , volumeId )
continue
}
2022-03-31 22:10:06 +08:00
}
2022-10-25 07:50:39 +08:00
if * c . collection != "" && vinfo . collection != * c . collection {
2022-03-31 22:10:06 +08:00
delete ( volumeIdToVInfo , volumeId )
continue
}
2022-09-11 06:29:17 +08:00
cutoffFrom := time . Now ( ) . Add ( - * cutoffTimeAgo ) . UnixNano ( )
2022-10-14 14:30:30 +08:00
err = c . collectOneVolumeFileIds ( dataNodeId , volumeId , vinfo , uint64 ( cutoffFrom ) )
2022-03-31 22:10:06 +08:00
if err != nil {
return fmt . Errorf ( "failed to collect file ids from volume %d on %s: %v" , volumeId , vinfo . server , err )
}
2020-03-24 17:18:13 +08:00
}
2022-10-14 14:30:30 +08:00
if * c . verbose {
fmt . Fprintf ( c . writer , "dn %+v filtred %d volumes and locations.\n" , dataNodeId , len ( dataNodeVolumeIdToVInfo [ dataNodeId ] ) )
}
2020-03-24 17:18:13 +08:00
}
2022-10-14 14:30:30 +08:00
if * c . findMissingChunksInFiler {
2021-06-25 14:56:24 +08:00
// collect all filer file ids and paths
2022-11-01 02:38:12 +08:00
if err = c . collectFilerFileIdAndPaths ( dataNodeVolumeIdToVInfo , * purgeAbsent , collectCutoffFromAtNs ) ; err != nil {
2021-06-25 14:56:24 +08:00
return fmt . Errorf ( "collectFilerFileIdAndPaths: %v" , err )
}
2022-03-31 22:10:06 +08:00
for dataNodeId , volumeIdToVInfo := range dataNodeVolumeIdToVInfo {
// for each volume, check filer file ids
2022-10-14 14:30:30 +08:00
if err = c . findFilerChunksMissingInVolumeServers ( volumeIdToVInfo , dataNodeId , * applyPurging ) ; err != nil {
2022-03-31 22:10:06 +08:00
return fmt . Errorf ( "findFilerChunksMissingInVolumeServers: %v" , err )
}
2021-06-25 14:56:24 +08:00
}
} else {
// collect all filer file ids
2022-11-01 02:38:12 +08:00
if err = c . collectFilerFileIdAndPaths ( dataNodeVolumeIdToVInfo , false , 0 ) ; err != nil {
2021-06-25 14:56:24 +08:00
return fmt . Errorf ( "failed to collect file ids from filer: %v" , err )
}
2022-01-21 13:38:34 +08:00
// volume file ids subtract filer file ids
2022-10-14 14:30:30 +08:00
if err = c . findExtraChunksInVolumeServers ( dataNodeVolumeIdToVInfo , * applyPurging ) ; err != nil {
2021-06-25 14:56:24 +08:00
return fmt . Errorf ( "findExtraChunksInVolumeServers: %v" , err )
}
}
return nil
}
2022-11-01 02:38:12 +08:00
func ( c * commandVolumeFsck ) collectFilerFileIdAndPaths ( dataNodeVolumeIdToVInfo map [ string ] map [ uint32 ] VInfo , purgeAbsent bool , cutoffFromAtNs int64 ) error {
2022-10-14 14:30:30 +08:00
if * c . verbose {
fmt . Fprintf ( c . writer , "checking each file from filer path %s...\n" , c . getCollectFilerFilePath ( ) )
2020-03-24 17:18:13 +08:00
}
2021-06-25 14:56:24 +08:00
files := make ( map [ uint32 ] * os . File )
2022-03-31 22:10:06 +08:00
for _ , volumeIdToServer := range dataNodeVolumeIdToVInfo {
for vid := range volumeIdToServer {
2022-04-01 13:17:09 +08:00
if _ , ok := files [ vid ] ; ok {
continue
}
2022-10-14 14:30:30 +08:00
dst , openErr := os . OpenFile ( getFilerFileIdFile ( c . tempFolder , vid ) , os . O_WRONLY | os . O_CREATE | os . O_TRUNC , 0644 )
2022-03-31 22:10:06 +08:00
if openErr != nil {
2022-10-14 14:30:30 +08:00
return fmt . Errorf ( "failed to create file %s: %v" , getFilerFileIdFile ( c . tempFolder , vid ) , openErr )
2022-03-31 22:10:06 +08:00
}
2022-04-01 13:17:09 +08:00
files [ vid ] = dst
2021-06-25 14:56:24 +08:00
}
}
defer func ( ) {
for _ , f := range files {
f . Close ( )
}
} ( )
2023-01-03 15:20:45 +08:00
return doTraverseBfsAndSaving ( c . env , c . writer , c . getCollectFilerFilePath ( ) , false ,
2022-10-14 14:30:30 +08:00
func ( entry * filer_pb . FullEntry , outputChan chan interface { } ) ( err error ) {
if * c . verbose && entry . Entry . IsDirectory {
fmt . Fprintf ( c . writer , "checking directory %s\n" , util . NewFullPath ( entry . Dir , entry . Entry . Name ) )
2022-04-01 00:36:10 +08:00
}
2022-11-15 22:33:36 +08:00
dataChunks , manifestChunks , resolveErr := filer . ResolveChunkManifest ( filer . LookupFn ( c . env ) , entry . Entry . GetChunks ( ) , 0 , math . MaxInt64 )
2022-10-14 14:30:30 +08:00
if resolveErr != nil {
return fmt . Errorf ( "failed to ResolveChunkManifest: %+v" , resolveErr )
2021-06-25 14:56:24 +08:00
}
2022-10-14 14:30:30 +08:00
dataChunks = append ( dataChunks , manifestChunks ... )
for _ , chunk := range dataChunks {
2022-11-01 02:38:12 +08:00
if cutoffFromAtNs != 0 && chunk . ModifiedTsNs > cutoffFromAtNs {
2022-10-14 14:30:30 +08:00
continue
}
outputChan <- & Item {
vid : chunk . Fid . VolumeId ,
fileKey : chunk . Fid . FileKey ,
cookie : chunk . Fid . Cookie ,
path : util . NewFullPath ( entry . Dir , entry . Entry . Name ) ,
2022-02-07 06:22:04 +08:00
}
2022-01-22 04:08:58 +08:00
}
2022-10-14 14:30:30 +08:00
return nil
} ,
func ( outputChan chan interface { } ) {
2022-10-16 11:38:46 +08:00
buffer := make ( [ ] byte , readbufferSize )
2022-10-14 14:30:30 +08:00
for item := range outputChan {
i := item . ( * Item )
if f , ok := files [ i . vid ] ; ok {
util . Uint64toBytes ( buffer , i . fileKey )
util . Uint32toBytes ( buffer [ 8 : ] , i . cookie )
util . Uint32toBytes ( buffer [ 12 : ] , uint32 ( len ( i . path ) ) )
f . Write ( buffer )
f . Write ( [ ] byte ( i . path ) )
2022-11-01 02:36:26 +08:00
} else if * c . findMissingChunksInFiler && len ( c . volumeIds ) == 0 {
2022-10-14 14:30:30 +08:00
fmt . Fprintf ( c . writer , "%d,%x%08x %s volume not found\n" , i . vid , i . fileKey , i . cookie , i . path )
if purgeAbsent {
fmt . Printf ( "deleting path %s after volume not found" , i . path )
c . httpDelete ( i . path )
}
}
}
} )
2021-06-25 14:56:24 +08:00
}
2022-10-14 14:30:30 +08:00
func ( c * commandVolumeFsck ) findFilerChunksMissingInVolumeServers ( volumeIdToVInfo map [ uint32 ] VInfo , dataNodeId string , applyPurging bool ) error {
2021-06-25 08:22:53 +08:00
2021-06-25 14:56:24 +08:00
for volumeId , vinfo := range volumeIdToVInfo {
2022-10-14 14:30:30 +08:00
checkErr := c . oneVolumeFileIdsCheckOneVolume ( dataNodeId , volumeId , applyPurging )
2021-06-25 14:56:24 +08:00
if checkErr != nil {
return fmt . Errorf ( "failed to collect file ids from volume %d on %s: %v" , volumeId , vinfo . server , checkErr )
}
}
return nil
2021-06-25 08:22:53 +08:00
}
2022-10-14 14:30:30 +08:00
func ( c * commandVolumeFsck ) findExtraChunksInVolumeServers ( dataNodeVolumeIdToVInfo map [ string ] map [ uint32 ] VInfo , applyPurging bool ) error {
2022-01-21 13:38:34 +08:00
2020-03-25 13:38:33 +08:00
var totalInUseCount , totalOrphanChunkCount , totalOrphanDataSize uint64
2022-04-01 17:45:41 +08:00
volumeIdOrphanFileIds := make ( map [ uint32 ] map [ string ] bool )
isSeveralReplicas := make ( map [ uint32 ] bool )
isEcVolumeReplicas := make ( map [ uint32 ] bool )
isReadOnlyReplicas := make ( map [ uint32 ] bool )
serverReplicas := make ( map [ uint32 ] [ ] pb . ServerAddress )
2022-03-31 22:10:06 +08:00
for dataNodeId , volumeIdToVInfo := range dataNodeVolumeIdToVInfo {
for volumeId , vinfo := range volumeIdToVInfo {
2022-11-01 02:33:04 +08:00
inUseCount , orphanFileIds , orphanDataSize , checkErr := c . oneVolumeFileIdsSubtractFilerFileIds ( dataNodeId , volumeId , & vinfo )
2022-03-31 22:10:06 +08:00
if checkErr != nil {
return fmt . Errorf ( "failed to collect file ids from volume %d on %s: %v" , volumeId , vinfo . server , checkErr )
2020-10-22 14:48:07 +08:00
}
2022-04-01 17:45:41 +08:00
isSeveralReplicas [ volumeId ] = false
if _ , found := volumeIdOrphanFileIds [ volumeId ] ; ! found {
volumeIdOrphanFileIds [ volumeId ] = make ( map [ string ] bool )
} else {
isSeveralReplicas [ volumeId ] = true
}
for _ , fid := range orphanFileIds {
if isSeveralReplicas [ volumeId ] {
if _ , found := volumeIdOrphanFileIds [ volumeId ] [ fid ] ; ! found {
continue
}
}
volumeIdOrphanFileIds [ volumeId ] [ fid ] = isSeveralReplicas [ volumeId ]
}
2022-03-31 22:10:06 +08:00
totalInUseCount += inUseCount
totalOrphanChunkCount += uint64 ( len ( orphanFileIds ) )
totalOrphanDataSize += orphanDataSize
2020-10-22 14:48:07 +08:00
2022-10-14 14:30:30 +08:00
if * c . verbose {
2022-03-31 22:10:06 +08:00
for _ , fid := range orphanFileIds {
2022-10-16 11:38:46 +08:00
fmt . Fprintf ( c . writer , "%s:%s\n" , vinfo . collection , fid )
2022-03-31 22:10:06 +08:00
}
2020-03-25 17:41:22 +08:00
}
2022-04-01 17:45:41 +08:00
isEcVolumeReplicas [ volumeId ] = vinfo . isEcVolume
if isReadOnly , found := isReadOnlyReplicas [ volumeId ] ; ! ( found && isReadOnly ) {
isReadOnlyReplicas [ volumeId ] = vinfo . isReadOnly
}
serverReplicas [ volumeId ] = append ( serverReplicas [ volumeId ] , vinfo . server )
}
2022-01-21 13:38:34 +08:00
2022-04-01 17:45:41 +08:00
for volumeId , orphanReplicaFileIds := range volumeIdOrphanFileIds {
if ! ( applyPurging && len ( orphanReplicaFileIds ) > 0 ) {
continue
}
orphanFileIds := [ ] string { }
for fid , foundInAllReplicas := range orphanReplicaFileIds {
2022-04-26 02:10:01 +08:00
if ! isSeveralReplicas [ volumeId ] || * c . forcePurging || ( isSeveralReplicas [ volumeId ] && foundInAllReplicas ) {
2022-04-01 17:45:41 +08:00
orphanFileIds = append ( orphanFileIds , fid )
2021-07-13 02:22:00 +08:00
}
2022-04-01 17:45:41 +08:00
}
if ! ( len ( orphanFileIds ) > 0 ) {
continue
}
2022-10-14 14:30:30 +08:00
if * c . verbose {
fmt . Fprintf ( c . writer , "purging process for volume %d.\n" , volumeId )
2022-04-01 17:45:41 +08:00
}
2022-01-21 13:38:34 +08:00
2022-04-01 17:45:41 +08:00
if isEcVolumeReplicas [ volumeId ] {
2022-10-14 14:30:30 +08:00
fmt . Fprintf ( c . writer , "skip purging for Erasure Coded volume %d.\n" , volumeId )
2022-04-01 17:45:41 +08:00
continue
}
for _ , server := range serverReplicas [ volumeId ] {
2022-03-31 22:10:06 +08:00
needleVID := needle . VolumeId ( volumeId )
2020-03-24 17:18:13 +08:00
2022-04-01 17:45:41 +08:00
if isReadOnlyReplicas [ volumeId ] {
err := markVolumeWritable ( c . env . option . GrpcDialOption , needleVID , server , true )
2022-03-31 22:10:06 +08:00
if err != nil {
return fmt . Errorf ( "mark volume %d read/write: %v" , volumeId , err )
}
2022-10-14 14:30:30 +08:00
fmt . Fprintf ( c . writer , "temporarily marked %d on server %v writable for forced purge\n" , volumeId , server )
2022-04-01 17:45:41 +08:00
defer markVolumeWritable ( c . env . option . GrpcDialOption , needleVID , server , false )
2022-02-07 06:22:04 +08:00
2022-10-14 14:30:30 +08:00
fmt . Fprintf ( c . writer , "marked %d on server %v writable for forced purge\n" , volumeId , server )
2022-04-01 17:45:41 +08:00
}
2022-10-14 14:30:30 +08:00
if * c . verbose {
fmt . Fprintf ( c . writer , "purging files from volume %d\n" , volumeId )
2022-03-31 22:10:06 +08:00
}
2022-10-14 14:30:30 +08:00
if err := c . purgeFileIdsForOneVolume ( volumeId , orphanFileIds ) ; err != nil {
2022-03-31 22:10:06 +08:00
return fmt . Errorf ( "purging volume %d: %v" , volumeId , err )
}
2020-03-25 17:21:15 +08:00
}
}
2020-03-24 17:18:13 +08:00
}
2022-02-07 06:22:04 +08:00
if ! applyPurging {
2020-03-25 17:21:15 +08:00
pct := float64 ( totalOrphanChunkCount * 100 ) / ( float64 ( totalOrphanChunkCount + totalInUseCount ) )
2022-10-14 14:30:30 +08:00
fmt . Fprintf ( c . writer , "\nTotal\t\tentries:%d\torphan:%d\t%.2f%%\t%dB\n" ,
2020-03-25 17:21:15 +08:00
totalOrphanChunkCount + totalInUseCount , totalOrphanChunkCount , pct , totalOrphanDataSize )
2020-03-25 13:38:33 +08:00
2022-10-14 14:30:30 +08:00
fmt . Fprintf ( c . writer , "This could be normal if multiple filers or no filers are used.\n" )
2020-03-25 13:38:33 +08:00
}
2022-01-21 13:38:34 +08:00
if totalOrphanChunkCount == 0 {
2022-10-14 14:30:30 +08:00
fmt . Fprintf ( c . writer , "no orphan data\n" )
2022-01-21 13:38:34 +08:00
}
2020-03-24 17:18:13 +08:00
return nil
}
2022-10-14 14:30:30 +08:00
func ( c * commandVolumeFsck ) collectOneVolumeFileIds ( dataNodeId string , volumeId uint32 , vinfo VInfo , cutoffFrom uint64 ) error {
2020-03-25 15:56:47 +08:00
2022-10-14 14:30:30 +08:00
if * c . verbose {
fmt . Fprintf ( c . writer , "collecting volume %d file ids from %s ...\n" , volumeId , vinfo . server )
2020-03-25 15:56:47 +08:00
}
2020-03-24 17:18:13 +08:00
2022-10-14 14:30:30 +08:00
return operation . WithVolumeServerClient ( false , vinfo . server , c . env . option . GrpcDialOption ,
func ( volumeServerClient volume_server_pb . VolumeServerClient ) error {
ext := ".idx"
if vinfo . isEcVolume {
ext = ".ecx"
2022-09-11 06:29:17 +08:00
}
2022-10-14 14:30:30 +08:00
copyFileClient , err := volumeServerClient . CopyFile ( context . Background ( ) , & volume_server_pb . CopyFileRequest {
VolumeId : volumeId ,
Ext : ext ,
CompactionRevision : math . MaxUint32 ,
StopOffset : math . MaxInt64 ,
Collection : vinfo . collection ,
IsEcVolume : vinfo . isEcVolume ,
IgnoreSourceFileNotFound : false ,
} )
2022-09-11 06:29:17 +08:00
if err != nil {
2022-10-14 14:30:30 +08:00
return fmt . Errorf ( "failed to start copying volume %d%s: %v" , volumeId , ext , err )
2022-09-11 06:29:17 +08:00
}
2022-10-14 14:30:30 +08:00
var buf bytes . Buffer
for {
resp , err := copyFileClient . Recv ( )
if errors . Is ( err , io . EOF ) {
break
}
2022-09-11 06:29:17 +08:00
if err != nil {
2022-10-14 14:30:30 +08:00
return err
2022-09-11 06:29:17 +08:00
}
2022-10-14 14:30:30 +08:00
buf . Write ( resp . FileContent )
}
if vinfo . isReadOnly == false {
2022-10-25 13:09:38 +08:00
index , err := idx . FirstInvalidIndex ( buf . Bytes ( ) ,
2022-10-14 14:30:30 +08:00
func ( key types . NeedleId , offset types . Offset , size types . Size ) ( bool , error ) {
resp , err := volumeServerClient . ReadNeedleMeta ( context . Background ( ) , & volume_server_pb . ReadNeedleMetaRequest {
VolumeId : volumeId ,
NeedleId : uint64 ( key ) ,
Offset : offset . ToActualOffset ( ) ,
Size : int32 ( size ) ,
} )
if err != nil {
2023-02-11 05:04:29 +08:00
return false , fmt . Errorf ( "read needle meta with id %d from volume %d: %v" , key , volumeId , err )
2022-10-14 14:30:30 +08:00
}
2022-10-25 13:09:38 +08:00
return resp . AppendAtNs <= cutoffFrom , nil
2022-10-14 14:30:30 +08:00
} )
if err != nil {
2023-02-11 04:53:43 +08:00
fmt . Fprintf ( c . writer , "Failed to search for last valid index on volume %d with error %v\n" , volumeId , err )
2022-10-14 14:30:30 +08:00
} else {
2022-10-25 13:09:38 +08:00
buf . Truncate ( index * types . NeedleMapEntrySize )
2022-10-14 14:30:30 +08:00
}
}
idxFilename := getVolumeFileIdFile ( c . tempFolder , dataNodeId , volumeId )
2022-10-25 13:09:38 +08:00
err = writeToFile ( buf . Bytes ( ) , idxFilename )
2022-09-11 06:29:17 +08:00
if err != nil {
2022-10-14 14:30:30 +08:00
return fmt . Errorf ( "failed to copy %d%s from %s: %v" , volumeId , ext , vinfo . server , err )
2022-09-11 06:29:17 +08:00
}
2020-03-24 17:18:13 +08:00
2022-10-14 14:30:30 +08:00
return nil
} )
2020-03-24 17:18:13 +08:00
}
2022-10-14 14:30:30 +08:00
type Item struct {
vid uint32
fileKey uint64
cookie uint32
path util . FullPath
2020-03-24 17:18:13 +08:00
}
2022-10-14 14:30:30 +08:00
func ( c * commandVolumeFsck ) readFilerFileIdFile ( volumeId uint32 , fn func ( needleId types . NeedleId , itemPath util . FullPath ) ) error {
fp , err := os . Open ( getFilerFileIdFile ( c . tempFolder , volumeId ) )
2021-06-25 14:56:24 +08:00
if err != nil {
2022-10-14 14:30:30 +08:00
return err
2021-06-25 14:56:24 +08:00
}
defer fp . Close ( )
br := bufio . NewReader ( fp )
2022-10-16 11:38:46 +08:00
buffer := make ( [ ] byte , readbufferSize )
2021-06-25 14:56:24 +08:00
var readSize int
2022-10-14 14:30:30 +08:00
var readErr error
item := & Item { vid : volumeId }
2021-06-25 14:56:24 +08:00
for {
2022-10-14 14:30:30 +08:00
readSize , readErr = io . ReadFull ( br , buffer )
if errors . Is ( readErr , io . EOF ) {
2022-01-21 13:38:34 +08:00
break
2021-06-25 14:56:24 +08:00
}
2022-10-14 14:30:30 +08:00
if readErr != nil {
return readErr
}
2022-10-16 11:38:46 +08:00
if readSize != readbufferSize {
2022-10-14 14:30:30 +08:00
return fmt . Errorf ( "readSize mismatch" )
}
2021-06-25 14:56:24 +08:00
item . fileKey = util . BytesToUint64 ( buffer [ : 8 ] )
item . cookie = util . BytesToUint32 ( buffer [ 8 : 12 ] )
pathSize := util . BytesToUint32 ( buffer [ 12 : 16 ] )
pathBytes := make ( [ ] byte , int ( pathSize ) )
2021-06-28 14:32:57 +08:00
n , err := io . ReadFull ( br , pathBytes )
if err != nil {
2022-10-14 14:30:30 +08:00
fmt . Fprintf ( c . writer , "%d,%x%08x in unexpected error: %v\n" , volumeId , item . fileKey , item . cookie , err )
2021-06-28 14:32:57 +08:00
}
if n != int ( pathSize ) {
2022-10-14 14:30:30 +08:00
fmt . Fprintf ( c . writer , "%d,%x%08x %d unexpected file name size %d\n" , volumeId , item . fileKey , item . cookie , pathSize , n )
2021-06-28 14:32:57 +08:00
}
2022-10-14 14:30:30 +08:00
item . path = util . FullPath ( pathBytes )
2022-01-21 13:38:34 +08:00
needleId := types . NeedleId ( item . fileKey )
2022-10-14 14:30:30 +08:00
fn ( needleId , item . path )
}
return nil
}
func ( c * commandVolumeFsck ) oneVolumeFileIdsCheckOneVolume ( dataNodeId string , volumeId uint32 , applyPurging bool ) ( err error ) {
if * c . verbose {
fmt . Fprintf ( c . writer , "find missing file chunks in dataNodeId %s volume %d ...\n" , dataNodeId , volumeId )
}
db := needle_map . NewMemDb ( )
defer db . Close ( )
2022-01-21 13:38:34 +08:00
2022-10-14 14:30:30 +08:00
if err = db . LoadFromIdx ( getVolumeFileIdFile ( c . tempFolder , dataNodeId , volumeId ) ) ; err != nil {
return
}
if err = c . readFilerFileIdFile ( volumeId , func ( needleId types . NeedleId , itemPath util . FullPath ) {
if _ , found := db . Get ( needleId ) ; ! found {
fmt . Fprintf ( c . writer , "%s\n" , itemPath )
2022-02-07 06:22:04 +08:00
if applyPurging {
2022-10-14 14:30:30 +08:00
c . httpDelete ( itemPath )
2022-02-07 06:22:04 +08:00
}
2021-06-25 14:56:24 +08:00
}
2022-10-14 14:30:30 +08:00
} ) ; err != nil {
return
2022-02-07 06:22:04 +08:00
}
return nil
}
2021-06-25 14:56:24 +08:00
2022-10-14 14:30:30 +08:00
func ( c * commandVolumeFsck ) httpDelete ( path util . FullPath ) {
2022-02-07 06:22:04 +08:00
req , err := http . NewRequest ( http . MethodDelete , "" , nil )
2021-06-25 14:56:24 +08:00
2022-02-07 06:22:04 +08:00
req . URL = & url . URL {
Scheme : "http" ,
Host : c . env . option . FilerAddress . ToHttpAddress ( ) ,
Path : string ( path ) ,
2021-06-25 14:56:24 +08:00
}
2022-10-14 14:30:30 +08:00
if * c . verbose {
fmt . Fprintf ( c . writer , "full HTTP delete request to be sent: %v\n" , req )
2022-02-07 06:22:04 +08:00
}
if err != nil {
2022-10-14 14:30:30 +08:00
fmt . Fprintf ( c . writer , "HTTP delete request error: %v\n" , err )
2022-02-07 06:22:04 +08:00
}
2022-01-21 13:38:34 +08:00
2022-02-07 06:22:04 +08:00
client := & http . Client { }
2021-06-25 14:56:24 +08:00
2022-02-07 06:22:04 +08:00
resp , err := client . Do ( req )
if err != nil {
2022-10-14 14:30:30 +08:00
fmt . Fprintf ( c . writer , "DELETE fetch error: %v\n" , err )
2022-02-07 06:22:04 +08:00
}
defer resp . Body . Close ( )
2021-06-25 14:56:24 +08:00
2022-02-07 06:22:04 +08:00
_ , err = ioutil . ReadAll ( resp . Body )
if err != nil {
2022-10-14 14:30:30 +08:00
fmt . Fprintf ( c . writer , "DELETE response error: %v\n" , err )
2022-02-07 06:22:04 +08:00
}
2021-06-25 14:56:24 +08:00
2022-10-14 14:30:30 +08:00
if * c . verbose {
fmt . Fprintln ( c . writer , "delete response Status : " , resp . Status )
fmt . Fprintln ( c . writer , "delete response Headers : " , resp . Header )
2022-01-21 13:38:34 +08:00
}
2021-06-25 14:56:24 +08:00
}
2022-11-01 02:33:04 +08:00
func ( c * commandVolumeFsck ) oneVolumeFileIdsSubtractFilerFileIds ( dataNodeId string , volumeId uint32 , vinfo * VInfo ) ( inUseCount uint64 , orphanFileIds [ ] string , orphanDataSize uint64 , err error ) {
2020-03-24 17:18:13 +08:00
2022-11-01 02:33:04 +08:00
volumeFileIdDb := needle_map . NewMemDb ( )
defer volumeFileIdDb . Close ( )
2020-03-24 17:18:13 +08:00
2022-11-01 02:33:04 +08:00
if err = volumeFileIdDb . LoadFromIdx ( getVolumeFileIdFile ( c . tempFolder , dataNodeId , volumeId ) ) ; err != nil {
2022-10-14 14:30:30 +08:00
err = fmt . Errorf ( "failed to LoadFromIdx %+v" , err )
2020-03-24 17:18:13 +08:00
return
}
2022-11-01 02:33:04 +08:00
if err = c . readFilerFileIdFile ( volumeId , func ( filerNeedleId types . NeedleId , itemPath util . FullPath ) {
2022-10-16 11:38:46 +08:00
inUseCount ++
if * c . verifyNeedle {
2022-11-01 02:33:04 +08:00
if needleValue , ok := volumeFileIdDb . Get ( filerNeedleId ) ; ok && ! needleValue . Size . IsDeleted ( ) {
if _ , err := readNeedleStatus ( c . env . option . GrpcDialOption , vinfo . server , volumeId , * needleValue ) ; err != nil {
// files may be deleted during copying filesIds
if ! strings . Contains ( err . Error ( ) , storage . ErrorDeleted . Error ( ) ) {
fmt . Fprintf ( c . writer , "failed to read %d:%s needle status of file %s: %+v\n" ,
volumeId , filerNeedleId . String ( ) , itemPath , err )
if * c . forcePurging {
return
}
2022-10-16 11:38:46 +08:00
}
}
}
}
2022-11-01 02:33:04 +08:00
if err = volumeFileIdDb . Delete ( filerNeedleId ) ; err != nil && * c . verbose {
fmt . Fprintf ( c . writer , "failed to nm.delete %s(%+v): %+v" , itemPath , filerNeedleId , err )
2022-10-14 14:30:30 +08:00
}
} ) ; err != nil {
err = fmt . Errorf ( "failed to readFilerFileIdFile %+v" , err )
return
2020-03-24 17:18:13 +08:00
}
2020-03-25 17:21:15 +08:00
var orphanFileCount uint64
2022-11-01 02:33:04 +08:00
if err = volumeFileIdDb . AscendingVisit ( func ( n needle_map . NeedleValue ) error {
if n . Size . IsDeleted ( ) {
2022-10-16 11:38:46 +08:00
return nil
}
2022-10-31 11:32:46 +08:00
orphanFileIds = append ( orphanFileIds , n . Key . FileId ( volumeId ) )
2020-03-25 17:21:15 +08:00
orphanFileCount ++
2020-03-24 17:18:13 +08:00
orphanDataSize += uint64 ( n . Size )
return nil
2022-10-14 14:30:30 +08:00
} ) ; err != nil {
err = fmt . Errorf ( "failed to AscendingVisit %+v" , err )
return
}
2020-03-24 17:18:13 +08:00
2020-03-25 17:21:15 +08:00
if orphanFileCount > 0 {
pct := float64 ( orphanFileCount * 100 ) / ( float64 ( orphanFileCount + inUseCount ) )
2022-10-14 14:30:30 +08:00
fmt . Fprintf ( c . writer , "dataNode:%s\tvolume:%d\tentries:%d\torphan:%d\t%.2f%%\t%dB\n" ,
2022-03-31 22:10:06 +08:00
dataNodeId , volumeId , orphanFileCount + inUseCount , orphanFileCount , pct , orphanDataSize )
2020-03-24 17:34:28 +08:00
}
2020-03-24 17:18:13 +08:00
return
}
type VInfo struct {
2021-09-13 13:47:52 +08:00
server pb . ServerAddress
2020-03-24 17:18:13 +08:00
collection string
isEcVolume bool
2022-01-07 01:52:28 +08:00
isReadOnly bool
2020-03-24 17:18:13 +08:00
}
2022-10-14 14:30:30 +08:00
func ( c * commandVolumeFsck ) collectVolumeIds ( ) ( volumeIdToServer map [ string ] map [ uint32 ] VInfo , err error ) {
2020-03-25 15:56:47 +08:00
2022-10-14 14:30:30 +08:00
if * c . verbose {
fmt . Fprintf ( c . writer , "collecting volume id and locations from master ...\n" )
2020-03-25 15:56:47 +08:00
}
2020-03-24 17:18:13 +08:00
2022-03-31 22:10:06 +08:00
volumeIdToServer = make ( map [ string ] map [ uint32 ] VInfo )
2021-02-22 16:28:42 +08:00
// collect topology information
2022-10-14 14:30:30 +08:00
topologyInfo , _ , err := collectTopologyInfo ( c . env , 0 )
2020-03-24 17:18:13 +08:00
if err != nil {
return
}
2021-02-22 16:28:42 +08:00
eachDataNode ( topologyInfo , func ( dc string , rack RackId , t * master_pb . DataNodeInfo ) {
2021-02-18 12:57:08 +08:00
for _ , diskInfo := range t . DiskInfos {
2022-03-31 22:10:06 +08:00
dataNodeId := t . GetId ( )
volumeIdToServer [ dataNodeId ] = make ( map [ uint32 ] VInfo )
2021-02-16 18:47:02 +08:00
for _ , vi := range diskInfo . VolumeInfos {
2022-03-31 22:10:06 +08:00
volumeIdToServer [ dataNodeId ] [ vi . Id ] = VInfo {
2021-09-13 13:47:52 +08:00
server : pb . NewServerAddressFromDataNode ( t ) ,
2021-02-16 18:47:02 +08:00
collection : vi . Collection ,
isEcVolume : false ,
2022-01-07 01:52:28 +08:00
isReadOnly : vi . ReadOnly ,
2021-02-16 18:47:02 +08:00
}
2020-03-24 17:18:13 +08:00
}
2021-02-16 18:47:02 +08:00
for _ , ecShardInfo := range diskInfo . EcShardInfos {
2022-03-31 22:10:06 +08:00
volumeIdToServer [ dataNodeId ] [ ecShardInfo . Id ] = VInfo {
2021-09-13 13:47:52 +08:00
server : pb . NewServerAddressFromDataNode ( t ) ,
2021-02-16 18:47:02 +08:00
collection : ecShardInfo . Collection ,
isEcVolume : true ,
2022-01-07 01:52:28 +08:00
isReadOnly : true ,
2021-02-16 18:47:02 +08:00
}
2020-03-24 17:18:13 +08:00
}
2022-10-14 14:30:30 +08:00
if * c . verbose {
fmt . Fprintf ( c . writer , "dn %+v collected %d volumes and locations.\n" , dataNodeId , len ( volumeIdToServer [ dataNodeId ] ) )
}
2020-03-24 17:18:13 +08:00
}
} )
return
}
2022-10-14 14:30:30 +08:00
func ( c * commandVolumeFsck ) purgeFileIdsForOneVolume ( volumeId uint32 , fileIds [ ] string ) ( err error ) {
fmt . Fprintf ( c . writer , "purging orphan data for volume %d...\n" , volumeId )
2020-03-25 17:21:15 +08:00
locations , found := c . env . MasterClient . GetLocations ( volumeId )
if ! found {
return fmt . Errorf ( "failed to find volume %d locations" , volumeId )
}
resultChan := make ( chan [ ] * volume_server_pb . DeleteResult , len ( locations ) )
var wg sync . WaitGroup
for _ , location := range locations {
wg . Add ( 1 )
2021-09-13 13:47:52 +08:00
go func ( server pb . ServerAddress , fidList [ ] string ) {
2020-03-25 17:21:15 +08:00
defer wg . Done ( )
if deleteResults , deleteErr := operation . DeleteFilesAtOneVolumeServer ( server , c . env . option . GrpcDialOption , fidList , false ) ; deleteErr != nil {
err = deleteErr
} else if deleteResults != nil {
resultChan <- deleteResults
}
2021-09-13 13:47:52 +08:00
} ( location . ServerAddress ( ) , fileIds )
2020-03-25 17:21:15 +08:00
}
wg . Wait ( )
close ( resultChan )
for results := range resultChan {
for _ , result := range results {
if result . Error != "" {
2022-10-14 14:30:30 +08:00
fmt . Fprintf ( c . writer , "purge error: %s\n" , result . Error )
2020-03-25 17:21:15 +08:00
}
}
}
return
}
2022-10-14 14:30:30 +08:00
func ( c * commandVolumeFsck ) getCollectFilerFilePath ( ) string {
if * c . collection != "" {
return fmt . Sprintf ( "%s/%s" , c . bucketsPath , * c . collection )
}
return "/"
}
2022-03-31 22:10:06 +08:00
func getVolumeFileIdFile ( tempFolder string , dataNodeid string , vid uint32 ) string {
return filepath . Join ( tempFolder , fmt . Sprintf ( "%s_%d.idx" , dataNodeid , vid ) )
2020-03-24 17:18:13 +08:00
}
func getFilerFileIdFile ( tempFolder string , vid uint32 ) string {
return filepath . Join ( tempFolder , fmt . Sprintf ( "%d.fid" , vid ) )
}
2022-09-11 06:29:17 +08:00
func writeToFile ( bytes [ ] byte , fileName string ) error {
2020-03-24 17:18:13 +08:00
flags := os . O_WRONLY | os . O_CREATE | os . O_TRUNC
dst , err := os . OpenFile ( fileName , flags , 0644 )
if err != nil {
return nil
}
defer dst . Close ( )
2022-09-11 06:29:17 +08:00
dst . Write ( bytes )
2020-03-24 17:18:13 +08:00
return nil
}