diff options
-rw-r--r-- | internal/fusefrontend/file.go | 8 | ||||
-rw-r--r-- | internal/fusefrontend/root_node.go | 4 | ||||
-rw-r--r-- | internal/serialize_reads/sr.go | 150 |
3 files changed, 1 insertions, 161 deletions
diff --git a/internal/fusefrontend/file.go b/internal/fusefrontend/file.go index 9481abf..6f33a60 100644 --- a/internal/fusefrontend/file.go +++ b/internal/fusefrontend/file.go @@ -20,7 +20,6 @@ import ( "github.com/rfjakob/gocryptfs/v2/internal/contentenc" "github.com/rfjakob/gocryptfs/v2/internal/inomap" "github.com/rfjakob/gocryptfs/v2/internal/openfiletable" - "github.com/rfjakob/gocryptfs/v2/internal/serialize_reads" "github.com/rfjakob/gocryptfs/v2/internal/stupidgcm" "github.com/rfjakob/gocryptfs/v2/internal/syscallcompat" "github.com/rfjakob/gocryptfs/v2/internal/tlog" @@ -252,13 +251,7 @@ func (f *File) Read(ctx context.Context, buf []byte, off int64) (resultData fuse defer f.fileTableEntry.ContentLock.RUnlock() tlog.Debug.Printf("ino%d: FUSE Read: offset=%d length=%d", f.qIno.Ino, off, len(buf)) - if f.rootNode.args.SerializeReads { - serialize_reads.Wait(off, len(buf)) - } out, errno := f.doRead(buf[:0], uint64(off), uint64(len(buf))) - if f.rootNode.args.SerializeReads { - serialize_reads.Done() - } if errno != 0 { return nil, errno } @@ -389,6 +382,7 @@ func (f *File) Write(ctx context.Context, data []byte, off int64) (uint32, sysca // But if the write directly follows an earlier write, it cannot create a // hole, and we can save one Stat() call. if !f.isConsecutiveWrite(off) { + fmt.Printf("isConsecutiveWrite=false, off=%d\n", off) errno := f.writePadHole(off) if errno != 0 { return 0, errno diff --git a/internal/fusefrontend/root_node.go b/internal/fusefrontend/root_node.go index 34b084b..7d37520 100644 --- a/internal/fusefrontend/root_node.go +++ b/internal/fusefrontend/root_node.go @@ -11,7 +11,6 @@ import ( "github.com/rfjakob/gocryptfs/v2/internal/contentenc" "github.com/rfjakob/gocryptfs/v2/internal/inomap" "github.com/rfjakob/gocryptfs/v2/internal/nametransform" - "github.com/rfjakob/gocryptfs/v2/internal/serialize_reads" "github.com/rfjakob/gocryptfs/v2/internal/syscallcompat" "github.com/rfjakob/gocryptfs/v2/internal/tlog" ) @@ -63,9 +62,6 @@ type RootNode struct { } func NewRootNode(args Args, c *contentenc.ContentEnc, n *nametransform.NameTransform) *RootNode { - if args.SerializeReads { - serialize_reads.InitSerializer() - } if len(args.Exclude) > 0 { tlog.Warn.Printf("Forward mode does not support -exclude") } diff --git a/internal/serialize_reads/sr.go b/internal/serialize_reads/sr.go deleted file mode 100644 index 88115f9..0000000 --- a/internal/serialize_reads/sr.go +++ /dev/null @@ -1,150 +0,0 @@ -package serialize_reads - -import ( - "log" - "sync" - "time" - - "github.com/rfjakob/gocryptfs/v2/internal/tlog" -) - -// serializerState is used by the Wait and Done functions -type serializerState struct { - // we get submissions through the "input" channel - input chan *submission - // q = Queue - q []*submission - // wg is used to wait for the read to complete before unblocking the next - wg sync.WaitGroup -} - -// Wait places the caller into a queue and blocks -func Wait(offset int64, size int) { - serializer.wait(offset, size) -} - -// Done signals that the read operation has finished -func Done() { - serializer.wg.Done() -} - -type submission struct { - // "ch" is closed by "eventLoop" once it wants to unblock the caller - ch chan struct{} - // submissions are prioritized by offset (lowest offset gets unblocked first) - offset int64 - // size will be used in the future to detect consecutive read requests. These - // can be unblocked immediately. - size int -} - -func (sr *serializerState) wait(offset int64, size int) { - ch := make(chan struct{}) - sb := &submission{ - ch: ch, - offset: offset, - size: size, - } - // Send our submission - sr.input <- sb - // Wait till we get unblocked - <-ch -} - -// push returns true if the queue is full after the element has been stored. -// It panics if it did not have space to store the element. -func (sr *serializerState) push(sb *submission) (full bool) { - free := 0 - stored := false - for i, v := range sr.q { - if v != nil { - continue - } - if !stored { - sr.q[i] = sb - stored = true - continue - } - free++ - } - if !stored { - // This should never happen because eventLoop checks if the queue got full - log.Panic("BUG: unhandled queue overflow") - } - if free == 0 { - return true - } - return false -} - -// pop the submission with the lowest offset off the queue -func (sr *serializerState) pop() *submission { - var winner *submission - var winnerIndex int - for i, v := range sr.q { - if v == nil { - continue - } - if winner == nil { - winner = v - winnerIndex = i - continue - } - if v.offset < winner.offset { - winner = v - winnerIndex = i - } - } - if winner == nil { - return nil - } - sr.q[winnerIndex] = nil - return winner -} - -func (sr *serializerState) eventLoop() { - sr.input = make(chan *submission) - empty := true - for { - if empty { - // If the queue is empty we block on the channel to conserve CPU - sb := <-sr.input - sr.push(sb) - empty = false - } - select { - case sb := <-sr.input: - full := sr.push(sb) - if full { - // Queue is full, unblock the new request immediately - tlog.Warn.Printf("serialize_reads: queue full, forcing unblock") - sr.unblockOne() - } - case <-time.After(time.Microsecond * 500): - // Looks like we have waited out all concurrent requests. - empty = sr.unblockOne() - } - } -} - -// Unblock a submission and wait for completion -func (sr *serializerState) unblockOne() (empty bool) { - winner := sr.pop() - if winner == nil { - return true - } - sr.wg.Add(1) - close(winner.ch) - sr.wg.Wait() - return false -} - -var serializer serializerState - -// InitSerializer sets up the internal serializer state and starts the event loop. -// Called by fusefrontend.NewFS. -func InitSerializer() { - serializer.input = make(chan *submission) - serializer.q = make([]*submission, 10) - go serializer.eventLoop() -} |