mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-01-18 06:30:07 +08:00
add name list
This commit is contained in:
parent
a481c4a45e
commit
e6196cdc50
102
weed/util/skiplist/name_batch.go
Normal file
102
weed/util/skiplist/name_batch.go
Normal file
@ -0,0 +1,102 @@
|
||||
package skiplist
|
||||
|
||||
import (
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"sort"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type NameBatch struct {
|
||||
key string
|
||||
names map[string]struct{}
|
||||
}
|
||||
|
||||
func (nb *NameBatch) ContainsName(name string) (found bool) {
|
||||
_, found = nb.names[name]
|
||||
return
|
||||
}
|
||||
func (nb *NameBatch) WriteName(name string) {
|
||||
if nb.key == "" || strings.Compare(nb.key, name) > 0 {
|
||||
nb.key = name
|
||||
}
|
||||
nb.names[name] = struct{}{}
|
||||
}
|
||||
func (nb *NameBatch) DeleteName(name string) {
|
||||
delete(nb.names, name)
|
||||
if nb.key == name {
|
||||
nb.key = ""
|
||||
for n := range nb.names {
|
||||
if nb.key == "" || strings.Compare(nb.key, n) > 0 {
|
||||
nb.key = n
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
func (nb *NameBatch) ListNames(startFrom string, visitNamesFn func(name string) bool) bool {
|
||||
var names []string
|
||||
needFilter := startFrom == ""
|
||||
for n := range nb.names {
|
||||
if !needFilter || strings.Compare(n, startFrom) >= 0 {
|
||||
names = append(names, n)
|
||||
}
|
||||
}
|
||||
sort.Slice(names, func(i, j int) bool {
|
||||
return strings.Compare(names[i], names[j]) < 0
|
||||
})
|
||||
for _, n := range names {
|
||||
if !visitNamesFn(n) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func NewNameBatch() *NameBatch {
|
||||
return &NameBatch{
|
||||
names: make(map[string]struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
func LoadNameBatch(data []byte) *NameBatch {
|
||||
t := &NameBatchData{}
|
||||
if len(data) > 0 {
|
||||
err := proto.Unmarshal(data, t)
|
||||
if err != nil {
|
||||
glog.Errorf("unmarshal into NameBatchData{} : %v", err)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
nb := NewNameBatch()
|
||||
for _, n := range t.Names {
|
||||
name := string(n)
|
||||
if nb.key == "" || strings.Compare(nb.key, name) > 0 {
|
||||
nb.key = name
|
||||
}
|
||||
nb.names[name] = struct{}{}
|
||||
}
|
||||
return nb
|
||||
}
|
||||
|
||||
func (nb *NameBatch) ToBytes() []byte {
|
||||
t := &NameBatchData{}
|
||||
for n := range nb.names {
|
||||
t.Names = append(t.Names, []byte(n))
|
||||
}
|
||||
data, _ := proto.Marshal(t)
|
||||
return data
|
||||
}
|
||||
|
||||
func (nb *NameBatch) SplitBy(name string) (x, y *NameBatch) {
|
||||
x, y = NewNameBatch(), NewNameBatch()
|
||||
|
||||
for n := range nb.names {
|
||||
// there should be no equal case though
|
||||
if strings.Compare(n, name) <= 0 {
|
||||
x.WriteName(n)
|
||||
} else {
|
||||
y.WriteName(n)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
303
weed/util/skiplist/name_list.go
Normal file
303
weed/util/skiplist/name_list.go
Normal file
@ -0,0 +1,303 @@
|
||||
package skiplist
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
)
|
||||
|
||||
type NameList struct {
|
||||
skipList *SkipList
|
||||
batchSize int
|
||||
}
|
||||
|
||||
func NewNameList(store ListStore, batchSize int) *NameList {
|
||||
return &NameList{
|
||||
skipList: New(store),
|
||||
batchSize: batchSize,
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
Be reluctant to create new nodes. Try to fit into either previous node or next node.
|
||||
Prefer to add to previous node.
|
||||
|
||||
There are multiple cases after finding the name for greater or equal node
|
||||
1. found and node.Key == name
|
||||
The node contains a batch with leading key the same as the name
|
||||
nothing to do
|
||||
2. no such node found or node.Key > name
|
||||
|
||||
if no such node found
|
||||
prevNode = list.LargestNode
|
||||
|
||||
// case 2.1
|
||||
if previousNode contains name
|
||||
nothing to do
|
||||
|
||||
// prefer to add to previous node
|
||||
if prevNode != nil {
|
||||
// case 2.2
|
||||
if prevNode has capacity
|
||||
prevNode.add name, and save
|
||||
return
|
||||
// case 2.3
|
||||
split prevNode by name
|
||||
}
|
||||
|
||||
// case 2.4
|
||||
// merge into next node. Avoid too many nodes if adding data in reverse order.
|
||||
if nextNode is not nil and nextNode has capacity
|
||||
delete nextNode.Key
|
||||
nextNode.Key = name
|
||||
nextNode.batch.add name
|
||||
insert nodeNode.Key
|
||||
return
|
||||
|
||||
// case 2.5
|
||||
if prevNode is nil
|
||||
insert new node with key = name, value = batch{name}
|
||||
return
|
||||
|
||||
*/
|
||||
func (nl *NameList) WriteName(name string) error {
|
||||
lookupKey := []byte(name)
|
||||
prevNode, nextNode, found, err := nl.skipList.FindGreaterOrEqual(lookupKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// case 1: the name already exists as one leading key in the batch
|
||||
if found && bytes.Compare(nextNode.Key, lookupKey) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
if !found {
|
||||
prevNode, err = nl.skipList.GetLargestNode()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if nextNode != nil && prevNode == nil {
|
||||
prevNode, err = nl.skipList.loadElement(nextNode.Prev)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if prevNode != nil {
|
||||
prevNameBatch := LoadNameBatch(prevNode.Value)
|
||||
// case 2.1
|
||||
if prevNameBatch.ContainsName(name) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// case 2.2
|
||||
if len(prevNameBatch.names) < nl.batchSize {
|
||||
prevNameBatch.WriteName(name)
|
||||
return nl.skipList.ChangeValue(prevNode, prevNameBatch.ToBytes())
|
||||
}
|
||||
|
||||
// case 2.3
|
||||
x, y := prevNameBatch.SplitBy(name)
|
||||
addToX := len(x.names) <= len(y.names)
|
||||
if len(x.names) != len(prevNameBatch.names) {
|
||||
if addToX {
|
||||
x.WriteName(name)
|
||||
}
|
||||
if x.key == prevNameBatch.key {
|
||||
if err := nl.skipList.ChangeValue(prevNode, x.ToBytes()); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
if err := nl.skipList.Insert([]byte(x.key), x.ToBytes()); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(y.names) != len(prevNameBatch.names) {
|
||||
if !addToX {
|
||||
y.WriteName(name)
|
||||
}
|
||||
if y.key == prevNameBatch.key {
|
||||
if err := nl.skipList.ChangeValue(prevNode, y.ToBytes()); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
if err := nl.skipList.Insert([]byte(y.key), y.ToBytes()); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
||||
}
|
||||
|
||||
// case 2.4
|
||||
if nextNode != nil {
|
||||
nextNameBatch := LoadNameBatch(nextNode.Value)
|
||||
if len(nextNameBatch.names) < nl.batchSize {
|
||||
if err := nl.skipList.Delete(nextNode.Key); err != nil {
|
||||
return err
|
||||
}
|
||||
nextNameBatch.WriteName(name)
|
||||
if err := nl.skipList.Insert([]byte(nextNameBatch.key), nextNameBatch.ToBytes()); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// case 2.5
|
||||
// now prevNode is nil
|
||||
newNameBatch := NewNameBatch()
|
||||
newNameBatch.WriteName(name)
|
||||
if err := nl.skipList.Insert([]byte(newNameBatch.key), newNameBatch.ToBytes()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
/*
|
||||
// case 1: exists in nextNode
|
||||
if nextNode != nil && nextNode.Key == name {
|
||||
remove from nextNode, update nextNode
|
||||
// TODO: merge with prevNode if possible?
|
||||
return
|
||||
}
|
||||
if nextNode is nil
|
||||
prevNode = list.Largestnode
|
||||
if prevNode == nil and nextNode.Prev != nil
|
||||
prevNode = load(nextNode.Prev)
|
||||
|
||||
// case 2: does not exist
|
||||
// case 2.1
|
||||
if prevNode == nil {
|
||||
return
|
||||
}
|
||||
// case 2.2
|
||||
if prevNameBatch does not contain name {
|
||||
return
|
||||
}
|
||||
|
||||
// case 3
|
||||
delete from prevNameBatch
|
||||
if prevNameBatch + nextNode < capacityList
|
||||
// case 3.1
|
||||
merge
|
||||
else
|
||||
// case 3.2
|
||||
update prevNode
|
||||
|
||||
|
||||
*/
|
||||
func (nl *NameList) DeleteName(name string) error {
|
||||
lookupKey := []byte(name)
|
||||
prevNode, nextNode, found, err := nl.skipList.FindGreaterOrEqual(lookupKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// case 1
|
||||
var nextNameBatch *NameBatch
|
||||
if nextNode != nil {
|
||||
nextNameBatch = LoadNameBatch(nextNode.Value)
|
||||
}
|
||||
if found && bytes.Compare(nextNode.Key, lookupKey) == 0 {
|
||||
if err := nl.skipList.Delete(nextNode.Key); err != nil {
|
||||
return err
|
||||
}
|
||||
nextNameBatch.DeleteName(name)
|
||||
if len(nextNameBatch.names) > 0 {
|
||||
if err := nl.skipList.Insert([]byte(nextNameBatch.key), nextNameBatch.ToBytes()); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if !found {
|
||||
prevNode, err = nl.skipList.GetLargestNode()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if nextNode != nil && prevNode == nil {
|
||||
prevNode, err = nl.skipList.loadElement(nextNode.Prev)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// case 2
|
||||
if prevNode == nil {
|
||||
// case 2.1
|
||||
return nil
|
||||
}
|
||||
prevNameBatch := LoadNameBatch(prevNode.Value)
|
||||
if !prevNameBatch.ContainsName(name) {
|
||||
// case 2.2
|
||||
return nil
|
||||
}
|
||||
|
||||
// case 3
|
||||
prevNameBatch.DeleteName(name)
|
||||
if len(prevNameBatch.names) == 0 {
|
||||
if err := nl.skipList.Delete(prevNode.Key); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
if nextNameBatch != nil && len(nextNameBatch.names) + len(prevNameBatch.names) < nl.batchSize {
|
||||
// case 3.1 merge nextNode and prevNode
|
||||
if err := nl.skipList.Delete(nextNode.Key); err != nil {
|
||||
return err
|
||||
}
|
||||
for nextName := range nextNameBatch.names {
|
||||
prevNameBatch.WriteName(nextName)
|
||||
}
|
||||
return nl.skipList.ChangeValue(prevNode, prevNameBatch.ToBytes())
|
||||
} else {
|
||||
// case 3.2 update prevNode
|
||||
return nl.skipList.ChangeValue(prevNode, prevNameBatch.ToBytes())
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (nl *NameList) ListNames(startFrom string, visitNamesFn func(name string) bool) error {
|
||||
lookupKey := []byte(startFrom)
|
||||
prevNode, nextNode, found, err := nl.skipList.FindGreaterOrEqual(lookupKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if found && bytes.Compare(nextNode.Key, lookupKey) == 0 {
|
||||
prevNode = nil
|
||||
}
|
||||
if !found {
|
||||
prevNode, err = nl.skipList.GetLargestNode()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if prevNode != nil {
|
||||
prevNameBatch := LoadNameBatch(prevNode.Value)
|
||||
if !prevNameBatch.ListNames(startFrom, visitNamesFn) {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
for nextNode != nil {
|
||||
nextNameBatch := LoadNameBatch(nextNode.Value)
|
||||
if !nextNameBatch.ListNames(startFrom, visitNamesFn) {
|
||||
return nil
|
||||
}
|
||||
nextNode, err = nl.skipList.loadElement(nextNode.Next[0])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
73
weed/util/skiplist/name_list_test.go
Normal file
73
weed/util/skiplist/name_list_test.go
Normal file
@ -0,0 +1,73 @@
|
||||
package skiplist
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"strconv"
|
||||
"testing"
|
||||
)
|
||||
|
||||
const (
|
||||
maxNameCount = 100
|
||||
)
|
||||
|
||||
func String(x int) string {
|
||||
return strconv.Itoa(x)
|
||||
}
|
||||
|
||||
func TestNameList(t *testing.T) {
|
||||
list := NewNameList(memStore, 7)
|
||||
|
||||
for i := 0; i < maxNameCount; i++ {
|
||||
list.WriteName(String(i))
|
||||
}
|
||||
|
||||
counter := 0
|
||||
list.ListNames("", func(name string) bool {
|
||||
counter++
|
||||
print(name, " ")
|
||||
return true
|
||||
})
|
||||
if counter != maxNameCount {
|
||||
t.Fail()
|
||||
}
|
||||
|
||||
// list.skipList.println()
|
||||
|
||||
deleteBase := 5
|
||||
deleteCount := maxNameCount - 3 * deleteBase
|
||||
|
||||
for i := deleteBase; i < deleteBase+deleteCount; i++ {
|
||||
list.DeleteName(String(i))
|
||||
}
|
||||
|
||||
counter = 0
|
||||
list.ListNames("", func(name string) bool {
|
||||
counter++
|
||||
return true
|
||||
})
|
||||
// list.skipList.println()
|
||||
if counter != maxNameCount-deleteCount {
|
||||
t.Fail()
|
||||
}
|
||||
|
||||
// randomized deletion
|
||||
list = NewNameList(memStore, 7)
|
||||
// Delete elements at random positions in the list.
|
||||
rList := rand.Perm(maxN)
|
||||
for _, i := range rList {
|
||||
list.WriteName(String(i))
|
||||
}
|
||||
for _, i := range rList {
|
||||
list.DeleteName(String(i))
|
||||
}
|
||||
counter = 0
|
||||
list.ListNames("", func(name string) bool {
|
||||
counter++
|
||||
print(name, " ")
|
||||
return true
|
||||
})
|
||||
if counter != 0 {
|
||||
t.Fail()
|
||||
}
|
||||
|
||||
}
|
@ -238,6 +238,53 @@ func (x *SkipListElement) GetPrev() *SkipListElementReference {
|
||||
return nil
|
||||
}
|
||||
|
||||
type NameBatchData struct {
|
||||
state protoimpl.MessageState
|
||||
sizeCache protoimpl.SizeCache
|
||||
unknownFields protoimpl.UnknownFields
|
||||
|
||||
Names [][]byte `protobuf:"bytes,1,rep,name=names,proto3" json:"names,omitempty"`
|
||||
}
|
||||
|
||||
func (x *NameBatchData) Reset() {
|
||||
*x = NameBatchData{}
|
||||
if protoimpl.UnsafeEnabled {
|
||||
mi := &file_skiplist_proto_msgTypes[3]
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
ms.StoreMessageInfo(mi)
|
||||
}
|
||||
}
|
||||
|
||||
func (x *NameBatchData) String() string {
|
||||
return protoimpl.X.MessageStringOf(x)
|
||||
}
|
||||
|
||||
func (*NameBatchData) ProtoMessage() {}
|
||||
|
||||
func (x *NameBatchData) ProtoReflect() protoreflect.Message {
|
||||
mi := &file_skiplist_proto_msgTypes[3]
|
||||
if protoimpl.UnsafeEnabled && x != nil {
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
if ms.LoadMessageInfo() == nil {
|
||||
ms.StoreMessageInfo(mi)
|
||||
}
|
||||
return ms
|
||||
}
|
||||
return mi.MessageOf(x)
|
||||
}
|
||||
|
||||
// Deprecated: Use NameBatchData.ProtoReflect.Descriptor instead.
|
||||
func (*NameBatchData) Descriptor() ([]byte, []int) {
|
||||
return file_skiplist_proto_rawDescGZIP(), []int{3}
|
||||
}
|
||||
|
||||
func (x *NameBatchData) GetNames() [][]byte {
|
||||
if x != nil {
|
||||
return x.Names
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
var File_skiplist_proto protoreflect.FileDescriptor
|
||||
|
||||
var file_skiplist_proto_rawDesc = []byte{
|
||||
@ -275,10 +322,13 @@ var file_skiplist_proto_rawDesc = []byte{
|
||||
0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x22, 0x2e, 0x73, 0x6b, 0x69, 0x70, 0x6c, 0x69, 0x73,
|
||||
0x74, 0x2e, 0x53, 0x6b, 0x69, 0x70, 0x4c, 0x69, 0x73, 0x74, 0x45, 0x6c, 0x65, 0x6d, 0x65, 0x6e,
|
||||
0x74, 0x52, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x52, 0x04, 0x70, 0x72, 0x65, 0x76,
|
||||
0x42, 0x33, 0x5a, 0x31, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63,
|
||||
0x68, 0x72, 0x69, 0x73, 0x6c, 0x75, 0x73, 0x66, 0x2f, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64,
|
||||
0x66, 0x73, 0x2f, 0x77, 0x65, 0x65, 0x64, 0x2f, 0x75, 0x74, 0x69, 0x6c, 0x2f, 0x73, 0x6b, 0x69,
|
||||
0x70, 0x6c, 0x69, 0x73, 0x74, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
|
||||
0x22, 0x25, 0x0a, 0x0d, 0x4e, 0x61, 0x6d, 0x65, 0x42, 0x61, 0x74, 0x63, 0x68, 0x44, 0x61, 0x74,
|
||||
0x61, 0x12, 0x14, 0x0a, 0x05, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0c,
|
||||
0x52, 0x05, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x42, 0x33, 0x5a, 0x31, 0x67, 0x69, 0x74, 0x68, 0x75,
|
||||
0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x68, 0x72, 0x69, 0x73, 0x6c, 0x75, 0x73, 0x66, 0x2f,
|
||||
0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2f, 0x77, 0x65, 0x65, 0x64, 0x2f, 0x75,
|
||||
0x74, 0x69, 0x6c, 0x2f, 0x73, 0x6b, 0x69, 0x70, 0x6c, 0x69, 0x73, 0x74, 0x62, 0x06, 0x70, 0x72,
|
||||
0x6f, 0x74, 0x6f, 0x33,
|
||||
}
|
||||
|
||||
var (
|
||||
@ -293,11 +343,12 @@ func file_skiplist_proto_rawDescGZIP() []byte {
|
||||
return file_skiplist_proto_rawDescData
|
||||
}
|
||||
|
||||
var file_skiplist_proto_msgTypes = make([]protoimpl.MessageInfo, 3)
|
||||
var file_skiplist_proto_msgTypes = make([]protoimpl.MessageInfo, 4)
|
||||
var file_skiplist_proto_goTypes = []interface{}{
|
||||
(*SkipListProto)(nil), // 0: skiplist.SkipListProto
|
||||
(*SkipListElementReference)(nil), // 1: skiplist.SkipListElementReference
|
||||
(*SkipListElement)(nil), // 2: skiplist.SkipListElement
|
||||
(*NameBatchData)(nil), // 3: skiplist.NameBatchData
|
||||
}
|
||||
var file_skiplist_proto_depIdxs = []int32{
|
||||
1, // 0: skiplist.SkipListProto.start_levels:type_name -> skiplist.SkipListElementReference
|
||||
@ -353,6 +404,18 @@ func file_skiplist_proto_init() {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
file_skiplist_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
|
||||
switch v := v.(*NameBatchData); i {
|
||||
case 0:
|
||||
return &v.state
|
||||
case 1:
|
||||
return &v.sizeCache
|
||||
case 2:
|
||||
return &v.unknownFields
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
type x struct{}
|
||||
out := protoimpl.TypeBuilder{
|
||||
@ -360,7 +423,7 @@ func file_skiplist_proto_init() {
|
||||
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
|
||||
RawDescriptor: file_skiplist_proto_rawDesc,
|
||||
NumEnums: 0,
|
||||
NumMessages: 3,
|
||||
NumMessages: 4,
|
||||
NumExtensions: 0,
|
||||
NumServices: 0,
|
||||
},
|
||||
|
@ -24,3 +24,7 @@ message SkipListElement {
|
||||
bytes value = 5;
|
||||
SkipListElementReference prev = 6;
|
||||
}
|
||||
|
||||
message NameBatchData {
|
||||
repeated bytes names = 1;
|
||||
}
|
Loading…
Reference in New Issue
Block a user