2018-08-23 14:54:22 +08:00
package command
import (
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
2018-11-04 03:43:45 +08:00
"github.com/chrislusf/seaweedfs/weed/notification"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
2018-11-05 03:59:08 +08:00
"github.com/chrislusf/seaweedfs/weed/server"
"github.com/spf13/viper"
2018-08-23 14:54:22 +08:00
)
func init ( ) {
cmdFilerExport . Run = runFilerExport // break init cycle
}
var cmdFilerExport = & Command {
UsageLine : "filer.export -sourceStore=mysql -targetStroe=cassandra" ,
Short : "export meta data in filer store" ,
Long : ` Iterate the file tree and export all metadata out
Both source and target store :
* should be a store name already specified in filer . toml
* do not need to be enabled state
If target store is empty , only the directory tree will be listed .
2018-11-04 03:43:45 +08:00
If target store is "notification" , the list of entries will be sent to notification .
This is usually used to bootstrap filer replication to a remote system .
2018-08-23 14:54:22 +08:00
` ,
}
var (
// filerExportOutputFile = cmdFilerExport.Flag.String("output", "", "the output file. If empty, only list out the directory tree")
2018-11-05 04:59:29 +08:00
filerExportSourceStore = cmdFilerExport . Flag . String ( "sourceStore" , "" , "the source store name in filer.toml, default to currently enabled store" )
2018-11-04 03:43:45 +08:00
filerExportTargetStore = cmdFilerExport . Flag . String ( "targetStore" , "" , "the target store name in filer.toml, or \"notification\" to export all files to message queue" )
2018-11-05 04:07:33 +08:00
dir = cmdFilerExport . Flag . String ( "dir" , "/" , "only process files under this directory" )
2018-09-17 15:42:36 +08:00
dirListLimit = cmdFilerExport . Flag . Int ( "dirListLimit" , 100000 , "limit directory list size" )
2018-11-04 03:43:45 +08:00
dryRun = cmdFilerExport . Flag . Bool ( "dryRun" , false , "not actually moving data" )
2018-08-23 14:54:22 +08:00
)
type statistics struct {
directoryCount int
fileCount int
}
func runFilerExport ( cmd * Command , args [ ] string ) bool {
weed_server . LoadConfiguration ( "filer" , true )
config := viper . GetViper ( )
2018-08-23 15:02:04 +08:00
var sourceStore , targetStore filer2 . FilerStore
2018-08-23 14:54:22 +08:00
for _ , store := range filer2 . Stores {
2018-11-05 04:59:29 +08:00
if store . GetName ( ) == * filerExportSourceStore || * filerExportSourceStore == "" && config . GetBool ( store . GetName ( ) + ".enabled" ) {
2018-08-23 14:54:22 +08:00
viperSub := config . Sub ( store . GetName ( ) )
if err := store . Initialize ( viperSub ) ; err != nil {
2018-11-04 03:43:45 +08:00
glog . Fatalf ( "Failed to initialize source store for %s: %+v" ,
2018-08-23 14:54:22 +08:00
store . GetName ( ) , err )
} else {
sourceStore = store
}
break
}
}
2018-08-23 15:02:04 +08:00
for _ , store := range filer2 . Stores {
if store . GetName ( ) == * filerExportTargetStore {
viperSub := config . Sub ( store . GetName ( ) )
if err := store . Initialize ( viperSub ) ; err != nil {
2018-11-04 03:43:45 +08:00
glog . Fatalf ( "Failed to initialize target store for %s: %+v" ,
2018-08-23 15:02:04 +08:00
store . GetName ( ) , err )
} else {
targetStore = store
}
break
}
}
2018-08-23 14:54:22 +08:00
if sourceStore == nil {
glog . Errorf ( "Failed to find source store %s" , * filerExportSourceStore )
println ( "existing data sources are:" )
for _ , store := range filer2 . Stores {
println ( " " + store . GetName ( ) )
}
return false
}
2018-11-04 03:43:45 +08:00
if targetStore == nil && * filerExportTargetStore != "" && * filerExportTargetStore != "notification" {
glog . Errorf ( "Failed to find target store %s" , * filerExportTargetStore )
println ( "existing data sources are:" )
for _ , store := range filer2 . Stores {
println ( " " + store . GetName ( ) )
}
return false
}
2018-08-23 14:54:22 +08:00
stat := statistics { }
2018-08-23 15:02:04 +08:00
var fn func ( level int , entry * filer2 . Entry ) error
2018-11-04 03:43:45 +08:00
if * filerExportTargetStore == "notification" {
weed_server . LoadConfiguration ( "notification" , false )
v := viper . GetViper ( )
notification . LoadConfiguration ( v . Sub ( "notification" ) )
fn = func ( level int , entry * filer2 . Entry ) error {
printout ( level , entry )
if * dryRun {
return nil
}
return notification . Queue . SendMessage (
string ( entry . FullPath ) ,
& filer_pb . EventNotification {
NewEntry : entry . ToProtoEntry ( ) ,
} ,
)
}
} else if targetStore == nil {
2018-08-23 15:02:04 +08:00
fn = printout
} else {
fn = func ( level int , entry * filer2 . Entry ) error {
2018-11-04 03:43:45 +08:00
printout ( level , entry )
if * dryRun {
return nil
}
2018-08-23 15:02:04 +08:00
return targetStore . InsertEntry ( entry )
}
}
2018-11-05 04:07:33 +08:00
doTraverse ( & stat , sourceStore , filer2 . FullPath ( * dir ) , 0 , fn )
2018-08-23 14:54:22 +08:00
glog . Infof ( "processed %d directories, %d files" , stat . directoryCount , stat . fileCount )
return true
}
func doTraverse ( stat * statistics , filerStore filer2 . FilerStore , parentPath filer2 . FullPath , level int , fn func ( level int , entry * filer2 . Entry ) error ) {
2018-09-17 15:42:36 +08:00
limit := * dirListLimit
2018-08-23 14:54:22 +08:00
lastEntryName := ""
for {
entries , err := filerStore . ListDirectoryEntries ( parentPath , lastEntryName , false , limit )
if err != nil {
break
}
for _ , entry := range entries {
if fnErr := fn ( level , entry ) ; fnErr != nil {
glog . Errorf ( "failed to process entry: %s" , entry . FullPath )
}
if entry . IsDirectory ( ) {
stat . directoryCount ++
doTraverse ( stat , filerStore , entry . FullPath , level + 1 , fn )
} else {
stat . fileCount ++
}
}
if len ( entries ) < limit {
break
}
}
}
func printout ( level int , entry * filer2 . Entry ) error {
for i := 0 ; i < level ; i ++ {
2018-11-04 03:43:45 +08:00
if i == level - 1 {
print ( "+-" )
} else {
print ( "| " )
}
2018-08-23 14:54:22 +08:00
}
println ( entry . FullPath . Name ( ) )
return nil
}