diff options
| author | Jakob Unterwurzacher | 2018-07-22 22:29:22 +0200 | 
|---|---|---|
| committer | Jakob Unterwurzacher | 2018-07-22 22:29:22 +0200 | 
| commit | f316f1b2df47dca651174e574ab072f6b46c0b01 (patch) | |
| tree | 16176cb053718ba6b2815d08df44040f75d3fdae /internal/fusefrontend | |
| parent | c70df522d2a78f3152fa61511bed9fafa7c495a3 (diff) | |
fusefronted: disallow writes running concurrently with reads
As uncovered by xfstests generic/465, concurrent reads and writes
could lead to this,
  doRead 3015532: corrupt block #1039: stupidgcm: message authentication failed,
as the read could pick up a block that has not yet been completely written -
write() is not atomic!
Now writes take ContentLock exclusively, while reads take it shared,
meaning that multiple reads can run in parallel with each other, but
not with a write.
This also simplifies the file header locking.
Diffstat (limited to 'internal/fusefrontend')
| -rw-r--r-- | internal/fusefrontend/file.go | 86 | ||||
| -rw-r--r-- | internal/fusefrontend/file_allocate_truncate.go | 6 | 
2 files changed, 37 insertions, 55 deletions
| diff --git a/internal/fusefrontend/file.go b/internal/fusefrontend/file.go index ffc41c6..32f615b 100644 --- a/internal/fusefrontend/file.go +++ b/internal/fusefrontend/file.go @@ -145,39 +145,34 @@ func (f *File) createHeader() (fileID []byte, err error) {  //  // Called by Read() for normal reading,  // by Write() and Truncate() via doWrite() for Read-Modify-Write. -// -// doWrite() uses nolock=true because it makes sure the ID is in the cache and -// HeaderLock is locked before calling doRead. -func (f *File) doRead(dst []byte, off uint64, length uint64, nolock bool) ([]byte, fuse.Status) { -	if !nolock { -		// Make sure we have the file ID. -		f.fileTableEntry.HeaderLock.RLock() -		if f.fileTableEntry.ID == nil { -			f.fileTableEntry.HeaderLock.RUnlock() -			// Yes, somebody else may take the lock before we can. This will get -			// the header read twice, but causes no harm otherwise. -			f.fileTableEntry.HeaderLock.Lock() -			tmpID, err := f.readFileID() +func (f *File) doRead(dst []byte, off uint64, length uint64) ([]byte, fuse.Status) { +	// Get the file ID, either from the open file table, or from disk. +	var fileID []byte +	f.fileTableEntry.IDLock.Lock() +	if f.fileTableEntry.ID != nil { +		// Use the cached value in the file table +		fileID = f.fileTableEntry.ID +	} else { +		// Not cached, we have to read it from disk. +		var err error +		fileID, err = f.readFileID() +		if err != nil { +			f.fileTableEntry.IDLock.Unlock()  			if err == io.EOF { -				f.fileTableEntry.HeaderLock.Unlock() +				// Empty file  				return nil, fuse.OK  			}  			if err != nil { -				f.fileTableEntry.HeaderLock.Unlock()  				tlog.Warn.Printf("doRead %d: corrupt header: %v", f.qIno.Ino, err)  				return nil, fuse.EIO  			} -			f.fileTableEntry.ID = tmpID -			// Downgrade the lock. -			f.fileTableEntry.HeaderLock.Unlock() -			// The file ID may change in here. This does no harm because we -			// re-read it after the RLock(). -			f.fileTableEntry.HeaderLock.RLock()  		} +		// Save into the file table +		f.fileTableEntry.ID = fileID  	} -	fileID := f.fileTableEntry.ID +	f.fileTableEntry.IDLock.Unlock()  	if fileID == nil { -		log.Panicf("filedID=%v, nolock=%v", fileID, nolock) +		log.Panicf("fileID=%v", fileID)  	}  	// Read the backing ciphertext in one go  	blocks := f.contentEnc.ExplodePlainRange(off, length) @@ -189,10 +184,6 @@ func (f *File) doRead(dst []byte, off uint64, length uint64, nolock bool) ([]byt  	ciphertext := f.fs.contentEnc.CReqPool.Get()  	ciphertext = ciphertext[:int(alignedLength)]  	n, err := f.fd.ReadAt(ciphertext, int64(alignedOffset)) -	// We don't care if the file ID changes after we have read the data. Drop the lock. -	if !nolock { -		f.fileTableEntry.HeaderLock.RUnlock() -	}  	if err != nil && err != io.EOF {  		tlog.Warn.Printf("read: ReadAt: %s", err.Error())  		return nil, fuse.ToStatus(err) @@ -251,11 +242,14 @@ func (f *File) Read(buf []byte, off int64) (resultData fuse.ReadResult, code fus  	f.fdLock.RLock()  	defer f.fdLock.RUnlock() +	f.fileTableEntry.ContentLock.RLock() +	defer f.fileTableEntry.ContentLock.RUnlock() +  	tlog.Debug.Printf("ino%d: FUSE Read: offset=%d length=%d", f.qIno.Ino, len(buf), off)  	if f.fs.args.SerializeReads {  		serialize_reads.Wait(off, len(buf))  	} -	out, status := f.doRead(buf[:0], uint64(off), uint64(len(buf)), false) +	out, status := f.doRead(buf[:0], uint64(off), uint64(len(buf)))  	if f.fs.args.SerializeReads {  		serialize_reads.Done()  	} @@ -277,31 +271,25 @@ func (f *File) Read(buf []byte, off int64) (resultData fuse.ReadResult, code fus  // Empty writes do nothing and are allowed.  func (f *File) doWrite(data []byte, off int64) (uint32, fuse.Status) {  	fileWasEmpty := false -	// If the file ID is not cached, read it from disk -	if f.fileTableEntry.ID == nil { -		// Block other readers while we mess with the file header. Other writers -		// are blocked by ContentLock already. -		f.fileTableEntry.HeaderLock.Lock() -		tmpID, err := f.readFileID() +	// Get the file ID, create a new one if it does not exist yet. +	var fileID []byte +	// The caller has exclusively locked ContentLock, which blocks all other +	// readers and writers. No need to take IDLock. +	if f.fileTableEntry.ID != nil { +		fileID = f.fileTableEntry.ID +	} else { +		// If the file ID is not cached, read it from disk +		var err error +		fileID, err = f.readFileID()  		// Write a new file header if the file is empty  		if err == io.EOF { -			tmpID, err = f.createHeader() +			fileID, err = f.createHeader()  			fileWasEmpty = true  		}  		if err != nil { -			f.fileTableEntry.HeaderLock.Unlock()  			return 0, fuse.ToStatus(err)  		} -		f.fileTableEntry.ID = tmpID -		if fileWasEmpty { -			// The file was empty and we wrote a new file header. Keep the lock -			// as we might have to kill the file header again if the data write -			// fails. -			defer f.fileTableEntry.HeaderLock.Unlock() -		} else { -			// We won't touch the header again, drop the lock immediately. -			f.fileTableEntry.HeaderLock.Unlock() -		} +		f.fileTableEntry.ID = fileID  	}  	// Handle payload data  	dataBuf := bytes.NewBuffer(data) @@ -312,7 +300,7 @@ func (f *File) doWrite(data []byte, off int64) (uint32, fuse.Status) {  		// Incomplete block -> Read-Modify-Write  		if b.IsPartial() {  			// Read -			oldData, status := f.doRead(nil, b.BlockPlainOff(), f.contentEnc.PlainBS(), true) +			oldData, status := f.doRead(nil, b.BlockPlainOff(), f.contentEnc.PlainBS())  			if status != fuse.OK {  				tlog.Warn.Printf("ino%d fh%d: RMW read failed: %s", f.qIno.Ino, f.intFd(), status.String())  				return 0, status @@ -382,9 +370,7 @@ func (f *File) Write(data []byte, off int64) (uint32, fuse.Status) {  	f.fdLock.RLock()  	defer f.fdLock.RUnlock()  	if f.released { -		// The file descriptor has been closed concurrently, which also means -		// the wlock has been freed. Exit here so we don't crash trying to access -		// it. +		// The file descriptor has been closed concurrently  		tlog.Warn.Printf("ino%d fh%d: Write on released file", f.qIno.Ino, f.intFd())  		return 0, fuse.EBADF  	} diff --git a/internal/fusefrontend/file_allocate_truncate.go b/internal/fusefrontend/file_allocate_truncate.go index 81ac8d3..34fe2c0 100644 --- a/internal/fusefrontend/file_allocate_truncate.go +++ b/internal/fusefrontend/file_allocate_truncate.go @@ -111,9 +111,7 @@ func (f *File) Truncate(newSize uint64) fuse.Status {  			return fuse.ToStatus(err)  		}  		// Truncate to zero kills the file header -		f.fileTableEntry.HeaderLock.Lock()  		f.fileTableEntry.ID = nil -		f.fileTableEntry.HeaderLock.Unlock()  		return fuse.OK  	}  	// We need the old file size to determine if we are growing or shrinking @@ -144,7 +142,7 @@ func (f *File) Truncate(newSize uint64) fuse.Status {  	var data []byte  	if lastBlockLen > 0 {  		var status fuse.Status -		data, status = f.doRead(nil, plainOff, lastBlockLen, false) +		data, status = f.doRead(nil, plainOff, lastBlockLen)  		if status != fuse.OK {  			tlog.Warn.Printf("Truncate: shrink doRead returned error: %v", err)  			return status @@ -206,8 +204,6 @@ func (f *File) truncateGrowFile(oldPlainSz uint64, newPlainSz uint64) fuse.Statu  	if newPlainSz%f.contentEnc.PlainBS() == 0 {  		// The file was empty, so it did not have a header. Create one.  		if oldPlainSz == 0 { -			f.fileTableEntry.HeaderLock.Lock() -			defer f.fileTableEntry.HeaderLock.Unlock()  			id, err := f.createHeader()  			if err != nil {  				return fuse.ToStatus(err) | 
