fix: fs verify error counter (#5261)

This commit is contained in:
Konstantin Lebedev 2024-02-15 14:31:51 +05:00 committed by GitHub
parent 3b5d8ffb70
commit 6181aa7594
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -12,6 +12,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/storage"
"github.com/seaweedfs/seaweedfs/weed/util"
"go.uber.org/atomic"
"golang.org/x/exp/slices"
"io"
"math"
@ -137,7 +138,7 @@ type ItemEntry struct {
path util.FullPath
}
func (c *commandFsVerify) verifyTraverseBfs(path string) (fileCount int64, errCount int64, err error) {
func (c *commandFsVerify) verifyTraverseBfs(path string) (fileCount uint64, errCount uint64, err error) {
timeNowAtSec := time.Now().Unix()
return fileCount, errCount, doTraverseBfsAndSaving(c.env, c.writer, path, false,
func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) {
@ -160,19 +161,24 @@ func (c *commandFsVerify) verifyTraverseBfs(path string) (fileCount int64, errCo
return nil
},
func(outputChan chan interface{}) {
var wg sync.WaitGroup
itemErrCount := atomic.NewUint64(0)
for itemEntry := range outputChan {
i := itemEntry.(*ItemEntry)
itemPath := string(i.path)
fileMsg := fmt.Sprintf("file:%s", itemPath)
errItem := make(map[string]error)
errItemLock := sync.RWMutex{}
itemIsVerifed := atomic.NewBool(true)
for _, chunk := range i.chunks {
if volumeIds, ok := c.volumeIds[chunk.Fid.VolumeId]; ok {
for _, volumeServer := range volumeIds {
if *c.concurrency == 0 {
if err = c.verifyEntry(volumeServer, chunk.Fid); err != nil {
fmt.Fprintf(c.writer, "%s failed verify needle %d:%d: %+v\n",
fileMsg, chunk.Fid.VolumeId, chunk.Fid.FileKey, err)
fmt.Fprintf(c.writer, "%s failed verify fileId %s: %+v\n",
fileMsg, chunk.GetFileIdString(), err)
if itemIsVerifed.Load() {
itemIsVerifed.Store(false)
itemErrCount.Add(1)
}
}
continue
}
@ -180,43 +186,48 @@ func (c *commandFsVerify) verifyTraverseBfs(path string) (fileCount int64, errCo
waitChan, ok := c.waitChan[string(volumeServer)]
c.waitChanLock.RUnlock()
if !ok {
fmt.Fprintf(c.writer, "%s failed to get channel for %s chunk: %d:%d: %+v\n",
string(volumeServer), fileMsg, chunk.Fid.VolumeId, chunk.Fid.FileKey, err)
fmt.Fprintf(c.writer, "%s failed to get channel for %s fileId: %s: %+v\n",
string(volumeServer), fileMsg, chunk.GetFileIdString(), err)
if itemIsVerifed.Load() {
itemIsVerifed.Store(false)
itemErrCount.Add(1)
}
continue
}
wg.Add(1)
waitChan <- struct{}{}
go func(fId *filer_pb.FileId, path string, volumeServer pb.ServerAddress, msg string) {
if err = c.verifyEntry(volumeServer, fId); err != nil {
errItemLock.Lock()
errItem[path] = err
fmt.Fprintf(c.writer, "%s failed verify needle %d:%d: %+v\n",
msg, fId.VolumeId, fId.FileKey, err)
errItemLock.Unlock()
go func(fChunk *filer_pb.FileChunk, path string, volumeServer pb.ServerAddress, msg string) {
defer wg.Done()
if err = c.verifyEntry(volumeServer, fChunk.Fid); err != nil {
fmt.Fprintf(c.writer, "%s failed verify fileId %s: %+v\n",
msg, fChunk.GetFileIdString(), err)
if itemIsVerifed.Load() {
itemIsVerifed.Store(false)
itemErrCount.Add(1)
}
}
<-waitChan
}(chunk.Fid, itemPath, volumeServer, fileMsg)
}(chunk, itemPath, volumeServer, fileMsg)
}
} else {
err = fmt.Errorf("volumeId %d not found", chunk.Fid.VolumeId)
fmt.Fprintf(c.writer, "%s %d:%d: %+v\n",
fileMsg, chunk.Fid.VolumeId, chunk.Fid.FileKey, err)
fmt.Fprintf(c.writer, "%s failed verify fileId %s: %+v\n",
fileMsg, chunk.GetFileIdString(), err)
if itemIsVerifed.Load() {
itemIsVerifed.Store(false)
itemErrCount.Add(1)
}
break
}
}
errItemLock.RLock()
err, _ = errItem[itemPath]
errItemLock.RUnlock()
if err != nil {
errCount++
continue
if itemIsVerifed.Load() {
if *c.verbose {
fmt.Fprintf(c.writer, "%s needles:%d verifed\n", fileMsg, len(i.chunks))
}
fileCount++
}
if *c.verbose {
fmt.Fprintf(c.writer, "%s needles:%d verifed\n", fileMsg, len(i.chunks))
}
fileCount++
}
wg.Wait()
errCount = itemErrCount.Load()
})
}