mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-12-12 00:19:14 +08:00
split into server and clients
git-svn-id: https://weed-fs.googlecode.com/svn/trunk@9 282b0af5-e82d-9cf1-ede4-77906d7719d0
This commit is contained in:
parent
02f4e7b82c
commit
23ecd7bb33
@ -1,145 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"store"
|
|
||||||
"directory"
|
|
||||||
"flag"
|
|
||||||
"fmt"
|
|
||||||
"http"
|
|
||||||
"json"
|
|
||||||
"log"
|
|
||||||
"mime"
|
|
||||||
"rand"
|
|
||||||
"strconv"
|
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
port = flag.Int("port", 8080, "http listen port")
|
|
||||||
chunkFolder = flag.String("dir", "/tmp", "data directory to store files")
|
|
||||||
chunkCount = flag.Int("chunks", 5, "data chunks to store files")
|
|
||||||
chunkEnabled = flag.Bool("data", false, "act as a store server")
|
|
||||||
chunkServer = flag.String("cserver", "localhost:8080", "chunk server to store data")
|
|
||||||
publicServer = flag.String("pserver", "localhost:8080", "public server to serve data read")
|
|
||||||
metaServer = flag.String("mserver", "localhost:8080", "metadata server to store mappings")
|
|
||||||
|
|
||||||
metaEnabled = flag.Bool("meta", false, "act as a directory server")
|
|
||||||
metaFolder = flag.String("mdir", "/tmp", "data directory to store mappings")
|
|
||||||
)
|
|
||||||
|
|
||||||
type Haystack struct {
|
|
||||||
store *store.Store
|
|
||||||
directory *directory.Mapper
|
|
||||||
}
|
|
||||||
|
|
||||||
func storeHandler(w http.ResponseWriter, r *http.Request) {
|
|
||||||
switch r.Method {
|
|
||||||
case "GET":
|
|
||||||
server.GetHandler(w, r)
|
|
||||||
case "DELETE":
|
|
||||||
server.DeleteHandler(w, r)
|
|
||||||
case "POST":
|
|
||||||
server.PostHandler(w, r)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
func (s *Haystack) GetHandler(w http.ResponseWriter, r *http.Request) {
|
|
||||||
n := new(store.Needle)
|
|
||||||
path := r.URL.Path
|
|
||||||
sepIndex := strings.Index(path[1:], "/") + 1
|
|
||||||
volumeId, _ := strconv.Atoui64(path[1:sepIndex])
|
|
||||||
dotIndex := strings.LastIndex(path, ".")
|
|
||||||
n.ParsePath(path[sepIndex+1 : dotIndex])
|
|
||||||
ext := path[dotIndex:]
|
|
||||||
|
|
||||||
s.store.Read(volumeId, n)
|
|
||||||
w.Header().Set("Content-Type", mime.TypeByExtension(ext))
|
|
||||||
w.Write(n.Data)
|
|
||||||
}
|
|
||||||
func (s *Haystack) PostHandler(w http.ResponseWriter, r *http.Request) {
|
|
||||||
volumeId, _ := strconv.Atoui64(r.FormValue("volumeId"))
|
|
||||||
s.store.Write(volumeId, store.NewNeedle(r))
|
|
||||||
w.Header().Set("Content-Type", "text/plain")
|
|
||||||
fmt.Fprint(w, "volumeId=", volumeId, "\n")
|
|
||||||
}
|
|
||||||
func (s *Haystack) DeleteHandler(w http.ResponseWriter, r *http.Request) {
|
|
||||||
|
|
||||||
}
|
|
||||||
func dirReadHandler(w http.ResponseWriter, r *http.Request) {
|
|
||||||
volumeId, _ := strconv.Atoui64(r.FormValue("volumeId"))
|
|
||||||
machineList := server.directory.Get((uint32)(volumeId))
|
|
||||||
x := rand.Intn(len(machineList))
|
|
||||||
machine := machineList[x]
|
|
||||||
bytes, _ := json.Marshal(machine)
|
|
||||||
callback := r.FormValue("callback")
|
|
||||||
w.Header().Set("Content-Type", "application/javascript")
|
|
||||||
if callback == "" {
|
|
||||||
w.Write(bytes)
|
|
||||||
} else {
|
|
||||||
w.Write([]uint8(callback))
|
|
||||||
w.Write([]uint8("("))
|
|
||||||
w.Write(bytes)
|
|
||||||
w.Write([]uint8(")"))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
func dirWriteHandler(w http.ResponseWriter, r *http.Request) {
|
|
||||||
machineList := server.directory.PickForWrite()
|
|
||||||
bytes, _ := json.Marshal(machineList)
|
|
||||||
callback := r.FormValue("callback")
|
|
||||||
w.Header().Set("Content-Type", "application/javascript")
|
|
||||||
if callback == "" {
|
|
||||||
w.Write(bytes)
|
|
||||||
} else {
|
|
||||||
w.Write([]uint8(callback))
|
|
||||||
w.Write([]uint8("("))
|
|
||||||
w.Write(bytes)
|
|
||||||
w.Write([]uint8(")"))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
func dirJoinHandler(w http.ResponseWriter, r *http.Request) {
|
|
||||||
s := r.FormValue("server")
|
|
||||||
publicServer := r.FormValue("publicServer")
|
|
||||||
volumes := make([]store.VolumeStat, 0)
|
|
||||||
json.Unmarshal([]byte(r.FormValue("volumes")), volumes)
|
|
||||||
server.directory.Add(directory.NewMachine(s, publicServer), volumes)
|
|
||||||
}
|
|
||||||
func dirStatusHandler(w http.ResponseWriter, r *http.Request) {
|
|
||||||
w.Header().Set("Content-Type", "text/plain")
|
|
||||||
bytes, _ := json.Marshal(server.directory)
|
|
||||||
fmt.Fprint(w, bytes)
|
|
||||||
}
|
|
||||||
|
|
||||||
var server *Haystack
|
|
||||||
|
|
||||||
func main() {
|
|
||||||
flag.Parse()
|
|
||||||
bothEnabled := false
|
|
||||||
if !*chunkEnabled && !*metaEnabled {
|
|
||||||
bothEnabled = true
|
|
||||||
log.Println("Act as both a store server and a directory server")
|
|
||||||
}
|
|
||||||
server = new(Haystack)
|
|
||||||
if *chunkEnabled || bothEnabled {
|
|
||||||
log.Println("data stored in", *chunkFolder)
|
|
||||||
server.store = store.NewStore(*chunkServer, *publicServer, *chunkFolder)
|
|
||||||
defer server.store.Close()
|
|
||||||
http.HandleFunc("/", storeHandler)
|
|
||||||
}
|
|
||||||
if *metaEnabled || bothEnabled {
|
|
||||||
server.directory = directory.NewMapper(*metaFolder, "directory")
|
|
||||||
defer server.directory.Save()
|
|
||||||
http.HandleFunc("/dir/read", dirReadHandler)
|
|
||||||
http.HandleFunc("/dir/write", dirWriteHandler)
|
|
||||||
http.HandleFunc("/dir/join", dirJoinHandler)
|
|
||||||
http.HandleFunc("/dir/status", dirStatusHandler)
|
|
||||||
}
|
|
||||||
go func() {
|
|
||||||
time.Sleep(3000 * 1000)
|
|
||||||
server.store.Join(*metaServer)
|
|
||||||
log.Println("store joined at", *metaServer)
|
|
||||||
}()
|
|
||||||
|
|
||||||
log.Println("Serving at http://127.0.0.1:" + strconv.Itoa(*port))
|
|
||||||
http.ListenAndServe(":"+strconv.Itoa(*port), nil)
|
|
||||||
|
|
||||||
}
|
|
73
weed-fs/src/cmd/weedc.go
Normal file
73
weed-fs/src/cmd/weedc.go
Normal file
@ -0,0 +1,73 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"storage"
|
||||||
|
"flag"
|
||||||
|
"fmt"
|
||||||
|
"http"
|
||||||
|
"log"
|
||||||
|
"mime"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
port = flag.Int("port", 8080, "http listen port")
|
||||||
|
chunkFolder = flag.String("dir", "/tmp", "data directory to store files")
|
||||||
|
chunkCount = flag.Int("chunks", 5, "data chunks to store files")
|
||||||
|
publicServer = flag.String("pserver", "localhost:8080", "public server to serve data read")
|
||||||
|
metaServer = flag.String("mserver", "localhost:9333", "metadata server to store mappings")
|
||||||
|
|
||||||
|
store *storage.Store
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
func storeHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
switch r.Method {
|
||||||
|
case "GET":
|
||||||
|
GetHandler(w, r)
|
||||||
|
case "DELETE":
|
||||||
|
DeleteHandler(w, r)
|
||||||
|
case "POST":
|
||||||
|
PostHandler(w, r)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
func GetHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
n := new(storage.Needle)
|
||||||
|
path := r.URL.Path
|
||||||
|
sepIndex := strings.Index(path[1:], "/") + 1
|
||||||
|
volumeId, _ := strconv.Atoui64(path[1:sepIndex])
|
||||||
|
dotIndex := strings.LastIndex(path, ".")
|
||||||
|
n.ParsePath(path[sepIndex+1 : dotIndex])
|
||||||
|
ext := path[dotIndex:]
|
||||||
|
|
||||||
|
store.Read(volumeId, n)
|
||||||
|
w.Header().Set("Content-Type", mime.TypeByExtension(ext))
|
||||||
|
w.Write(n.Data)
|
||||||
|
}
|
||||||
|
func PostHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
volumeId, _ := strconv.Atoui64(r.FormValue("volumeId"))
|
||||||
|
store.Write(volumeId, storage.NewNeedle(r))
|
||||||
|
w.Header().Set("Content-Type", "text/plain")
|
||||||
|
fmt.Fprint(w, "volumeId=", volumeId, "\n")
|
||||||
|
}
|
||||||
|
func DeleteHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
flag.Parse()
|
||||||
|
store = storage.NewStore(*port, *publicServer, *chunkFolder)
|
||||||
|
defer store.Close()
|
||||||
|
http.HandleFunc("/", storeHandler)
|
||||||
|
|
||||||
|
store.Join(*metaServer)
|
||||||
|
log.Println("store joined at", *metaServer)
|
||||||
|
|
||||||
|
log.Println("Start storage service at http://127.0.0.1:" + strconv.Itoa(*port))
|
||||||
|
e := http.ListenAndServe(":"+strconv.Itoa(*port), nil)
|
||||||
|
if e!=nil {
|
||||||
|
log.Fatalf("Fail to start:",e.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
80
weed-fs/src/cmd/weeds.go
Normal file
80
weed-fs/src/cmd/weeds.go
Normal file
@ -0,0 +1,80 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"storage"
|
||||||
|
"directory"
|
||||||
|
"flag"
|
||||||
|
"fmt"
|
||||||
|
"http"
|
||||||
|
"json"
|
||||||
|
"log"
|
||||||
|
"rand"
|
||||||
|
"strconv"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
port = flag.Int("port", 9333, "http listen port")
|
||||||
|
metaFolder = flag.String("mdir", "/tmp", "data directory to store mappings")
|
||||||
|
mapper *directory.Mapper
|
||||||
|
)
|
||||||
|
|
||||||
|
func dirReadHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
volumeId, _ := strconv.Atoui64(r.FormValue("volumeId"))
|
||||||
|
machineList := mapper.Get((uint32)(volumeId))
|
||||||
|
x := rand.Intn(len(machineList))
|
||||||
|
machine := machineList[x]
|
||||||
|
bytes, _ := json.Marshal(machine)
|
||||||
|
callback := r.FormValue("callback")
|
||||||
|
w.Header().Set("Content-Type", "application/javascript")
|
||||||
|
if callback == "" {
|
||||||
|
w.Write(bytes)
|
||||||
|
} else {
|
||||||
|
w.Write([]uint8(callback))
|
||||||
|
w.Write([]uint8("("))
|
||||||
|
w.Write(bytes)
|
||||||
|
w.Write([]uint8(")"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
func dirWriteHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
machineList := mapper.PickForWrite()
|
||||||
|
bytes, _ := json.Marshal(machineList)
|
||||||
|
callback := r.FormValue("callback")
|
||||||
|
w.Header().Set("Content-Type", "application/javascript")
|
||||||
|
if callback == "" {
|
||||||
|
w.Write(bytes)
|
||||||
|
} else {
|
||||||
|
w.Write([]uint8(callback))
|
||||||
|
w.Write([]uint8("("))
|
||||||
|
w.Write(bytes)
|
||||||
|
w.Write([]uint8(")"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
func dirJoinHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
s := r.FormValue("server")
|
||||||
|
publicServer := r.FormValue("publicServer")
|
||||||
|
volumes := make([]storage.VolumeStat, 0)
|
||||||
|
json.Unmarshal([]byte(r.FormValue("volumes")), volumes)
|
||||||
|
mapper.Add(directory.NewMachine(s, publicServer), volumes)
|
||||||
|
}
|
||||||
|
func dirStatusHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.Header().Set("Content-Type", "text/plain")
|
||||||
|
bytes, _ := json.Marshal(mapper)
|
||||||
|
fmt.Fprint(w, bytes)
|
||||||
|
}
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
flag.Parse()
|
||||||
|
mapper = directory.NewMapper(*metaFolder, "directory")
|
||||||
|
defer mapper.Save()
|
||||||
|
http.HandleFunc("/dir/read", dirReadHandler)
|
||||||
|
http.HandleFunc("/dir/write", dirWriteHandler)
|
||||||
|
http.HandleFunc("/dir/join", dirJoinHandler)
|
||||||
|
http.HandleFunc("/dir/status", dirStatusHandler)
|
||||||
|
|
||||||
|
log.Println("Start directory service at http://127.0.0.1:" + strconv.Itoa(*port))
|
||||||
|
e := http.ListenAndServe(":"+strconv.Itoa(*port), nil)
|
||||||
|
if e!=nil {
|
||||||
|
log.Fatalf("Fail to start:",e.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -6,20 +6,13 @@ import (
|
|||||||
"path"
|
"path"
|
||||||
"rand"
|
"rand"
|
||||||
"log"
|
"log"
|
||||||
"store"
|
"storage"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Machine struct {
|
type Machine struct {
|
||||||
Server string //<server name/ip>[:port]
|
Server string //<server name/ip>[:port]
|
||||||
PublicServer string
|
PublicServer string
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMachine(server, publicServer string) (m *Machine) {
|
|
||||||
m = new(Machine)
|
|
||||||
m.Server, m.PublicServer = server, publicServer
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
type Mapper struct {
|
type Mapper struct {
|
||||||
dir string
|
dir string
|
||||||
FileName string
|
FileName string
|
||||||
@ -27,6 +20,12 @@ type Mapper struct {
|
|||||||
LastId uint32
|
LastId uint32
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func NewMachine(server, publicServer string) (m *Machine) {
|
||||||
|
m = new(Machine)
|
||||||
|
m.Server, m.PublicServer = server, publicServer
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
func NewMapper(dirname string, filename string) (m *Mapper) {
|
func NewMapper(dirname string, filename string) (m *Mapper) {
|
||||||
m = new(Mapper)
|
m = new(Mapper)
|
||||||
m.dir = dirname
|
m.dir = dirname
|
||||||
@ -51,7 +50,7 @@ func (m *Mapper) PickForWrite() []*Machine {
|
|||||||
func (m *Mapper) Get(vid uint32) []*Machine {
|
func (m *Mapper) Get(vid uint32) []*Machine {
|
||||||
return m.Id2Machine[vid]
|
return m.Id2Machine[vid]
|
||||||
}
|
}
|
||||||
func (m *Mapper) Add(machine *Machine, volumes []store.VolumeStat) {
|
func (m *Mapper) Add(machine *Machine, volumes []storage.VolumeStat) {
|
||||||
log.Println("Adding store node", machine.Server)
|
log.Println("Adding store node", machine.Server)
|
||||||
for _, v := range volumes {
|
for _, v := range volumes {
|
||||||
existing := m.Id2Machine[uint32(v.Id)]
|
existing := m.Id2Machine[uint32(v.Id)]
|
||||||
|
67
weed-fs/src/pkg/storage/needle.go
Normal file
67
weed-fs/src/pkg/storage/needle.go
Normal file
@ -0,0 +1,67 @@
|
|||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
|
"http"
|
||||||
|
"log"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Needle struct{
|
||||||
|
Cookie uint8 "random number to mitigate brute force lookups"
|
||||||
|
Key uint64 "file id"
|
||||||
|
AlternateKey uint32 "supplemental id"
|
||||||
|
Size uint32 "Data size"
|
||||||
|
Data []byte "The actual file data"
|
||||||
|
Checksum int32 "CRC32 to check integrity"
|
||||||
|
Padding []byte "Aligned to 8 bytes"
|
||||||
|
}
|
||||||
|
func NewNeedle(r *http.Request)(n *Needle){
|
||||||
|
n = new(Needle)
|
||||||
|
form,fe:=r.MultipartReader()
|
||||||
|
if fe!=nil {
|
||||||
|
log.Fatalf("MultipartReader [ERROR] %s\n", fe)
|
||||||
|
}
|
||||||
|
part,_:=form.NextPart()
|
||||||
|
data,_:=ioutil.ReadAll(part)
|
||||||
|
n.Data = data
|
||||||
|
|
||||||
|
n.ParsePath(r.URL.Path[1:strings.LastIndex(r.URL.Path,".")])
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
func (n *Needle) ParsePath(path string){
|
||||||
|
a := strings.Split(path,"_")
|
||||||
|
log.Println("cookie",a[0],"key",a[1],"altKey",a[2])
|
||||||
|
cookie,_ := strconv.Atoi(a[0])
|
||||||
|
n.Cookie = uint8(cookie)
|
||||||
|
n.Key,_ = strconv.Atoui64(a[1])
|
||||||
|
altKey,_ := strconv.Atoui64(a[2])
|
||||||
|
n.AlternateKey = uint32(altKey)
|
||||||
|
}
|
||||||
|
func (n *Needle) Append(w io.Writer){
|
||||||
|
header := make([]byte,17)
|
||||||
|
header[0] = n.Cookie
|
||||||
|
uint64toBytes(header[1:9],n.Key)
|
||||||
|
uint32toBytes(header[9:13],n.AlternateKey)
|
||||||
|
n.Size = uint32(len(n.Data))
|
||||||
|
uint32toBytes(header[13:17],n.Size)
|
||||||
|
w.Write(header)
|
||||||
|
w.Write(n.Data)
|
||||||
|
rest := 8-((n.Size+17+4)%8)
|
||||||
|
uint32toBytes(header[0:4],uint32(n.Checksum))
|
||||||
|
w.Write(header[0:rest+4])
|
||||||
|
}
|
||||||
|
func (n *Needle) Read(r io.Reader, size uint32){
|
||||||
|
bytes := make([]byte,size+17+4)
|
||||||
|
r.Read(bytes)
|
||||||
|
n.Cookie = bytes[0]
|
||||||
|
n.Key = bytesToUint64(bytes[1:9])
|
||||||
|
n.AlternateKey = bytesToUint32(bytes[9:13])
|
||||||
|
n.Size = bytesToUint32(bytes[13:17])
|
||||||
|
n.Data = bytes[17:17+size]
|
||||||
|
n.Checksum = int32(bytesToUint32(bytes[17+size:17+size+4]))
|
||||||
|
}
|
||||||
|
|
53
weed-fs/src/pkg/storage/needle_map.go
Normal file
53
weed-fs/src/pkg/storage/needle_map.go
Normal file
@ -0,0 +1,53 @@
|
|||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
)
|
||||||
|
|
||||||
|
type NeedleKey struct{
|
||||||
|
Key uint64 "file id"
|
||||||
|
AlternateKey uint32 "supplemental id"
|
||||||
|
}
|
||||||
|
func (k *NeedleKey) String() string {
|
||||||
|
var tmp [12]byte
|
||||||
|
for i :=uint(0);i<8;i++{
|
||||||
|
tmp[i] = byte(k.Key >> (8*i));
|
||||||
|
}
|
||||||
|
for i :=uint(0);i<4;i++{
|
||||||
|
tmp[i+8] = byte(k.AlternateKey >> (8*i));
|
||||||
|
}
|
||||||
|
return string(tmp[:])
|
||||||
|
}
|
||||||
|
|
||||||
|
type NeedleValue struct{
|
||||||
|
Offset uint32 "Volume offset" //since aligned to 8 bytes, range is 4G*8=32G
|
||||||
|
Size uint32 "Size of the data portion"
|
||||||
|
}
|
||||||
|
|
||||||
|
type NeedleMap struct{
|
||||||
|
m map[string]*NeedleValue //mapping NeedleKey(Key,AlternateKey) to NeedleValue
|
||||||
|
}
|
||||||
|
func NewNeedleMap() (nm *NeedleMap){
|
||||||
|
nm = new(NeedleMap)
|
||||||
|
nm.m = make(map[string]*NeedleValue)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
func (nm *NeedleMap) load(file *os.File){
|
||||||
|
}
|
||||||
|
func makeKey(key uint64, altKey uint32) string {
|
||||||
|
var tmp [12]byte
|
||||||
|
for i :=uint(0);i<8;i++{
|
||||||
|
tmp[i] = byte(key >> (8*i));
|
||||||
|
}
|
||||||
|
for i :=uint(0);i<4;i++{
|
||||||
|
tmp[i+8] = byte(altKey >> (8*i));
|
||||||
|
}
|
||||||
|
return string(tmp[:])
|
||||||
|
}
|
||||||
|
func (nm *NeedleMap) put(key uint64, altKey uint32, offset uint32, size uint32){
|
||||||
|
nm.m[makeKey(key,altKey)] = &NeedleValue{Offset:offset, Size:size}
|
||||||
|
}
|
||||||
|
func (nm *NeedleMap) get(key uint64, altKey uint32) (element *NeedleValue, ok bool){
|
||||||
|
element, ok = nm.m[makeKey(key,altKey)]
|
||||||
|
return
|
||||||
|
}
|
69
weed-fs/src/pkg/storage/store.go
Normal file
69
weed-fs/src/pkg/storage/store.go
Normal file
@ -0,0 +1,69 @@
|
|||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log"
|
||||||
|
"io/ioutil"
|
||||||
|
"json"
|
||||||
|
"strings"
|
||||||
|
"strconv"
|
||||||
|
"url"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Store struct {
|
||||||
|
volumes map[uint64]*Volume
|
||||||
|
dir string
|
||||||
|
Port int
|
||||||
|
PublicServer string
|
||||||
|
}
|
||||||
|
type VolumeStat struct {
|
||||||
|
Id uint64 "id"
|
||||||
|
Status int "status" //0:read, 1:write
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewStore(port int, publicServer, dirname string) (s *Store) {
|
||||||
|
s = new(Store)
|
||||||
|
s.Port, s.PublicServer, s.dir = port, publicServer, dirname
|
||||||
|
s.volumes = make(map[uint64]*Volume)
|
||||||
|
|
||||||
|
counter := uint64(0)
|
||||||
|
files, _ := ioutil.ReadDir(dirname)
|
||||||
|
for _, f := range files {
|
||||||
|
if f.IsDirectory() || !strings.HasSuffix(f.Name, ".dat") {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
id, err := strconv.Atoui64(f.Name[:-4])
|
||||||
|
if err == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
s.volumes[counter] = NewVolume(s.dir, id)
|
||||||
|
counter++
|
||||||
|
}
|
||||||
|
log.Println("Store started on dir:", dirname, "with", counter, "existing volumes")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) Join(mserver string) {
|
||||||
|
stats := make([]*VolumeStat, len(s.volumes))
|
||||||
|
for k, _ := range s.volumes {
|
||||||
|
s := new(VolumeStat)
|
||||||
|
s.Id, s.Status = k, 1
|
||||||
|
stats = append(stats, s)
|
||||||
|
}
|
||||||
|
bytes, _ := json.Marshal(stats)
|
||||||
|
values := make(url.Values)
|
||||||
|
values.Add("port", strconv.Itoa(s.Port))
|
||||||
|
values.Add("publicServer", s.PublicServer)
|
||||||
|
values.Add("volumes", string(bytes))
|
||||||
|
post("http://"+mserver+"/dir/join", values)
|
||||||
|
}
|
||||||
|
func (s *Store) Close() {
|
||||||
|
for _, v := range s.volumes {
|
||||||
|
v.Close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
func (s *Store) Write(i uint64, n *Needle) {
|
||||||
|
s.volumes[i].write(n)
|
||||||
|
}
|
||||||
|
func (s *Store) Read(i uint64, n *Needle) {
|
||||||
|
s.volumes[i].read(n)
|
||||||
|
}
|
50
weed-fs/src/pkg/storage/util.go
Normal file
50
weed-fs/src/pkg/storage/util.go
Normal file
@ -0,0 +1,50 @@
|
|||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"http"
|
||||||
|
"io/ioutil"
|
||||||
|
"url"
|
||||||
|
"log"
|
||||||
|
)
|
||||||
|
|
||||||
|
func bytesToUint64(b []byte)(v uint64){
|
||||||
|
for i :=uint(7);i>0;i-- {
|
||||||
|
v += uint64(b[i])
|
||||||
|
v <<= 8
|
||||||
|
}
|
||||||
|
v+=uint64(b[0])
|
||||||
|
return
|
||||||
|
}
|
||||||
|
func bytesToUint32(b []byte)(v uint32){
|
||||||
|
for i :=uint(3);i>0;i-- {
|
||||||
|
v += uint32(b[i])
|
||||||
|
v <<= 8
|
||||||
|
}
|
||||||
|
v+=uint32(b[0])
|
||||||
|
return
|
||||||
|
}
|
||||||
|
func uint64toBytes(b []byte, v uint64){
|
||||||
|
for i :=uint(0);i<8;i++ {
|
||||||
|
b[i] = byte(v>>(i*8))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
func uint32toBytes(b []byte, v uint32){
|
||||||
|
for i :=uint(0);i<4;i++ {
|
||||||
|
b[i] = byte(v>>(i*8))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func post(url string, values url.Values)string{
|
||||||
|
r, err := http.PostForm(url, values)
|
||||||
|
if err != nil {
|
||||||
|
log.Println("post:", err)
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
defer r.Body.Close()
|
||||||
|
b, err := ioutil.ReadAll(r.Body)
|
||||||
|
if err != nil {
|
||||||
|
log.Println("post:", err)
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
return string(b)
|
||||||
|
}
|
66
weed-fs/src/pkg/storage/volume.go
Normal file
66
weed-fs/src/pkg/storage/volume.go
Normal file
@ -0,0 +1,66 @@
|
|||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
"path"
|
||||||
|
"strconv"
|
||||||
|
"log"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Volume struct {
|
||||||
|
Id uint64
|
||||||
|
dir string
|
||||||
|
dataFile, indexFile *os.File
|
||||||
|
nm *NeedleMap
|
||||||
|
|
||||||
|
accessChannel chan int
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewVolume(dirname string, id uint64) (v *Volume) {
|
||||||
|
var e os.Error
|
||||||
|
v = new(Volume)
|
||||||
|
v.dir = dirname
|
||||||
|
v.Id = id
|
||||||
|
fileName := strconv.Uitoa64(v.Id)
|
||||||
|
log.Println("file", v.dir, "/", fileName)
|
||||||
|
v.dataFile, e = os.OpenFile(path.Join(v.dir,fileName+".dat"), os.O_RDWR|os.O_CREATE, 0644)
|
||||||
|
if e != nil {
|
||||||
|
log.Fatalf("New Volume [ERROR] %s\n", e)
|
||||||
|
}
|
||||||
|
v.indexFile, e = os.OpenFile(path.Join(v.dir,fileName+".idx"), os.O_RDWR|os.O_CREATE, 0644)
|
||||||
|
if e != nil {
|
||||||
|
log.Fatalf("New Volume [ERROR] %s\n", e)
|
||||||
|
}
|
||||||
|
v.nm = NewNeedleMap()
|
||||||
|
v.nm.load(v.indexFile)
|
||||||
|
|
||||||
|
v.accessChannel = make(chan int, 1)
|
||||||
|
v.accessChannel <- 0
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
func (v *Volume) Close() {
|
||||||
|
close(v.accessChannel)
|
||||||
|
v.dataFile.Close()
|
||||||
|
v.indexFile.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *Volume) write(n *Needle) {
|
||||||
|
counter := <-v.accessChannel
|
||||||
|
offset, _ := v.dataFile.Seek(0, 2)
|
||||||
|
n.Append(v.dataFile)
|
||||||
|
nv, ok := v.nm.get(n.Key, n.AlternateKey)
|
||||||
|
if !ok || int64(nv.Offset)*8 < offset {
|
||||||
|
v.nm.put(n.Key, n.AlternateKey, uint32(offset/8), n.Size)
|
||||||
|
}
|
||||||
|
v.accessChannel <- counter + 1
|
||||||
|
}
|
||||||
|
func (v *Volume) read(n *Needle) {
|
||||||
|
counter := <-v.accessChannel
|
||||||
|
nv, ok := v.nm.get(n.Key, n.AlternateKey)
|
||||||
|
if ok && nv.Offset > 0 {
|
||||||
|
v.dataFile.Seek(int64(nv.Offset)*8, 0)
|
||||||
|
n.Read(v.dataFile, nv.Size)
|
||||||
|
}
|
||||||
|
v.accessChannel <- counter + 1
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user