diff options
| author | Jakob Unterwurzacher | 2017-03-18 16:01:50 +0100 | 
|---|---|---|
| committer | Jakob Unterwurzacher | 2017-03-18 16:18:00 +0100 | 
| commit | 00df0771e3dd9fba0992cbc9a7d347f25aff856a (patch) | |
| tree | 62c7a234184413ffd8250579d0c7935aa2941553 /internal | |
| parent | 14038a1644f17f50b113a05d09a2a0a3b3e973b2 (diff) | |
serialize_reads: add read serialization logic
Due to kernel readahead, we usually get multiple read requests
at the same time. These get submitted to the backing storage in
random order, which is a problem if seeking is very expensive.
Details: https://github.com/rfjakob/gocryptfs/issues/92
Diffstat (limited to 'internal')
| -rw-r--r-- | internal/fusefrontend/args.go | 2 | ||||
| -rw-r--r-- | internal/fusefrontend/file.go | 11 | ||||
| -rw-r--r-- | internal/fusefrontend/fs.go | 5 | ||||
| -rw-r--r-- | internal/serialize_reads/sr.go | 149 | 
4 files changed, 167 insertions, 0 deletions
| diff --git a/internal/fusefrontend/args.go b/internal/fusefrontend/args.go index f76848d..4029913 100644 --- a/internal/fusefrontend/args.go +++ b/internal/fusefrontend/args.go @@ -27,4 +27,6 @@ type Args struct {  	// Use HKDF key derivation.  	// Corresponds to the HKDF feature flag introduced in gocryptfs v1.3.  	HKDF bool +	// Try to serialize read operations, "-serialize_reads" +	SerializeReads bool  } diff --git a/internal/fusefrontend/file.go b/internal/fusefrontend/file.go index dac7510..b41f220 100644 --- a/internal/fusefrontend/file.go +++ b/internal/fusefrontend/file.go @@ -17,6 +17,7 @@ import (  	"github.com/hanwen/go-fuse/fuse/nodefs"  	"github.com/rfjakob/gocryptfs/internal/contentenc" +	"github.com/rfjakob/gocryptfs/internal/serialize_reads"  	"github.com/rfjakob/gocryptfs/internal/syscallcompat"  	"github.com/rfjakob/gocryptfs/internal/tlog"  ) @@ -176,6 +177,7 @@ func (f *file) doRead(off uint64, length uint64) ([]byte, fuse.Status) {  	alignedOffset, alignedLength := blocks[0].JointCiphertextRange(blocks)  	skip := blocks[0].Skip  	tlog.Debug.Printf("JointCiphertextRange(%d, %d) -> %d, %d, %d", off, length, alignedOffset, alignedLength, skip) +  	ciphertext := make([]byte, int(alignedLength))  	n, err := f.fd.ReadAt(ciphertext, int64(alignedOffset))  	// We don't care if the file ID changes after we have read the data. Drop the lock. @@ -224,8 +226,17 @@ func (f *file) Read(buf []byte, off int64) (resultData fuse.ReadResult, code fus  		return nil, fuse.EBADF  	} +	if f.fs.args.SerializeReads { +		serialize_reads.Wait(off, len(buf)) +	} + +	fmt.Printf("%02d\n", off/131072)  	out, status := f.doRead(uint64(off), uint64(len(buf))) +	if f.fs.args.SerializeReads { +		serialize_reads.Done() +	} +  	if status == fuse.EIO {  		tlog.Warn.Printf("ino%d: Read: returning EIO, offset=%d, length=%d", f.devIno.ino, len(buf), off)  	} diff --git a/internal/fusefrontend/fs.go b/internal/fusefrontend/fs.go index 9ffcff1..28c43b6 100644 --- a/internal/fusefrontend/fs.go +++ b/internal/fusefrontend/fs.go @@ -17,6 +17,7 @@ import (  	"github.com/rfjakob/gocryptfs/internal/contentenc"  	"github.com/rfjakob/gocryptfs/internal/cryptocore"  	"github.com/rfjakob/gocryptfs/internal/nametransform" +	"github.com/rfjakob/gocryptfs/internal/serialize_reads"  	"github.com/rfjakob/gocryptfs/internal/syscallcompat"  	"github.com/rfjakob/gocryptfs/internal/tlog"  ) @@ -43,6 +44,10 @@ func NewFS(args Args) *FS {  	contentEnc := contentenc.New(cryptoCore, contentenc.DefaultBS)  	nameTransform := nametransform.New(cryptoCore.EMECipher, args.LongNames, args.Raw64) +	if args.SerializeReads { +		serialize_reads.Init() +	} +  	return &FS{  		FileSystem:    pathfs.NewLoopbackFileSystem(args.Cipherdir),  		args:          args, diff --git a/internal/serialize_reads/sr.go b/internal/serialize_reads/sr.go new file mode 100644 index 0000000..0f623d3 --- /dev/null +++ b/internal/serialize_reads/sr.go @@ -0,0 +1,149 @@ +package serialize_reads + +import ( +	"log" +	"sync" +	"time" + +	"github.com/rfjakob/gocryptfs/internal/tlog" +) + +type SerializerStruct struct { +	// we get submissions through the "input" channel +	input chan *submission +	// q = Queue +	q []*submission +	// wg is used to wait for the read to complete before unblocking the next +	wg sync.WaitGroup +} + +// Wait places the caller into a queue and blocks +func Wait(offset int64, size int) { +	serializer.wait(offset, size) +} + +// Done signals that the read operation has finished +func Done() { +	serializer.wg.Done() +} + +type submission struct { +	// "ch" is closed by "eventLoop" once it wants to unblock the caller +	ch chan struct{} +	// submissions are prioritized by offset (lowest offset gets unblocked first) +	offset int64 +	// size will be used in the future to detect consecutive read requests. These +	// can be unblocked immediately. +	size int +} + +func (sr *SerializerStruct) wait(offset int64, size int) { +	ch := make(chan struct{}) +	sb := &submission{ +		ch:     ch, +		offset: offset, +		size:   size, +	} +	// Send our submission +	sr.input <- sb +	// Wait till we get unblocked +	<-ch +} + +// push returns true if the queue is full after the element has been stored. +// It panics if it did not have space to store the element. +func (sr *SerializerStruct) push(sb *submission) (full bool) { +	free := 0 +	stored := false +	for i, v := range sr.q { +		if v != nil { +			continue +		} +		if !stored { +			sr.q[i] = sb +			stored = true +			continue +		} +		free++ +	} +	if !stored { +		// This should never happen because eventLoop checks if the queue got full +		log.Panic("BUG: unhandled queue overflow") +	} +	if free == 0 { +		return true +	} +	return false +} + +// pop the submission with the lowest offset off the queue +func (sr *SerializerStruct) pop() *submission { +	var winner *submission +	var winnerIndex int +	for i, v := range sr.q { +		if v == nil { +			continue +		} +		if winner == nil { +			winner = v +			winnerIndex = i +			continue +		} +		if v.offset < winner.offset { +			winner = v +			winnerIndex = i +		} +	} +	if winner == nil { +		return nil +	} +	sr.q[winnerIndex] = nil +	return winner +} + +func (sr *SerializerStruct) eventLoop() { +	sr.input = make(chan *submission) +	empty := true +	for { +		if empty { +			// If the queue is empty we block on the channel to conserve CPU +			sb := <-sr.input +			sr.push(sb) +			empty = false +		} +		select { +		case sb := <-sr.input: +			full := sr.push(sb) +			if full { +				// Queue is full, unblock the new request immediately +				tlog.Warn.Printf("serialize_reads: queue full, forcing unblock") +				sr.unblockOne() +			} +			continue +		case <-time.After(time.Microsecond * 500): +			// Looks like we have waited out all concurrent requests. +			empty = sr.unblockOne() +		} +	} +} + +// Unblock a submission and wait for completion +func (sr *SerializerStruct) unblockOne() (empty bool) { +	winner := sr.pop() +	if winner == nil { +		return true +	} +	sr.wg.Add(1) +	close(winner.ch) +	sr.wg.Wait() +	return false +} + +var serializer SerializerStruct + +// Called by fusefrontend.NewFS +func Init() { +	serializer.input = make(chan *submission) +	serializer.q = make([]*submission, 10) +	go serializer.eventLoop() +} | 
