2012-08-07 16:29:22 +08:00
package main
import (
2012-09-04 10:18:02 +08:00
"encoding/json"
"log"
"net/http"
"pkg/directory"
2012-09-17 08:31:15 +08:00
"pkg/replication"
2012-09-04 10:18:02 +08:00
"pkg/storage"
2012-09-17 08:31:15 +08:00
"pkg/topology"
2012-09-04 10:18:02 +08:00
"strconv"
"strings"
2012-09-04 11:40:38 +08:00
"time"
2012-08-07 16:29:22 +08:00
)
func init ( ) {
2012-09-04 10:18:02 +08:00
cmdMaster . Run = runMaster // break init cycle
IsDebug = cmdMaster . Flag . Bool ( "debug" , false , "enable debug mode" )
2012-08-07 16:29:22 +08:00
}
var cmdMaster = & Command {
2012-09-04 10:18:02 +08:00
UsageLine : "master -port=9333" ,
Short : "start a master server" ,
Long : ` start a master server to provide volume = > location mapping service
2012-08-07 16:29:22 +08:00
and sequence number of file ids
` ,
}
var (
2012-09-04 10:18:02 +08:00
mport = cmdMaster . Flag . Int ( "port" , 9333 , "http listen port" )
metaFolder = cmdMaster . Flag . String ( "mdir" , "/tmp" , "data directory to store mappings" )
capacity = cmdMaster . Flag . Int ( "capacity" , 100 , "maximum number of volumes to hold" )
volumeSizeLimitMB = cmdMaster . Flag . Uint ( "volumeSizeLimitMB" , 32 * 1024 , "Default Volume Size in MegaBytes" )
2012-09-04 11:40:38 +08:00
mpulse = cmdMaster . Flag . Int ( "pulseSeconds" , 5 , "number of seconds between heartbeats" )
2012-08-07 16:29:22 +08:00
)
2012-09-17 08:31:15 +08:00
var mapper * directory . Mapper
var topo * topology . Topology
var vg * replication . VolumeGrowth
2012-08-07 16:29:22 +08:00
func dirLookupHandler ( w http . ResponseWriter , r * http . Request ) {
2012-09-04 10:18:02 +08:00
vid := r . FormValue ( "volumeId" )
commaSep := strings . Index ( vid , "," )
if commaSep > 0 {
vid = vid [ 0 : commaSep ]
}
volumeId , _ := storage . NewVolumeId ( vid )
2012-09-04 15:26:38 +08:00
machines , e := mapper . Get ( volumeId )
2012-09-04 10:18:02 +08:00
if e == nil {
2012-09-17 08:31:15 +08:00
ret := [ ] map [ string ] string { }
for _ , machine := range machines {
ret = append ( ret , map [ string ] string { "url" : machine . Url , "publicUrl" : machine . PublicUrl } )
}
2012-09-04 15:26:38 +08:00
writeJson ( w , r , ret )
2012-09-04 10:18:02 +08:00
} else {
log . Println ( "Invalid volume id" , volumeId )
2012-09-04 15:26:38 +08:00
writeJson ( w , r , map [ string ] string { "error" : "volume id " + volumeId . String ( ) + " not found. " + e . Error ( ) } )
2012-09-04 10:18:02 +08:00
}
2012-08-07 16:29:22 +08:00
}
func dirAssignHandler ( w http . ResponseWriter , r * http . Request ) {
2012-09-04 10:18:02 +08:00
c := r . FormValue ( "count" )
fid , count , machine , err := mapper . PickForWrite ( c )
if err == nil {
2012-09-04 10:36:35 +08:00
writeJson ( w , r , map [ string ] interface { } { "fid" : fid , "url" : machine . Url , "publicUrl" : machine . PublicUrl , "count" : count } )
2012-09-04 10:18:02 +08:00
} else {
2012-09-04 12:31:13 +08:00
writeJson ( w , r , map [ string ] string { "error" : err . Error ( ) } )
2012-09-04 10:18:02 +08:00
}
2012-08-07 16:29:22 +08:00
}
2012-09-17 08:31:15 +08:00
func dirAssign2Handler ( w http . ResponseWriter , r * http . Request ) {
c , _ := strconv . Atoi ( r . FormValue ( "count" ) )
2012-09-17 14:18:47 +08:00
rt , err := storage . NewReplicationType ( r . FormValue ( "replication" ) )
if err != nil {
writeJson ( w , r , map [ string ] string { "error" : err . Error ( ) } )
return
}
2012-09-17 08:31:15 +08:00
if topo . GetVolumeLayout ( rt ) . GetActiveVolumeCount ( ) <= 0 {
if topo . FreeSpace ( ) <= 0 {
writeJson ( w , r , map [ string ] string { "error" : "No free volumes left!" } )
} else {
vg . GrowByType ( rt , topo )
}
}
fid , count , dn , err := topo . PickForWrite ( rt , c )
if err == nil {
writeJson ( w , r , map [ string ] interface { } { "fid" : fid , "url" : dn . Ip + ":" + strconv . Itoa ( dn . Port ) , "publicUrl" : dn . PublicUrl , "count" : count } )
} else {
writeJson ( w , r , map [ string ] string { "error" : err . Error ( ) } )
}
}
2012-08-07 16:29:22 +08:00
func dirJoinHandler ( w http . ResponseWriter , r * http . Request ) {
2012-09-17 08:31:15 +08:00
ip := r . RemoteAddr [ 0 : strings . Index ( r . RemoteAddr , ":" ) ]
port , _ := strconv . Atoi ( r . FormValue ( "port" ) )
maxVolumeCount , _ := strconv . Atoi ( r . FormValue ( "maxVolumeCount" ) )
2012-09-04 10:18:02 +08:00
s := r . RemoteAddr [ 0 : strings . Index ( r . RemoteAddr , ":" ) + 1 ] + r . FormValue ( "port" )
publicUrl := r . FormValue ( "publicUrl" )
volumes := new ( [ ] storage . VolumeInfo )
json . Unmarshal ( [ ] byte ( r . FormValue ( "volumes" ) ) , volumes )
if * IsDebug {
log . Println ( s , "volumes" , r . FormValue ( "volumes" ) )
}
2012-09-04 11:40:38 +08:00
mapper . Add ( directory . NewMachine ( s , publicUrl , * volumes , time . Now ( ) . Unix ( ) ) )
2012-09-17 08:31:15 +08:00
//new ways
topo . RegisterVolumes ( * volumes , ip , port , publicUrl , maxVolumeCount )
2012-08-07 16:29:22 +08:00
}
2012-09-17 08:31:15 +08:00
func dirOldStatusHandler ( w http . ResponseWriter , r * http . Request ) {
2012-09-04 10:18:02 +08:00
writeJson ( w , r , mapper )
2012-08-07 16:29:22 +08:00
}
2012-09-17 08:31:15 +08:00
func dirNewStatusHandler ( w http . ResponseWriter , r * http . Request ) {
writeJson ( w , r , topo . ToMap ( ) )
}
func volumeGrowHandler ( w http . ResponseWriter , r * http . Request ) {
2012-09-17 14:18:47 +08:00
rt , err := storage . NewReplicationType ( r . FormValue ( "replication" ) )
if err != nil {
writeJson ( w , r , map [ string ] string { "error" : err . Error ( ) } )
return
}
2012-09-17 08:31:15 +08:00
count , err := strconv . Atoi ( r . FormValue ( "count" ) )
2012-09-17 14:18:47 +08:00
if topo . FreeSpace ( ) < count * rt . GetCopyCount ( ) {
writeJson ( w , r , map [ string ] string { "error" : "Only " + strconv . Itoa ( topo . FreeSpace ( ) ) + " volumes left! Not enough for " + strconv . Itoa ( count * rt . GetCopyCount ( ) ) } )
return
}
2012-09-17 08:31:15 +08:00
if err != nil {
2012-09-17 14:18:47 +08:00
count , err := vg . GrowByType ( rt , topo )
writeJson ( w , r , map [ string ] interface { } { "count" : count , "error" : err } )
2012-09-17 08:31:15 +08:00
} else {
2012-09-17 14:18:47 +08:00
count , err := vg . GrowByCountAndType ( count , rt , topo )
writeJson ( w , r , map [ string ] interface { } { "count" : count , "error" : err } )
2012-09-17 08:31:15 +08:00
}
}
2012-08-07 16:29:22 +08:00
func runMaster ( cmd * Command , args [ ] string ) bool {
2012-09-17 08:31:15 +08:00
topo = topology . NewTopology ( "topo" , * metaFolder , "toposequence" , uint64 ( * volumeSizeLimitMB ) * 1024 * 1024 , * mpulse )
vg = replication . NewDefaultVolumeGrowth ( )
2012-09-04 10:18:02 +08:00
log . Println ( "Volume Size Limit is" , * volumeSizeLimitMB , "MB" )
2012-09-04 11:40:38 +08:00
mapper = directory . NewMapper ( * metaFolder , "directory" , uint64 ( * volumeSizeLimitMB ) * 1024 * 1024 , * mpulse )
2012-09-04 10:18:02 +08:00
http . HandleFunc ( "/dir/assign" , dirAssignHandler )
2012-09-17 08:31:15 +08:00
http . HandleFunc ( "/dir/assign2" , dirAssign2Handler )
2012-09-04 10:18:02 +08:00
http . HandleFunc ( "/dir/lookup" , dirLookupHandler )
http . HandleFunc ( "/dir/join" , dirJoinHandler )
2012-09-17 08:31:15 +08:00
http . HandleFunc ( "/dir/status" , dirOldStatusHandler )
http . HandleFunc ( "/dir/status2" , dirNewStatusHandler ) //temporary
http . HandleFunc ( "/vol/grow" , volumeGrowHandler )
2012-08-07 16:29:22 +08:00
2012-09-17 08:31:15 +08:00
mapper . StartRefreshWritableVolumes ( )
2012-09-04 12:31:13 +08:00
2012-09-04 10:18:02 +08:00
log . Println ( "Start directory service at http://127.0.0.1:" + strconv . Itoa ( * mport ) )
e := http . ListenAndServe ( ":" + strconv . Itoa ( * mport ) , nil )
if e != nil {
log . Fatal ( "Fail to start:" , e )
}
return true
2012-08-07 16:29:22 +08:00
}