aboutsummaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
Diffstat (limited to 'internal')
-rw-r--r--internal/fusefrontend/args.go2
-rw-r--r--internal/fusefrontend/file.go11
-rw-r--r--internal/fusefrontend/fs.go5
-rw-r--r--internal/serialize_reads/sr.go149
4 files changed, 167 insertions, 0 deletions
diff --git a/internal/fusefrontend/args.go b/internal/fusefrontend/args.go
index f76848d..4029913 100644
--- a/internal/fusefrontend/args.go
+++ b/internal/fusefrontend/args.go
@@ -27,4 +27,6 @@ type Args struct {
// Use HKDF key derivation.
// Corresponds to the HKDF feature flag introduced in gocryptfs v1.3.
HKDF bool
+ // Try to serialize read operations, "-serialize_reads"
+ SerializeReads bool
}
diff --git a/internal/fusefrontend/file.go b/internal/fusefrontend/file.go
index dac7510..b41f220 100644
--- a/internal/fusefrontend/file.go
+++ b/internal/fusefrontend/file.go
@@ -17,6 +17,7 @@ import (
"github.com/hanwen/go-fuse/fuse/nodefs"
"github.com/rfjakob/gocryptfs/internal/contentenc"
+ "github.com/rfjakob/gocryptfs/internal/serialize_reads"
"github.com/rfjakob/gocryptfs/internal/syscallcompat"
"github.com/rfjakob/gocryptfs/internal/tlog"
)
@@ -176,6 +177,7 @@ func (f *file) doRead(off uint64, length uint64) ([]byte, fuse.Status) {
alignedOffset, alignedLength := blocks[0].JointCiphertextRange(blocks)
skip := blocks[0].Skip
tlog.Debug.Printf("JointCiphertextRange(%d, %d) -> %d, %d, %d", off, length, alignedOffset, alignedLength, skip)
+
ciphertext := make([]byte, int(alignedLength))
n, err := f.fd.ReadAt(ciphertext, int64(alignedOffset))
// We don't care if the file ID changes after we have read the data. Drop the lock.
@@ -224,8 +226,17 @@ func (f *file) Read(buf []byte, off int64) (resultData fuse.ReadResult, code fus
return nil, fuse.EBADF
}
+ if f.fs.args.SerializeReads {
+ serialize_reads.Wait(off, len(buf))
+ }
+
+ fmt.Printf("%02d\n", off/131072)
out, status := f.doRead(uint64(off), uint64(len(buf)))
+ if f.fs.args.SerializeReads {
+ serialize_reads.Done()
+ }
+
if status == fuse.EIO {
tlog.Warn.Printf("ino%d: Read: returning EIO, offset=%d, length=%d", f.devIno.ino, len(buf), off)
}
diff --git a/internal/fusefrontend/fs.go b/internal/fusefrontend/fs.go
index 9ffcff1..28c43b6 100644
--- a/internal/fusefrontend/fs.go
+++ b/internal/fusefrontend/fs.go
@@ -17,6 +17,7 @@ import (
"github.com/rfjakob/gocryptfs/internal/contentenc"
"github.com/rfjakob/gocryptfs/internal/cryptocore"
"github.com/rfjakob/gocryptfs/internal/nametransform"
+ "github.com/rfjakob/gocryptfs/internal/serialize_reads"
"github.com/rfjakob/gocryptfs/internal/syscallcompat"
"github.com/rfjakob/gocryptfs/internal/tlog"
)
@@ -43,6 +44,10 @@ func NewFS(args Args) *FS {
contentEnc := contentenc.New(cryptoCore, contentenc.DefaultBS)
nameTransform := nametransform.New(cryptoCore.EMECipher, args.LongNames, args.Raw64)
+ if args.SerializeReads {
+ serialize_reads.Init()
+ }
+
return &FS{
FileSystem: pathfs.NewLoopbackFileSystem(args.Cipherdir),
args: args,
diff --git a/internal/serialize_reads/sr.go b/internal/serialize_reads/sr.go
new file mode 100644
index 0000000..0f623d3
--- /dev/null
+++ b/internal/serialize_reads/sr.go
@@ -0,0 +1,149 @@
+package serialize_reads
+
+import (
+ "log"
+ "sync"
+ "time"
+
+ "github.com/rfjakob/gocryptfs/internal/tlog"
+)
+
+type SerializerStruct struct {
+ // we get submissions through the "input" channel
+ input chan *submission
+ // q = Queue
+ q []*submission
+ // wg is used to wait for the read to complete before unblocking the next
+ wg sync.WaitGroup
+}
+
+// Wait places the caller into a queue and blocks
+func Wait(offset int64, size int) {
+ serializer.wait(offset, size)
+}
+
+// Done signals that the read operation has finished
+func Done() {
+ serializer.wg.Done()
+}
+
+type submission struct {
+ // "ch" is closed by "eventLoop" once it wants to unblock the caller
+ ch chan struct{}
+ // submissions are prioritized by offset (lowest offset gets unblocked first)
+ offset int64
+ // size will be used in the future to detect consecutive read requests. These
+ // can be unblocked immediately.
+ size int
+}
+
+func (sr *SerializerStruct) wait(offset int64, size int) {
+ ch := make(chan struct{})
+ sb := &submission{
+ ch: ch,
+ offset: offset,
+ size: size,
+ }
+ // Send our submission
+ sr.input <- sb
+ // Wait till we get unblocked
+ <-ch
+}
+
+// push returns true if the queue is full after the element has been stored.
+// It panics if it did not have space to store the element.
+func (sr *SerializerStruct) push(sb *submission) (full bool) {
+ free := 0
+ stored := false
+ for i, v := range sr.q {
+ if v != nil {
+ continue
+ }
+ if !stored {
+ sr.q[i] = sb
+ stored = true
+ continue
+ }
+ free++
+ }
+ if !stored {
+ // This should never happen because eventLoop checks if the queue got full
+ log.Panic("BUG: unhandled queue overflow")
+ }
+ if free == 0 {
+ return true
+ }
+ return false
+}
+
+// pop the submission with the lowest offset off the queue
+func (sr *SerializerStruct) pop() *submission {
+ var winner *submission
+ var winnerIndex int
+ for i, v := range sr.q {
+ if v == nil {
+ continue
+ }
+ if winner == nil {
+ winner = v
+ winnerIndex = i
+ continue
+ }
+ if v.offset < winner.offset {
+ winner = v
+ winnerIndex = i
+ }
+ }
+ if winner == nil {
+ return nil
+ }
+ sr.q[winnerIndex] = nil
+ return winner
+}
+
+func (sr *SerializerStruct) eventLoop() {
+ sr.input = make(chan *submission)
+ empty := true
+ for {
+ if empty {
+ // If the queue is empty we block on the channel to conserve CPU
+ sb := <-sr.input
+ sr.push(sb)
+ empty = false
+ }
+ select {
+ case sb := <-sr.input:
+ full := sr.push(sb)
+ if full {
+ // Queue is full, unblock the new request immediately
+ tlog.Warn.Printf("serialize_reads: queue full, forcing unblock")
+ sr.unblockOne()
+ }
+ continue
+ case <-time.After(time.Microsecond * 500):
+ // Looks like we have waited out all concurrent requests.
+ empty = sr.unblockOne()
+ }
+ }
+}
+
+// Unblock a submission and wait for completion
+func (sr *SerializerStruct) unblockOne() (empty bool) {
+ winner := sr.pop()
+ if winner == nil {
+ return true
+ }
+ sr.wg.Add(1)
+ close(winner.ch)
+ sr.wg.Wait()
+ return false
+}
+
+var serializer SerializerStruct
+
+// Called by fusefrontend.NewFS
+func Init() {
+ serializer.input = make(chan *submission)
+ serializer.q = make([]*submission, 10)
+ go serializer.eventLoop()
+}