From 00df0771e3dd9fba0992cbc9a7d347f25aff856a Mon Sep 17 00:00:00 2001 From: Jakob Unterwurzacher Date: Sat, 18 Mar 2017 16:01:50 +0100 Subject: serialize_reads: add read serialization logic Due to kernel readahead, we usually get multiple read requests at the same time. These get submitted to the backing storage in random order, which is a problem if seeking is very expensive. Details: https://github.com/rfjakob/gocryptfs/issues/92 --- cli_args.go | 3 +- internal/fusefrontend/args.go | 2 + internal/fusefrontend/file.go | 11 +++ internal/fusefrontend/fs.go | 5 ++ internal/serialize_reads/sr.go | 149 +++++++++++++++++++++++++++++++++++++++++ mount.go | 1 + 6 files changed, 170 insertions(+), 1 deletion(-) create mode 100644 internal/serialize_reads/sr.go diff --git a/cli_args.go b/cli_args.go index 00e9324..04009dc 100644 --- a/cli_args.go +++ b/cli_args.go @@ -18,7 +18,7 @@ type argContainer struct { debug, init, zerokey, fusedebug, openssl, passwd, fg, version, plaintextnames, quiet, nosyslog, wpanic, longnames, allow_other, ro, reverse, aessiv, nonempty, raw64, - noprealloc, speed, hkdf bool + noprealloc, speed, hkdf, serialize_reads bool masterkey, mountpoint, cipherdir, cpuprofile, extpass, memprofile, ko, passfile, ctlsock, fsname string // Configuration file name override @@ -112,6 +112,7 @@ func parseCliOpts() (args argContainer) { flagSet.BoolVar(&args.noprealloc, "noprealloc", false, "Disable preallocation before writing") flagSet.BoolVar(&args.speed, "speed", false, "Run crypto speed test") flagSet.BoolVar(&args.hkdf, "hkdf", true, "Use HKDF as an additional key derivation step") + flagSet.BoolVar(&args.serialize_reads, "serialize_reads", false, "Try to serialize read operations") flagSet.StringVar(&args.masterkey, "masterkey", "", "Mount with explicit master key") flagSet.StringVar(&args.cpuprofile, "cpuprofile", "", "Write cpu profile to specified file") flagSet.StringVar(&args.memprofile, "memprofile", "", "Write memory profile to specified file") diff --git a/internal/fusefrontend/args.go b/internal/fusefrontend/args.go index f76848d..4029913 100644 --- a/internal/fusefrontend/args.go +++ b/internal/fusefrontend/args.go @@ -27,4 +27,6 @@ type Args struct { // Use HKDF key derivation. // Corresponds to the HKDF feature flag introduced in gocryptfs v1.3. HKDF bool + // Try to serialize read operations, "-serialize_reads" + SerializeReads bool } diff --git a/internal/fusefrontend/file.go b/internal/fusefrontend/file.go index dac7510..b41f220 100644 --- a/internal/fusefrontend/file.go +++ b/internal/fusefrontend/file.go @@ -17,6 +17,7 @@ import ( "github.com/hanwen/go-fuse/fuse/nodefs" "github.com/rfjakob/gocryptfs/internal/contentenc" + "github.com/rfjakob/gocryptfs/internal/serialize_reads" "github.com/rfjakob/gocryptfs/internal/syscallcompat" "github.com/rfjakob/gocryptfs/internal/tlog" ) @@ -176,6 +177,7 @@ func (f *file) doRead(off uint64, length uint64) ([]byte, fuse.Status) { alignedOffset, alignedLength := blocks[0].JointCiphertextRange(blocks) skip := blocks[0].Skip tlog.Debug.Printf("JointCiphertextRange(%d, %d) -> %d, %d, %d", off, length, alignedOffset, alignedLength, skip) + ciphertext := make([]byte, int(alignedLength)) n, err := f.fd.ReadAt(ciphertext, int64(alignedOffset)) // We don't care if the file ID changes after we have read the data. Drop the lock. @@ -224,8 +226,17 @@ func (f *file) Read(buf []byte, off int64) (resultData fuse.ReadResult, code fus return nil, fuse.EBADF } + if f.fs.args.SerializeReads { + serialize_reads.Wait(off, len(buf)) + } + + fmt.Printf("%02d\n", off/131072) out, status := f.doRead(uint64(off), uint64(len(buf))) + if f.fs.args.SerializeReads { + serialize_reads.Done() + } + if status == fuse.EIO { tlog.Warn.Printf("ino%d: Read: returning EIO, offset=%d, length=%d", f.devIno.ino, len(buf), off) } diff --git a/internal/fusefrontend/fs.go b/internal/fusefrontend/fs.go index 9ffcff1..28c43b6 100644 --- a/internal/fusefrontend/fs.go +++ b/internal/fusefrontend/fs.go @@ -17,6 +17,7 @@ import ( "github.com/rfjakob/gocryptfs/internal/contentenc" "github.com/rfjakob/gocryptfs/internal/cryptocore" "github.com/rfjakob/gocryptfs/internal/nametransform" + "github.com/rfjakob/gocryptfs/internal/serialize_reads" "github.com/rfjakob/gocryptfs/internal/syscallcompat" "github.com/rfjakob/gocryptfs/internal/tlog" ) @@ -43,6 +44,10 @@ func NewFS(args Args) *FS { contentEnc := contentenc.New(cryptoCore, contentenc.DefaultBS) nameTransform := nametransform.New(cryptoCore.EMECipher, args.LongNames, args.Raw64) + if args.SerializeReads { + serialize_reads.Init() + } + return &FS{ FileSystem: pathfs.NewLoopbackFileSystem(args.Cipherdir), args: args, diff --git a/internal/serialize_reads/sr.go b/internal/serialize_reads/sr.go new file mode 100644 index 0000000..0f623d3 --- /dev/null +++ b/internal/serialize_reads/sr.go @@ -0,0 +1,149 @@ +package serialize_reads + +import ( + "log" + "sync" + "time" + + "github.com/rfjakob/gocryptfs/internal/tlog" +) + +type SerializerStruct struct { + // we get submissions through the "input" channel + input chan *submission + // q = Queue + q []*submission + // wg is used to wait for the read to complete before unblocking the next + wg sync.WaitGroup +} + +// Wait places the caller into a queue and blocks +func Wait(offset int64, size int) { + serializer.wait(offset, size) +} + +// Done signals that the read operation has finished +func Done() { + serializer.wg.Done() +} + +type submission struct { + // "ch" is closed by "eventLoop" once it wants to unblock the caller + ch chan struct{} + // submissions are prioritized by offset (lowest offset gets unblocked first) + offset int64 + // size will be used in the future to detect consecutive read requests. These + // can be unblocked immediately. + size int +} + +func (sr *SerializerStruct) wait(offset int64, size int) { + ch := make(chan struct{}) + sb := &submission{ + ch: ch, + offset: offset, + size: size, + } + // Send our submission + sr.input <- sb + // Wait till we get unblocked + <-ch +} + +// push returns true if the queue is full after the element has been stored. +// It panics if it did not have space to store the element. +func (sr *SerializerStruct) push(sb *submission) (full bool) { + free := 0 + stored := false + for i, v := range sr.q { + if v != nil { + continue + } + if !stored { + sr.q[i] = sb + stored = true + continue + } + free++ + } + if !stored { + // This should never happen because eventLoop checks if the queue got full + log.Panic("BUG: unhandled queue overflow") + } + if free == 0 { + return true + } + return false +} + +// pop the submission with the lowest offset off the queue +func (sr *SerializerStruct) pop() *submission { + var winner *submission + var winnerIndex int + for i, v := range sr.q { + if v == nil { + continue + } + if winner == nil { + winner = v + winnerIndex = i + continue + } + if v.offset < winner.offset { + winner = v + winnerIndex = i + } + } + if winner == nil { + return nil + } + sr.q[winnerIndex] = nil + return winner +} + +func (sr *SerializerStruct) eventLoop() { + sr.input = make(chan *submission) + empty := true + for { + if empty { + // If the queue is empty we block on the channel to conserve CPU + sb := <-sr.input + sr.push(sb) + empty = false + } + select { + case sb := <-sr.input: + full := sr.push(sb) + if full { + // Queue is full, unblock the new request immediately + tlog.Warn.Printf("serialize_reads: queue full, forcing unblock") + sr.unblockOne() + } + continue + case <-time.After(time.Microsecond * 500): + // Looks like we have waited out all concurrent requests. + empty = sr.unblockOne() + } + } +} + +// Unblock a submission and wait for completion +func (sr *SerializerStruct) unblockOne() (empty bool) { + winner := sr.pop() + if winner == nil { + return true + } + sr.wg.Add(1) + close(winner.ch) + sr.wg.Wait() + return false +} + +var serializer SerializerStruct + +// Called by fusefrontend.NewFS +func Init() { + serializer.input = make(chan *submission) + serializer.q = make([]*submission, 10) + go serializer.eventLoop() +} diff --git a/mount.go b/mount.go index f4b562d..c4bdc4d 100644 --- a/mount.go +++ b/mount.go @@ -191,6 +191,7 @@ func initFuseFrontend(key []byte, args *argContainer, confFile *configfile.ConfF Raw64: args.raw64, NoPrealloc: args.noprealloc, HKDF: args.hkdf, + SerializeReads: args.serialize_reads, } // confFile is nil when "-zerokey" or "-masterkey" was used if confFile != nil { -- cgit v1.2.3