Skip to content

Commit 2d64661

Browse files
committed
Add pwrite() in FdHandle to support concurrent reads and writes at specific offsets
1 parent ba95997 commit 2d64661

2 files changed

Lines changed: 103 additions & 6 deletions

File tree

include/fs/FdHandle.h

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,56 @@ class FdHandle {
301301
*/
302302
ssize_t read(void* value, size_t size) const;
303303

304+
/**
305+
* @brief Reads a value of type T from a specific file offset.
306+
*
307+
* This is a thread-safe positional read. Unlike read(), it does not
308+
* mutate or rely on the file's seek position and does not hold the file
309+
* mutex for the duration of the syscall, allowing concurrent readers.
310+
*
311+
* @tparam T Type of value.
312+
* @param value Reference to store the read value.
313+
* @param offset Offset in the file to read from.
314+
* @return Number of bytes read.
315+
*/
316+
template<class T>
317+
inline ssize_t pread(T& value, off_t offset) const {
318+
return pread(&value, sizeof(value), offset);
319+
}
320+
321+
/**
322+
* @brief Reads raw bytes from a specific file offset.
323+
* @param value Pointer to buffer.
324+
* @param size Number of bytes to read.
325+
* @param offset Offset in the file to read from.
326+
* @return Number of bytes read.
327+
*/
328+
ssize_t pread(void* value, size_t size, off_t offset) const;
329+
330+
/**
331+
* @brief Writes a value of type T to a specific file offset.
332+
*
333+
* Positional write that does not move the file's seek position.
334+
*
335+
* @tparam T Type of value.
336+
* @param value Value to write.
337+
* @param offset Offset in the file to write to.
338+
* @return Number of bytes written.
339+
*/
340+
template<class T>
341+
inline ssize_t pwrite(const T& value, off_t offset) const {
342+
return pwrite(&value, sizeof(value), offset);
343+
}
344+
345+
/**
346+
* @brief Writes raw bytes to a specific file offset.
347+
* @param value Pointer to data.
348+
* @param size Number of bytes to write.
349+
* @param offset Offset in the file to write to.
350+
* @return Number of bytes written.
351+
*/
352+
ssize_t pwrite(const void* value, size_t size, off_t offset) const;
353+
304354
/**
305355
* @brief Waits until data is available for reading.
306356
* @return true if ready.
@@ -382,7 +432,7 @@ class FdHandle {
382432
* @brief Returns a lock guard for the file's mutex.
383433
* @return lock_guard.
384434
*/
385-
std::lock_guard<std::mutex> getLock() const;
435+
std::lock_guard<std::recursive_mutex> getLock() const;
386436

387437
/**
388438
* @brief Destructor (decreases reference count).
@@ -506,7 +556,7 @@ class FdTransaction {
506556
bool isStream() const;
507557

508558
private:
509-
std::lock_guard<std::mutex> guard;
559+
std::lock_guard<std::recursive_mutex> guard;
510560
FdHandleData& handleData; /**< Underlying file handle data. */
511561
};
512562

src/fs/FdHandle.cpp

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ class FdHandleData {
231231
}
232232

233233
int16_t fd;
234-
std::mutex mutex;
234+
std::recursive_mutex mutex;
235235

236236
int numReferences() const {
237237
return refs;
@@ -471,6 +471,53 @@ ssize_t FdHandle::read(void* value, size_t size) const {
471471
return getHandleData(fd).read(value, size);
472472
}
473473

474+
ssize_t FdHandle::pread(void* value, size_t size, off_t offset) const {
475+
FdHandleData& handle = getHandleData(fd);
476+
477+
// Drain any queued positional writes so the read sees the latest data,
478+
// then release the handle mutex before issuing the syscall. Multiple
479+
// readers can then execute concurrently inside the kernel.
480+
{
481+
std::lock_guard<std::recursive_mutex> _(handle.mutex);
482+
handle.flushWrites();
483+
}
484+
485+
char* buffer = (char*)value;
486+
size_t totalRead = 0;
487+
while (totalRead < size) {
488+
ssize_t res = ::pread(handle.fd, &buffer[totalRead], size - totalRead, offset + (off_t)totalRead);
489+
if (res < 0) {
490+
if (totalRead > 0) return (ssize_t)totalRead;
491+
return res;
492+
} else if (res == 0)
493+
break;
494+
totalRead += (size_t)res;
495+
}
496+
return (ssize_t)totalRead;
497+
}
498+
499+
ssize_t FdHandle::pwrite(const void* value, size_t size, off_t offset) const {
500+
FdHandleData& handle = getHandleData(fd);
501+
502+
{
503+
std::lock_guard<std::recursive_mutex> _(handle.mutex);
504+
handle.flushWrites();
505+
}
506+
507+
const char* buffer = (const char*)value;
508+
size_t totalWritten = 0;
509+
while (totalWritten < size) {
510+
ssize_t res = ::pwrite(handle.fd, &buffer[totalWritten], size - totalWritten, offset + (off_t)totalWritten);
511+
if (res < 0) {
512+
if (totalWritten > 0) return (ssize_t)totalWritten;
513+
return res;
514+
} else if (res == 0)
515+
break;
516+
totalWritten += (size_t)res;
517+
}
518+
return (ssize_t)totalWritten;
519+
}
520+
474521
bool FdHandle::waitForRead() const {
475522
return getHandleData(fd).waitForRead();
476523
}
@@ -547,8 +594,8 @@ void FdHandle::close() {
547594
}
548595
}
549596

550-
std::lock_guard<std::mutex> FdHandle::getLock() const {
551-
return std::lock_guard<std::mutex>(getHandleData(fd).mutex);
597+
std::lock_guard<std::recursive_mutex> FdHandle::getLock() const {
598+
return std::lock_guard<std::recursive_mutex>(getHandleData(fd).mutex);
552599
}
553600

554601
void FdHandle::markToClose() const {
@@ -561,7 +608,7 @@ MmapHandle FdHandle::getMmapHandle(off_t offset, size_t size, int prot, int flag
561608
// Make sure the file is big enough, then
562609
// restore the previous cursor position
563610
{
564-
std::lock_guard<std::mutex> _(handle.mutex);
611+
std::lock_guard<std::recursive_mutex> _(handle.mutex);
565612

566613
// Get current file offset to restore later
567614
off_t previousOffset = lseek(fd, 0, SEEK_CUR);

0 commit comments

Comments
 (0)