diff options
Diffstat (limited to 'internal/serialize_reads/sr.go')
-rw-r--r-- | internal/serialize_reads/sr.go | 149 |
1 files changed, 149 insertions, 0 deletions
diff --git a/internal/serialize_reads/sr.go b/internal/serialize_reads/sr.go new file mode 100644 index 0000000..0f623d3 --- /dev/null +++ b/internal/serialize_reads/sr.go @@ -0,0 +1,149 @@ +package serialize_reads + +import ( + "log" + "sync" + "time" + + "github.com/rfjakob/gocryptfs/internal/tlog" +) + +type SerializerStruct struct { + // we get submissions through the "input" channel + input chan *submission + // q = Queue + q []*submission + // wg is used to wait for the read to complete before unblocking the next + wg sync.WaitGroup +} + +// Wait places the caller into a queue and blocks +func Wait(offset int64, size int) { + serializer.wait(offset, size) +} + +// Done signals that the read operation has finished +func Done() { + serializer.wg.Done() +} + +type submission struct { + // "ch" is closed by "eventLoop" once it wants to unblock the caller + ch chan struct{} + // submissions are prioritized by offset (lowest offset gets unblocked first) + offset int64 + // size will be used in the future to detect consecutive read requests. These + // can be unblocked immediately. + size int +} + +func (sr *SerializerStruct) wait(offset int64, size int) { + ch := make(chan struct{}) + sb := &submission{ + ch: ch, + offset: offset, + size: size, + } + // Send our submission + sr.input <- sb + // Wait till we get unblocked + <-ch +} + +// push returns true if the queue is full after the element has been stored. +// It panics if it did not have space to store the element. +func (sr *SerializerStruct) push(sb *submission) (full bool) { + free := 0 + stored := false + for i, v := range sr.q { + if v != nil { + continue + } + if !stored { + sr.q[i] = sb + stored = true + continue + } + free++ + } + if !stored { + // This should never happen because eventLoop checks if the queue got full + log.Panic("BUG: unhandled queue overflow") + } + if free == 0 { + return true + } + return false +} + +// pop the submission with the lowest offset off the queue +func (sr *SerializerStruct) pop() *submission { + var winner *submission + var winnerIndex int + for i, v := range sr.q { + if v == nil { + continue + } + if winner == nil { + winner = v + winnerIndex = i + continue + } + if v.offset < winner.offset { + winner = v + winnerIndex = i + } + } + if winner == nil { + return nil + } + sr.q[winnerIndex] = nil + return winner +} + +func (sr *SerializerStruct) eventLoop() { + sr.input = make(chan *submission) + empty := true + for { + if empty { + // If the queue is empty we block on the channel to conserve CPU + sb := <-sr.input + sr.push(sb) + empty = false + } + select { + case sb := <-sr.input: + full := sr.push(sb) + if full { + // Queue is full, unblock the new request immediately + tlog.Warn.Printf("serialize_reads: queue full, forcing unblock") + sr.unblockOne() + } + continue + case <-time.After(time.Microsecond * 500): + // Looks like we have waited out all concurrent requests. + empty = sr.unblockOne() + } + } +} + +// Unblock a submission and wait for completion +func (sr *SerializerStruct) unblockOne() (empty bool) { + winner := sr.pop() + if winner == nil { + return true + } + sr.wg.Add(1) + close(winner.ch) + sr.wg.Wait() + return false +} + +var serializer SerializerStruct + +// Called by fusefrontend.NewFS +func Init() { + serializer.input = make(chan *submission) + serializer.q = make([]*submission, 10) + go serializer.eventLoop() +} |