From bdf2ddddfdccbc3d48e763457a779c8cc9ece6ac Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 2 Apr 2021 02:21:38 -0700 Subject: [PATCH] revert to same implementation as before This reverts commit 7e8edc3c4adaf1251f5c773cbeaf3a868269f97a. --- weed/filesys/wfs.go | 4 ++-- weed/util/limiter.go | 40 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+), 2 deletions(-) diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index f0ac6d80d..c6d9080a1 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -77,7 +77,7 @@ type WFS struct { signature int32 // throttle writers - concurrentWriters *util.LimitedOutOfOrderProcessor + concurrentWriters *util.LimitedConcurrentExecutor Server *fs.Server } type statsCache struct { @@ -135,7 +135,7 @@ func NewSeaweedFileSystem(option *Option) *WFS { wfs.fsNodeCache = newFsCache(wfs.root) if wfs.option.ConcurrentWriters > 0 { - wfs.concurrentWriters = util.NewLimitedOutOfOrderProcessor(int32(wfs.option.ConcurrentWriters)) + wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters) } return wfs diff --git a/weed/util/limiter.go b/weed/util/limiter.go index 2e5168d3d..ccdaa701e 100644 --- a/weed/util/limiter.go +++ b/weed/util/limiter.go @@ -7,6 +7,46 @@ import ( "sync/atomic" ) +// initial version comes from https://github.com/korovkin/limiter/blob/master/limiter.go + +// LimitedConcurrentExecutor object +type LimitedConcurrentExecutor struct { + limit int + tokenChan chan int +} + +func NewLimitedConcurrentExecutor(limit int) *LimitedConcurrentExecutor { + + // allocate a limiter instance + c := &LimitedConcurrentExecutor{ + limit: limit, + tokenChan: make(chan int, limit), + } + + // allocate the tokenChan: + for i := 0; i < c.limit; i++ { + c.tokenChan <- i + } + + return c +} + +// Execute adds a function to the execution queue. +// if num of go routines allocated by this instance is < limit +// launch a new go routine to execute job +// else wait until a go routine becomes available +func (c *LimitedConcurrentExecutor) Execute(job func()) { + token := <-c.tokenChan + go func() { + defer func() { + c.tokenChan <- token + }() + // run the job + job() + }() +} + +// a different implementation, but somehow more "conservative" type OperationRequest func() type LimitedOutOfOrderProcessor struct {