aboutsummaryrefslogtreecommitdiff
path: root/internal/serialize_reads/sr.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/serialize_reads/sr.go')
-rw-r--r--internal/serialize_reads/sr.go150
1 files changed, 0 insertions, 150 deletions
diff --git a/internal/serialize_reads/sr.go b/internal/serialize_reads/sr.go
deleted file mode 100644
index 88115f9..0000000
--- a/internal/serialize_reads/sr.go
+++ /dev/null
@@ -1,150 +0,0 @@
-package serialize_reads
-
-import (
- "log"
- "sync"
- "time"
-
- "github.com/rfjakob/gocryptfs/v2/internal/tlog"
-)
-
-// serializerState is used by the Wait and Done functions
-type serializerState struct {
- // we get submissions through the "input" channel
- input chan *submission
- // q = Queue
- q []*submission
- // wg is used to wait for the read to complete before unblocking the next
- wg sync.WaitGroup
-}
-
-// Wait places the caller into a queue and blocks
-func Wait(offset int64, size int) {
- serializer.wait(offset, size)
-}
-
-// Done signals that the read operation has finished
-func Done() {
- serializer.wg.Done()
-}
-
-type submission struct {
- // "ch" is closed by "eventLoop" once it wants to unblock the caller
- ch chan struct{}
- // submissions are prioritized by offset (lowest offset gets unblocked first)
- offset int64
- // size will be used in the future to detect consecutive read requests. These
- // can be unblocked immediately.
- size int
-}
-
-func (sr *serializerState) wait(offset int64, size int) {
- ch := make(chan struct{})
- sb := &submission{
- ch: ch,
- offset: offset,
- size: size,
- }
- // Send our submission
- sr.input <- sb
- // Wait till we get unblocked
- <-ch
-}
-
-// push returns true if the queue is full after the element has been stored.
-// It panics if it did not have space to store the element.
-func (sr *serializerState) push(sb *submission) (full bool) {
- free := 0
- stored := false
- for i, v := range sr.q {
- if v != nil {
- continue
- }
- if !stored {
- sr.q[i] = sb
- stored = true
- continue
- }
- free++
- }
- if !stored {
- // This should never happen because eventLoop checks if the queue got full
- log.Panic("BUG: unhandled queue overflow")
- }
- if free == 0 {
- return true
- }
- return false
-}
-
-// pop the submission with the lowest offset off the queue
-func (sr *serializerState) pop() *submission {
- var winner *submission
- var winnerIndex int
- for i, v := range sr.q {
- if v == nil {
- continue
- }
- if winner == nil {
- winner = v
- winnerIndex = i
- continue
- }
- if v.offset < winner.offset {
- winner = v
- winnerIndex = i
- }
- }
- if winner == nil {
- return nil
- }
- sr.q[winnerIndex] = nil
- return winner
-}
-
-func (sr *serializerState) eventLoop() {
- sr.input = make(chan *submission)
- empty := true
- for {
- if empty {
- // If the queue is empty we block on the channel to conserve CPU
- sb := <-sr.input
- sr.push(sb)
- empty = false
- }
- select {
- case sb := <-sr.input:
- full := sr.push(sb)
- if full {
- // Queue is full, unblock the new request immediately
- tlog.Warn.Printf("serialize_reads: queue full, forcing unblock")
- sr.unblockOne()
- }
- case <-time.After(time.Microsecond * 500):
- // Looks like we have waited out all concurrent requests.
- empty = sr.unblockOne()
- }
- }
-}
-
-// Unblock a submission and wait for completion
-func (sr *serializerState) unblockOne() (empty bool) {
- winner := sr.pop()
- if winner == nil {
- return true
- }
- sr.wg.Add(1)
- close(winner.ch)
- sr.wg.Wait()
- return false
-}
-
-var serializer serializerState
-
-// InitSerializer sets up the internal serializer state and starts the event loop.
-// Called by fusefrontend.NewFS.
-func InitSerializer() {
- serializer.input = make(chan *submission)
- serializer.q = make([]*submission, 10)
- go serializer.eventLoop()
-}