seaweedfs/weed/command/benchmark.go

603 lines
16 KiB
Go
Raw Normal View History

package command
2014-03-10 10:42:50 +08:00
import (
"bufio"
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb"
2014-03-10 10:42:50 +08:00
"io"
2014-03-10 14:12:05 +08:00
"math"
"math/rand"
2014-03-10 10:42:50 +08:00
"os"
"runtime"
"runtime/pprof"
"sort"
2014-03-10 10:42:50 +08:00
"sync"
"time"
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
2019-02-15 16:09:19 +08:00
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/wdclient"
2014-03-10 10:42:50 +08:00
)
type BenchmarkOptions struct {
masters *string
2014-03-19 19:44:59 +08:00
concurrency *int
numberOfFiles *int
fileSize *int
idListFile *string
write *bool
deletePercentage *int
read *bool
sequentialRead *bool
collection *string
2019-03-20 16:38:11 +08:00
replication *string
2020-12-17 01:14:05 +08:00
diskType *string
2014-03-19 19:44:59 +08:00
cpuprofile *string
maxCpu *int
2019-02-19 04:11:52 +08:00
grpcDialOption grpc.DialOption
2019-03-30 13:53:35 +08:00
masterClient *wdclient.MasterClient
2020-04-30 23:31:08 +08:00
fsync *bool
2021-03-07 06:26:24 +08:00
useTcp *bool
2014-03-10 10:42:50 +08:00
}
var (
2019-03-30 13:53:35 +08:00
b BenchmarkOptions
sharedBytes []byte
isSecure bool
2014-03-10 10:42:50 +08:00
)
func init() {
2019-01-17 09:17:19 +08:00
cmdBenchmark.Run = runBenchmark // break init cycle
2014-03-10 10:42:50 +08:00
cmdBenchmark.IsDebug = cmdBenchmark.Flag.Bool("debug", false, "verbose debug information")
b.masters = cmdBenchmark.Flag.String("master", "localhost:9333", "SeaweedFS master location")
b.concurrency = cmdBenchmark.Flag.Int("c", 16, "number of concurrent write or read processes")
2014-03-19 19:44:59 +08:00
b.fileSize = cmdBenchmark.Flag.Int("size", 1024, "simulated file size in bytes, with random(0~63) bytes padding")
2014-03-10 10:42:50 +08:00
b.numberOfFiles = cmdBenchmark.Flag.Int("n", 1024*1024, "number of files to write for each thread")
b.idListFile = cmdBenchmark.Flag.String("list", os.TempDir()+"/benchmark_list.txt", "list of uploaded file ids")
b.write = cmdBenchmark.Flag.Bool("write", true, "enable write")
2014-03-19 19:44:59 +08:00
b.deletePercentage = cmdBenchmark.Flag.Int("deletePercent", 0, "the percent of writes that are deletes")
2014-03-10 10:42:50 +08:00
b.read = cmdBenchmark.Flag.Bool("read", true, "enable read")
b.sequentialRead = cmdBenchmark.Flag.Bool("readSequentially", false, "randomly read by ids from \"-list\" specified file")
b.collection = cmdBenchmark.Flag.String("collection", "benchmark", "write data to this collection")
2019-03-20 16:38:11 +08:00
b.replication = cmdBenchmark.Flag.String("replication", "000", "replication type")
2021-02-22 18:03:12 +08:00
b.diskType = cmdBenchmark.Flag.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag")
b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file")
b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
2020-04-30 23:31:08 +08:00
b.fsync = cmdBenchmark.Flag.Bool("fsync", false, "flush data to disk after write")
2021-03-07 06:26:24 +08:00
b.useTcp = cmdBenchmark.Flag.Bool("useTcp", false, "send data via tcp")
sharedBytes = make([]byte, 1024)
2014-03-10 10:42:50 +08:00
}
var cmdBenchmark = &Command{
2019-10-14 20:23:32 +08:00
UsageLine: "benchmark -master=localhost:9333 -c=10 -n=100000",
2014-03-10 10:42:50 +08:00
Short: "benchmark on writing millions of files and read out",
Long: `benchmark on an empty SeaweedFS file system.
2014-03-10 10:42:50 +08:00
Two tests during benchmark:
1) write lots of small files to the system
2) read the files out
2014-03-10 10:42:50 +08:00
The file content is mostly zero, but no compression is done.
2014-03-10 10:42:50 +08:00
You can choose to only benchmark read or write.
During write, the list of uploaded file ids is stored in "-list" specified file.
You can also use your own list of file ids to run read test.
2014-03-10 10:42:50 +08:00
Write speed and read speed will be collected.
The numbers are used to get a sense of the system.
2014-03-14 03:11:26 +08:00
Usually your network or the hard drive is the real bottleneck.
2014-03-14 03:11:26 +08:00
Another thing to watch is whether the volumes are evenly distributed
to each volume server. Because the 7 more benchmark volumes are randomly distributed
to servers with free slots, it's highly possible some servers have uneven amount of
benchmark volumes. To remedy this, you can use this to grow the benchmark volumes
2014-03-14 03:11:26 +08:00
before starting the benchmark command:
http://localhost:9333/vol/grow?collection=benchmark&count=5
2014-03-10 10:42:50 +08:00
After benchmarking, you can clean up the written data by deleting the benchmark collection
http://localhost:9333/col/delete?collection=benchmark
2014-03-10 10:42:50 +08:00
`,
}
var (
wait sync.WaitGroup
writeStats *stats
readStats *stats
2014-03-10 10:42:50 +08:00
)
2019-01-17 09:17:19 +08:00
func runBenchmark(cmd *Command, args []string) bool {
2019-02-19 04:11:52 +08:00
util.LoadConfiguration("security", false)
b.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
2019-02-19 04:11:52 +08:00
2020-06-02 15:10:35 +08:00
fmt.Printf("This is SeaweedFS version %s %s %s\n", util.Version(), runtime.GOOS, runtime.GOARCH)
if *b.maxCpu < 1 {
*b.maxCpu = runtime.NumCPU()
}
runtime.GOMAXPROCS(*b.maxCpu)
if *b.cpuprofile != "" {
f, err := os.Create(*b.cpuprofile)
if err != nil {
glog.Fatal(err)
}
pprof.StartCPUProfile(f)
defer pprof.StopCPUProfile()
}
b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "client", "", "", pb.ServerAddresses(*b.masters).ToAddressMap())
go b.masterClient.KeepConnectedToMaster()
2019-03-30 13:53:35 +08:00
b.masterClient.WaitUntilConnected()
2014-03-10 10:42:50 +08:00
if *b.write {
2019-01-17 09:17:19 +08:00
benchWrite()
2014-03-10 10:42:50 +08:00
}
if *b.read {
2019-01-17 09:17:19 +08:00
benchRead()
2014-03-10 10:42:50 +08:00
}
return true
}
2019-01-17 09:17:19 +08:00
func benchWrite() {
fileIdLineChan := make(chan string)
finishChan := make(chan bool)
writeStats = newStats(*b.concurrency)
idChan := make(chan int)
go writeFileIds(*b.idListFile, fileIdLineChan, finishChan)
for i := 0; i < *b.concurrency; i++ {
wait.Add(1)
go writeFiles(idChan, fileIdLineChan, &writeStats.localStats[i])
}
writeStats.start = time.Now()
2014-03-19 19:44:59 +08:00
writeStats.total = *b.numberOfFiles
go writeStats.checkProgress("Writing Benchmark", finishChan)
for i := 0; i < *b.numberOfFiles; i++ {
idChan <- i
}
close(idChan)
wait.Wait()
writeStats.end = time.Now()
wait.Add(2)
finishChan <- true
finishChan <- true
wait.Wait()
close(finishChan)
writeStats.printStats()
}
2019-01-17 09:17:19 +08:00
func benchRead() {
fileIdLineChan := make(chan string)
finishChan := make(chan bool)
readStats = newStats(*b.concurrency)
go readFileIds(*b.idListFile, fileIdLineChan)
readStats.start = time.Now()
2014-03-19 19:44:59 +08:00
readStats.total = *b.numberOfFiles
go readStats.checkProgress("Randomly Reading Benchmark", finishChan)
for i := 0; i < *b.concurrency; i++ {
wait.Add(1)
go readFiles(fileIdLineChan, &readStats.localStats[i])
}
wait.Wait()
wait.Add(1)
finishChan <- true
wait.Wait()
close(finishChan)
readStats.end = time.Now()
readStats.printStats()
}
2014-03-21 04:58:56 +08:00
type delayedFile struct {
enterTime time.Time
fp *operation.FilePart
}
func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
defer wait.Done()
2014-03-24 12:56:24 +08:00
delayedDeleteChan := make(chan *delayedFile, 100)
2014-03-21 04:30:34 +08:00
var waitForDeletions sync.WaitGroup
2015-02-08 07:35:28 +08:00
2014-03-21 04:30:34 +08:00
for i := 0; i < 7; i++ {
waitForDeletions.Add(1)
2014-03-21 04:30:34 +08:00
go func() {
defer waitForDeletions.Done()
2014-03-24 12:56:24 +08:00
for df := range delayedDeleteChan {
if df.enterTime.After(time.Now()) {
time.Sleep(df.enterTime.Sub(time.Now()))
2014-03-21 04:58:56 +08:00
}
2019-02-15 16:09:19 +08:00
var jwtAuthorization security.EncodedJwt
if isSecure {
2021-08-13 12:40:33 +08:00
jwtAuthorization = operation.LookupJwt(b.masterClient.GetMaster(), b.grpcDialOption, df.fp.Fid)
2019-02-15 16:09:19 +08:00
}
if e := util.Delete(fmt.Sprintf("http://%s/%s", df.fp.Server, df.fp.Fid), string(jwtAuthorization)); e == nil {
2014-03-21 04:30:34 +08:00
s.completed++
} else {
s.failed++
}
}
}()
}
random := rand.New(rand.NewSource(time.Now().UnixNano()))
2021-03-07 06:26:24 +08:00
volumeTcpClient := wdclient.NewVolumeTcpClient()
for id := range idChan {
start := time.Now()
fileSize := int64(*b.fileSize + random.Intn(64))
fp := &operation.FilePart{
Reader: &FakeReader{id: uint64(id), size: fileSize, random: random},
FileSize: fileSize,
MimeType: "image/bench", // prevent gzip benchmark content
2020-05-10 18:50:30 +08:00
Fsync: *b.fsync,
}
2016-06-26 10:50:18 +08:00
ar := &operation.VolumeAssignRequest{
2019-03-20 16:38:11 +08:00
Count: 1,
Collection: *b.collection,
Replication: *b.replication,
2020-12-17 01:14:05 +08:00
DiskType: *b.diskType,
2016-06-26 10:50:18 +08:00
}
if assignResult, err := operation.Assign(b.masterClient.GetMaster, b.grpcDialOption, ar); err == nil {
fp.Server, fp.Fid, fp.Collection = assignResult.Url, assignResult.Fid, *b.collection
2019-02-15 16:09:19 +08:00
if !isSecure && assignResult.Auth != "" {
isSecure = true
}
2021-03-07 06:26:24 +08:00
if *b.useTcp {
if uploadByTcp(volumeTcpClient, fp) {
fileIdLineChan <- fp.Fid
s.completed++
s.transferred += fileSize
} else {
s.failed++
}
} else if _, err := fp.Upload(0, b.masterClient.GetMaster, false, assignResult.Auth, b.grpcDialOption); err == nil {
if random.Intn(100) < *b.deletePercentage {
s.total++
delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp}
} else {
fileIdLineChan <- fp.Fid
2014-03-10 10:42:50 +08:00
}
s.completed++
s.transferred += fileSize
2014-03-10 10:42:50 +08:00
} else {
2014-03-10 14:12:05 +08:00
s.failed++
fmt.Printf("Failed to write with error:%v\n", err)
}
writeStats.addSample(time.Now().Sub(start))
if *cmdBenchmark.IsDebug {
fmt.Printf("writing %d file %s\n", id, fp.Fid)
2014-03-10 10:42:50 +08:00
}
} else {
s.failed++
println("writing file error:", err.Error())
2014-03-10 10:42:50 +08:00
}
}
2014-03-24 12:56:24 +08:00
close(delayedDeleteChan)
2014-03-21 04:30:34 +08:00
waitForDeletions.Wait()
2014-03-10 10:42:50 +08:00
}
func readFiles(fileIdLineChan chan string, s *stat) {
defer wait.Done()
for fid := range fileIdLineChan {
if len(fid) == 0 {
continue
}
if fid[0] == '#' {
continue
}
if *cmdBenchmark.IsDebug {
fmt.Printf("reading file %s\n", fid)
}
start := time.Now()
var bytesRead int
var err error
urls, err := b.masterClient.LookupFileId(fid)
2020-06-20 23:00:25 +08:00
if err != nil {
s.failed++
println("!!!! ", fid, " location not found!!!!!")
continue
}
2020-06-20 23:00:25 +08:00
var bytes []byte
for _, url := range urls {
bytes, _, err = util.Get(url)
if err == nil {
break
}
}
2020-06-20 23:00:25 +08:00
bytesRead = len(bytes)
if err == nil {
s.completed++
s.transferred += int64(bytesRead)
readStats.addSample(time.Now().Sub(start))
2014-03-10 10:42:50 +08:00
} else {
s.failed++
fmt.Printf("Failed to read %s error:%v\n", fid, err)
2014-03-10 10:42:50 +08:00
}
}
}
func writeFileIds(fileName string, fileIdLineChan chan string, finishChan chan bool) {
file, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
glog.Fatalf("File to create file %s: %s\n", fileName, err)
}
defer file.Close()
for {
select {
case <-finishChan:
wait.Done()
return
case line := <-fileIdLineChan:
file.Write([]byte(line))
file.Write([]byte("\n"))
}
}
}
2021-03-07 06:26:24 +08:00
func uploadByTcp(volumeTcpClient *wdclient.VolumeTcpClient, fp *operation.FilePart) bool {
err := volumeTcpClient.PutFileChunk(fp.Server, fp.Fid, uint32(fp.FileSize), fp.Reader)
if err != nil {
glog.Errorf("upload chunk err: %v", err)
return false
}
return true
}
2014-03-10 10:42:50 +08:00
func readFileIds(fileName string, fileIdLineChan chan string) {
file, err := os.Open(fileName) // For read access.
if err != nil {
glog.Fatalf("File to read file %s: %s\n", fileName, err)
}
defer file.Close()
random := rand.New(rand.NewSource(time.Now().UnixNano()))
2014-03-10 10:42:50 +08:00
r := bufio.NewReader(file)
if *b.sequentialRead {
for {
if line, err := Readln(r); err == nil {
fileIdLineChan <- string(line)
} else {
break
}
}
} else {
2014-03-21 04:58:56 +08:00
lines := make([]string, 0, readStats.total)
for {
if line, err := Readln(r); err == nil {
lines = append(lines, string(line))
} else {
break
}
}
2014-03-19 19:44:59 +08:00
if len(lines) > 0 {
2014-03-21 04:58:56 +08:00
for i := 0; i < readStats.total; i++ {
fileIdLineChan <- lines[random.Intn(len(lines))]
2014-03-19 19:44:59 +08:00
}
2014-03-10 10:42:50 +08:00
}
}
2014-03-10 10:42:50 +08:00
close(fileIdLineChan)
}
2014-03-10 14:12:05 +08:00
const (
2020-06-20 23:00:25 +08:00
benchResolution = 10000 // 0.1 microsecond
2014-03-10 14:12:05 +08:00
benchBucket = 1000000000 / benchResolution
)
// An efficient statics collecting and rendering
2014-03-10 10:42:50 +08:00
type stats struct {
data []int
overflow []int
localStats []stat
start time.Time
end time.Time
total int
}
type stat struct {
2014-03-10 14:12:05 +08:00
completed int
failed int
2014-03-19 19:44:59 +08:00
total int
2014-03-10 14:12:05 +08:00
transferred int64
2014-03-10 10:42:50 +08:00
}
2014-03-10 14:12:05 +08:00
var percentages = []int{50, 66, 75, 80, 90, 95, 98, 99, 100}
func newStats(n int) *stats {
return &stats{
data: make([]int, benchResolution),
overflow: make([]int, 0),
localStats: make([]stat, n),
}
2014-03-10 10:42:50 +08:00
}
func (s *stats) addSample(d time.Duration) {
index := int(d / benchBucket)
if index < 0 {
fmt.Printf("This request takes %3.1f seconds, skipping!\n", float64(index)/10000)
} else if index < len(s.data) {
s.data[int(d/benchBucket)]++
} else {
s.overflow = append(s.overflow, index)
}
2014-03-10 10:42:50 +08:00
}
2014-03-10 14:12:05 +08:00
func (s *stats) checkProgress(testName string, finishChan chan bool) {
2014-03-10 14:12:05 +08:00
fmt.Printf("\n------------ %s ----------\n", testName)
ticker := time.Tick(time.Second)
lastCompleted, lastTransferred, lastTime := 0, int64(0), time.Now()
for {
select {
case <-finishChan:
wait.Done()
return
case t := <-ticker:
completed, transferred, taken, total := 0, int64(0), t.Sub(lastTime), s.total
for _, localStat := range s.localStats {
completed += localStat.completed
transferred += localStat.transferred
total += localStat.total
}
fmt.Printf("Completed %d of %d requests, %3.1f%% %3.1f/s %3.1fMB/s\n",
completed, total, float64(completed)*100/float64(total),
float64(completed-lastCompleted)*float64(int64(time.Second))/float64(int64(taken)),
float64(transferred-lastTransferred)*float64(int64(time.Second))/float64(int64(taken))/float64(1024*1024),
)
lastCompleted, lastTransferred, lastTime = completed, transferred, t
}
}
}
func (s *stats) printStats() {
completed, failed, transferred, total := 0, 0, int64(0), s.total
for _, localStat := range s.localStats {
completed += localStat.completed
failed += localStat.failed
transferred += localStat.transferred
total += localStat.total
}
2014-03-10 14:12:05 +08:00
timeTaken := float64(int64(s.end.Sub(s.start))) / 1000000000
2014-03-11 04:22:08 +08:00
fmt.Printf("\nConcurrency Level: %d\n", *b.concurrency)
2014-03-10 14:12:05 +08:00
fmt.Printf("Time taken for tests: %.3f seconds\n", timeTaken)
fmt.Printf("Complete requests: %d\n", completed)
fmt.Printf("Failed requests: %d\n", failed)
fmt.Printf("Total transferred: %d bytes\n", transferred)
fmt.Printf("Requests per second: %.2f [#/sec]\n", float64(completed)/timeTaken)
fmt.Printf("Transfer rate: %.2f [Kbytes/sec]\n", float64(transferred)/1024/timeTaken)
2014-03-10 14:12:05 +08:00
n, sum := 0, 0
min, max := 10000000, 0
for i := 0; i < len(s.data); i++ {
n += s.data[i]
sum += s.data[i] * i
if s.data[i] > 0 {
if min > i {
min = i
}
if max < i {
max = i
}
}
}
n += len(s.overflow)
for i := 0; i < len(s.overflow); i++ {
sum += s.overflow[i]
if min > s.overflow[i] {
min = s.overflow[i]
}
if max < s.overflow[i] {
max = s.overflow[i]
}
}
2014-03-10 14:12:05 +08:00
avg := float64(sum) / float64(n)
varianceSum := 0.0
for i := 0; i < len(s.data); i++ {
if s.data[i] > 0 {
d := float64(i) - avg
varianceSum += d * d * float64(s.data[i])
}
}
for i := 0; i < len(s.overflow); i++ {
d := float64(s.overflow[i]) - avg
varianceSum += d * d
}
2014-03-10 14:12:05 +08:00
std := math.Sqrt(varianceSum / float64(n))
fmt.Printf("\nConnection Times (ms)\n")
fmt.Printf(" min avg max std\n")
fmt.Printf("Total: %2.1f %3.1f %3.1f %3.1f\n", float32(min)/10, float32(avg)/10, float32(max)/10, std/10)
2020-06-20 23:00:25 +08:00
// printing percentiles
2014-03-10 14:12:05 +08:00
fmt.Printf("\nPercentage of the requests served within a certain time (ms)\n")
percentiles := make([]int, len(percentages))
for i := 0; i < len(percentages); i++ {
percentiles[i] = n * percentages[i] / 100
}
percentiles[len(percentiles)-1] = n
percentileIndex := 0
currentSum := 0
for i := 0; i < len(s.data); i++ {
currentSum += s.data[i]
if s.data[i] > 0 && percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
fmt.Printf(" %3d%% %5.1f ms\n", percentages[percentileIndex], float32(i)/10.0)
percentileIndex++
for percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
percentileIndex++
}
2014-03-10 10:42:50 +08:00
}
}
sort.Ints(s.overflow)
for i := 0; i < len(s.overflow); i++ {
currentSum++
if percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
fmt.Printf(" %3d%% %5.1f ms\n", percentages[percentileIndex], float32(s.overflow[i])/10.0)
percentileIndex++
for percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
percentileIndex++
}
}
}
2014-03-10 10:42:50 +08:00
}
// a fake reader to generate content to upload
type FakeReader struct {
id uint64 // an id number
size int64 // max bytes
random *rand.Rand
2014-03-10 10:42:50 +08:00
}
func (l *FakeReader) Read(p []byte) (n int, err error) {
if l.size <= 0 {
return 0, io.EOF
}
if int64(len(p)) > l.size {
n = int(l.size)
} else {
n = len(p)
}
if n >= 8 {
for i := 0; i < 8; i++ {
p[i] = byte(l.id >> uint(i*8))
2014-03-10 10:42:50 +08:00
}
l.random.Read(p[8:])
2014-03-10 10:42:50 +08:00
}
l.size -= int64(n)
return
}
func (l *FakeReader) WriteTo(w io.Writer) (n int64, err error) {
size := int(l.size)
bufferSize := len(sharedBytes)
for size > 0 {
tempBuffer := sharedBytes
if size < bufferSize {
tempBuffer = sharedBytes[0:size]
}
count, e := w.Write(tempBuffer)
if e != nil {
return int64(size), e
}
size -= count
}
return l.size, nil
}
2014-03-10 10:42:50 +08:00
func Readln(r *bufio.Reader) ([]byte, error) {
var (
2015-03-10 15:20:31 +08:00
isPrefix = true
err error
2014-03-10 10:42:50 +08:00
line, ln []byte
)
for isPrefix && err == nil {
line, isPrefix, err = r.ReadLine()
ln = append(ln, line...)
}
return ln, err
}