diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go index c1bc80c42..af0793c70 100644 --- a/weed/command/benchmark.go +++ b/weed/command/benchmark.go @@ -41,6 +41,7 @@ type BenchmarkOptions struct { grpcDialOption grpc.DialOption masterClient *wdclient.MasterClient fsync *bool + useTcp *bool } var ( @@ -67,6 +68,7 @@ func init() { b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file") b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") b.fsync = cmdBenchmark.Flag.Bool("fsync", false, "flush data to disk after write") + b.useTcp = cmdBenchmark.Flag.Bool("useTcp", false, "send data via tcp") sharedBytes = make([]byte, 1024) } @@ -223,6 +225,8 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) { random := rand.New(rand.NewSource(time.Now().UnixNano())) + volumeTcpClient := wdclient.NewVolumeTcpClient() + for id := range idChan { start := time.Now() fileSize := int64(*b.fileSize + random.Intn(64)) @@ -243,7 +247,15 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) { if !isSecure && assignResult.Auth != "" { isSecure = true } - if _, err := fp.Upload(0, b.masterClient.GetMaster, false, assignResult.Auth, b.grpcDialOption); err == nil { + if *b.useTcp { + if uploadByTcp(volumeTcpClient, fp) { + fileIdLineChan <- fp.Fid + s.completed++ + s.transferred += fileSize + } else { + s.failed++ + } + } else if _, err := fp.Upload(0, b.masterClient.GetMaster, false, assignResult.Auth, b.grpcDialOption); err == nil { if random.Intn(100) < *b.deletePercentage { s.total++ delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp} @@ -329,6 +341,17 @@ func writeFileIds(fileName string, fileIdLineChan chan string, finishChan chan b } } +func uploadByTcp(volumeTcpClient *wdclient.VolumeTcpClient, fp *operation.FilePart) bool { + + err := volumeTcpClient.PutFileChunk(fp.Server, fp.Fid, uint32(fp.FileSize), fp.Reader) + if err != nil { + glog.Errorf("upload chunk err: %v", err) + return false + } + + return true +} + func readFileIds(fileName string, fileIdLineChan chan string) { file, err := os.Open(fileName) // For read access. if err != nil { diff --git a/weed/wdclient/volume_tcp_client.go b/weed/wdclient/volume_tcp_client.go new file mode 100644 index 000000000..afebd71eb --- /dev/null +++ b/weed/wdclient/volume_tcp_client.go @@ -0,0 +1,97 @@ +package wdclient + +import ( + "bufio" + "bytes" + "fmt" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/wdclient/net2" + "io" + "net" + "time" +) + +// VolumeTcpClient put/get/delete file chunks directly on volume servers without replication +type VolumeTcpClient struct { + cp net2.ConnectionPool +} + +type VolumeTcpConn struct { + net.Conn + bufWriter *bufio.Writer + bufReader *bufio.Reader +} + +func NewVolumeTcpClient() *VolumeTcpClient { + MaxIdleTime := 10 * time.Second + return &VolumeTcpClient{ + cp: net2.NewMultiConnectionPool(net2.ConnectionOptions{ + MaxActiveConnections: 16, + MaxIdleConnections: 1, + MaxIdleTime: &MaxIdleTime, + DialMaxConcurrency: 0, + Dial: func(network string, address string) (net.Conn, error) { + conn, err := net.Dial(network, address) + return &VolumeTcpConn{ + conn, + bufio.NewWriter(conn), + bufio.NewReader(conn), + }, err + }, + NowFunc: nil, + ReadTimeout: 0, + WriteTimeout: 0, + }), + } +} +func (c *VolumeTcpClient) PutFileChunk(volumeServerAddress string, fileId string, fileSize uint32, fileReader io.Reader) (err error) { + + tcpAddress, parseErr := pb.ParseServerAddress(volumeServerAddress, 20000) + if parseErr != nil { + return parseErr + } + + c.cp.Register("tcp", tcpAddress) + tcpConn, getErr := c.cp.Get("tcp", tcpAddress) + if getErr != nil { + return fmt.Errorf("get connection to %s: %v", tcpAddress, getErr) + } + conn := tcpConn.RawConn().(*VolumeTcpConn) + defer func() { + if err != nil { + tcpConn.DiscardConnection() + } else { + tcpConn.ReleaseConnection() + } + }() + + buf := []byte("+" + fileId + "\n") + _, err = conn.bufWriter.Write([]byte(buf)) + if err != nil { + return + } + util.Uint32toBytes(buf[0:4], fileSize) + _, err = conn.bufWriter.Write(buf[0:4]) + if err != nil { + return + } + _, err = io.Copy(conn.bufWriter, fileReader) + if err != nil { + return + } + conn.bufWriter.Write([]byte("!\n")) + conn.bufWriter.Flush() + + ret, _, err := conn.bufReader.ReadLine() + if err != nil { + glog.V(0).Infof("upload by tcp: %v", err) + return + } + if !bytes.HasPrefix(ret, []byte("+OK")) { + glog.V(0).Infof("upload by tcp: %v", string(ret)) + } + + return nil +}