ARTICLE AD BOX
I've a class which is a QIODevice that wraps a background producer thread. It consumes data from a source QIODevice in a producer thread and provides it via the standard QIODevice API (read, seek, etc.) on main thread. It reads the data in a circular buffer, producer adds data at end and consumer consumes from the start. I also maintain a sub circuclar queue inside this to allow for fast seeking when seek position is in already read data. Note that only two threads are possible here producer and consumer i.e. this is a single producer and single consumer.
I've been able to implement a lock free read when data is already there in the buffer and now trying to implement a lock free seek in the fast path i.e when seek position is in already read buffer and here is where I'm facing some problems. Basically I'm unable to think a solution to correctly/safely update the read left (m_readLeft) variable in this fast seek variable. And If I try to eliminate m_readLeft than the maintaining m_count becomes difficult inside makeSpaceForMoreReading. Can anyone with more experience help me in this?
#include <QIODevice> #include <QMutex> #include <QVector> #include <QWaitCondition> #include <atomic> #include <memory> /** * @brief A QIODevice that wraps a background producer thread. * It consumes data from a source QIODevice and provides it via * the standard QIODevice API (read, seek, etc.) asynchronously. */ class AsyncBufferedReader : public QIODevice { Q_OBJECT public: class Buffer { Q_DISABLE_COPY(Buffer) public: Buffer() = default; Buffer(size_t capacity) : m_buffer((char *) std::malloc(capacity)) , m_capacity(capacity) {} Buffer(Buffer &&buffer) : m_buffer(std::exchange(buffer.m_buffer, nullptr)) , m_capacity(std::exchange(buffer.m_capacity, 0)) {} Buffer &operator=(Buffer &&buffer) { if (this == &buffer) return *this; if (m_buffer) std::free(m_buffer); m_buffer = std::exchange(buffer.m_buffer, nullptr); m_capacity = std::move(buffer.m_capacity); return *this; } ~Buffer() { if (m_buffer) std::free(m_buffer); } void alloc(size_t cap) { if (m_capacity == 0) { m_buffer = (char *) std::malloc(cap); m_capacity = cap; } else { m_buffer = (char *)std::realloc(m_buffer, cap); m_capacity = cap; } } size_t capacity() const { return m_capacity; } char &operator[](const size_t index) { assert(index >= 0 && index < m_capacity); return m_buffer[index]; } const char &operator[](const size_t index) const { assert(index >= 0 && index < m_capacity); return m_buffer[index]; } private: char *m_buffer = nullptr; size_t m_capacity = 0; }; static qint64 idealBufferCapacity(qint64 sourceSize); static constexpr size_t default_capacity = 2 * 1024 * 1024; // 2MB explicit AsyncBufferedReader(QObject *parent = nullptr); explicit AsyncBufferedReader(Buffer &&buffer, size_t capacity, QObject *parent = nullptr); explicit AsyncBufferedReader(size_t capacity, QObject *parent = nullptr); ~AsyncBufferedReader(); /** Starts the worker thread. Takes ownership of the source device. */ bool openSource(std::unique_ptr<QIODevice> source, qint64 startPos = 0, QIODevice::OpenMode openMode = QIODevice::ReadOnly); void abort(); // QIODevice overrides bool isSequential() const override { return false; } qint64 size() const override; bool seek(qint64 pos) override; void close() override; Buffer closeAndReleaseBuffer(); protected: qint64 readData(char *data, qint64 maxlen) override; qint64 writeData(const char *, qint64) override { return -1; } private: friend class AsyncBufferedReaderTest; // only support producer thread and once consumer thread void runWorker(std::unique_ptr<QIODevice> source, qint64 startPos); void handleSeekInWorker(QIODevice *source, qint64 ¤tPos); void abortWorkerAndWait(); // thread safe bool canMakeSpaceForMoreReading(); // requires mutex to be locked bool makeSpaceForMoreReading(); mutable QMutex m_mutex; QWaitCondition m_dataWait; QWaitCondition m_bufferSpaceWait; QWaitCondition m_seekFinishedWait; QWaitCondition m_threadFinishedWait; Buffer m_buffer; const size_t m_capacity; // denotes full circular queue with all the data size_t m_head = 0; size_t m_tail = 0; std::atomic<size_t> m_count = 0; // m_buffer ptr is only valid if count>0, only reading from m_count is thread safe, modification should be under lock // sub circular queue of next read // this is maintained to allow fast backward seeks std::atomic<size_t> m_readPos = 0; std::atomic<size_t> m_readLeft = 0; bool m_workerRunning {false}; std::atomic<bool> m_aborted {false}; bool m_seekRequested{false}; std::atomic<qint64> m_seekPos{0}; qint64 m_totalSourceSize = 0; bool m_seekSuccess = false; bool m_sourceEof = false; }; #include "AsyncBufferedReader.h" #include <QDebug> #include <QScopeGuard> #include <QThreadPool> #include <QTimer> #include <cstring> #include <qelapsedtimer.h> constexpr qint64 CHUNK_SIZE = 256 * 1024; // use macro to not get quotes in qDebug output #define formatMiB(bytes) (QString("%1 MiB").arg(QString::number(static_cast<double>(bytes) / (1024. * 1024), 'f')).toStdString().c_str()) qint64 AsyncBufferedReader::idealBufferCapacity(qint64 sourceSize) { return std::clamp<qint64>(sourceSize * .6, qMin(sourceSize, 200 * 1024 * 1024), 700 * 1024 * 1024); } AsyncBufferedReader::AsyncBufferedReader(QObject *parent) : AsyncBufferedReader(default_capacity, parent) {} AsyncBufferedReader::AsyncBufferedReader(AsyncBufferedReader::Buffer &&buffer, size_t capacity, QObject *parent) : AsyncBufferedReader(capacity, parent) { m_buffer = std::move(buffer); } AsyncBufferedReader::AsyncBufferedReader(size_t capacity, QObject *parent) : QIODevice(parent) , m_capacity(capacity) { auto timer = new QTimer(this); connect(timer, &QTimer::timeout, this, [this]() { qDebug() << this << "buffer size" << formatMiB(m_count) << "read left" << formatMiB(m_readLeft); }); timer->start(5000); } AsyncBufferedReader::~AsyncBufferedReader() { abortWorkerAndWait(); } bool AsyncBufferedReader::openSource(std::unique_ptr<QIODevice> source, qint64 startPos, QIODevice::OpenMode openMode) { if (!source) return false; // Ensure source is open if (!source->isOpen() && !source->open(QIODevice::ReadOnly)) { qWarning("AsyncBufferedReader::openSource failed to open source"); return false; } // support reopening abortWorkerAndWait(); m_totalSourceSize = source->size(); m_workerRunning = true; m_aborted = false; m_sourceEof = false; m_head = m_tail = m_count = m_readPos = m_readLeft = 0; QIODevice::open(openMode); QIODevice::seek(startPos); QThreadPool::globalInstance()->start([this, src = std::move(source), startPos]() mutable { runWorker(std::move(src), startPos); }); return true; } void AsyncBufferedReader::runWorker(std::unique_ptr<QIODevice> source, qint64 startPos) { auto cleanup = qScopeGuard([&] { QMutexLocker locker(&m_mutex); m_workerRunning = false; m_dataWait.notify_all(); m_threadFinishedWait.notify_all(); }); if (startPos > 0 && !source->seek(startPos)) return; if (m_buffer.capacity() < m_capacity) m_buffer.alloc(m_capacity); qint64 currentPos = startPos; QMutexLocker locker(&m_mutex); while (!m_aborted.load()) { if (m_seekRequested) { handleSeekInWorker(source.get(), currentPos); continue; } // Wait as long as the buffer is absolutely full while ((m_sourceEof || m_count == m_capacity) && !m_aborted && !m_seekRequested) { m_bufferSpaceWait.wait(&m_mutex); } if (m_aborted || m_seekRequested || m_sourceEof) continue; // --- 2. Calculate Contiguous Space --- // spaceAtTail is the linear memory available before we have to wrap size_t spaceAtTail = m_capacity - m_tail; // totalBufferSpace is how much we can add before hitting m_head size_t totalBufferSpace = m_capacity - m_count; size_t recommendedReadSize = 0; if (m_count == 0) recommendedReadSize = 32 * 1024; else if (m_count < 1024 * 1024) recommendedReadSize = 128 * 1024; else recommendedReadSize = 1024 * 1024; assert(recommendedReadSize > 0); // We can only read the smaller of: // - Our desired chunk size // - The linear space until the end of the vector // - The total space remaining in the buffer size_t toRead = std::min({recommendedReadSize, spaceAtTail, totalBufferSpace}); if (toRead == 0) continue; locker.unlock(); qint64 bytesRead = source->read(&m_buffer[m_tail], toRead); locker.relock(); if (bytesRead <= 0) { m_sourceEof = true; if (bytesRead < 0) qWarning() << "source failed to read" << source->errorString(); m_dataWait.notify_all(); continue; } // --- 3. Update Tail --- // Because toRead was clamped by spaceAtTail, // (m_tail + bytesRead) will be <= m_capacity m_tail = (m_tail + bytesRead) % m_capacity; m_count += bytesRead; m_readLeft += bytesRead; currentPos += bytesRead; m_dataWait.notify_all(); QMetaObject::invokeMethod(this, &AsyncBufferedReader::readyRead, Qt::QueuedConnection); } } void AsyncBufferedReader::handleSeekInWorker(QIODevice *source, qint64 ¤tPos) { qint64 target = m_seekPos.load(); qint64 bufferStart = currentPos - m_count; if (target >= bufferStart && target <= currentPos) { size_t offset = target - bufferStart; m_readPos = (m_head + offset) % m_capacity; m_readLeft = m_count - offset; m_seekSuccess = true; m_sourceEof = false; makeSpaceForMoreReading(); } else { qDebug() << this << "seek outside buffer, relativepos" << formatMiB(currentPos - target) << "discarding - count" << formatMiB(m_count) << "readleft" << formatMiB(m_readLeft); m_seekSuccess = source->seek(target); if (m_seekSuccess) { m_head = m_tail = m_count = m_readPos = m_readLeft = 0; currentPos = target; m_sourceEof = false; } } m_seekRequested = false; m_seekFinishedWait.notify_all(); } void AsyncBufferedReader::abortWorkerAndWait() { abort(); QMutexLocker locker(&m_mutex); while (m_workerRunning) m_threadFinishedWait.wait(&m_mutex); } bool AsyncBufferedReader::canMakeSpaceForMoreReading() { return (m_readLeft < m_capacity / 1.25f && m_count == m_capacity); } bool AsyncBufferedReader::makeSpaceForMoreReading() { if (canMakeSpaceForMoreReading()) { qDebug() << "AsyncBufferedReader::readData discarding front buffer" << m_readLeft << m_capacity << m_count; m_head = m_readPos; m_count = m_readLeft.load(); return true; } return false; } qint64 AsyncBufferedReader::readData(char *data, qint64 maxlen) { qint64 target = maxlen; while (m_readLeft > target && target > 0) { const size_t availableAtTail = m_capacity - m_readPos; const size_t chunk = std::min<size_t>(availableAtTail, target); std::memcpy(data + (maxlen - target), &m_buffer[m_readPos], chunk); target -= chunk; m_readPos = (m_readPos + chunk) % m_capacity; m_readLeft -= chunk; } if (target == 0) { if (canMakeSpaceForMoreReading()) { QMutexLocker locker(&m_mutex); if (makeSpaceForMoreReading()) m_bufferSpaceWait.notify_all(); } return maxlen; } QMutexLocker locker(&m_mutex); qint64 totalRead = 0; while (totalRead < target) { // 1. Wait if the buffer is empty but the worker is still producing while (m_readLeft == 0 && m_workerRunning && !m_sourceEof && !m_aborted) { m_dataWait.wait(&m_mutex); } // 2. If buffer is still empty after waiting, we hit EOF/Abort if (m_readLeft == 0) { break; } // 3. Determine how much we can pull in this specific iteration // We can only read what's in the buffer (m_readLeft) // OR what's left to fill our request (target - totalRead) size_t availableToCopy = std::min<size_t>(m_readLeft, target - totalRead); size_t iterationCopied = 0; // 4. Handle Circular Buffer Wrap-around while (iterationCopied < availableToCopy) { const size_t availableAtTail = m_capacity - m_readPos; const size_t chunk = std::min(availableToCopy - iterationCopied, availableAtTail); locker.unlock(); // Consumer is blocked (this thread), no seek request possible std::memcpy(data + totalRead, &m_buffer[m_readPos], chunk); locker.relock(); m_readPos = (m_readPos + chunk) % m_capacity; m_readLeft -= chunk; iterationCopied += chunk; totalRead += chunk; } // 5. Update global count and notify producer there is now room if (makeSpaceForMoreReading()) m_bufferSpaceWait.notify_all(); // If we hit EOF or the source stopped, don't loop again even if totalRead < target if (m_sourceEof || m_aborted) { break; } } return (m_sourceEof || !m_workerRunning || m_aborted) && (totalRead == 0) ? - 1 : static_cast<qint64>(totalRead); } bool AsyncBufferedReader::seek(qint64 pos) { if (!QIODevice::seek(pos)) return false; QMutexLocker locker(&m_mutex); if (!m_workerRunning) { return false; } m_seekPos = pos; m_seekRequested = true; m_bufferSpaceWait.notify_all(); while (m_seekRequested && m_workerRunning) { m_seekFinishedWait.wait(&m_mutex); } return m_seekSuccess; } void AsyncBufferedReader::close() { QIODevice::close(); abort(); } AsyncBufferedReader::Buffer AsyncBufferedReader::closeAndReleaseBuffer() { abortWorkerAndWait(); QIODevice::close(); m_count = 0; return std::move(m_buffer); } qint64 AsyncBufferedReader::size() const { return m_totalSourceSize; } void AsyncBufferedReader::abort() { m_aborted = true; m_bufferSpaceWait.notify_all(); m_dataWait.notify_all(); m_seekFinishedWait.notify_all(); } class AsyncBufferedReaderTest : public QObject { Q_OBJECT private slots: void testCircularBufferWrap() { const size_t smallCapacity = 10; QByteArray inputData = "abcdefghijklmnopqrstuvwxy"; // 25 bytes auto source = std::make_unique<QBuffer>(&inputData); AsyncBufferedReader reader(smallCapacity); QVERIFY(reader.openSource(std::move(source))); // Read in small chunks to force the producer to wait and wrap QByteArray result; // Chunk 1: Read 5 bytes. // Reader has 10 bytes capacity, so it should fill, wait, and we drain. result.append(reader.read(5)); // Chunk 2: Read another 10 bytes. // This forces the 'tail' to wrap around the 10-byte boundary. result.append(reader.read(10)); // Chunk 3: Read the rest result.append(reader.readAll()); QCOMPARE(result, inputData); QCOMPARE(result.size(), 25); } }; QTEST_MAIN(AsyncBufferedReaderTest) #include "test_AsyncBufferedReader.moc"