seaweedfs/weed/command/filer_meta_tail.go

130 lines
4.1 KiB
Go
Raw Normal View History

2020-04-05 15:51:16 +08:00
package command
import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/pb"
2022-08-18 03:05:07 +08:00
"google.golang.org/protobuf/jsonpb"
"os"
2020-08-29 10:43:04 +08:00
"path/filepath"
"strings"
2020-04-28 14:49:46 +08:00
"time"
2020-04-05 15:51:16 +08:00
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/util"
2020-04-05 15:51:16 +08:00
)
func init() {
cmdFilerMetaTail.Run = runFilerMetaTail // break init cycle
2020-04-05 15:51:16 +08:00
}
var cmdFilerMetaTail = &Command{
2021-03-03 15:07:29 +08:00
UsageLine: "filer.meta.tail [-filer=localhost:8888] [-pathPrefix=/]",
2021-03-01 08:21:09 +08:00
Short: "see continuous changes on a filer",
Long: `See continuous changes on a filer.
2020-04-05 15:51:16 +08:00
weed filer.meta.tail -timeAgo=30h | grep truncate
weed filer.meta.tail -timeAgo=30h | jq .
weed filer.meta.tail -timeAgo=30h -untilTimeAgo=20h | jq .
weed filer.meta.tail -timeAgo=30h | jq .eventNotification.newEntry.name
weed filer.meta.tail -timeAgo=30h -es=http://<elasticSearchServerHost>:<port> -es.index=seaweedfs
2020-04-05 15:51:16 +08:00
`,
}
var (
2021-01-13 16:31:19 +08:00
tailFiler = cmdFilerMetaTail.Flag.String("filer", "localhost:8888", "filer hostname:port")
2021-03-03 15:07:29 +08:00
tailTarget = cmdFilerMetaTail.Flag.String("pathPrefix", "/", "path to a folder or common prefix for the folders or files on filer")
2021-01-13 16:31:19 +08:00
tailStart = cmdFilerMetaTail.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
tailStop = cmdFilerMetaTail.Flag.Duration("untilTimeAgo", 0, "read until this time ago. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
2021-01-13 16:31:19 +08:00
tailPattern = cmdFilerMetaTail.Flag.String("pattern", "", "full path or just filename pattern, ex: \"/home/?opher\", \"*.pdf\", see https://golang.org/pkg/path/filepath/#Match ")
esServers = cmdFilerMetaTail.Flag.String("es", "", "comma-separated elastic servers http://<host:port>")
esIndex = cmdFilerMetaTail.Flag.String("es.index", "seaweedfs", "ES index name")
2020-04-05 15:51:16 +08:00
)
func runFilerMetaTail(cmd *Command, args []string) bool {
2020-04-05 15:51:16 +08:00
util.LoadConfiguration("security", false)
2020-04-05 15:51:16 +08:00
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
clientId := util.RandomInt32()
2020-04-05 15:51:16 +08:00
2020-08-29 10:43:04 +08:00
var filterFunc func(dir, fname string) bool
2021-01-13 16:31:19 +08:00
if *tailPattern != "" {
if strings.Contains(*tailPattern, "/") {
println("watch path pattern", *tailPattern)
2020-08-29 10:43:04 +08:00
filterFunc = func(dir, fname string) bool {
2021-01-13 16:31:19 +08:00
matched, err := filepath.Match(*tailPattern, dir+"/"+fname)
2020-08-29 10:43:04 +08:00
if err != nil {
fmt.Printf("error: %v", err)
}
return matched
}
} else {
2021-01-13 16:31:19 +08:00
println("watch file pattern", *tailPattern)
2020-08-29 10:43:04 +08:00
filterFunc = func(dir, fname string) bool {
2021-01-13 16:31:19 +08:00
matched, err := filepath.Match(*tailPattern, fname)
2020-08-29 10:43:04 +08:00
if err != nil {
fmt.Printf("error: %v", err)
}
return matched
}
}
}
shouldPrint := func(resp *filer_pb.SubscribeMetadataResponse) bool {
2022-02-25 17:17:26 +08:00
if filer_pb.IsEmpty(resp) {
2020-08-29 10:43:04 +08:00
return false
}
2021-09-25 16:04:51 +08:00
if filterFunc == nil {
return true
}
2020-08-29 10:43:04 +08:00
if resp.EventNotification.OldEntry != nil && filterFunc(resp.Directory, resp.EventNotification.OldEntry.Name) {
return true
}
if resp.EventNotification.NewEntry != nil && filterFunc(resp.EventNotification.NewParentPath, resp.EventNotification.NewEntry.Name) {
return true
}
return false
}
jsonpbMarshaler := jsonpb.Marshaler{
EmitDefaults: false,
}
2021-01-11 18:08:26 +08:00
eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
jsonpbMarshaler.Marshal(os.Stdout, resp)
fmt.Fprintln(os.Stdout)
2021-01-11 18:08:26 +08:00
return nil
2021-01-11 16:03:13 +08:00
}
if *esServers != "" {
var err error
eachEntryFunc, err = sendToElasticSearchFunc(*esServers, *esIndex)
if err != nil {
fmt.Printf("create elastic search client to %s: %+v\n", *esServers, err)
return false
}
}
2021-01-11 16:03:13 +08:00
var untilTsNs int64
if *tailStop != 0 {
untilTsNs = time.Now().Add(-*tailStop).UnixNano()
}
tailErr := pb.FollowMetadata(pb.ServerAddress(*tailFiler), grpcDialOption, "tail", clientId, 0, *tailTarget, nil,
time.Now().Add(-*tailStart).UnixNano(), untilTsNs, 0, func(resp *filer_pb.SubscribeMetadataResponse) error {
2021-09-01 14:30:28 +08:00
if !shouldPrint(resp) {
return nil
}
if err := eachEntryFunc(resp); err != nil {
return err
}
2021-08-05 07:25:46 +08:00
return nil
}, pb.TrivialOnError)
2020-04-05 15:51:16 +08:00
2021-01-13 16:31:19 +08:00
if tailErr != nil {
fmt.Printf("tail %s: %v\n", *tailFiler, tailErr)
2020-04-05 15:51:16 +08:00
}
return true
}