diff --git a/go/operation/assign_file_id.go b/go/operation/assign_file_id.go new file mode 100644 index 000000000..a6680cbb8 --- /dev/null +++ b/go/operation/assign_file_id.go @@ -0,0 +1,40 @@ +package operation + +import ( + "code.google.com/p/weed-fs/go/glog" + "code.google.com/p/weed-fs/go/util" + "encoding/json" + "errors" + "net/url" + "strconv" +) + +type AssignResult struct { + Fid string `json:"fid"` + Url string `json:"url"` + PublicUrl string `json:"publicUrl"` + Count int + Error string `json:"error"` +} + +func Assign(server string, count int, replication string) (*AssignResult, error) { + values := make(url.Values) + values.Add("count", strconv.Itoa(count)) + if replication != "" { + values.Add("replication", replication) + } + jsonBlob, err := util.Post("http://"+server+"/dir/assign", values) + glog.V(2).Info("assign result :", string(jsonBlob)) + if err != nil { + return nil, err + } + var ret AssignResult + err = json.Unmarshal(jsonBlob, &ret) + if err != nil { + return nil, err + } + if ret.Count <= 0 { + return nil, errors.New(ret.Error) + } + return &ret, nil +} diff --git a/go/operation/submit.go b/go/operation/submit.go new file mode 100644 index 000000000..b429ca02a --- /dev/null +++ b/go/operation/submit.go @@ -0,0 +1,91 @@ +package operation + +import ( + "code.google.com/p/weed-fs/go/glog" + "io" + "mime" + "os" + "path" + "path/filepath" + "strconv" + "strings" +) + +type SubmitResult struct { + FileName string `json:"fileName"` + FileUrl string `json:"fileUrl"` + Fid string `json:"fid"` + Size int `json:"size"` + Error string `json:"error"` +} + +func Submit(master string, reader io.Reader, replication string) (result SubmitResult, err error) { + assignResult, assignError := Assign(master, 1, replication) + if assignError != nil { + result.Error = assignError.Error() + return + } + url := "http://" + assignResult.PublicUrl + "/" + assignResult.Fid + uploadResult, uploadError := Upload(url, "", reader, false, "") + if uploadError != nil { + result.Error = uploadError.Error() + return + } + result.Size = uploadResult.Size + result.FileUrl = url + result.Fid = assignResult.Fid + return result, nil +} + +func SubmitFiles(master string, files []string, replication string) ([]SubmitResult, error) { + results := make([]SubmitResult, len(files)) + for index, file := range files { + results[index].FileName = file + } + ret, err := Assign(master, len(files), replication) + if err != nil { + for index, _ := range files { + results[index].Error = err.Error() + } + return results, err + } + for index, file := range files { + fid := ret.Fid + if index > 0 { + fid = fid + "_" + strconv.Itoa(index) + } + results[index].Size, err = upload(file, ret.PublicUrl, fid) + if err != nil { + fid = "" + results[index].Error = err.Error() + } + results[index].Fid = fid + results[index].FileUrl = ret.PublicUrl + "/" + fid + } + return results, nil +} + +func upload(filename string, server string, fid string) (int, error) { + glog.V(2).Info("Start uploading file:", filename) + fh, err := os.Open(filename) + if err != nil { + glog.V(0).Info("Failed to open file: ", filename) + return 0, err + } + fi, fiErr := fh.Stat() + if fiErr != nil { + glog.V(0).Info("Failed to stat file:", filename) + return 0, fiErr + } + filename = path.Base(filename) + isGzipped := path.Ext(filename) == ".gz" + if isGzipped { + filename = filename[0 : len(filename)-3] + } + mtype := mime.TypeByExtension(strings.ToLower(filepath.Ext(filename))) + ret, e := Upload("http://"+server+"/"+fid+"?ts="+strconv.Itoa(int(fi.ModTime().Unix())), filename, fh, isGzipped, mtype) + if e != nil { + return 0, e + } + return ret.Size, e +} diff --git a/go/operation/upload_content.go b/go/operation/upload_content.go index bed5e6239..ae4cf0cc3 100644 --- a/go/operation/upload_content.go +++ b/go/operation/upload_content.go @@ -2,12 +2,12 @@ package operation import ( "bytes" + "code.google.com/p/weed-fs/go/glog" "encoding/json" "errors" "fmt" "io" "io/ioutil" - "code.google.com/p/weed-fs/go/glog" "mime" "mime/multipart" "net/http" @@ -24,6 +24,12 @@ type UploadResult struct { var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"") func Upload(uploadUrl string, filename string, reader io.Reader, isGzipped bool, mtype string) (*UploadResult, error) { + return upload_content(uploadUrl, func(w io.Writer) (err error) { + _, err = io.Copy(w, reader) + return + }, filename, isGzipped, mtype) +} +func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error, filename string, isGzipped bool, mtype string) (*UploadResult, error) { body_buf := bytes.NewBufferString("") body_writer := multipart.NewWriter(body_buf) h := make(textproto.MIMEHeader) @@ -31,7 +37,9 @@ func Upload(uploadUrl string, filename string, reader io.Reader, isGzipped bool, if mtype == "" { mtype = mime.TypeByExtension(strings.ToLower(filepath.Ext(filename))) } - h.Set("Content-Type", mtype) + if mtype != "" { + h.Set("Content-Type", mtype) + } if isGzipped { h.Set("Content-Encoding", "gzip") } @@ -40,7 +48,7 @@ func Upload(uploadUrl string, filename string, reader io.Reader, isGzipped bool, glog.V(0).Infoln("error creating form file", err) return nil, err } - if _, err = io.Copy(file_writer, reader); err != nil { + if err = fillBufferFunction(file_writer); err != nil { glog.V(0).Infoln("error copying data", err) return nil, err } diff --git a/go/weed/master.go b/go/weed/master.go index 3cec1e5d3..6d01ef3bc 100644 --- a/go/weed/master.go +++ b/go/weed/master.go @@ -257,7 +257,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st } debug("assigning file id for", fname) - assignResult, ae := Assign(masterUrl, 1) + assignResult, ae := operation.Assign(masterUrl, 1, r.FormValue("replication")) if ae != nil { writeJsonError(w, r, ae) return diff --git a/go/weed/upload.go b/go/weed/upload.go index 54c566ca7..8c0469595 100644 --- a/go/weed/upload.go +++ b/go/weed/upload.go @@ -2,17 +2,10 @@ package main import ( "code.google.com/p/weed-fs/go/operation" - "code.google.com/p/weed-fs/go/util" "encoding/json" - "errors" "fmt" - "mime" - "net/url" "os" - "path" "path/filepath" - "strconv" - "strings" ) var ( @@ -50,97 +43,6 @@ var cmdUpload = &Command{ `, } -type AssignResult struct { - Fid string `json:"fid"` - Url string `json:"url"` - PublicUrl string `json:"publicUrl"` - Count int - Error string `json:"error"` -} - -func Assign(server string, count int) (*AssignResult, error) { - values := make(url.Values) - values.Add("count", strconv.Itoa(count)) - if *uploadReplication != "" { - values.Add("replication", *uploadReplication) - } - jsonBlob, err := util.Post("http://"+server+"/dir/assign", values) - debug("assign result :", string(jsonBlob)) - if err != nil { - return nil, err - } - var ret AssignResult - err = json.Unmarshal(jsonBlob, &ret) - if err != nil { - return nil, err - } - if ret.Count <= 0 { - return nil, errors.New(ret.Error) - } - return &ret, nil -} - -func upload(filename string, server string, fid string) (int, error) { - debug("Start uploading file:", filename) - fh, err := os.Open(filename) - if err != nil { - debug("Failed to open file:", filename) - return 0, err - } - fi, fiErr := fh.Stat() - if fiErr != nil { - debug("Failed to stat file:", filename) - return 0, fiErr - } - filename = path.Base(filename) - isGzipped := path.Ext(filename) == ".gz" - if isGzipped { - filename = filename[0 : len(filename)-3] - } - mtype := mime.TypeByExtension(strings.ToLower(filepath.Ext(filename))) - ret, e := operation.Upload("http://"+server+"/"+fid+"?ts="+strconv.Itoa(int(fi.ModTime().Unix())), filename, fh, isGzipped, mtype) - if e != nil { - return 0, e - } - return ret.Size, e -} - -type SubmitResult struct { - FileName string `json:"fileName"` - FileUrl string `json:"fileUrl"` - Fid string `json:"fid"` - Size int `json:"size"` - Error string `json:"error"` -} - -func submit(files []string) ([]SubmitResult, error) { - results := make([]SubmitResult, len(files)) - for index, file := range files { - results[index].FileName = file - } - ret, err := Assign(*server, len(files)) - if err != nil { - for index, _ := range files { - results[index].Error = err.Error() - } - return results, err - } - for index, file := range files { - fid := ret.Fid - if index > 0 { - fid = fid + "_" + strconv.Itoa(index) - } - results[index].Size, err = upload(file, ret.PublicUrl, fid) - if err != nil { - fid = "" - results[index].Error = err.Error() - } - results[index].Fid = fid - results[index].FileUrl = ret.PublicUrl + "/" + fid - } - return results, nil -} - func runUpload(cmd *Command, args []string) bool { if len(cmdUpload.Flag.Args()) == 0 { if *uploadDir == "" { @@ -154,7 +56,7 @@ func runUpload(cmd *Command, args []string) bool { return nil } } - results, e := submit([]string{path}) + results, e := operation.SubmitFiles(*server, []string{path}, *uploadReplication) bytes, _ := json.Marshal(results) fmt.Println(string(bytes)) if e != nil { @@ -167,7 +69,7 @@ func runUpload(cmd *Command, args []string) bool { return err }) } else { - results, _ := submit(args) + results, _ := operation.SubmitFiles(*server, args, *uploadReplication) bytes, _ := json.Marshal(results) fmt.Println(string(bytes)) }