mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-01-19 06:53:32 +08:00
64 lines
1.6 KiB
Go
64 lines
1.6 KiB
Go
package page_writer
|
|
|
|
import (
|
|
"sync/atomic"
|
|
)
|
|
|
|
func (up *UploadPipeline) LockForRead(startOffset, stopOffset int64) {
|
|
startLogicChunkIndex := LogicChunkIndex(startOffset / up.ChunkSize)
|
|
stopLogicChunkIndex := LogicChunkIndex(stopOffset / up.ChunkSize)
|
|
if stopOffset%up.ChunkSize > 0 {
|
|
stopLogicChunkIndex += 1
|
|
}
|
|
up.activeReadChunksLock.Lock()
|
|
defer up.activeReadChunksLock.Unlock()
|
|
for i := startLogicChunkIndex; i < stopLogicChunkIndex; i++ {
|
|
if count, found := up.activeReadChunks[i]; found {
|
|
up.activeReadChunks[i] = count + 1
|
|
} else {
|
|
up.activeReadChunks[i] = 1
|
|
}
|
|
}
|
|
}
|
|
|
|
func (up *UploadPipeline) UnlockForRead(startOffset, stopOffset int64) {
|
|
startLogicChunkIndex := LogicChunkIndex(startOffset / up.ChunkSize)
|
|
stopLogicChunkIndex := LogicChunkIndex(stopOffset / up.ChunkSize)
|
|
if stopOffset%up.ChunkSize > 0 {
|
|
stopLogicChunkIndex += 1
|
|
}
|
|
up.activeReadChunksLock.Lock()
|
|
defer up.activeReadChunksLock.Unlock()
|
|
for i := startLogicChunkIndex; i < stopLogicChunkIndex; i++ {
|
|
if count, found := up.activeReadChunks[i]; found {
|
|
if count == 1 {
|
|
delete(up.activeReadChunks, i)
|
|
} else {
|
|
up.activeReadChunks[i] = count - 1
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (up *UploadPipeline) IsLocked(logicChunkIndex LogicChunkIndex) bool {
|
|
up.activeReadChunksLock.Lock()
|
|
defer up.activeReadChunksLock.Unlock()
|
|
if count, found := up.activeReadChunks[logicChunkIndex]; found {
|
|
return count > 0
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (up *UploadPipeline) waitForCurrentWritersToComplete() {
|
|
up.uploaderCountCond.L.Lock()
|
|
t := int32(100)
|
|
for {
|
|
t = atomic.LoadInt32(&up.uploaderCount)
|
|
if t <= 0 {
|
|
break
|
|
}
|
|
up.uploaderCountCond.Wait()
|
|
}
|
|
up.uploaderCountCond.L.Unlock()
|
|
}
|