Conversation
There was a problem hiding this comment.
Pull Request Overview
Implements standalone purger support by introducing a new event listener system that tracks file numbers and updates S3 to prevent premature deletion by the purger.
- Replaces the previous RocksDBEventListener with a new PurgerEventListener that monitors file operations
- Implements a sliding window mechanism to track file numbers and periodically update S3 with the smallest file number
- Adds S3 file updater functionality to write file numbers to cloud storage for purger coordination
Reviewed Changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| rocksdb_cloud_data_store.cpp | Replaces old event listener with PurgerEventListener and adds initialization logic for purger blocking |
| purger_sliding_window.h | Defines classes for S3 file number updates and time-based sliding window tracking |
| purger_sliding_window.cpp | Implementation of sliding window and S3 updater functionality |
| purger_event_listener.h | Header for new event listener that tracks RocksDB operations for purger safety |
| purger_event_listener.cpp | Implementation of event listener with flush/compaction monitoring |
| build_eloq_store.cmake | Adds new source files to build configuration |
| CMakeLists.txt | Adds new source files to CMake build |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughAdds a sliding-window purger (S3FileNumberUpdater + SlidingWindow), a PurgerEventListener to track RocksDB flush/compaction file numbers and persist the smallest file-number per epoch to S3, integrates the listener into the cloud data store open flow, and updates build files to compile the new sources. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant App as CloudDataStore (open)
participant CFS as CloudFileSystemImpl
participant ROpts as rocksdb::Options
participant L as PurgerEventListener
participant SW as SlidingWindow
participant S3P as CloudStorageProvider
rect rgb(245,248,255)
note over App: DB open path with purger integration
App->>CFS: dynamic_cast cloud_fs_ -> CloudFileSystemImpl
alt cast ok
CFS-->>App: storage_provider
App->>L: construct(epoch="", bucket, path, provider, timings)
L->>SW: initialize sliding window & S3 updater
App->>ROpts: options.listeners.push_back(L)
App->>App: Pause background flush/compaction
App->>App: GetCurrentEpoch()
App->>L: SetEpoch(epoch)
App->>L: BlockPurger()
App->>App: Resume background work
App->>App: rocksdb::DB::Open(...)
else cast fail
CFS-->>App: null -> log error, abort open
end
end
sequenceDiagram
autonumber
participant DB as rocksdb::DB
participant L as PurgerEventListener
participant SW as SlidingWindow
participant Upd as S3FileNumberUpdater
participant S3 as S3 / CloudStorage
rect rgb(245,255,245)
note over DB,L: Flush/Compaction begin
DB-->>L: OnFlushBegin/OnCompactionBegin(info)
L->>DB: determine max_file = GetNextFileNumber()-1
L->>SW: AddFileNumber(max_file, thread_id, job_id)
alt new smallest discovered
SW->>Upd: UpdateSmallestFileNumber(epoch, file#)
Upd->>S3: upload temp file -> putObject
end
end
rect rgb(255,248,240)
note over DB,L: Flush/Compaction completed
DB-->>L: OnFlushCompleted/OnCompactionCompleted(info)
L->>SW: RemoveFileNumber(thread_id, job_id)
end
rect rgb(248,248,255)
note over SW,Upd: Periodic flush
SW-->>SW: TimerWorker wakes (interval)
SW->>SW: GetSmallestFileNumber()
SW->>Upd: UpdateSmallestFileNumber(epoch, smallest)
Upd->>S3: upload
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests
Comment |
There was a problem hiding this comment.
Actionable comments posted: 6
♻️ Duplicate comments (3)
eloq_data_store_service/rocksdb_cloud_data_store.cpp (1)
581-586: Style: move the inline block comment out of the arg list.Keeps callsite clean and satisfies previous nit.
- auto db_event_listener = - std::make_shared<PurgerEventListener>("", /*We still don't know the epoch now*/ + // We still don't know the epoch now, so pass an empty string. + auto db_event_listener = + std::make_shared<PurgerEventListener>("", bucket_name, cloud_config_.object_path_, cfs_impl->GetStorageProvider());eloq_data_store_service/purger_sliding_window.cpp (1)
230-266: GetSmallestFileNumber() is O(n); cache the minimum.Frequent scans plus tombstones can add up. Maintain a cached_min and a dirty flag; recompute only when the removed entry was the current min.
Happy to sketch a minimal intrusive refactor if you want it in this PR.
eloq_data_store_service/purger_event_listener.cpp (1)
223-229: Guard GetNextFileNumber() − 1 underflow.If RocksDB returns 0, subtracting 1 wraps to UINT64_MAX and could corrupt purger thresholds.
- uint64_t max_file_number = db->GetNextFileNumber() - 1; + const uint64_t next_file_number = db->GetNextFileNumber(); + const uint64_t max_file_number = + next_file_number == 0 ? 0 : next_file_number - 1;
🧹 Nitpick comments (13)
eloq_data_store_service/purger_sliding_window.h (4)
80-87: Param name mismatch in docs vs signature.Docs say window_duration but the ctor param is entry_duration. Align to avoid confusion.
Apply:
- * @brief Constructor for sliding window - * @param window_duration Duration to keep entries in the window + * @brief Constructor for sliding window + * @param entry_duration Duration to keep entries in the window
101-104: Make getters const-correct.GetEpoch should be const; it doesn’t mutate state.
- std::string GetEpoch(); + std::string GetEpoch() const;
121-124: Avoid macro constant in API contract.Header mentions UINT64_MAX; prefer std::numeric_limits<uint64_t>::max(). Also include .
+#include <limits> ... - * @return The smallest file number, or UINT64_MAX if window is empty + * @return The smallest file number, or std::numeric_limits<uint64_t>::max() if window is empty
42-70: Make S3FileNumberUpdater non-copyable; tiny API polish.These members shouldn’t be copied. Also consider making GetS3ObjectKey static/private helper with string_view args to avoid copies (optional).
class S3FileNumberUpdater { public: @@ ~S3FileNumberUpdater() = default; + S3FileNumberUpdater(const S3FileNumberUpdater&) = delete; + S3FileNumberUpdater& operator=(const S3FileNumberUpdater&) = delete; + S3FileNumberUpdater(S3FileNumberUpdater&&) = delete; + S3FileNumberUpdater& operator=(S3FileNumberUpdater&&) = delete;eloq_data_store_service/rocksdb_cloud_data_store.cpp (1)
571-577: Guard against impl type drift; add fallback.If CloudFileSystemImpl changes, dynamic_cast may fail. Consider retrieving the StorageProvider earlier (you already have it in StartDB) and threading it through, removing this cast.
- rocksdb::CloudFileSystemImpl *cfs_impl = - dynamic_cast<rocksdb::CloudFileSystemImpl *>(cloud_fs_.get()); - if (cfs_impl == nullptr) + rocksdb::CloudFileSystemImpl *cfs_impl = + dynamic_cast<rocksdb::CloudFileSystemImpl *>(cloud_fs_.get()); + if (cfs_impl == nullptr) { LOG(ERROR) << "Fail to get CloudFileSystemImpl from cloud_fs_"; return false; }Follow-up: pass the storage_provider captured in StartDB into OpenCloudDB to drop the cast.
eloq_data_store_service/purger_event_listener.h (4)
63-71: Constructor defaults: ensure relation to purger cadence is documented.Note explicitly that entry_duration < purger_periodicity_millis and s3_update_interval bounds to avoid stale windows.
77-85: Expose clear lifecycle: Stop() idempotent, DTOR invokes Stop().State these guarantees in comments; helpful under RocksDB shutdown.
127-128: Utility should be static and noexcept.GetFlushReason can be static (no member access) and noexcept to avoid surprises in event paths.
- std::string GetFlushReason(rocksdb::FlushReason flush_reason); + static std::string GetFlushReason(rocksdb::FlushReason flush_reason) noexcept;
129-144: Make listener non-copyable/non-movable.Prevents accidental copies from API users; also typical for EventListener.
class PurgerEventListener : public rocksdb::EventListener { public: @@ ~PurgerEventListener(); + PurgerEventListener(const PurgerEventListener&) = delete; + PurgerEventListener& operator=(const PurgerEventListener&) = delete; + PurgerEventListener(PurgerEventListener&&) = delete; + PurgerEventListener& operator=(PurgerEventListener&&) = delete;eloq_data_store_service/purger_event_listener.cpp (2)
23-28: Include glog (and own deps) explicitly in this TU.LOG/DLOG are used but <glog/logging.h> isn’t included here. Also consider including if not already pulled via the header.
#include <rocksdb/db.h> +#include <glog/logging.h> +#include <memory> #include <string> #include "purger_event_listener.h"
55-59: Confirm listener lifetime vs. callbacks.Destructor resets sliding_window_ after Stop(); ensure the listener is detached or DB is closed before any RocksDB thread can invoke callbacks, or guard accesses with a local raw ptr snapshot.
void PurgerEventListener::Stop() { - if (sliding_window_) + if (sliding_window_) { - sliding_window_->Stop(); - sliding_window_.reset(); + auto* sw = sliding_window_.get(); + sw->Stop(); + sliding_window_.reset(); } }Also applies to: 170-177
eloq_data_store_service/purger_sliding_window.cpp (2)
23-41: Add for std::remove.std::remove is used but isn’t included in this TU.
#include <chrono> #include <condition_variable> #include <fstream> #include <limits> +#include <cstdio> #include <memory> #include <mutex> #include <sstream>
167-186: S3 write under mutex; consider deferring “immediate” updates.UpdateSmallestFileNumber() performs I/O while holding window_mutex_. It enforces ordering but can stall all producers if the PUT is slow.
Idea: record the new smallest and set a “priority flush” flag; release the lock; notify TimerWorker to perform the immediate flush (TimerWorker already unlocks around S3). This preserves ordering (no larger smallest is published first) without blocking producers on I/O. If you keep current design, add a brief comment stating why I/O under lock is intentional.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
eloq_data_store_service/CMakeLists.txt(1 hunks)eloq_data_store_service/build_eloq_store.cmake(1 hunks)eloq_data_store_service/purger_event_listener.cpp(1 hunks)eloq_data_store_service/purger_event_listener.h(1 hunks)eloq_data_store_service/purger_sliding_window.cpp(1 hunks)eloq_data_store_service/purger_sliding_window.h(1 hunks)eloq_data_store_service/rocksdb_cloud_data_store.cpp(3 hunks)
🔇 Additional comments (3)
eloq_data_store_service/CMakeLists.txt (1)
382-390: Build integration looks correct — verify GCS build compiles.
CMakeLists adds rocksdb_config.cpp, purger_event_listener.cpp, and purger_sliding_window.cpp under the ELOQDSS_ROCKSDB_CLOUD_* block (lines 387–390). Run a GCS-configured build to confirm compilation.eloq_data_store_service/purger_sliding_window.cpp (2)
106-110: BlockPurger sentinel value: 0 looks correct—please confirm.Given your contract (“smallest in‑flight” written to S3), writing 0 blocks purging (threshold at 0). Using UINT64_MAX would instead allow purging almost everything. Keep 0 unless the purger interprets the value differently.
167-170: Propagate uint64_t for thread_id across the API.Avoid narrowing; aligns with RocksDB job/thread id types and prevents key collisions.
-void SlidingWindow::AddFileNumber(uint64_t file_number, - int thread_id, - uint64_t job_id) +void SlidingWindow::AddFileNumber(uint64_t file_number, + uint64_t thread_id, + uint64_t job_id) -void SlidingWindow::RemoveFileNumber(int thread_id, uint64_t job_id) +void SlidingWindow::RemoveFileNumber(uint64_t thread_id, uint64_t job_id) -std::string SlidingWindow::GenerateKey(int thread_id, uint64_t job_id) const +std::string SlidingWindow::GenerateKey(uint64_t thread_id, uint64_t job_id) constFile: eloq_data_store_service/purger_sliding_window.cpp (lines ~167-170; also applies at ~194-197 and ~344-349). Mirror these changes in purger_sliding_window.h and update the listener’s UpdateSlidingWindow signature and all call sites.
There was a problem hiding this comment.
Actionable comments posted: 3
♻️ Duplicate comments (2)
eloq_data_store_service/purger_sliding_window.cpp (2)
240-248: Cache the smallest value to avoid O(n) scans.GetSmallestFileNumber() scans the whole map and is called on every add and timer tick. Maintain a cached min (updated on add/remove/expiry) or use a min-heap keyed by file_number.
If preferred, I can sketch a min-heap + tombstone approach that keeps amortized O(log n) updates and O(1) min queries.
91-96: Null-check CloudStorageProvider before S3 PUT.Dereferencing a null provider will segfault.
- rocksdb::IOStatus s = storage_provider_->PutCloudObject( + if (!storage_provider_) + { + LOG(ERROR) << "CloudStorageProvider is null; cannot upload " << temp_file_path; + if (std::remove(temp_file_path.c_str()) != 0) + { + LOG(WARNING) << "Failed to remove temp file: " << temp_file_path; + } + return; + } + rocksdb::IOStatus s = storage_provider_->PutCloudObject( temp_file_path, bucket_name_, object_key);
🧹 Nitpick comments (5)
eloq_data_store_service/purger_sliding_window.cpp (5)
29-40: Include required POSIX/C headers for mkstemp/write/close/remove.Prevents implicit declaration/undefined symbol issues across toolchains.
Apply this diff:
#include <chrono> #include <condition_variable> #include <fstream> #include <limits> #include <memory> #include <mutex> #include <sstream> #include <string> #include <thread> #include <unordered_map> #include <utility> + +// POSIX/C headers for temp file + I/O +#include <cstdlib> // mkstemp +#include <unistd.h> // write, close, fsync +#include <cstdio> // std::remove +#include <cerrno> // errno
59-66: Guard against empty epoch before building the S3 key.Avoids uploading to a malformed key like “.../smallest_new_file_number-”.
void S3FileNumberUpdater::UpdateSmallestFileNumber(uint64_t file_number, const std::string &epoch) { + if (epoch.empty()) + { + DLOG(INFO) << "Skip S3 update: epoch is empty"; + return; + }
77-90: Handle partial writes and fsync the temp file before upload.write() can be short or EINTR; fsync reduces risk of a race with subsequent reads.
- // write content to the temp file - if (write(fd, content.c_str(), content.size()) == -1) - { - LOG(ERROR) << "Failed to write to temp file: " << temp_file_path; - close(fd); - // Remove the temp file - if (std::remove(temp_file_path.c_str()) != 0) - { - LOG(WARNING) << "Failed to remove temp file: " << temp_file_path; - } - return; - } - close(fd); // We will open it later for reading + // write content to the temp file (robust against partial writes/EINTR) + size_t off = 0; + while (off < content.size()) + { + ssize_t n = ::write(fd, content.data() + off, content.size() - off); + if (n < 0) + { + if (errno == EINTR) continue; + LOG(ERROR) << "Failed to write to temp file: " << temp_file_path + << " errno=" << errno; + close(fd); + if (std::remove(temp_file_path.c_str()) != 0) + { + LOG(WARNING) << "Failed to remove temp file: " << temp_file_path; + } + return; + } + off += static_cast<size_t>(n); + } + if (fsync(fd) == -1) + { + LOG(WARNING) << "fsync failed for temp file: " << temp_file_path + << " errno=" << errno; + } + close(fd); // We will open it later for reading
116-121: BlockPurger sentinel is fine; clarify intent in log.Using 0 (min) blocks purging if the purger deletes files < smallest_new_file_number. Add a clarifying comment to prevent future confusion.
- DLOG(INFO) << "Wrote 0 as file number to S3 to block purger"; + // Setting smallest_new_file_number=0 means "nothing is < 0", so purger deletes nothing. + DLOG(INFO) << "Blocking purger by writing 0 as smallest_new_file_number";
198-202: Log level nit: use INFO instead of WARNING for normal control flow.This message isn’t an anomaly; demote to INFO to reduce noise.
- DLOG(WARNING) << "New file number " << file_number + DLOG(INFO) << "New file number " << file_number
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
eloq_data_store_service/purger_sliding_window.cpp(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
eloq_data_store_service/purger_sliding_window.cpp (2)
eloq_data_store_service/purger_sliding_window.h (2)
S3FileNumberUpdater(42-70)SlidingWindow(76-184)eloq_data_store_service/purger_event_listener.cpp (6)
BlockPurger(73-79)BlockPurger(73-73)Stop(170-177)Stop(170-170)SetEpoch(60-71)SetEpoch(60-60)
🔇 Additional comments (1)
eloq_data_store_service/purger_sliding_window.cpp (1)
347-352: Guard against empty epoch here too (belt-and-suspenders).TimerWorker already skips when epoch is empty, but FlushToS3 can be called from other contexts later.
[raise_ninor_issue]- s3_updater_->UpdateSmallestFileNumber(smallest, epoch_); + if (epoch_.empty()) + { + DLOG(INFO) << "Skip S3 update in FlushToS3: epoch is empty"; + return; + } + s3_updater_->UpdateSmallestFileNumber(smallest, epoch_);
There was a problem hiding this comment.
Actionable comments posted: 5
♻️ Duplicate comments (2)
eloq_data_store_service/purger_event_listener.cpp (1)
223-225: Guard GetNextFileNumber() underflow.If RocksDB returns 0, subtracting 1 wraps to UINT64_MAX.
- // Get current max file number from RocksDB - uint64_t max_file_number = db->GetNextFileNumber() - 1; + // Get current max file number from RocksDB + uint64_t next_file_number = db->GetNextFileNumber(); + uint64_t max_file_number = + (next_file_number == 0) ? 0 : (next_file_number - 1);Would you prefer we skip adding to the window when next_file_number == 0 to avoid writing a blocking 0 to S3 on a brand-new DB?
eloq_data_store_service/purger_sliding_window.cpp (1)
354-356: Timer thread: unlock around S3 and avoid overriding a smaller value; skip empty-window (UINT64_MAX).Prevents widening purge scope and removes long I/O under lock.
- uint64_t smallest = GetSmallestFileNumber(); - FlushToS3(smallest); + uint64_t smallest = GetSmallestFileNumber(); + if (smallest == std::numeric_limits<uint64_t>::max()) { + // Nothing to publish + continue; + } + if (smallest > last_published_smallest_) { + DLOG(INFO) << "Skip S3 flush to avoid overriding smaller published value: " + << "pending=" << smallest + << " last_published=" << last_published_smallest_; + continue; + } + // Capture epoch and commit the value under lock, then upload outside. + std::string epoch_copy = epoch_; + last_published_smallest_ = smallest; + lock.unlock(); + s3_updater_->UpdateSmallestFileNumber(smallest, epoch_copy); + lock.lock();
🧹 Nitpick comments (2)
eloq_data_store_service/purger_event_listener.h (1)
56-61: Clarify entry_duration semantics to match actual behavior.Active entries do not auto-expire; entry_duration only delays erasure of entries marked deleted. Tighten the doc to prevent misuse.
- * @param entry_duration Duration to keep entries in sliding window even it - * is deleted (default: 15 seconds, should be less than purger interval, - * indicating the minimum update frequency, it prevents too frequent - * updates) + * @param entry_duration Duration to retain entries that have been marked + * deleted before actually erasing them (default: 15 seconds). Active + * entries do NOT auto-expire; they must be explicitly removed. This + * reduces S3 churn by batching updates.eloq_data_store_service/purger_sliding_window.cpp (1)
361-367: FlushToS3 reads epoch_ without synchronization.If you keep FlushToS3, avoid touching shared state here; use GetEpoch() or pass epoch explicitly.
- s3_updater_->UpdateSmallestFileNumber(smallest, epoch_); - last_published_smallest_ = smallest; + auto epoch_copy = GetEpoch(); // GetEpoch() locks + s3_updater_->UpdateSmallestFileNumber(smallest, epoch_copy); + // last_published_smallest_ should be set by the caller under lock
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
eloq_data_store_service/build_eloq_store.cmake(1 hunks)eloq_data_store_service/purger_event_listener.cpp(1 hunks)eloq_data_store_service/purger_event_listener.h(1 hunks)eloq_data_store_service/purger_sliding_window.cpp(1 hunks)eloq_data_store_service/purger_sliding_window.h(1 hunks)eloq_data_store_service/rocksdb_cloud_data_store.cpp(3 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- eloq_data_store_service/build_eloq_store.cmake
- eloq_data_store_service/rocksdb_cloud_data_store.cpp
- eloq_data_store_service/purger_sliding_window.h
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-09-17T08:10:28.774Z
Learnt from: githubzilla
PR: eloqdata/store_handler#81
File: eloq_data_store_service/purger_sliding_window.cpp:240-276
Timestamp: 2025-09-17T08:10:28.774Z
Learning: In the EloqDS purger sliding window implementation (purger_sliding_window.cpp), entries in the sliding window must be explicitly removed via RemoveFileNumber() and do not expire automatically based on time. The entry_duration_ parameter only applies to deleted entries to prevent frequent S3 updates, not for automatic expiration of active entries.
Applied to files:
eloq_data_store_service/purger_event_listener.heloq_data_store_service/purger_event_listener.cppeloq_data_store_service/purger_sliding_window.cpp
🧬 Code graph analysis (3)
eloq_data_store_service/purger_event_listener.h (2)
eloq_data_store_service/purger_sliding_window.h (2)
EloqDS(36-192)SlidingWindow(76-192)eloq_data_store_service/purger_event_listener.cpp (20)
PurgerEventListener(32-53)PurgerEventListener(55-58)SetEpoch(60-71)SetEpoch(60-60)BlockPurger(73-79)BlockPurger(73-73)OnFlushBegin(81-105)OnFlushBegin(81-82)OnFlushCompleted(107-135)OnFlushCompleted(107-108)OnCompactionBegin(137-149)OnCompactionBegin(137-138)OnCompactionCompleted(151-168)OnCompactionCompleted(151-152)Stop(170-177)Stop(170-170)GetFlushReason(179-211)GetFlushReason(179-180)UpdateSlidingWindow(213-230)UpdateSlidingWindow(213-215)
eloq_data_store_service/purger_event_listener.cpp (2)
eloq_data_store_service/purger_event_listener.h (1)
PurgerEventListener(47-144)eloq_data_store_service/purger_sliding_window.cpp (8)
Stop(307-328)Stop(307-307)SetEpoch(179-183)SetEpoch(179-179)BlockPurger(127-133)BlockPurger(127-127)BlockPurger(295-305)BlockPurger(295-295)
eloq_data_store_service/purger_sliding_window.cpp (2)
eloq_data_store_service/purger_sliding_window.h (2)
S3FileNumberUpdater(42-70)SlidingWindow(76-192)eloq_data_store_service/purger_event_listener.cpp (6)
BlockPurger(73-79)BlockPurger(73-73)Stop(170-177)Stop(170-170)SetEpoch(60-71)SetEpoch(60-60)
There was a problem hiding this comment.
Actionable comments posted: 0
♻️ Duplicate comments (5)
eloq_data_store_service/purger_sliding_window.cpp (5)
29-41: Compile break: missing headers for mkstemp/write/close/remove
mkstemp,write, andcloseneed POSIX headers;std::removeneeds<cstdio>. Add these to avoid implicit declarations/UB.#include <condition_variable> #include <fstream> +#include <cstdio> // std::remove +#include <cstdlib> // mkstemp #include <limits> #include <memory> #include <mutex> #include <sstream> #include <string> #include <thread> #include <unordered_map> #include <utility> +#include <unistd.h> // write, close
191-210: Avoid S3 upload under lock; also guard against empty epochNetwork I/O while holding
window_mutex_stalls producers andStop(). Also, immediate publish should be skipped whenepoch_is empty (timer path already does this).-void SlidingWindow::AddFileNumber(uint64_t file_number, - int thread_id, - uint64_t job_id) +void SlidingWindow::AddFileNumber(uint64_t file_number, + int thread_id, + uint64_t job_id) { - std::lock_guard<std::mutex> lock(window_mutex_); - if (file_number < last_published_smallest_) - { - DLOG(INFO) << "Immediate S3 update with smaller file number: " - << file_number << ", thread_id: " << thread_id - << ", job_id: " << job_id << ", epoch: " << epoch_; - s3_updater_->UpdateSmallestFileNumber(file_number, epoch_); - last_published_smallest_ = file_number; - } - std::string key = GenerateKey(thread_id, job_id); - window_entries_.emplace(key, WindowEntry(file_number)); - DLOG(INFO) << "Added file number to sliding window: " << file_number - << ", thread_id: " << thread_id << ", job_id: " << job_id - << ", epoch: " << epoch_ - << ", window size: " << window_entries_.size(); + std::string epoch_copy; + bool do_immediate_update = false; + { + std::lock_guard<std::mutex> lock(window_mutex_); + do_immediate_update = + (file_number < last_published_smallest_) && !epoch_.empty(); + if (do_immediate_update) { + // Pre-commit to prevent timer from publishing a larger value. + last_published_smallest_ = file_number; + epoch_copy = epoch_; + } + std::string key = GenerateKey(thread_id, job_id); + window_entries_.emplace(key, WindowEntry(file_number)); + DLOG(INFO) << "Added file number to sliding window: " << file_number + << ", thread_id: " << thread_id << ", job_id: " << job_id + << ", epoch: " << epoch_ + << ", window size: " << window_entries_.size(); + } + if (do_immediate_update) { + DLOG(INFO) << "Immediate S3 update with smaller file number: " + << file_number << ", thread_id: " << thread_id + << ", job_id: " << job_id << ", epoch: " << epoch_copy; + s3_updater_->UpdateSmallestFileNumber(file_number, epoch_copy); + } }
321-347: Timer holds mutex across S3 upload and can overwrite a smaller immediate publishRelease the lock before the upload, and gate so the timer never publishes a value larger than
last_published_smallest_. Also, updatelast_published_smallest_under the lock; don’t update it insideFlushToS3()to avoid a data race.void SlidingWindow::TimerWorker() { std::unique_lock<std::mutex> lock(window_mutex_); @@ - uint64_t smallest = GetSmallestFileNumber(); - FlushToS3(smallest); + uint64_t smallest = GetSmallestFileNumber(); + // Avoid overriding a more-conservative value. + if (smallest > last_published_smallest_) { + DLOG(INFO) << "Skip S3 flush (pending=" << smallest + << " > last_published=" << last_published_smallest_ + << "), epoch: " << epoch_; + continue; + } + // Capture epoch and pre-commit while holding the lock. + std::string epoch_copy = epoch_; + last_published_smallest_ = smallest; + // Release lock during network I/O. + lock.unlock(); + s3_updater_->UpdateSmallestFileNumber(smallest, epoch_copy); + DLOG(INFO) << "Updated S3 with smallest file number: " << smallest + << ", epoch: " << epoch_copy; + lock.lock(); @@ void SlidingWindow::FlushToS3(uint64_t smallest) { - s3_updater_->UpdateSmallestFileNumber(smallest, epoch_); - last_published_smallest_ = smallest; - DLOG(INFO) << "Updated S3 with smallest file number: " << smallest - << ", epoch: " << epoch_; + // Consider inlining this call as shown in TimerWorker to avoid using + // epoch_ without synchronization. + s3_updater_->UpdateSmallestFileNumber(smallest, epoch_); }Also applies to: 352-358
286-296: Move BlockPurger S3 call out of the lockAvoid holding
window_mutex_during network I/O.void SlidingWindow::BlockPurger() { - std::lock_guard<std::mutex> lock(window_mutex_); - if (epoch_.empty()) - { - LOG(WARNING) - << "Cannot block purger, epoch is not set in sliding window"; - return; - } - s3_updater_->BlockPurger(epoch_); + std::string epoch_copy; + { + std::lock_guard<std::mutex> lock(window_mutex_); + if (epoch_.empty()) { + LOG(WARNING) << "Cannot block purger, epoch is not set in sliding window"; + return; + } + epoch_copy = epoch_; + } + s3_updater_->BlockPurger(epoch_copy); }
191-194: Use uint64_t for thread_id end‑to‑end (match RocksDB types, avoid truncation/collisions)RocksDB job/flush/compaction
thread_idisuint64_t. Usingintrisks collisions in the key.-void SlidingWindow::AddFileNumber(uint64_t file_number, - int thread_id, +void SlidingWindow::AddFileNumber(uint64_t file_number, + uint64_t thread_id, uint64_t job_id) @@ -void SlidingWindow::RemoveFileNumber(int thread_id, uint64_t job_id) +void SlidingWindow::RemoveFileNumber(uint64_t thread_id, uint64_t job_id) @@ -std::string SlidingWindow::GenerateKey(int thread_id, uint64_t job_id) const +std::string SlidingWindow::GenerateKey(uint64_t thread_id, uint64_t job_id) constAlso update the declarations in
purger_sliding_window.hand all call sites.To verify call sites:
#!/bin/bash # Find SlidingWindow::Add/Remove/GenerateKey usages and signatures rg -n -C2 --type=cpp 'AddFileNumber\s*\(|RemoveFileNumber\s*\(|GenerateKey\s*\(' rg -n -C3 --type=cpp 'class\s+SlidingWindow|struct\s+SlidingWindow'Also applies to: 212-213, 360-364
🧹 Nitpick comments (3)
eloq_data_store_service/purger_sliding_window.cpp (3)
221-227: Deleted entry expiry starts at add-time, not delete-timeIf an entry lived longer than
entry_duration_before deletion, it’s erased immediately, defeating the cooldown. Reset the timestamp when marking deleted.- // Entry is still within the duration window, only mark as deleted - it->second.deleted_ = true; + // Start expiry window from deletion time to avoid immediate purge + it->second.deleted_ = true; + it->second.timestamp_ = now;
77-89: Handle short writes; use RAII cleanup
writecan return a short count. Also, consolidate temp-file cleanup with RAII to avoid duplication.- // write content to the temp file - if (write(fd, content.c_str(), content.size()) == -1) + // write content to the temp file + ssize_t written = write(fd, content.c_str(), content.size()); + if (written < 0 || static_cast<size_t>(written) != content.size()) { - LOG(ERROR) << "Failed to write to temp file: " << temp_file_path; + LOG(ERROR) << "Failed to write to temp file: " << temp_file_path; close(fd); // Remove the temp file if (std::remove(temp_file_path.c_str()) != 0) { LOG(WARNING) << "Failed to remove temp file: " << temp_file_path; } return; }Optional: introduce a small scope guard for
std::remove(temp_file_path.c_str())on all exit paths.
248-284: GetSmallestFileNumber is public and mutates state without locking — make it private or thread-safeGetSmallestFileNumber (purger_sliding_window.h:129; purger_sliding_window.cpp:248–284) erases entries from window_entries_ but does not take window_mutex_. It's currently only called from TimerWorker while holding window_mutex_ (call at purger_sliding_window.cpp:345), yet leaving it public invites misuse. Either make it private and document "caller holds lock", or acquire window_mutex_ inside the method / provide a thread-safe wrapper.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
eloq_data_store_service/purger_sliding_window.cpp(1 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-09-17T08:10:28.774Z
Learnt from: githubzilla
PR: eloqdata/store_handler#81
File: eloq_data_store_service/purger_sliding_window.cpp:240-276
Timestamp: 2025-09-17T08:10:28.774Z
Learning: In the EloqDS purger sliding window implementation (purger_sliding_window.cpp), entries in the sliding window must be explicitly removed via RemoveFileNumber() and do not expire automatically based on time. The entry_duration_ parameter only applies to deleted entries to prevent frequent S3 updates, not for automatic expiration of active entries.
Applied to files:
eloq_data_store_service/purger_sliding_window.cpp
🧬 Code graph analysis (1)
eloq_data_store_service/purger_sliding_window.cpp (2)
eloq_data_store_service/purger_sliding_window.h (2)
S3FileNumberUpdater(42-70)SlidingWindow(76-192)eloq_data_store_service/purger_event_listener.cpp (6)
BlockPurger(73-79)BlockPurger(73-73)Stop(170-177)Stop(170-170)SetEpoch(60-71)SetEpoch(60-60)
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (4)
eloq_data_store_service/purger_sliding_window.cpp (4)
29-41: Add required POSIX/C headers for mkstemp/write/close/remove.Prevents implicit declarations/UB on some toolchains.
#include <condition_variable> #include <fstream> +#include <cstdio> // std::remove +#include <cstdlib> // mkstemp #include <limits> #include <memory> #include <mutex> #include <sstream> #include <string> #include <thread> #include <unordered_map> #include <utility> +#include <unistd.h> // write, close
191-203: Avoid S3 upload while holding window_mutex_ in AddFileNumber.Network I/O under the lock can stall producers and the timer thread.
-{ - std::lock_guard<std::mutex> lock(window_mutex_); - if (file_number < last_published_smallest_) - { - DLOG(INFO) << "Immediate S3 update with smaller file number: " - << file_number << ", thread_id: " << thread_id - << ", job_id: " << job_id << ", epoch: " << epoch_; - s3_updater_->UpdateSmallestFileNumber(file_number, epoch_); - last_published_smallest_ = file_number; - } - std::string key = GenerateKey(thread_id, job_id); - window_entries_.emplace(key, WindowEntry(file_number)); - DLOG(INFO) << "Added file number to sliding window: " << file_number - << ", thread_id: " << thread_id << ", job_id: " << job_id - << ", epoch: " << epoch_ - << ", window size: " << window_entries_.size(); -} +{ + std::string epoch_copy; + bool do_immediate_update = false; + { + std::lock_guard<std::mutex> lock(window_mutex_); + if (file_number < last_published_smallest_ && !epoch_.empty()) { + last_published_smallest_ = file_number; // pre-commit under lock + epoch_copy = epoch_; + do_immediate_update = true; + } + std::string key = GenerateKey(thread_id, job_id); + window_entries_.emplace(key, WindowEntry(file_number)); + DLOG(INFO) << "Added file number to sliding window: " << file_number + << ", thread_id: " << thread_id << ", job_id: " << job_id + << ", epoch: " << epoch_ + << ", window size: " << window_entries_.size(); + } + if (do_immediate_update) { + DLOG(INFO) << "Immediate S3 update with smaller file number: " + << file_number << ", thread_id: " << thread_id + << ", job_id: " << job_id << ", epoch: " << epoch_copy; + s3_updater_->UpdateSmallestFileNumber(file_number, epoch_copy); + } +}
286-296: Move BlockPurger S3 call out of the lock.Avoid blocking producers on network I/O.
void SlidingWindow::BlockPurger() { - std::lock_guard<std::mutex> lock(window_mutex_); - if (epoch_.empty()) - { - LOG(WARNING) - << "Cannot block purger, epoch is not set in sliding window"; - return; - } - s3_updater_->BlockPurger(epoch_); + std::string epoch_copy; + { + std::lock_guard<std::mutex> lock(window_mutex_); + if (epoch_.empty()) { + LOG(WARNING) << "Cannot block purger, epoch is not set in sliding window"; + return; + } + epoch_copy = epoch_; + } + s3_updater_->BlockPurger(epoch_copy); }
212-218: Align RemoveFileNumber signature with header (uint64_t).Prevents narrowing and keeps API consistent.
-void SlidingWindow::RemoveFileNumber(int thread_id, uint64_t job_id) +void SlidingWindow::RemoveFileNumber(uint64_t thread_id, uint64_t job_id)
🧹 Nitpick comments (4)
eloq_data_store_service/purger_sliding_window.h (2)
72-76: Clarify semantics: not a time-expiring “sliding window.”Per project semantics, entries persist until RemoveFileNumber(); entry_duration_ only applies to deleted entries. Adjust the class doc to prevent future regressions.
- * @brief Time-based sliding window for tracking file numbers with automatic S3 - * updates + * @brief Sliding window for tracking file numbers with periodic S3 updates. + * + * Entries persist until explicitly removed via RemoveFileNumber(). They do NOT + * expire by time. entry_duration_ is applied only to deleted entries to delay + * their actual erase and avoid frequent S3 updates.
125-131: Document thread-safety for GetSmallestFileNumber or reduce visibility.It’s currently public but assumes caller holds window_mutex_. Either make it private or note the locking contract.
- /** - * @brief Get the smallest file number in the current window - * @return The smallest file number, or UINT64_MAX if window is empty - */ + /** + * @brief Get the smallest file number in the current window + * @return The smallest file number, or UINT64_MAX if window is empty + * Thread-safety: must be called with window_mutex_ held. + */eloq_data_store_service/purger_sliding_window.cpp (2)
75-90: Handle partial writes to the temp file.write() can return short counts; loop until all bytes are written.
- // write content to the temp file - if (write(fd, content.c_str(), content.size()) == -1) + // write content to the temp file (handle short writes) + const char* buf = content.c_str(); + size_t n = content.size(); + while (n > 0) { + ssize_t w = write(fd, buf, n); + if (w <= 0) { + LOG(ERROR) << "Failed to write to temp file: " << temp_file_path; + close(fd); + if (std::remove(temp_file_path.c_str()) != 0) { + LOG(WARNING) << "Failed to remove temp file: " << temp_file_path; + } + return; + } + buf += w; + n -= static_cast<size_t>(w); + }
321-347: Optional: unlock before S3 PUT and revalidate under lock to avoid holding window_mutex_ during I/O.
Copy epoch and smallest, release window_mutex_, perform the S3 I/O, then briefly re-lock to revalidate and pre-commit last_published_smallest_. Ensure revalidation is done without attempting to lock the same mutex while a different guard still holds it (use a scoped revalidation and a flag to avoid deadlock). Location: eloq_data_store_service/purger_sliding_window.cpp (TimerWorker).
Verification note: the prior helper script assumed RemoveFileNumber used uint64_t for thread_id — actual signature is void RemoveFileNumber(int thread_id, uint64_t job_id) with call sites at purger_event_listener.cpp:132 and 166. Also verify s3_updater_->UpdateSmallestFileNumber(smallest, epoch_copy) exists and last_published_smallest_ is present/initialized before applying the refactor.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
eloq_data_store_service/purger_sliding_window.cpp(1 hunks)eloq_data_store_service/purger_sliding_window.h(1 hunks)
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: githubzilla
PR: eloqdata/store_handler#81
File: eloq_data_store_service/purger_sliding_window.cpp:240-276
Timestamp: 2025-09-17T08:10:28.774Z
Learning: In the EloqDS purger sliding window implementation (purger_sliding_window.cpp), entries in the sliding window must be explicitly removed via RemoveFileNumber() and do not expire automatically based on time. The entry_duration_ parameter only applies to deleted entries to prevent frequent S3 updates, not for automatic expiration of active entries.
📚 Learning: 2025-09-17T08:10:28.774Z
Learnt from: githubzilla
PR: eloqdata/store_handler#81
File: eloq_data_store_service/purger_sliding_window.cpp:240-276
Timestamp: 2025-09-17T08:10:28.774Z
Learning: In the EloqDS purger sliding window implementation (purger_sliding_window.cpp), entries in the sliding window must be explicitly removed via RemoveFileNumber() and do not expire automatically based on time. The entry_duration_ parameter only applies to deleted entries to prevent frequent S3 updates, not for automatic expiration of active entries.
Applied to files:
eloq_data_store_service/purger_sliding_window.cppeloq_data_store_service/purger_sliding_window.h
🧬 Code graph analysis (2)
eloq_data_store_service/purger_sliding_window.cpp (2)
eloq_data_store_service/purger_sliding_window.h (2)
S3FileNumberUpdater(42-70)SlidingWindow(76-192)eloq_data_store_service/purger_event_listener.cpp (6)
BlockPurger(73-79)BlockPurger(73-73)Stop(170-177)Stop(170-170)SetEpoch(60-71)SetEpoch(60-60)
eloq_data_store_service/purger_sliding_window.h (3)
eloq_data_store_service/purger_event_listener.h (1)
EloqDS(35-146)eloq_data_store_service/purger_sliding_window.cpp (23)
S3FileNumberUpdater(49-57)UpdateSmallestFileNumber(59-125)UpdateSmallestFileNumber(59-60)BlockPurger(127-133)BlockPurger(127-127)BlockPurger(286-296)BlockPurger(286-286)GetS3ObjectKey(135-145)GetS3ObjectKey(135-135)SlidingWindow(149-172)SlidingWindow(174-177)SetEpoch(179-183)SetEpoch(179-179)GetEpoch(185-189)GetEpoch(185-185)AddFileNumber(191-210)AddFileNumber(191-193)RemoveFileNumber(212-246)RemoveFileNumber(212-212)GetSmallestFileNumber(248-284)GetSmallestFileNumber(248-248)Stop(298-319)Stop(298-298)eloq_data_store_service/purger_event_listener.cpp (6)
BlockPurger(73-79)BlockPurger(73-73)SetEpoch(60-71)SetEpoch(60-60)Stop(170-177)Stop(170-170)
There was a problem hiding this comment.
Actionable comments posted: 0
♻️ Duplicate comments (4)
eloq_data_store_service/purger_sliding_window.h (2)
28-31: Resolved: include added.This addresses the missing-header compile issue for std::numeric_limits.
97-106: Thread lifecycle docs: destructor/Stop idempotence; non-copyable resolved.Copy/move deleted: good. Please document that ~SlidingWindow calls Stop(), Stop is idempotent, signals cv_, and joins timer_thread_.
Apply:
- /** - * @brief Destructor - stops the timer thread - */ + /** + * @brief Destructor - calls Stop(). Safe to call multiple times (idempotent). + * Stop() signals cv_, sets should_stop_, and joins timer_thread_. + */Also applies to: 168-173
eloq_data_store_service/purger_event_listener.cpp (2)
212-214: Type width aligned to RocksDB (uint64_t).Good correction from int to uint64_t for thread_id/job_id.
222-224: Fix underflow when GetNextFileNumber() == 0 (data-loss risk).Subtracting 1 from 0 wraps to UINT64_MAX and could publish an unsafe threshold to S3.
Apply:
- uint64_t max_file_number = db->GetNextFileNumber() - 1; + uint64_t next_file_number = db->GetNextFileNumber(); + uint64_t max_file_number = + (next_file_number == 0) ? 0 : (next_file_number - 1);
🧹 Nitpick comments (4)
eloq_data_store_service/purger_sliding_window.h (4)
73-77: Docs mismatch: window is not time-expiring for active entries; clarify param name/semantics.Active entries persist until RemoveFileNumber(); entry_duration applies to deleted entries only (rate-limiting S3 updates). Also the param is entry_duration, not window_duration.
Apply:
-/** - * @brief Time-based sliding window for tracking file numbers with automatic S3 - * updates - */ +/** + * @brief Sliding window tracking in-flight file numbers with periodic S3 updates. + * + * Notes: + * - Active entries do NOT expire by time; they remain until RemoveFileNumber(). + * - entry_duration_ only applies to DELETED entries to rate‑limit S3 updates. + */ @@ - * @param window_duration Duration to keep entries in the window + * @param entry_duration Retention for DELETED entries before physical removal + * from the window (active entries persist until RemoveFileNumber()). * @param s3_update_interval Interval for updating S3 fileAlso applies to: 81-88
126-131: Make GetSmallestFileNumber internal or self-synchronized.As written, callers may read/erase window_entries_ without holding window_mutex_, risking data races. Either document and enforce lock ownership (private API) or add internal locking.
Apply:
- /** - * @brief Get the smallest file number in the current window - * @return The smallest file number, or UINT64_MAX if window is empty - */ - uint64_t GetSmallestFileNumber(); + // (internal)And add under private:
+ /** + * @brief Compute current smallest file number. + * @note Caller must hold window_mutex_. + * @return The smallest file number, or UINT64_MAX if window is empty. + */ + uint64_t GetSmallestFileNumber();Also applies to: 179-183
109-110: Optional: make GetEpoch const.If useful in const contexts, mark GetEpoch() const and window_mutex_ mutable.
- std::string GetEpoch(); + std::string GetEpoch() const; @@ - std::mutex window_mutex_; + mutable std::mutex window_mutex_;Also applies to: 170-171
132-136: Clarify BlockPurger behavior in docs.Document that it writes 0 (UINT64_MIN) to S3 as a sentinel to block the purger.
- /** - * @brief Block purger temporarily - */ + /** + * @brief Block purger temporarily. + * Writes 0 (UINT64_MIN) as "smallest_new_file_number" to S3 for current epoch. + */
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
eloq_data_store_service/purger_event_listener.cpp(1 hunks)eloq_data_store_service/purger_sliding_window.cpp(1 hunks)eloq_data_store_service/purger_sliding_window.h(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- eloq_data_store_service/purger_sliding_window.cpp
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: githubzilla
PR: eloqdata/store_handler#81
File: eloq_data_store_service/purger_sliding_window.cpp:240-276
Timestamp: 2025-09-17T08:10:28.774Z
Learning: In the EloqDS purger sliding window implementation (purger_sliding_window.cpp), entries in the sliding window must be explicitly removed via RemoveFileNumber() and do not expire automatically based on time. The entry_duration_ parameter only applies to deleted entries to prevent frequent S3 updates, not for automatic expiration of active entries.
📚 Learning: 2025-09-17T08:10:28.774Z
Learnt from: githubzilla
PR: eloqdata/store_handler#81
File: eloq_data_store_service/purger_sliding_window.cpp:240-276
Timestamp: 2025-09-17T08:10:28.774Z
Learning: In the EloqDS purger sliding window implementation (purger_sliding_window.cpp), entries in the sliding window must be explicitly removed via RemoveFileNumber() and do not expire automatically based on time. The entry_duration_ parameter only applies to deleted entries to prevent frequent S3 updates, not for automatic expiration of active entries.
Applied to files:
eloq_data_store_service/purger_sliding_window.heloq_data_store_service/purger_event_listener.cpp
🧬 Code graph analysis (2)
eloq_data_store_service/purger_sliding_window.h (3)
eloq_data_store_service/purger_event_listener.h (1)
EloqDS(35-146)eloq_data_store_service/purger_sliding_window.cpp (23)
S3FileNumberUpdater(51-59)UpdateSmallestFileNumber(61-127)UpdateSmallestFileNumber(61-62)BlockPurger(129-135)BlockPurger(129-129)BlockPurger(288-298)BlockPurger(288-288)GetS3ObjectKey(137-147)GetS3ObjectKey(137-137)SlidingWindow(151-174)SlidingWindow(176-179)SetEpoch(181-185)SetEpoch(181-181)GetEpoch(187-191)GetEpoch(187-187)AddFileNumber(193-212)AddFileNumber(193-195)RemoveFileNumber(214-248)RemoveFileNumber(214-214)GetSmallestFileNumber(250-286)GetSmallestFileNumber(250-250)Stop(300-321)Stop(300-300)eloq_data_store_service/purger_event_listener.cpp (6)
BlockPurger(73-79)BlockPurger(73-73)SetEpoch(60-71)SetEpoch(60-60)Stop(169-176)Stop(169-169)
eloq_data_store_service/purger_event_listener.cpp (2)
eloq_data_store_service/purger_event_listener.h (1)
PurgerEventListener(47-144)eloq_data_store_service/purger_sliding_window.cpp (8)
Stop(300-321)Stop(300-300)SetEpoch(181-185)SetEpoch(181-181)BlockPurger(129-135)BlockPurger(129-129)BlockPurger(288-298)BlockPurger(288-288)
🔇 Additional comments (3)
eloq_data_store_service/purger_event_listener.cpp (2)
41-47: BlockPurger already handled during DB open — no change needed.rocksdb_cloud_data_store.cpp calls db_event_listener->SetEpoch(current_epoch) then db_event_listener->BlockPurger() (≈ lines 657–660), and PurgerEventListener::BlockPurger() forwards to sliding_window_->BlockPurger() (purger_event_listener.cpp, ≈ lines 73–79), so inserting sliding_window_->BlockPurger() after construction is redundant.
Likely an incorrect or invalid review comment.
169-176: Shutdown ordering — OK: DB::Close quiesces background work before destroying listenersPurgerEventListener is registered into options.listeners (eloq_data_store_service/rocksdb_cloud_data_store.cpp: ~580–586) and the DB is closed/deleted in RocksDBCloudDataStore::~RocksDBCloudDataStore (db_->Close(); delete db_; ~lines 177–183). RocksDB waits for background work / notification callbacks to finish and clears listeners before destroying them, so PurgerEventListener::~PurgerEventListener (which calls Stop()) runs after background threads are quiesced. (github.com)
eloq_data_store_service/purger_sliding_window.h (1)
157-159: Ensure GenerateKey uses an explicit delimiter to avoid key collisionsVerify SlidingWindow::GenerateKey constructs keys with an unambiguous delimiter (e.g. "<thread_id>:<job_id>") rather than simple concatenation; update implementation in eloq_data_store_service/purger_sliding_window.h (lines 157-159) and the related area at 185-191 if necessary.
There was a problem hiding this comment.
Actionable comments posted: 0
♻️ Duplicate comments (2)
eloq_data_store_service/purger_sliding_window.cpp (2)
285-295: Move BlockPurger’s S3 call outside the mutex.Avoid holding window_mutex_ during network I/O; copy epoch under lock, then upload.
void SlidingWindow::BlockPurger() { - std::lock_guard<std::mutex> lock(window_mutex_); - if (epoch_.empty()) - { - LOG(WARNING) - << "Cannot block purger, epoch is not set in sliding window"; - return; - } - s3_updater_->BlockPurger(epoch_); + std::string epoch_copy; + { + std::lock_guard<std::mutex> lock(window_mutex_); + if (epoch_.empty()) + { + LOG(WARNING) + << "Cannot block purger, epoch is not set in sliding window"; + return; + } + epoch_copy = epoch_; + } + s3_updater_->BlockPurger(epoch_copy); }
193-212: Guard against empty epoch and avoid S3 I/O under the mutex in AddFileNumber (precommit + upload outside).Immediate S3 update runs under window_mutex_ and doesn’t check epoch_.empty(). This can block producers and may write to an invalid S3 key when epoch is not set yet.
Apply this refactor:
void SlidingWindow::AddFileNumber(uint64_t file_number, uint64_t thread_id, uint64_t job_id) { - std::lock_guard<std::mutex> lock(window_mutex_); - if (file_number < last_published_smallest_) - { - DLOG(INFO) << "Immediate S3 update with smaller file number: " - << file_number << ", thread_id: " << thread_id - << ", job_id: " << job_id << ", epoch: " << epoch_; - s3_updater_->UpdateSmallestFileNumber(file_number, epoch_); - last_published_smallest_ = file_number; - } - std::string key = GenerateKey(thread_id, job_id); - window_entries_.emplace(key, WindowEntry(file_number)); - DLOG(INFO) << "Added file number to sliding window: " << file_number - << ", thread_id: " << thread_id << ", job_id: " << job_id - << ", epoch: " << epoch_ - << ", window size: " << window_entries_.size(); + std::string epoch_copy; + bool do_immediate_update = false; + { + std::lock_guard<std::mutex> lock(window_mutex_); + do_immediate_update = + (file_number < last_published_smallest_) && !epoch_.empty(); + if (do_immediate_update) + { + // Pre-commit to prevent a later timer tick from publishing larger. + last_published_smallest_ = file_number; + epoch_copy = epoch_; + } + std::string key = GenerateKey(thread_id, job_id); + window_entries_.emplace(key, WindowEntry(file_number)); + DLOG(INFO) << "Added file number to sliding window: " << file_number + << ", thread_id: " << thread_id << ", job_id: " << job_id + << ", epoch: " << epoch_ + << ", window size: " << window_entries_.size(); + } + if (do_immediate_update) + { + DLOG(INFO) << "Immediate S3 update with smaller file number: " + << file_number << ", thread_id: " << thread_id + << ", job_id: " << job_id << ", epoch: " << epoch_copy; + s3_updater_->UpdateSmallestFileNumber(file_number, epoch_copy); + } }
🧹 Nitpick comments (2)
eloq_data_store_service/purger_sliding_window.cpp (2)
79-91: Handle partial writes to the temp file (robustness).write() can legally write fewer bytes than requested; current code treats only -1 as failure.
- // write content to the temp file - if (write(fd, content.c_str(), content.size()) == -1) - { - LOG(ERROR) << "Failed to write to temp file: " << temp_file_path; - close(fd); - // Remove the temp file - if (std::remove(temp_file_path.c_str()) != 0) - { - LOG(WARNING) << "Failed to remove temp file: " << temp_file_path; - } - return; - } + // write content to the temp file (handle partial writes) + const char* buf = content.c_str(); + size_t total = content.size(); + size_t off = 0; + while (off < total) + { + ssize_t n = write(fd, buf + off, total - off); + if (n < 0) + { + LOG(ERROR) << "Failed to write to temp file: " << temp_file_path; + close(fd); + if (std::remove(temp_file_path.c_str()) != 0) + { + LOG(WARNING) << "Failed to remove temp file: " << temp_file_path; + } + return; + } + off += static_cast<size_t>(n); + } + (void)fsync(fd); // best-effort durability before reader re-opens
247-283: Confirm locking discipline for GetSmallestFileNumber.This method reads/modifies window_entries_ without acquiring window_mutex_. Callers currently hold the lock, but this is easy to misuse.
Would you prefer:
- Make it private and document “must be called with window_mutex_ held”, or
- Take the lock internally and add a GetSmallestFileNumberLocked() helper for internal use?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
eloq_data_store_service/purger_sliding_window.cpp(1 hunks)
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: githubzilla
PR: eloqdata/store_handler#81
File: eloq_data_store_service/purger_sliding_window.cpp:240-276
Timestamp: 2025-09-17T08:10:28.801Z
Learning: In the EloqDS purger sliding window implementation (purger_sliding_window.cpp), entries in the sliding window must be explicitly removed via RemoveFileNumber() and do not expire automatically based on time. The entry_duration_ parameter only applies to deleted entries to prevent frequent S3 updates, not for automatic expiration of active entries.
📚 Learning: 2025-09-17T08:10:28.801Z
Learnt from: githubzilla
PR: eloqdata/store_handler#81
File: eloq_data_store_service/purger_sliding_window.cpp:240-276
Timestamp: 2025-09-17T08:10:28.801Z
Learning: In the EloqDS purger sliding window implementation (purger_sliding_window.cpp), entries in the sliding window must be explicitly removed via RemoveFileNumber() and do not expire automatically based on time. The entry_duration_ parameter only applies to deleted entries to prevent frequent S3 updates, not for automatic expiration of active entries.
Applied to files:
eloq_data_store_service/purger_sliding_window.cpp
🧬 Code graph analysis (1)
eloq_data_store_service/purger_sliding_window.cpp (2)
eloq_data_store_service/purger_sliding_window.h (2)
S3FileNumberUpdater(43-71)SlidingWindow(77-193)eloq_data_store_service/purger_event_listener.cpp (6)
BlockPurger(73-79)BlockPurger(73-73)Stop(169-176)Stop(169-169)SetEpoch(60-71)SetEpoch(60-60)
e839675 to
1b01a1e
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
eloq_data_store_service/CMakeLists.txt(1 hunks)eloq_data_store_service/build_eloq_store.cmake(1 hunks)eloq_data_store_service/purger_event_listener.cpp(1 hunks)eloq_data_store_service/purger_event_listener.h(1 hunks)eloq_data_store_service/purger_sliding_window.cpp(1 hunks)eloq_data_store_service/purger_sliding_window.h(1 hunks)eloq_data_store_service/rocksdb_cloud_data_store.cpp(3 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- eloq_data_store_service/build_eloq_store.cmake
- eloq_data_store_service/purger_sliding_window.h
- eloq_data_store_service/CMakeLists.txt
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: githubzilla
PR: eloqdata/store_handler#81
File: eloq_data_store_service/purger_sliding_window.cpp:240-276
Timestamp: 2025-09-17T08:10:28.801Z
Learning: In the EloqDS purger sliding window implementation (purger_sliding_window.cpp), entries in the sliding window must be explicitly removed via RemoveFileNumber() and do not expire automatically based on time. The entry_duration_ parameter only applies to deleted entries to prevent frequent S3 updates, not for automatic expiration of active entries.
📚 Learning: 2025-09-17T08:10:28.801Z
Learnt from: githubzilla
PR: eloqdata/store_handler#81
File: eloq_data_store_service/purger_sliding_window.cpp:240-276
Timestamp: 2025-09-17T08:10:28.801Z
Learning: In the EloqDS purger sliding window implementation (purger_sliding_window.cpp), entries in the sliding window must be explicitly removed via RemoveFileNumber() and do not expire automatically based on time. The entry_duration_ parameter only applies to deleted entries to prevent frequent S3 updates, not for automatic expiration of active entries.
Applied to files:
eloq_data_store_service/purger_sliding_window.cppeloq_data_store_service/purger_event_listener.heloq_data_store_service/purger_event_listener.cpp
🧬 Code graph analysis (3)
eloq_data_store_service/purger_sliding_window.cpp (2)
eloq_data_store_service/purger_sliding_window.h (2)
S3FileNumberUpdater(43-71)SlidingWindow(77-193)eloq_data_store_service/purger_event_listener.cpp (6)
BlockPurger(73-79)BlockPurger(73-73)Stop(169-176)Stop(169-169)SetEpoch(60-71)SetEpoch(60-60)
eloq_data_store_service/purger_event_listener.h (3)
eloq_data_store_service/purger_sliding_window.h (2)
EloqDS(37-193)SlidingWindow(77-193)eloq_data_store_service/purger_event_listener.cpp (20)
PurgerEventListener(32-53)PurgerEventListener(55-58)SetEpoch(60-71)SetEpoch(60-60)BlockPurger(73-79)BlockPurger(73-73)OnFlushBegin(81-105)OnFlushBegin(81-82)OnFlushCompleted(107-135)OnFlushCompleted(107-108)OnCompactionBegin(137-149)OnCompactionBegin(137-138)OnCompactionCompleted(151-167)OnCompactionCompleted(151-152)Stop(169-176)Stop(169-169)GetFlushReason(178-210)GetFlushReason(178-179)UpdateSlidingWindow(212-229)UpdateSlidingWindow(212-214)eloq_data_store_service/purger_sliding_window.cpp (10)
SetEpoch(181-185)SetEpoch(181-181)BlockPurger(129-135)BlockPurger(129-129)BlockPurger(285-295)BlockPurger(285-285)Stop(297-318)Stop(297-297)SlidingWindow(151-174)SlidingWindow(176-179)
eloq_data_store_service/purger_event_listener.cpp (2)
eloq_data_store_service/purger_event_listener.h (1)
PurgerEventListener(47-144)eloq_data_store_service/purger_sliding_window.cpp (8)
Stop(297-318)Stop(297-297)SetEpoch(181-185)SetEpoch(181-181)BlockPurger(129-135)BlockPurger(129-129)BlockPurger(285-295)BlockPurger(285-285)
🔇 Additional comments (31)
eloq_data_store_service/purger_event_listener.cpp (11)
32-53: LGTM on PurgerEventListener constructor!The constructor implementation correctly:
- Initializes member variables from parameters
- Creates the SlidingWindow with proper parameters
- Logs informative initialization details with all key configuration values
55-58: LGTM on destructor implementation!Properly calls Stop() to ensure clean shutdown of the sliding window.
60-71: LGTM on SetEpoch implementation!The method correctly:
- Guards against null sliding_window_
- Logs the epoch transition with appropriate handling of empty values
- Delegates to the SlidingWindow's SetEpoch method
73-79: LGTM on BlockPurger implementation!Simple and correct delegation to the sliding window with null guard.
81-105: LGTM on OnFlushBegin implementation!The method correctly:
- Logs detailed flush information only when performance is impacted
- Updates the sliding window with current database state
- Uses appropriate logging levels (LOG vs DLOG)
107-135: LGTM on OnFlushCompleted implementation!The method correctly:
- Mirrors the OnFlushBegin logging pattern
- Removes the completed operation from the sliding window
- Guards against null sliding_window_
137-149: LGTM on OnCompactionBegin implementation!Correctly logs compaction details and updates the sliding window with database state.
151-167: LGTM on OnCompactionCompleted implementation!Correctly logs completion details and removes the operation from the sliding window.
169-176: LGTM on Stop implementation!Properly stops and resets the sliding window to ensure clean shutdown.
178-210: LGTM on GetFlushReason implementation!Comprehensive mapping of all RocksDB flush reasons to human-readable strings for logging purposes.
212-229: Check for underflow in file number calculation.The calculation
db->GetNextFileNumber() - 1could underflow ifGetNextFileNumber()returns 0, though this is likely rare in practice.Consider adding a safety check:
// Get current max file number from RocksDB -uint64_t max_file_number = db->GetNextFileNumber() - 1; +uint64_t next_file_number = db->GetNextFileNumber(); +uint64_t max_file_number = (next_file_number == 0) ? 0 : (next_file_number - 1);eloq_data_store_service/purger_event_listener.h (2)
23-34: LGTM on includes and dependencies!The header includes are appropriate for the class functionality:
- Standard library headers for core functionality
- RocksDB headers for the EventListener interface and cloud storage
- glog for logging
- The purger_sliding_window.h for the SlidingWindow dependency
38-146: LGTM on PurgerEventListener class definition!The class is well-designed with:
- Clear documentation for the class purpose and all public methods
- Appropriate inheritance from rocksdb::EventListener
- Proper default parameter values in the constructor
- Clean separation of public interface and private implementation details
- Consistent use of uint64_t for thread_id and job_id to match RocksDB types
eloq_data_store_service/purger_sliding_window.cpp (15)
23-42: LGTM on includes!All necessary headers are properly included, including the POSIX headers (unistd.h, cstdio, cstdlib) for the system calls used in the implementation.
51-59: LGTM on S3FileNumberUpdater constructor!Simple and correct initialization of member variables.
129-135: LGTM on BlockPurger implementation!The use of
std::numeric_limits<uint64_t>::min()(which is 0) is correct for blocking the purger, as the purger only removes files with numbers smaller than the threshold.
137-147: LGTM on GetS3ObjectKey implementation!Correctly constructs the S3 object key with proper path separator handling and epoch suffix.
151-174: LGTM on SlidingWindow constructor!The constructor correctly:
- Initializes all member variables
- Creates the S3FileNumberUpdater
- Starts the timer thread
- Logs initialization details
176-179: LGTM on SlidingWindow destructor!Ensures proper cleanup by calling Stop().
181-191: LGTM on SetEpoch and GetEpoch implementations!Both methods properly use mutex protection for thread-safe epoch access.
214-245: LGTM on RemoveFileNumber implementation!The method correctly:
- Marks entries as deleted rather than removing immediately
- Updates the timestamp for deletion tracking
- Provides appropriate logging for both success and failure cases
Based on the retrieved learning, this "mark as deleted" approach is intentional to prevent frequent S3 updates.
247-283: LGTM on GetSmallestFileNumber implementation!The method correctly:
- Handles empty window case
- Removes expired deleted entries during iteration
- Finds the minimum file number among active entries
- Provides detailed logging
The behavior of only removing deleted entries after expiration (not all entries) is intentional based on the retrieved learning.
297-318: LGTM on Stop implementation!The method correctly:
- Uses proper synchronization to signal stopping
- Waits for timer thread to complete
- Handles the case where Stop() is called multiple times
- Provides appropriate logging
320-349: LGTM on TimerWorker implementation!The timer thread correctly:
- Uses condition variable for efficient waiting
- Handles stop signal properly
- Skips processing when epoch is empty
- Calls appropriate methods for periodic S3 updates
359-365: LGTM on GenerateKey implementation!Simple and correct key generation using thread_id and job_id.
61-127: Network I/O under lock could block other operations.The
AddFileNumbermethod callss3_updater_->UpdateSmallestFileNumber()while holding thewindow_mutex_, which can stall other operations during network I/O.Consider moving the S3 upload outside the lock:
void SlidingWindow::AddFileNumber(uint64_t file_number, uint64_t thread_id, uint64_t job_id) { + std::string epoch_copy; + bool do_immediate_update = false; { std::lock_guard<std::mutex> lock(window_mutex_); - if (file_number < last_published_smallest_) - { - DLOG(INFO) << "Immediate S3 update with smaller file number: " - << file_number << ", thread_id: " << thread_id - << ", job_id: " << job_id << ", epoch: " << epoch_; - s3_updater_->UpdateSmallestFileNumber(file_number, epoch_); - last_published_smallest_ = file_number; - } + do_immediate_update = (file_number < last_published_smallest_) && !epoch_.empty(); + if (do_immediate_update) { + epoch_copy = epoch_; + last_published_smallest_ = file_number; + } std::string key = GenerateKey(thread_id, job_id); window_entries_.emplace(key, WindowEntry(file_number)); // ... logging } + if (do_immediate_update) { + DLOG(INFO) << "Immediate S3 update with smaller file number: " + << file_number << ", thread_id: " << thread_id + << ", job_id: " << job_id << ", epoch: " << epoch_copy; + s3_updater_->UpdateSmallestFileNumber(file_number, epoch_copy); + } }
193-212: Network I/O under lock blocks other operations.The current implementation performs S3 upload while holding
window_mutex_, which can stallAdd/Removeoperations and the timer thread.Apply the fix described in the earlier comment to move network I/O outside the lock.
285-295: Network I/O under lock blocks other operations.
BlockPurger()performs S3 upload while holdingwindow_mutex_, which can stall other operations.Move the S3 call outside the lock:
void SlidingWindow::BlockPurger() { - std::lock_guard<std::mutex> lock(window_mutex_); - if (epoch_.empty()) - { - LOG(WARNING) << "Cannot block purger, epoch is not set in sliding window"; - return; - } - s3_updater_->BlockPurger(epoch_); + std::string epoch_copy; + { + std::lock_guard<std::mutex> lock(window_mutex_); + if (epoch_.empty()) + { + LOG(WARNING) << "Cannot block purger, epoch is not set in sliding window"; + return; + } + epoch_copy = epoch_; + } + s3_updater_->BlockPurger(epoch_copy); }eloq_data_store_service/rocksdb_cloud_data_store.cpp (3)
47-48: LGTM! Necessary includes for the purger integration.The addition of the PurgerEventListener header and CloudFileSystemImpl header is appropriate for implementing the purger functionality.
570-585: LGTM! Proper casting and error handling for the purger event listener setup.The implementation correctly:
- Casts the cloud filesystem to CloudFileSystemImpl and handles failure
- Constructs the bucket name by combining prefix and name
- Creates the PurgerEventListener with appropriate parameters
- Adds it to the listeners collection
The comment explaining the empty epoch parameter is clear and justified.
638-670: Approve — purger blocking sequence verifiedSetEpoch and BlockPurger are implemented and delegate to SlidingWindow (see eloq_data_store_service/purger_event_listener.{h,cpp} and purger_sliding_window.{h,cpp}). The PauseBackgroundWork -> GetCurrentEpoch -> SetEpoch -> BlockPurger -> ContinueBackgroundWork flow is correct and ContinueBackgroundWork is invoked on error paths. No changes required.
| void SlidingWindow::FlushToS3(uint64_t smallest) | ||
| { | ||
| s3_updater_->UpdateSmallestFileNumber(smallest, epoch_); | ||
| last_published_smallest_ = smallest; | ||
| DLOG(INFO) << "Updated S3 with smallest file number: " << smallest | ||
| << ", epoch: " << epoch_; | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
Potential race condition in S3 updates.
FlushToS3 updates last_published_smallest_ without holding the lock, which could race with concurrent AddFileNumber calls and potentially allow a larger value to overwrite a smaller one.
Verify that the timer thread's FlushToS3 calls won't race with AddFileNumber by checking if proper synchronization is in place:
🏁 Script executed:
#!/bin/bash
# Description: Check for proper synchronization in FlushToS3 and related S3 update paths
# Look for FlushToS3 usage and last_published_smallest_ updates
rg -n -A5 -B5 "FlushToS3|last_published_smallest_" --type=cppLength of output: 7361
Race: FlushToS3 updates shared state without locking — fix required
FlushToS3 (eloq_data_store_service/purger_sliding_window.cpp:351-357) assigns last_published_smallest_ without holding window_mutex_, while AddFileNumber (lines ~193-205) reads/updates last_published_smallest_ under window_mutex_. This is a data race/UB.
Fix: acquire the same window_mutex_ in FlushToS3 around s3_updater_->UpdateSmallestFileNumber(...) and the last_published_smallest_ assignment, or make last_published_smallest_ a std::atomic<uint64_t> and ensure all accesses use proper atomic/synchronization.
🤖 Prompt for AI Agents
In eloq_data_store_service/purger_sliding_window.cpp around lines 351 to 357,
FlushToS3 mutates shared state last_published_smallest_ and calls
s3_updater_->UpdateSmallestFileNumber(...) without holding window_mutex_,
causing a data race with AddFileNumber which accesses last_published_smallest_
under window_mutex_. Fix by acquiring window_mutex_ at the start of FlushToS3
and release it after both UpdateSmallestFileNumber(...) and the assignment to
last_published_smallest_ complete; alternatively, if you prefer atomics, change
last_published_smallest_ to std::atomic<uint64_t> and update all read/write
sites to use atomic operations consistently (choose one approach and apply it
across the class).
There was a problem hiding this comment.
A lock has been added in the TimerWorker() loop
There was a problem hiding this comment.
🧩 Analysis chain
@githubzilla, you're absolutely right! I can confirm that the race condition has been properly addressed.
Looking at the current implementation, FlushToS3 is called from TimerWorker() on Line 343 while window_mutex_ is already held (acquired on Line 318). Since FlushToS3 is a private method only called from this locked context, both the S3 update and the last_published_smallest_ assignment are properly synchronized with AddFileNumber.
The fix is correct - my concern has been resolved. Thank you for the clarification!
🏁 Script executed:
#!/bin/bash
# Check the current implementation of FlushToS3 and TimerWorker methods to verify the lock usage
rg -n -A10 -B5 "void SlidingWindow::FlushToS3|void SlidingWindow::TimerWorker" eloq_data_store_service/purger_sliding_window.cpp
Length of output: 1030
🏁 Script executed:
#!/bin/bash
# Check all calls to FlushToS3 to ensure it's only called from TimerWorker where the lock is held
rg -n "FlushToS3" eloq_data_store_service/purger_sliding_window.cpp eloq_data_store_service/purger_sliding_window.h
Length of output: 276
@githubzilla, you're absolutely right! I can confirm that the race condition has been properly addressed.
Looking at the current implementation, FlushToS3 is called from TimerWorker() on Line 345 while window_mutex_ is already held (acquired on Line 322). Since FlushToS3 is a private method only called from this locked context, both the S3 update and the last_published_smallest_ assignment are properly synchronized with AddFileNumber.
The fix is correct - my concern has been resolved. Thank you for the clarification!
🧠 Learnings used
Learnt from: githubzilla
PR: eloqdata/store_handler#81
File: eloq_data_store_service/purger_sliding_window.cpp:240-276
Timestamp: 2025-09-17T08:10:28.801Z
Learning: In the EloqDS purger sliding window implementation (purger_sliding_window.cpp), entries in the sliding window must be explicitly removed via RemoveFileNumber() and do not expire automatically based on time. The entry_duration_ parameter only applies to deleted entries to prevent frequent S3 updates, not for automatic expiration of active entries.
Summary by CodeRabbit
New Features
Improvements
Chores