From 04c615303c7435ca99435c88d1049a20fd0dfe4c Mon Sep 17 00:00:00 2001 From: lokax Date: Fri, 26 Dec 2025 10:49:50 +0800 Subject: [PATCH 01/16] phase 2 --- .../write_commands/write_commands.cpp | 46 +++++++++++++++++- .../db/modules/eloq/src/eloq_record_store.cpp | 9 ++++ src/mongo/db/operation_context.cpp | 4 ++ src/mongo/db/operation_context.h | 7 +++ src/mongo/db/ops/write_ops_exec.cpp | 47 +++++++++++++++---- 5 files changed, 103 insertions(+), 10 deletions(-) diff --git a/src/mongo/db/commands/write_commands/write_commands.cpp b/src/mongo/db/commands/write_commands/write_commands.cpp index aa2f727a03..34f326a90b 100644 --- a/src/mongo/db/commands/write_commands/write_commands.cpp +++ b/src/mongo/db/commands/write_commands/write_commands.cpp @@ -26,6 +26,8 @@ * it in the license file. */ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage +// #include "mongo/platform/basic.h" #include "mongo/base/init.h" #include "mongo/bson/mutable/document.h" #include "mongo/bson/mutable/element.h" @@ -53,6 +55,7 @@ #include "mongo/db/stats/counters.h" #include "mongo/db/write_concern.h" #include "mongo/s/stale_exception.h" +#include "mongo/util/log.h" namespace mongo { namespace { @@ -87,8 +90,14 @@ void serializeReply(OperationContext* opCtx, size_t opsInBatch, WriteResult result, BSONObjBuilder* out) { - if (shouldSkipOutput(opCtx)) + // Phase 1: Log entry point + MONGO_LOG(0) << "[Phase1] serializeReply called: opsInBatch=" << opsInBatch + << ", result.results.size()=" << result.results.size(); + + if (shouldSkipOutput(opCtx)) { + MONGO_LOG(0) << "[Phase1] serializeReply: skipping output due to write concern"; return; + } if (continueOnError && !result.results.empty()) { const auto& lastResult = result.results.back(); @@ -165,6 +174,15 @@ void serializeReply(OperationContext* opCtx, if (!errors.empty()) { out->append("writeErrors", errors); + // Phase 1: Logging to understand current response format + BSONObj tempObj = out->asTempObj(); + bool hasOk = tempObj.hasField("ok"); + bool okValue = hasOk ? tempObj["ok"].trueValue() : false; + // Use warning() to ensure log output, and LOG(0) for highest verbosity + warning() << "[Phase1] Bulk write errors: " << errors.size() << " errors, ok field: " + << (hasOk ? (okValue ? "1" : "0") : "missing"); + MONGO_LOG(0) << "[Phase1] Bulk write errors: " << errors.size() << " errors, ok field: " + << (hasOk ? (okValue ? "1" : "0") : "missing"); } // writeConcernError field is handled by command processor. @@ -232,6 +250,22 @@ class WriteCommand::InvocationBase : public CommandInvocation { BSONObjBuilder bob = result->getBodyBuilder(); runImpl(opCtx, bob); CommandHelpers::extractOrAppendOk(bob); + // Phase 1: Logging to understand final response format + BSONObj response = bob.asTempObj(); + if (response.hasField("writeErrors")) { + warning() << "[Phase1] Final bulk write response: ok=" + << (response.hasField("ok") ? (response["ok"].trueValue() ? "1" : "0") + : "missing") + << ", writeErrors=" << response["writeErrors"].Array().size() + << ", n=" << (response.hasField("n") ? response["n"].numberLong() : -1); + MONGO_LOG(0) << "[Phase1] Final bulk write response: ok=" + << (response.hasField("ok") ? (response["ok"].trueValue() ? "1" : "0") + : "missing") + << ", writeErrors=" << response["writeErrors"].Array().size() + << ", n=" << (response.hasField("n") ? response["n"].numberLong() : -1); + } + warning() << "[Phase1] Final bulk write response (full): " << response.toString(); + MONGO_LOG(0) << "[Phase1] Final bulk write response (full): " << response.toString(); } catch (const DBException& ex) { LastError::get(opCtx->getClient()).setLastError(ex.code(), ex.reason()); throw; @@ -300,7 +334,17 @@ class CmdInsert final : public WriteCommand { } void runImpl(OperationContext* opCtx, BSONObjBuilder& result) const override { + // Phase 1: Log entry point for insert command + warning() << "[Phase1] CmdInsert::runImpl called: documents=" << _batch.getDocuments().size() + << ", ordered=" << _batch.getWriteCommandBase().getOrdered(); + MONGO_LOG(0) << "[Phase1] CmdInsert::runImpl called: documents=" << _batch.getDocuments().size() + << ", ordered=" << _batch.getWriteCommandBase().getOrdered(); + auto reply = performInserts(opCtx, _batch); + + warning() << "[Phase1] performInserts returned: results.size()=" << reply.results.size(); + MONGO_LOG(0) << "[Phase1] performInserts returned: results.size()=" << reply.results.size(); + serializeReply(opCtx, ReplyStyle::kNotUpdate, !_batch.getWriteCommandBase().getOrdered(), diff --git a/src/mongo/db/modules/eloq/src/eloq_record_store.cpp b/src/mongo/db/modules/eloq/src/eloq_record_store.cpp index c0d4fbc947..a77de5aaef 100644 --- a/src/mongo/db/modules/eloq/src/eloq_record_store.cpp +++ b/src/mongo/db/modules/eloq/src/eloq_record_store.cpp @@ -1092,6 +1092,10 @@ Status EloqRecordStore::_insertRecords(OperationContext* opCtx, MONGO_LOG(1) << "Insert into a Data Table."; const EloqRecoveryUnit::DiscoveredTable& table = ru->discoveredTable(_tableName); uint64_t pkeySchemaVersion = table._schema->KeySchema()->SchemaTs(); + + // First, check all keys for duplicates before inserting any records. + // This prevents partial inserts that would cause false duplicate key errors + // when falling back to one-at-a-time insertion. txservice::TxErrorCode err = ru->batchGetKV(opCtx, _tableName, pkeySchemaVersion, batchTuples, true); if (err != txservice::TxErrorCode::NO_ERROR) { @@ -1101,14 +1105,19 @@ Status EloqRecordStore::_insertRecords(OperationContext* opCtx, return TxErrorCodeToMongoStatus(err); } + // Check all keys for duplicates first, before inserting anything. for (size_t i = 0; i < nRecords; i++) { const txservice::ScanBatchTuple& tuple = batchTuples[i]; if (tuple.status_ == txservice::RecordStatus::Normal) { + MONGO_LOG(0) << "yf: insertRecords, duplicate key = " << batchEntries[i].keyString.toString() << ", txn: " << ru->getTxm()->TxNumber() << "i = " << i << ", n record = " << nRecords; return {ErrorCodes::DuplicateKey, "DuplicateKey"}; } else { invariant(tuple.status_ == txservice::RecordStatus::Deleted); } + } + // All keys are valid (no duplicates found). Now insert all records. + for (size_t i = 0; i < nRecords; i++) { std::unique_ptr& mongoKey = batchEntries[i].mongoKey; std::unique_ptr mongoRecord = std::make_unique(); const RecordData& data = records[i].data; diff --git a/src/mongo/db/operation_context.cpp b/src/mongo/db/operation_context.cpp index 60f146ffb3..f7fc4ce5f6 100644 --- a/src/mongo/db/operation_context.cpp +++ b/src/mongo/db/operation_context.cpp @@ -482,6 +482,10 @@ WriteUnitOfWork::RecoveryUnitState OperationContext::getRecoveryUnitState() cons return _ruState; } +void OperationContext::setRecoveryUnitState(WriteUnitOfWork::RecoveryUnitState state) { + _ruState = state; +} + void OperationContext::resetLockState() { _locker->reset(); } diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h index b1c51935bf..14dc2b6ad0 100644 --- a/src/mongo/db/operation_context.h +++ b/src/mongo/db/operation_context.h @@ -117,6 +117,13 @@ class OperationContext : public Decorable { WriteUnitOfWork::RecoveryUnitState getRecoveryUnitState() const; + /** + * Sets the RecoveryUnitState without aborting the transaction. + * This is used when we need to reset state after a nested WriteUnitOfWork failure + * but want to keep the outer transaction active (e.g., command-level transactions). + */ + void setRecoveryUnitState(WriteUnitOfWork::RecoveryUnitState state); + // WriteUnitOfWork::RecoveryUnitState setRecoveryUnit(RecoveryUnit::UPtr unit, // WriteUnitOfWork::RecoveryUnitState state); /** diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp index 783d0d3e7a..72daf286c4 100644 --- a/src/mongo/db/ops/write_ops_exec.cpp +++ b/src/mongo/db/ops/write_ops_exec.cpp @@ -226,7 +226,8 @@ bool handleError(OperationContext* opCtx, const DBException& ex, const NamespaceString& nss, const write_ops::WriteCommandBase& wholeOp, - WriteResult* out) { + WriteResult* out, + bool allowCommandLevelTransaction = false) { LastError::get(opCtx->getClient()).setLastError(ex.code(), ex.reason()); auto& curOp = *CurOp::get(opCtx); curOp.debug().errInfo = ex.toStatus(); @@ -242,7 +243,9 @@ bool handleError(OperationContext* opCtx, } // EloqDoc enables command level transaction. - if (opCtx->lockState()->inAWriteUnitOfWork()) { + // For insert operations, allow error handling in command-level transactions. + // For update/delete operations, keep the original behavior (throw). + if (!allowCommandLevelTransaction && opCtx->lockState()->inAWriteUnitOfWork()) { throw; } @@ -365,6 +368,7 @@ void insertDocuments(OperationContext* opCtx, uassertStatusOK(collection->insertDocuments( opCtx, begin, end, &CurOp::get(opCtx)->debug(), /*enforceQuota*/ true, fromMigrate)); + MONGO_LOG(0) << "yf: insertDocuments, commit"; wuow.commit(); } @@ -456,17 +460,29 @@ bool insertBatchAndHandleErrors(OperationContext* opCtx, curOp.debug().additiveMetrics.incrementNinserted(batch.size()); return true; } - } catch (const DBException&) { + } catch (const DBException&e) { // If we cannot abandon the current snapshot, we give up and rethrow the exception. - // No WCE retrying is attempted. This code path is intended for snapshot read concern. - if (opCtx->lockState()->inAWriteUnitOfWork()) { + // No WCE retrying is attempted. This code path is intended for snapshot read concern, + // which is only used in multi-document transactions. + auto session = OperationContextSession::get(opCtx); + if (session && session->inMultiDocumentTransaction()) { throw; } - + MONGO_LOG(0) << "yf: catach, exception = " << e.toString() << ", code = " << e.code(); // Otherwise, ignore this failure and behave as-if we never tried to do the combined batch // insert. The loop below will handle reporting any non-transient errors. collection.reset(); + + // Reset RecoveryUnit state if it was set to kFailedUnitOfWork by the nested WriteUnitOfWork + // in insertDocuments. This is necessary because the nested WriteUnitOfWork's destructor + // sets the state to kFailedUnitOfWork when it aborts, but we need kActiveUnitOfWork for + // the fallback single-insert operations. Since we're in a command-level transaction (outer + // WriteUnitOfWork still exists), we only reset the state without aborting the transaction. + if (opCtx->getRecoveryUnitState() == WriteUnitOfWork::RecoveryUnitState::kFailedUnitOfWork) { + //opCtx->recoveryUnit()->abandonSnapshot(); // Clear any failed snapshot state + opCtx->setRecoveryUnitState(WriteUnitOfWork::RecoveryUnitState::kActiveUnitOfWork); + } } // Try to insert the batch one-at-a-time. This path is executed both for singular batches, and @@ -479,6 +495,7 @@ bool insertBatchAndHandleErrors(OperationContext* opCtx, if (!collection) acquireCollection(); lastOpFixer->startingOp(); + MONGO_LOG(0) << "yf: insertDocuments, key = " << it->doc.firstElement().fieldNameStringData() << ", value = " << it->doc.firstElement().toString(); insertDocuments(opCtx, collection->getCollection(), it, it + 1, fromMigrate); lastOpFixer->finishedOpSuccessfully(); SingleWriteResult result; @@ -496,8 +513,20 @@ bool insertBatchAndHandleErrors(OperationContext* opCtx, } }); } catch (const DBException& ex) { - bool canContinue = - handleError(opCtx, ex, wholeOp.getNamespace(), wholeOp.getWriteCommandBase(), out); + bool canContinue = handleError( + opCtx, ex, wholeOp.getNamespace(), wholeOp.getWriteCommandBase(), out, true); + + MONGO_LOG(0) << "yf: handleError, exception = " << ex.toString() << ", code = " << ex.code() << ", key = " << it->doc.firstElement().fieldNameStringData() << ", value = " << it->doc.firstElement().toString() ; + + // Reset RecoveryUnit state if it was set to kFailedUnitOfWork by the nested WriteUnitOfWork + // in insertDocuments. This is necessary when continuing to the next iteration after an error. + // We only reset the state without aborting the transaction, since the entire insertMany operation + // should be within a single command-level transaction. + if (opCtx->getRecoveryUnitState() == WriteUnitOfWork::RecoveryUnitState::kFailedUnitOfWork) { + // opCtx->recoveryUnit()->abandonSnapshot(); + opCtx->setRecoveryUnitState(WriteUnitOfWork::RecoveryUnitState::kActiveUnitOfWork); + } + if (!canContinue) return false; } @@ -616,7 +645,7 @@ WriteResult performInserts(OperationContext* opCtx, MONGO_UNREACHABLE; } catch (const DBException& ex) { canContinue = handleError( - opCtx, ex, wholeOp.getNamespace(), wholeOp.getWriteCommandBase(), &out); + opCtx, ex, wholeOp.getNamespace(), wholeOp.getWriteCommandBase(), &out, true); } } From 057876922e86df6b4c10625151a1445123e8e116 Mon Sep 17 00:00:00 2001 From: lokax Date: Fri, 26 Dec 2025 14:31:17 +0800 Subject: [PATCH 02/16] phase3 add test --- .../core/batch_write_command_insert.js | 111 ++++++++++++------ tests/jstests/core/bulk_write_error_format.js | 1 + 2 files changed, 78 insertions(+), 34 deletions(-) create mode 100644 tests/jstests/core/bulk_write_error_format.js diff --git a/tests/jstests/core/batch_write_command_insert.js b/tests/jstests/core/batch_write_command_insert.js index fbe531bb94..4d19dd6886 100644 --- a/tests/jstests/core/batch_write_command_insert.js +++ b/tests/jstests/core/batch_write_command_insert.js @@ -112,11 +112,12 @@ request = { ordered: false }; result = coll.runCommand(request); -assert(!result.ok, tojson(result)); -// assert(result.writeErrors != null); -// assert.eq(1, result.writeErrors.length); -// assert.eq(0, result.n); -// assert.eq(coll.count(), 0); +// assert(!result.ok, tojson(result)); +assert(result.ok, tojson(result)); +assert(result.writeErrors != null); +assert.eq(1, result.writeErrors.length); +assert.eq(0, result.n); +assert.eq(coll.count(), 0); // // Document with valid nested key should insert (op log format) @@ -195,10 +196,9 @@ request = { documents: [{a: 1}] }; result = coll.runCommand(request); -// assert(result.ok, tojson(result)); -assert(!result.ok, tojson(result)); -// assert.eq(1, result.writeErrors.length); -// assert.eq(0, result.n); +assert(result.ok, tojson(result)); +assert.eq(1, result.writeErrors.length); +assert.eq(0, result.n); assert.eq(coll.count(), 1); // @@ -211,22 +211,20 @@ request = { ordered: false }; result = coll.runCommand(request); -// assert(result.ok, tojson(result)); -assert(!result.ok, tojson(result)); -// assert.eq(1, result.n); -// assert.eq(2, result.writeErrors.length); -// assert.eq(coll.count(), 1); - -// assert.eq(1, result.writeErrors[0].index); -// assert.eq('number', typeof result.writeErrors[0].code); -// assert.eq('string', typeof result.writeErrors[0].errmsg); -// -// assert.eq(2, result.writeErrors[1].index); -// assert.eq('number', typeof result.writeErrors[1].code); -// assert.eq('string', typeof result.writeErrors[1].errmsg); - -// assert.eq(coll.count(), 1); -assert.eq(coll.count(), 0); +assert(result.ok, tojson(result)); +assert.eq(1, result.n); +assert.eq(2, result.writeErrors.length); +assert.eq(coll.count(), 1); + +assert.eq(1, result.writeErrors[0].index); +assert.eq('number', typeof result.writeErrors[0].code); +assert.eq('string', typeof result.writeErrors[0].errmsg); + +assert.eq(2, result.writeErrors[1].index); +assert.eq('number', typeof result.writeErrors[1].code); +assert.eq('string', typeof result.writeErrors[1].errmsg); + +assert.eq(coll.count(), 1); // // Fail with duplicate key error on multiple document inserts, ordered true @@ -238,17 +236,15 @@ request = { ordered: true }; result = coll.runCommand(request); -// assert(result.ok, tojson(result)); -assert(!result.ok, tojson(result)); -// assert.eq(1, result.n); -// assert.eq(1, result.writeErrors.length); +assert(result.ok, tojson(result)); +assert.eq(1, result.n); +assert.eq(1, result.writeErrors.length); -// assert.eq(1, result.writeErrors[0].index); -// assert.eq('number', typeof result.writeErrors[0].code); -// assert.eq('string', typeof result.writeErrors[0].errmsg); +assert.eq(1, result.writeErrors[0].index); +assert.eq('number', typeof result.writeErrors[0].code); +assert.eq('string', typeof result.writeErrors[0].errmsg); -// assert.eq(coll.count(), 1); -assert.eq(coll.count(), 0); +assert.eq(coll.count(), 1); // // Ensure _id is the first field in all documents @@ -428,3 +424,50 @@ allIndexes = coll.getIndexes(); spec = GetIndexHelpers.findByName(allIndexes, "x_1"); assert.neq(null, spec, "Index with name 'x_1' not found: " + tojson(allIndexes)); assert.lte(2, spec.v, tojson(spec)); + +// +// Additional tests for bulk write error format (ok: 1 with writeErrors) +// These tests verify that DuplicateKey errors return ok: 1 with writeErrors array, +// matching MongoDB's BulkWriteException format for dsync compatibility. +// + +// +// Test ordered batch with DuplicateKey error - stops at first error +coll.drop(); +coll.ensureIndex({a: 1}, {unique: true}); +coll.insert({a: 1}); // Insert initial document +request = { + insert: coll.getName(), + documents: [{a: 1}, {a: 2}, {a: 3}], + writeConcern: {w: 1}, + ordered: true +}; +result = coll.runCommand(request); +assert(result.ok, tojson(result)); +assert.eq(0, result.n, "Expected n: 0 (ordered stops at first error)"); +assert.eq(1, result.writeErrors.length, "Expected one write error"); +assert.eq(result.writeErrors[0].code, ErrorCodes.DuplicateKey, "Expected DuplicateKey error"); +assert.eq(result.writeErrors[0].index, 0, "Expected error at index 0"); +assert.eq(coll.count(), 1, "Expected only initial document"); + +// +// Test unordered batch with multiple DuplicateKey errors - continues after errors +coll.remove({}); +coll.insert({a: 1}); // Insert initial document +request = { + insert: coll.getName(), + documents: [{a: 1}, {a: 2}, {a: 1}, {a: 3}], + writeConcern: {w: 1}, + ordered: false +}; +result = coll.runCommand(request); +assert(result.ok, tojson(result)); +assert.eq(2, result.n, "Expected 2 successful inserts ({a: 2} and {a: 3})"); +assert.gte(result.writeErrors.length, 1, "Expected at least one write error"); +// Verify all writeErrors are DuplicateKey +result.writeErrors.forEach(function(err) { + assert.eq(err.code, ErrorCodes.DuplicateKey, "Expected DuplicateKey error: " + tojson(err)); + assert.eq('number', typeof err.code); + assert.eq('string', typeof err.errmsg); +}); +assert.eq(coll.count(), 3, "Expected 3 documents (initial + 2 successful inserts)"); diff --git a/tests/jstests/core/bulk_write_error_format.js b/tests/jstests/core/bulk_write_error_format.js new file mode 100644 index 0000000000..8b13789179 --- /dev/null +++ b/tests/jstests/core/bulk_write_error_format.js @@ -0,0 +1 @@ + From fa141fc0951996e79b9a46ce160bd7af29bdb6b8 Mon Sep 17 00:00:00 2001 From: lokax Date: Fri, 26 Dec 2025 14:45:10 +0800 Subject: [PATCH 03/16] failed build --- src/mongo/db/catalog/collection_impl.cpp | 107 +++++++++++++++++- src/mongo/db/catalog/collection_impl.h | 10 ++ .../db/modules/eloq/src/eloq_record_store.cpp | 1 + 3 files changed, 116 insertions(+), 2 deletions(-) diff --git a/src/mongo/db/catalog/collection_impl.cpp b/src/mongo/db/catalog/collection_impl.cpp index 06dcd6f9a8..4eb8e2c1eb 100644 --- a/src/mongo/db/catalog/collection_impl.cpp +++ b/src/mongo/db/catalog/collection_impl.cpp @@ -74,6 +74,9 @@ #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" +#include "mongo/db/modules/eloq/src/eloq_recovery_unit.h" +#include "mongo/db/storage/key_string.h" + namespace mongo { MONGO_REGISTER_SHIM(Collection::makeImpl) @@ -502,6 +505,12 @@ Status CollectionImpl::_insertDocuments(OperationContext* opCtx, opCtx->lockState(), ResourceId(RESOURCE_METADATA, _ns.ns()), MODE_X}; } + // Pre-validate all unique index constraints before inserting into recordStore. + // This prevents data inconsistency where recordStore has data but uniqueIndex doesn't. + Status status = _validateUniqueIndexConstraints(opCtx, begin, end); + if (!status.isOK()) + return status; + std::vector records; records.reserve(count); std::vector timestamps; @@ -513,8 +522,7 @@ Status CollectionImpl::_insertDocuments(OperationContext* opCtx, Timestamp timestamp = Timestamp(it->oplogSlot.opTime.getTimestamp()); timestamps.push_back(timestamp); } - Status status = - _recordStore->insertRecords(opCtx, &records, ×tamps, _enforceQuota(enforceQuota)); + status = _recordStore->insertRecords(opCtx, &records, ×tamps, _enforceQuota(enforceQuota)); if (!status.isOK()) return status; @@ -539,6 +547,101 @@ Status CollectionImpl::_insertDocuments(OperationContext* opCtx, return status; } +Status CollectionImpl::_validateUniqueIndexConstraints( + OperationContext* opCtx, + std::vector::const_iterator begin, + std::vector::const_iterator end) { + // Only validate for Eloq storage engine + auto* eloqRU = dynamic_cast(opCtx->recoveryUnit()); + if (!eloqRU) { + // Not Eloq storage engine, skip validation + return Status::OK(); + } + + // Get all unique indexes + IndexCatalog::IndexIterator indexIter = _indexCatalog.getIndexIterator(opCtx, false); + std::vector uniqueIndexes; + while (indexIter.more()) { + IndexCatalogEntry* entry = indexIter.next(); + const IndexDescriptor* desc = entry->descriptor(); + if (desc->unique() && entry->isReady(opCtx)) { + uniqueIndexes.push_back(entry); + } + } + + // If no unique indexes, nothing to validate + if (uniqueIndexes.empty()) { + return Status::OK(); + } + + // Construct table name for the collection + txservice::TableName tableName{Eloq::MongoTableToTxServiceTableName(_ns.ns(), true)}; + + // For each unique index, track keys seen in the current batch to detect duplicates within batch + std::map batchKeys; + + // For each document to be inserted + for (auto docIt = begin; docIt != end; ++docIt) { + const BSONObj& doc = docIt->doc; + + // For each unique index + for (IndexCatalogEntry* entry : uniqueIndexes) { + const IndexDescriptor* desc = entry->descriptor(); + const IndexAccessMethod* accessMethod = entry->accessMethod(); + + // Extract keys for this index + BSONObjSet keys = SimpleBSONObjComparator::kInstance.makeBSONObjSet(); + MultikeyPaths multikeyPaths; + accessMethod->getKeys(doc, IndexAccessMethod::GetKeysMode::kEnforceConstraints, &keys, &multikeyPaths); + + // Construct index table name + txservice::TableName indexName{desc->isIdIndex() + ? tableName + : Eloq::MongoIndexToTxServiceTableName( + desc->parentNS(), desc->indexName(), true)}; + + // Get index schema version + uint64_t keySchemaVersion = eloqRU->getIndexSchema(tableName, indexName)->SchemaTs(); + + // Check each key for duplicates + for (const BSONObj& key : keys) { + // First, check if this key was already seen in the current batch + auto& seenKeys = batchKeys[desc]; + if (seenKeys.count(key) > 0) { + // Duplicate key within the batch + return {ErrorCodes::DuplicateKey, + "Duplicate Key: " + indexName.String() + " key: " + key.toString()}; + } + + // Convert BSON key to MongoKey + KeyString keyString{KeyString::kLatestVersion, key, desc->ordering()}; + auto mongoKey = std::make_unique(keyString.getBuffer(), + keyString.getSize()); + + // Check if key exists in database or writeset (getKV checks both) + Eloq::MongoRecord mongoRecord; + auto [exists, err] = eloqRU->getKV( + opCtx, indexName, keySchemaVersion, mongoKey.get(), &mongoRecord, true); + + if (err != txservice::TxErrorCode::NO_ERROR) { + return TxErrorCodeToMongoStatus(err); + } + + if (exists) { + // Duplicate key found in database or writeset + return {ErrorCodes::DuplicateKey, + "Duplicate Key: " + indexName.String() + " key: " + key.toString()}; + } + + // Add key to batch keys set for duplicate detection within batch + seenKeys.insert(key.getOwned()); + } + } + } + + return Status::OK(); +} + bool CollectionImpl::haveCappedWaiters() { // Waiters keep a shared_ptr to '_cappedNotifier', so there are waiters if this CollectionImpl's // shared_ptr is not unique (use_count > 1). diff --git a/src/mongo/db/catalog/collection_impl.h b/src/mongo/db/catalog/collection_impl.h index 47ba8e165f..f0b900ff9b 100644 --- a/src/mongo/db/catalog/collection_impl.h +++ b/src/mongo/db/catalog/collection_impl.h @@ -416,6 +416,16 @@ class CollectionImpl final : virtual public Collection::Impl, bool enforceQuota, OpDebug* opDebug); + /** + * Validates all unique index constraints before inserting into recordStore. + * This prevents data inconsistency where recordStore has data but uniqueIndex doesn't. + * Returns an error if any duplicate key is found in any unique index. + */ + Status _validateUniqueIndexConstraints( + OperationContext* opCtx, + std::vector::const_iterator begin, + std::vector::const_iterator end); + /** * Perform update when document move will be required. diff --git a/src/mongo/db/modules/eloq/src/eloq_record_store.cpp b/src/mongo/db/modules/eloq/src/eloq_record_store.cpp index a77de5aaef..b2edd60bf7 100644 --- a/src/mongo/db/modules/eloq/src/eloq_record_store.cpp +++ b/src/mongo/db/modules/eloq/src/eloq_record_store.cpp @@ -1128,6 +1128,7 @@ Status EloqRecordStore::_insertRecords(OperationContext* opCtx, mongoRecord->SetUnpackInfo(typeBits.getBuffer(), typeBits.getSize()); } bool checkUnique = true; + MONGO_LOG(1) << "EloqRecordStore::_insertRecords setKV, table: " << _tableName.StringView() << ", txn: " << ru->getTxm()->TxNumber() << ", key: " << ks.toString() << ", checkUnique: " << checkUnique; err = ru->setKV(_tableName, pkeySchemaVersion, std::move(mongoKey), From f66f1e1c13d7b2509995609a78574f08e72f6614 Mon Sep 17 00:00:00 2001 From: lokax Date: Fri, 26 Dec 2025 16:45:35 +0800 Subject: [PATCH 04/16] a --- src/mongo/db/catalog/collection_impl.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/mongo/db/catalog/collection_impl.cpp b/src/mongo/db/catalog/collection_impl.cpp index 4eb8e2c1eb..b022fe0970 100644 --- a/src/mongo/db/catalog/collection_impl.cpp +++ b/src/mongo/db/catalog/collection_impl.cpp @@ -74,6 +74,7 @@ #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" +#include "mongo/db/modules/eloq/src/base/eloq_util.h" #include "mongo/db/modules/eloq/src/eloq_recovery_unit.h" #include "mongo/db/storage/key_string.h" From 287160538113f12e3734b6f5e883543b9097b2cf Mon Sep 17 00:00:00 2001 From: lokax Date: Mon, 29 Dec 2025 14:06:02 +0800 Subject: [PATCH 05/16] check duplicate within record store --- src/mongo/db/catalog/collection_impl.cpp | 105 +------------- src/mongo/db/catalog/collection_impl.h | 10 -- .../eloq/src/eloq_collection_helpers.cpp | 136 ++++++++++++++++++ .../eloq/src/eloq_collection_helpers.h | 48 +++++++ src/mongo/db/modules/eloq/src/eloq_index.cpp | 1 + .../db/modules/eloq/src/eloq_record_store.cpp | 96 +++++++++++++ 6 files changed, 282 insertions(+), 114 deletions(-) create mode 100644 src/mongo/db/modules/eloq/src/eloq_collection_helpers.cpp create mode 100644 src/mongo/db/modules/eloq/src/eloq_collection_helpers.h diff --git a/src/mongo/db/catalog/collection_impl.cpp b/src/mongo/db/catalog/collection_impl.cpp index b022fe0970..ba3fa50fef 100644 --- a/src/mongo/db/catalog/collection_impl.cpp +++ b/src/mongo/db/catalog/collection_impl.cpp @@ -74,8 +74,6 @@ #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" -#include "mongo/db/modules/eloq/src/base/eloq_util.h" -#include "mongo/db/modules/eloq/src/eloq_recovery_unit.h" #include "mongo/db/storage/key_string.h" namespace mongo { @@ -506,12 +504,6 @@ Status CollectionImpl::_insertDocuments(OperationContext* opCtx, opCtx->lockState(), ResourceId(RESOURCE_METADATA, _ns.ns()), MODE_X}; } - // Pre-validate all unique index constraints before inserting into recordStore. - // This prevents data inconsistency where recordStore has data but uniqueIndex doesn't. - Status status = _validateUniqueIndexConstraints(opCtx, begin, end); - if (!status.isOK()) - return status; - std::vector records; records.reserve(count); std::vector timestamps; @@ -523,7 +515,7 @@ Status CollectionImpl::_insertDocuments(OperationContext* opCtx, Timestamp timestamp = Timestamp(it->oplogSlot.opTime.getTimestamp()); timestamps.push_back(timestamp); } - status = _recordStore->insertRecords(opCtx, &records, ×tamps, _enforceQuota(enforceQuota)); + Status status = _recordStore->insertRecords(opCtx, &records, ×tamps, _enforceQuota(enforceQuota)); if (!status.isOK()) return status; @@ -548,101 +540,6 @@ Status CollectionImpl::_insertDocuments(OperationContext* opCtx, return status; } -Status CollectionImpl::_validateUniqueIndexConstraints( - OperationContext* opCtx, - std::vector::const_iterator begin, - std::vector::const_iterator end) { - // Only validate for Eloq storage engine - auto* eloqRU = dynamic_cast(opCtx->recoveryUnit()); - if (!eloqRU) { - // Not Eloq storage engine, skip validation - return Status::OK(); - } - - // Get all unique indexes - IndexCatalog::IndexIterator indexIter = _indexCatalog.getIndexIterator(opCtx, false); - std::vector uniqueIndexes; - while (indexIter.more()) { - IndexCatalogEntry* entry = indexIter.next(); - const IndexDescriptor* desc = entry->descriptor(); - if (desc->unique() && entry->isReady(opCtx)) { - uniqueIndexes.push_back(entry); - } - } - - // If no unique indexes, nothing to validate - if (uniqueIndexes.empty()) { - return Status::OK(); - } - - // Construct table name for the collection - txservice::TableName tableName{Eloq::MongoTableToTxServiceTableName(_ns.ns(), true)}; - - // For each unique index, track keys seen in the current batch to detect duplicates within batch - std::map batchKeys; - - // For each document to be inserted - for (auto docIt = begin; docIt != end; ++docIt) { - const BSONObj& doc = docIt->doc; - - // For each unique index - for (IndexCatalogEntry* entry : uniqueIndexes) { - const IndexDescriptor* desc = entry->descriptor(); - const IndexAccessMethod* accessMethod = entry->accessMethod(); - - // Extract keys for this index - BSONObjSet keys = SimpleBSONObjComparator::kInstance.makeBSONObjSet(); - MultikeyPaths multikeyPaths; - accessMethod->getKeys(doc, IndexAccessMethod::GetKeysMode::kEnforceConstraints, &keys, &multikeyPaths); - - // Construct index table name - txservice::TableName indexName{desc->isIdIndex() - ? tableName - : Eloq::MongoIndexToTxServiceTableName( - desc->parentNS(), desc->indexName(), true)}; - - // Get index schema version - uint64_t keySchemaVersion = eloqRU->getIndexSchema(tableName, indexName)->SchemaTs(); - - // Check each key for duplicates - for (const BSONObj& key : keys) { - // First, check if this key was already seen in the current batch - auto& seenKeys = batchKeys[desc]; - if (seenKeys.count(key) > 0) { - // Duplicate key within the batch - return {ErrorCodes::DuplicateKey, - "Duplicate Key: " + indexName.String() + " key: " + key.toString()}; - } - - // Convert BSON key to MongoKey - KeyString keyString{KeyString::kLatestVersion, key, desc->ordering()}; - auto mongoKey = std::make_unique(keyString.getBuffer(), - keyString.getSize()); - - // Check if key exists in database or writeset (getKV checks both) - Eloq::MongoRecord mongoRecord; - auto [exists, err] = eloqRU->getKV( - opCtx, indexName, keySchemaVersion, mongoKey.get(), &mongoRecord, true); - - if (err != txservice::TxErrorCode::NO_ERROR) { - return TxErrorCodeToMongoStatus(err); - } - - if (exists) { - // Duplicate key found in database or writeset - return {ErrorCodes::DuplicateKey, - "Duplicate Key: " + indexName.String() + " key: " + key.toString()}; - } - - // Add key to batch keys set for duplicate detection within batch - seenKeys.insert(key.getOwned()); - } - } - } - - return Status::OK(); -} - bool CollectionImpl::haveCappedWaiters() { // Waiters keep a shared_ptr to '_cappedNotifier', so there are waiters if this CollectionImpl's // shared_ptr is not unique (use_count > 1). diff --git a/src/mongo/db/catalog/collection_impl.h b/src/mongo/db/catalog/collection_impl.h index f0b900ff9b..47ba8e165f 100644 --- a/src/mongo/db/catalog/collection_impl.h +++ b/src/mongo/db/catalog/collection_impl.h @@ -416,16 +416,6 @@ class CollectionImpl final : virtual public Collection::Impl, bool enforceQuota, OpDebug* opDebug); - /** - * Validates all unique index constraints before inserting into recordStore. - * This prevents data inconsistency where recordStore has data but uniqueIndex doesn't. - * Returns an error if any duplicate key is found in any unique index. - */ - Status _validateUniqueIndexConstraints( - OperationContext* opCtx, - std::vector::const_iterator begin, - std::vector::const_iterator end); - /** * Perform update when document move will be required. diff --git a/src/mongo/db/modules/eloq/src/eloq_collection_helpers.cpp b/src/mongo/db/modules/eloq/src/eloq_collection_helpers.cpp new file mode 100644 index 0000000000..7e3c031603 --- /dev/null +++ b/src/mongo/db/modules/eloq/src/eloq_collection_helpers.cpp @@ -0,0 +1,136 @@ +/** + * Copyright (C) 2025 EloqData Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the license: + * 1. GNU Affero General Public License, version 3, as published by the Free + * Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +#include "mongo/db/modules/eloq/src/eloq_collection_helpers.h" + +#include "mongo/base/error_codes.h" +#include "mongo/bson/simple_bsonobj_comparator.h" +#include "mongo/db/catalog/index_catalog.h" +#include "mongo/db/index/index_access_method.h" +#include "mongo/db/index/index_descriptor.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/ops/insert.h" +#include "mongo/db/storage/key_string.h" + +#include "mongo/db/modules/eloq/src/base/eloq_util.h" +#include "mongo/db/modules/eloq/src/eloq_recovery_unit.h" + +namespace mongo { +namespace eloq_collection_helpers { + +Status validateUniqueIndexConstraints( + OperationContext* opCtx, + const NamespaceString& ns, + IndexCatalog* indexCatalog, + std::vector::const_iterator begin, + std::vector::const_iterator end) { + // Only validate for Eloq storage engine + auto* eloqRU = dynamic_cast(opCtx->recoveryUnit()); + if (!eloqRU) { + // Not Eloq storage engine, skip validation + return Status::OK(); + } + + // Get all unique indexes + IndexCatalog::IndexIterator indexIter = indexCatalog->getIndexIterator(opCtx, false); + std::vector uniqueIndexes; + while (indexIter.more()) { + IndexCatalogEntry* entry = indexIter.next(); + const IndexDescriptor* desc = entry->descriptor(); + if (desc->unique() && entry->isReady(opCtx)) { + uniqueIndexes.push_back(entry); + } + } + + // If no unique indexes, nothing to validate + if (uniqueIndexes.empty()) { + return Status::OK(); + } + + // Construct table name for the collection + txservice::TableName tableName{Eloq::MongoTableToTxServiceTableName(ns.ns(), true)}; + + // For each unique index, track keys seen in the current batch to detect duplicates within batch + std::map batchKeys; + + // For each document to be inserted + for (auto docIt = begin; docIt != end; ++docIt) { + const BSONObj& doc = docIt->doc; + + // For each unique index + for (IndexCatalogEntry* entry : uniqueIndexes) { + const IndexDescriptor* desc = entry->descriptor(); + const IndexAccessMethod* accessMethod = entry->accessMethod(); + + // Extract keys for this index + BSONObjSet keys = SimpleBSONObjComparator::kInstance.makeBSONObjSet(); + MultikeyPaths multikeyPaths; + accessMethod->getKeys( + doc, IndexAccessMethod::GetKeysMode::kEnforceConstraints, &keys, &multikeyPaths); + + // Construct index table name + txservice::TableName indexName{desc->isIdIndex() + ? tableName + : Eloq::MongoIndexToTxServiceTableName( + desc->parentNS(), desc->indexName(), true)}; + + // Get index schema version + uint64_t keySchemaVersion = eloqRU->getIndexSchema(tableName, indexName)->SchemaTs(); + + // Check each key for duplicates + for (const BSONObj& key : keys) { + // First, check if this key was already seen in the current batch + auto& seenKeys = batchKeys[desc]; + if (seenKeys.count(key) > 0) { + // Duplicate key within the batch + return {ErrorCodes::DuplicateKey, + "Duplicate Key: " + indexName.String() + " key: " + key.toString()}; + } + + // Convert BSON key to MongoKey + KeyString keyString{KeyString::kLatestVersion, key, desc->ordering()}; + auto mongoKey = std::make_unique(keyString.getBuffer(), + keyString.getSize()); + + // Check if key exists in database or writeset (getKV checks both) + Eloq::MongoRecord mongoRecord; + auto [exists, err] = eloqRU->getKV( + opCtx, indexName, keySchemaVersion, mongoKey.get(), &mongoRecord, true); + + if (err != txservice::TxErrorCode::NO_ERROR) { + return Eloq::TxErrorCodeToMongoStatus(err); + } + + if (exists) { + // Duplicate key found in database or writeset + return {ErrorCodes::DuplicateKey, + "Duplicate Key: " + indexName.String() + " key: " + key.toString()}; + } + + // Add key to batch keys set for duplicate detection within batch + seenKeys.insert(key.getOwned()); + } + } + } + + return Status::OK(); +} + +} // namespace eloq_collection_helpers +} // namespace mongo + diff --git a/src/mongo/db/modules/eloq/src/eloq_collection_helpers.h b/src/mongo/db/modules/eloq/src/eloq_collection_helpers.h new file mode 100644 index 0000000000..81a0144c70 --- /dev/null +++ b/src/mongo/db/modules/eloq/src/eloq_collection_helpers.h @@ -0,0 +1,48 @@ +/** + * Copyright (C) 2025 EloqData Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the license: + * 1. GNU Affero General Public License, version 3, as published by the Free + * Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ +#pragma once + +#include "mongo/base/status.h" +#include "mongo/db/operation_context.h" + +namespace mongo { + +class IndexCatalog; +class NamespaceString; +struct InsertStatement; + +namespace eloq_collection_helpers { + +/** + * Validates all unique index constraints before inserting into recordStore. + * This prevents data inconsistency where recordStore has data but uniqueIndex doesn't. + * Returns an error if any duplicate key is found in any unique index. + * + * This function only performs validation for Eloq storage engine. + * For other storage engines, it returns Status::OK() immediately. + */ +Status validateUniqueIndexConstraints( + OperationContext* opCtx, + const NamespaceString& ns, + IndexCatalog* indexCatalog, + std::vector::const_iterator begin, + std::vector::const_iterator end); + +} // namespace eloq_collection_helpers +} // namespace mongo + diff --git a/src/mongo/db/modules/eloq/src/eloq_index.cpp b/src/mongo/db/modules/eloq/src/eloq_index.cpp index bad826e240..e3f86c71fe 100644 --- a/src/mongo/db/modules/eloq/src/eloq_index.cpp +++ b/src/mongo/db/modules/eloq/src/eloq_index.cpp @@ -674,6 +674,7 @@ Status EloqUniqueIndex::insert(OperationContext* opCtx, } if (exists) { + MONGO_LOG(0) << "yf: duplicate key found, index: " << _indexName.StringView() << ", key: " << key.toString() << ", mongo key = " << mongoKey->ToString(); return {ErrorCodes::Error::DuplicateKey, "Duplicate Key: " + _indexName.String()}; } diff --git a/src/mongo/db/modules/eloq/src/eloq_record_store.cpp b/src/mongo/db/modules/eloq/src/eloq_record_store.cpp index b2edd60bf7..fab49e62d3 100644 --- a/src/mongo/db/modules/eloq/src/eloq_record_store.cpp +++ b/src/mongo/db/modules/eloq/src/eloq_record_store.cpp @@ -1116,6 +1116,102 @@ Status EloqRecordStore::_insertRecords(OperationContext* opCtx, } } + // Check unique index constraints for all records before inserting. + // Use dirtySchema if it exists (reflects ongoing index create/drop operations), + // otherwise use schema (committed state). + // This ensures we check the same indexes that indexRecords will insert into. + const Eloq::MongoTableSchema* tableSchemaToUse = table._dirtySchema + ? static_cast(table._dirtySchema.get()) + : static_cast(table._schema.get()); + + const std::unordered_map>* allIndexes = tableSchemaToUse->GetIndexes(); + if (allIndexes && !allIndexes->empty()) { + // Collect unique indexes + std::vector> uniqueIndexes; + for (const auto& [kid, index] : *allIndexes) { + const auto* keySchema = static_cast(index.second.sk_schema_.get()); + if (keySchema && keySchema->Unique()) { + uniqueIndexes.emplace_back(index.first, keySchema); + } + } + + // If there are unique indexes, validate all records using batchGetKV + if (!uniqueIndexes.empty()) { + // For each unique index, collect all keys to check + for (const auto& [indexName, keySchema] : uniqueIndexes) { + // Build batch for this unique index + // Use vectors to store KeyString buffers and MongoKey objects + std::vector keyStringBuffers; // Store KeyString buffer data + std::vector> mongoKeys; // Store MongoKey objects + std::vector mongoRecords; // Store MongoRecord objects + std::vector indexBatchTuples; + // std::vector> recordKeyMapping; // Maps batch index to (record index, BSON key) + + // For each document to be inserted + for (size_t i = 0; i < nRecords; i++) { + BSONObj obj{records[i].data.data()}; + + // Extract keys for this index + BSONObjSet keys = SimpleBSONObjComparator::kInstance.makeBSONObjSet(); + MultikeyPaths multikeyPaths; + keySchema->GetKeys(obj, &keys, &multikeyPaths); + + // Check each key for duplicates + for (const BSONObj& key : keys) { + // Convert BSON key to KeyString + KeyString keyString{KeyString::kLatestVersion, key, keySchema->Ordering()}; + // keyString.resetToKey(key, keySchema->Ordering(), records[i].id); + + // Store KeyString buffer data + keyStringBuffers.emplace_back(keyString.getBuffer(), keyString.getSize()); + + // Create MongoKey from buffer + auto mongoKey = std::make_unique( + keyStringBuffers.back().data(), keyStringBuffers.back().size()); + mongoKeys.push_back(std::move(mongoKey)); + + // Create MongoRecord + mongoRecords.emplace_back(); + + // Store mapping for error reporting + // recordKeyMapping.emplace_back(i, key.getOwned()); + + MONGO_LOG(0) << "yf: mongo key = " << mongoKeys.back()->ToString() << ", key = " << key.toString(); + + // Add to batch (mongoRecord is not needed, just pass nullptr or empty record) + indexBatchTuples.emplace_back(txservice::TxKey(mongoKeys.back().get()), + &mongoRecords.back()); + } + } + + if (!indexBatchTuples.empty()) { + MONGO_LOG(0) << "yf: index name = " << indexName.StringView() << ", n records = " << nRecords << ", check unique index"; + // Use batchGetKV to check all keys + txservice::TxErrorCode err = ru->batchGetKV( + opCtx, indexName, keySchema->SchemaTs(), indexBatchTuples, true); + if (err != txservice::TxErrorCode::NO_ERROR) { + MONGO_LOG(1) << "EloqRecordStore::_insertRecords batchGetKV failed for unique index, index: " + << indexName.StringView() << ", error: " << txservice::TxErrorMessage(err); + return TxErrorCodeToMongoStatus(err); + } + + // Check results for duplicates + for (size_t batchIdx = 0; batchIdx < indexBatchTuples.size(); batchIdx++) { + const txservice::ScanBatchTuple& tuple = indexBatchTuples[batchIdx]; + if (tuple.status_ == txservice::RecordStatus::Normal) { + // Duplicate key found in database or writeset + MONGO_LOG(0) << "yf: duplicate key found, index: " << indexName.StringView() << ", key: " << keyStringBuffers[batchIdx]; + return {ErrorCodes::DuplicateKey, + "DuplicateKey"}; + } else { + invariant(tuple.status_ == txservice::RecordStatus::Deleted); + } + } + } + } + } + } + // All keys are valid (no duplicates found). Now insert all records. for (size_t i = 0; i < nRecords; i++) { std::unique_ptr& mongoKey = batchEntries[i].mongoKey; From bded4ddd6f0837a560ae9405b94a556f995b3ca6 Mon Sep 17 00:00:00 2001 From: lokax Date: Mon, 29 Dec 2025 15:41:27 +0800 Subject: [PATCH 06/16] add new interface to check duplicate key --- src/mongo/db/catalog/collection_impl.cpp | 35 +++- src/mongo/db/catalog/index_catalog.h | 15 ++ src/mongo/db/catalog/index_catalog_impl.cpp | 35 +++- src/mongo/db/catalog/index_catalog_impl.h | 3 + src/mongo/db/index/index_access_method.cpp | 11 ++ src/mongo/db/index/index_access_method.h | 20 +++ src/mongo/db/modules/eloq/src/eloq_index.cpp | 105 +++++++++++- src/mongo/db/modules/eloq/src/eloq_index.h | 14 ++ .../db/modules/eloq/src/eloq_record_store.cpp | 154 +++++++++++------- .../db/modules/eloq/src/eloq_record_store.h | 3 + src/mongo/db/storage/record_store.h | 14 ++ src/mongo/db/storage/sorted_data_interface.h | 13 ++ .../core/batch_write_command_insert.js | 4 +- 13 files changed, 359 insertions(+), 67 deletions(-) diff --git a/src/mongo/db/catalog/collection_impl.cpp b/src/mongo/db/catalog/collection_impl.cpp index ba3fa50fef..c49d1bcb63 100644 --- a/src/mongo/db/catalog/collection_impl.cpp +++ b/src/mongo/db/catalog/collection_impl.cpp @@ -515,20 +515,45 @@ Status CollectionImpl::_insertDocuments(OperationContext* opCtx, Timestamp timestamp = Timestamp(it->oplogSlot.opTime.getTimestamp()); timestamps.push_back(timestamp); } - Status status = _recordStore->insertRecords(opCtx, &records, ×tamps, _enforceQuota(enforceQuota)); + + // Batch check for duplicate keys before inserting + Status status = _recordStore->batchCheckDuplicateKey(opCtx, &records); if (!status.isOK()) return status; + // Prepare BsonRecords for index duplicate check and later indexRecords call std::vector bsonRecords; bsonRecords.reserve(count); int recordIndex = 0; for (auto it = begin; it != end; it++) { - RecordId loc = records[recordIndex++].id; - // invariant(RecordId::min() < loc); - // invariant(loc < RecordId::max()); - + RecordId loc = records[recordIndex].id; BsonRecord bsonRecord = {loc, Timestamp(it->oplogSlot.opTime.getTimestamp()), &(it->doc)}; bsonRecords.push_back(bsonRecord); + recordIndex++; + } + + // Extract BSONObj pointers from BsonRecords for batchCheckDuplicateKey + std::vector bsonObjPtrs; + bsonObjPtrs.reserve(count); + for (const auto& bsonRecord : bsonRecords) { + bsonObjPtrs.push_back(bsonRecord.docPtr); + } + + // Batch check for duplicate keys in unique indexes + status = _indexCatalog.batchCheckDuplicateKey(opCtx, bsonObjPtrs); + if (!status.isOK()) { + return status; +} + + // Now insert records (primary keys already checked) + status = _recordStore->insertRecords(opCtx, &records, ×tamps, _enforceQuota(enforceQuota)); + if (!status.isOK()) { + return status; +} + + // Update RecordIds in bsonRecords after insertRecords sets them + for (size_t i = 0; i < bsonRecords.size(); i++) { + bsonRecords[i].id = records[i].id; } int64_t keysInserted; diff --git a/src/mongo/db/catalog/index_catalog.h b/src/mongo/db/catalog/index_catalog.h index d8c30a27f6..f0dea75c76 100644 --- a/src/mongo/db/catalog/index_catalog.h +++ b/src/mongo/db/catalog/index_catalog.h @@ -231,6 +231,9 @@ class IndexCatalog { const std::vector& bsonRecords, int64_t* keysInsertedOut) = 0; + virtual Status batchCheckDuplicateKey(OperationContext* opCtx, + const std::vector& bsonObjPtrs) = 0; + virtual void unindexRecord(OperationContext* opCtx, const BSONObj& obj, const RecordId& loc, @@ -527,6 +530,18 @@ class IndexCatalog { return this->_impl().unindexRecord(opCtx, obj, loc, noWarn, keysDeletedOut); } + /** + * Batch check for duplicate keys in unique indexes before inserting records. + * + * @param opCtx - Operation context + * @param bsonObjPtrs - Vector of pointers to BSON objects to check + * @return Status::OK if no duplicates found, ErrorCodes::DuplicateKey if duplicate found + */ + inline Status batchCheckDuplicateKey(OperationContext* const opCtx, + const std::vector& bsonObjPtrs) { + return this->_impl().batchCheckDuplicateKey(opCtx, bsonObjPtrs); + } + // ------- temp internal ------- inline std::string getAccessMethodName(OperationContext* const opCtx, diff --git a/src/mongo/db/catalog/index_catalog_impl.cpp b/src/mongo/db/catalog/index_catalog_impl.cpp index 629d6a3999..ac476cf7df 100644 --- a/src/mongo/db/catalog/index_catalog_impl.cpp +++ b/src/mongo/db/catalog/index_catalog_impl.cpp @@ -1446,9 +1446,40 @@ Status IndexCatalogImpl::_unindexRecord(OperationContext* opCtx, } +Status IndexCatalogImpl::batchCheckDuplicateKey(OperationContext* opCtx, + const std::vector& bsonObjPtrs) { + // Only check unique indexes for duplicates + for (IndexCatalogEntryContainer::const_iterator i = _entries.begin(); i != _entries.end(); + ++i) { + IndexCatalogEntry* entry = i->get(); + if (!entry->isReady(opCtx)) { + continue; // Skip unfinished indexes + } + + IndexDescriptor* desc = entry->descriptor(); + if (!desc->unique()) { + continue; // Only check unique indexes + } + + IndexAccessMethod* accessMethod = entry->accessMethod(); + if (!accessMethod) { + continue; + } + + // Call batchCheckDuplicateKey on the IndexAccessMethod + // This will delegate to EloqIndex for Eloq storage engine + Status s = accessMethod->batchCheckDuplicateKey(opCtx, bsonObjPtrs); + if (!s.isOK()) { + return s; + } + } + + return Status::OK(); +} + Status IndexCatalogImpl::indexRecords(OperationContext* opCtx, - const std::vector& bsonRecords, - int64_t* keysInsertedOut) { + const std::vector& bsonRecords, + int64_t* keysInsertedOut) { if (keysInsertedOut) { *keysInsertedOut = 0; } diff --git a/src/mongo/db/catalog/index_catalog_impl.h b/src/mongo/db/catalog/index_catalog_impl.h index 9c751c1769..325102a37c 100644 --- a/src/mongo/db/catalog/index_catalog_impl.h +++ b/src/mongo/db/catalog/index_catalog_impl.h @@ -341,6 +341,9 @@ class IndexCatalogImpl : public IndexCatalog::Impl { const std::vector& bsonRecords, int64_t* keysInsertedOut) override; + Status batchCheckDuplicateKey(OperationContext* opCtx, + const std::vector& bsonObjPtrs); + /** * When 'keysDeletedOut' is not null, it will be set to the number of index keys removed by * this operation. diff --git a/src/mongo/db/index/index_access_method.cpp b/src/mongo/db/index/index_access_method.cpp index 7dbaa346da..1c3cb4d6b1 100644 --- a/src/mongo/db/index/index_access_method.cpp +++ b/src/mongo/db/index/index_access_method.cpp @@ -618,6 +618,17 @@ bool IndexAccessMethod::BulkBuilder::isMultikey() const { return _everGeneratedMultipleKeys || isMultikeyFromPaths(_indexMultikeyPaths); } +Status IndexAccessMethod::batchCheckDuplicateKey(OperationContext* opCtx, + const std::vector& bsonObjPtrs) { + // Delegate to the underlying SortedDataInterface + SortedDataInterface* sdi = getSortedDataInterface(); + if (sdi) { + return sdi->batchCheckDuplicateKey(opCtx, bsonObjPtrs); + } + // If SortedDataInterface is not available, return OK + return Status::OK(); +} + } // namespace mongo #include "mongo/db/sorter/sorter.cpp" diff --git a/src/mongo/db/index/index_access_method.h b/src/mongo/db/index/index_access_method.h index bbf651bed3..478312a84d 100644 --- a/src/mongo/db/index/index_access_method.h +++ b/src/mongo/db/index/index_access_method.h @@ -156,6 +156,26 @@ class IndexAccessMethod { */ Status touch(OperationContext* opCtx, const BSONObj& obj); + /** + * Get the underlying SortedDataInterface for this index. + * This allows storage engine specific implementations (like EloqIndex) to be accessed. + * + * @return Pointer to the SortedDataInterface, or nullptr if not available + */ + SortedDataInterface* getSortedDataInterface() const { + return _newInterface.get(); + } + + /** + * Batch check for duplicate keys before inserting records. + * + * @param opCtx - Operation context + * @param bsonObjPtrs - Vector of pointers to BSON objects to check + * @return Status::OK if no duplicates found, ErrorCodes::DuplicateKey if duplicate found + */ + virtual Status batchCheckDuplicateKey(OperationContext* opCtx, + const std::vector& bsonObjPtrs); + /** * this pages in the entire index */ diff --git a/src/mongo/db/modules/eloq/src/eloq_index.cpp b/src/mongo/db/modules/eloq/src/eloq_index.cpp index e3f86c71fe..df51378612 100644 --- a/src/mongo/db/modules/eloq/src/eloq_index.cpp +++ b/src/mongo/db/modules/eloq/src/eloq_index.cpp @@ -573,6 +573,100 @@ Status EloqIndex::initAsEmpty(OperationContext* opCtx) { return Status::OK(); } +Status EloqIndex::batchCheckDuplicateKey(OperationContext* opCtx, + const std::vector& bsonObjPtrs) { + // Default implementation: only check for unique indexes + if (!unique()) { + return Status::OK(); + } + + if (bsonObjPtrs.empty()) { + return Status::OK(); + } + + auto ru = EloqRecoveryUnit::get(opCtx); + const Eloq::MongoKeySchema* keySchema = ru->getIndexSchema(_tableName, _indexName); + if (!keySchema) { + return Status::OK(); // No schema available, skip check + } + + // Build batch for this unique index + // Use vectors to store KeyString buffers and MongoKey objects + std::vector keyStringBuffers; // Store KeyString buffer data + std::vector> mongoKeys; // Store MongoKey objects + std::vector mongoRecords; // Store MongoRecord objects + std::vector indexBatchTuples; + + // Use a set to track keys within this batch to detect duplicates within the batch + BSONObjSet batchKeys = SimpleBSONObjComparator::kInstance.makeBSONObjSet(); + + // For each document to be inserted + for (const BSONObj* objPtr : bsonObjPtrs) { + const BSONObj& obj = *objPtr; + + // Extract keys for this index + BSONObjSet keys = SimpleBSONObjComparator::kInstance.makeBSONObjSet(); + MultikeyPaths multikeyPaths; + keySchema->GetKeys(obj, &keys, &multikeyPaths); + + // Check each key for duplicates within this batch + for (const BSONObj& key : keys) { + // Check if this key already exists in the batch + if (batchKeys.find(key) != batchKeys.end()) { + MONGO_LOG(0) << "EloqIndex::batchCheckDuplicateKey duplicate key found within batch, index: " + << _indexName.StringView() << ", key: " << key.toString(); + return {ErrorCodes::DuplicateKey, "DuplicateKey"}; + } + batchKeys.insert(key.getOwned()); + + // Convert BSON key to KeyString + KeyString keyString{KeyString::kLatestVersion, key, keySchema->Ordering()}; + + // Store KeyString buffer data + keyStringBuffers.emplace_back(keyString.getBuffer(), keyString.getSize()); + + // Create MongoKey from buffer + auto mongoKey = std::make_unique( + keyStringBuffers.back().data(), keyStringBuffers.back().size()); + mongoKeys.push_back(std::move(mongoKey)); + + // Create MongoRecord + mongoRecords.emplace_back(); + + // Add to batch + indexBatchTuples.emplace_back(txservice::TxKey(mongoKeys.back().get()), + &mongoRecords.back()); + } + } + + if (!indexBatchTuples.empty()) { + // Use batchGetKV to check all keys + uint64_t keySchemaVersion = keySchema->SchemaTs(); + txservice::TxErrorCode err = ru->batchGetKV( + opCtx, _indexName, keySchemaVersion, indexBatchTuples, true); + if (err != txservice::TxErrorCode::NO_ERROR) { + MONGO_LOG(1) << "EloqIndex::batchCheckDuplicateKey batchGetKV failed for index: " + << _indexName.StringView() << ", error: " << txservice::TxErrorMessage(err); + return TxErrorCodeToMongoStatus(err); + } + + // Check results for duplicates + for (size_t batchIdx = 0; batchIdx < indexBatchTuples.size(); batchIdx++) { + const txservice::ScanBatchTuple& tuple = indexBatchTuples[batchIdx]; + if (tuple.status_ == txservice::RecordStatus::Normal) { + // Duplicate key found in database or writeset + MONGO_LOG(0) << "EloqIndex::batchCheckDuplicateKey duplicate key found, index: " + << _indexName.StringView() << ", key buffer size: " << keyStringBuffers[batchIdx].size(); + return {ErrorCodes::DuplicateKey, "DuplicateKey"}; + } else { + invariant(tuple.status_ == txservice::RecordStatus::Deleted); + } + } + } + + return Status::OK(); +} + // EloqIdIndex std::unique_ptr EloqIdIndex::newCursor(OperationContext* opCtx, bool isForward) const { @@ -666,7 +760,8 @@ Status EloqUniqueIndex::insert(OperationContext* opCtx, auto mongoKey = std::make_unique(keyString.getBuffer(), keyString.getSize()); auto mongoRecord = std::make_unique(); uint64_t keySchemaVersion = ru->getIndexSchema(_tableName, _indexName)->SchemaTs(); - + + /* auto [exists, err] = ru->getKV(opCtx, _indexName, keySchemaVersion, mongoKey.get(), mongoRecord.get(), true); if (err != txservice::TxErrorCode::NO_ERROR) { @@ -677,18 +772,22 @@ Status EloqUniqueIndex::insert(OperationContext* opCtx, MONGO_LOG(0) << "yf: duplicate key found, index: " << _indexName.StringView() << ", key: " << key.toString() << ", mongo key = " << mongoKey->ToString(); return {ErrorCodes::Error::DuplicateKey, "Duplicate Key: " + _indexName.String()}; } + */ mongoRecord->SetEncodedBlob(valueItem); if (const auto& typeBits = keyString.getTypeBits(); !typeBits.isAllZeros()) { mongoRecord->SetUnpackInfo(typeBits.getBuffer(), typeBits.getSize()); } - err = ru->setKV(_indexName, + auto err = ru->setKV(_indexName, keySchemaVersion, std::move(mongoKey), std::move(mongoRecord), txservice::OperationType::Insert, true); - + if (err != txservice::TxErrorCode::NO_ERROR) { + MONGO_LOG(1) << "yf: insert setKV failed, index: " << _indexName.StringView() << ", key: " << key.toString() << ", error: " << txservice::TxErrorMessage(err); + } + return TxErrorCodeToMongoStatus(err); } diff --git a/src/mongo/db/modules/eloq/src/eloq_index.h b/src/mongo/db/modules/eloq/src/eloq_index.h index 8ca8945b0c..854f2f8437 100644 --- a/src/mongo/db/modules/eloq/src/eloq_index.h +++ b/src/mongo/db/modules/eloq/src/eloq_index.h @@ -22,6 +22,9 @@ #include "mongo/db/record_id.h" #include "mongo/db/storage/key_string.h" #include "mongo/db/storage/sorted_data_interface.h" +#include "mongo/db/catalog/index_catalog.h" +#include "mongo/db/index/multikey_paths.h" +#include "mongo/bson/simple_bsonobj_comparator.h" #include "mongo/db/modules/eloq/data_substrate/tx_service/include/type.h" @@ -77,6 +80,17 @@ class EloqIndex : public SortedDataInterface { virtual bool unique() const = 0; + /** + * Batch check for duplicate keys before inserting records. + * Override from SortedDataInterface. + * + * @param opCtx - Operation context + * @param bsonObjPtrs - Vector of pointers to BSON objects to check + * @return Status::OK if no duplicates found, ErrorCodes::DuplicateKey if duplicate found + */ + Status batchCheckDuplicateKey(OperationContext* opCtx, + const std::vector& bsonObjPtrs) override; + protected: class BulkBuilder; class IdBulkBuilder; diff --git a/src/mongo/db/modules/eloq/src/eloq_record_store.cpp b/src/mongo/db/modules/eloq/src/eloq_record_store.cpp index fab49e62d3..59a6e76b86 100644 --- a/src/mongo/db/modules/eloq/src/eloq_record_store.cpp +++ b/src/mongo/db/modules/eloq/src/eloq_record_store.cpp @@ -57,6 +57,18 @@ bvar::LatencyRecorder bVarUpdateRecord("update_record"); namespace mongo { +struct BatchReadEntry { + BatchReadEntry() : keyString(KeyString::kLatestVersion) {} + void resetToKey(const BSONObj& idObj) { + keyString.resetToKey(idObj, kIdOrdering); + mongoKey = std::make_unique(keyString); + } + + KeyString keyString; + std::unique_ptr mongoKey; + Eloq::MongoRecord mongoRecord; +}; + class EloqCatalogRecordStoreCursor : public SeekableRecordCursor { public: explicit EloqCatalogRecordStoreCursor(OperationContext* opCtx) @@ -796,10 +808,66 @@ StatusWith EloqRecordStore::insertRecord( return {record.id}; } +Status EloqRecordStore::batchCheckDuplicateKey(OperationContext* opCtx, + std::vector* records) { + MONGO_LOG(1) << "EloqRecordStore::batchCheckDuplicateKey, nRecords: " << records->size(); + + if (records->empty()) { + return Status::OK(); + } + + size_t nRecords = records->size(); + auto batchEntries = std::make_unique(nRecords); + std::vector batchTuples; + batchTuples.reserve(nRecords); + + for (size_t i = 0; i < nRecords; i++) { + Record& record = (*records)[i]; + BSONObj obj{record.data.data()}; + const BSONObj idObj = getIdBSONObjWithoutFieldName(obj); + Status s = checkKeySize(idObj, "RecordStore"); + if (!s.isOK()) { + return s; + } + + BatchReadEntry& entry = batchEntries[i]; + entry.resetToKey(idObj); + record.id = RecordId{entry.keyString.getBuffer(), entry.keyString.getSize()}; + batchTuples.emplace_back(txservice::TxKey(entry.mongoKey.get()), &entry.mongoRecord); + } + + auto ru = EloqRecoveryUnit::get(opCtx); + const EloqRecoveryUnit::DiscoveredTable& table = ru->discoveredTable(_tableName); + uint64_t pkeySchemaVersion = table._schema->KeySchema()->SchemaTs(); + + // Check all keys for duplicates + txservice::TxErrorCode err = + ru->batchGetKV(opCtx, _tableName, pkeySchemaVersion, batchTuples, true); + if (err != txservice::TxErrorCode::NO_ERROR) { + MONGO_LOG(1) << "EloqRecordStore::batchCheckDuplicateKey batchGetKV failed, table: " + << _tableName.StringView() << ", error: " << txservice::TxErrorMessage(err); + return TxErrorCodeToMongoStatus(err); + } + + // Check results for duplicates + for (size_t i = 0; i < nRecords; i++) { + const txservice::ScanBatchTuple& tuple = batchTuples[i]; + if (tuple.status_ == txservice::RecordStatus::Normal) { + MONGO_LOG(0) << "EloqRecordStore::batchCheckDuplicateKey duplicate key found, key: " + << batchEntries[i].keyString.toString(); + return {ErrorCodes::DuplicateKey, "DuplicateKey"}; + } else { + invariant(tuple.status_ == txservice::RecordStatus::Deleted); + } + } + + return Status::OK(); +} + Status EloqRecordStore::insertRecords(OperationContext* opCtx, - std::vector* records, - std::vector* timestamps, - bool enforceQuota) { + std::vector* records, + std::vector* timestamps, + bool enforceQuota) { MONGO_LOG(1) << "EloqRecordStore::insertRecords"; return _insertRecords(opCtx, records->data(), timestamps->data(), records->size()); } @@ -1035,18 +1103,6 @@ void EloqRecordStore::waitForAllEarlierOplogWritesToBeVisible(OperationContext* // } -struct BatchReadEntry { - BatchReadEntry() : keyString(KeyString::kLatestVersion) {} - void resetToKey(const BSONObj& idObj) { - keyString.resetToKey(idObj, kIdOrdering); - mongoKey = std::make_unique(keyString); - } - - KeyString keyString; - std::unique_ptr mongoKey; - Eloq::MongoRecord mongoRecord; -}; - Status EloqRecordStore::_insertRecords(OperationContext* opCtx, Record* records, const Timestamp* timestamps, @@ -1069,57 +1125,33 @@ Status EloqRecordStore::_insertRecords(OperationContext* opCtx, return {ErrorCodes::BadValue, "object to insert exceeds cappedMaxSize"}; } - auto batchEntries = std::make_unique(nRecords); - std::vector batchTuples; - batchTuples.reserve(nRecords); + // Convert Record* to std::vector for batchCheckDuplicateKey + std::vector recordsVec(records, records + nRecords); + + // Check for duplicate keys first (this also sets record.id) + /* + Status dupCheckStatus = batchCheckDuplicateKey(opCtx, &recordsVec); + if (!dupCheckStatus.isOK()) { + return dupCheckStatus; + } + */ + + // Copy back the RecordIds that were set by batchCheckDuplicateKey for (size_t i = 0; i < nRecords; i++) { - Record& record = records[i]; - BSONObj obj{record.data.data()}; - const BSONObj idObj = getIdBSONObjWithoutFieldName(obj); - MONGO_LOG(1) << idObj.jsonString(); - Status s = checkKeySize(idObj, "RecordStore"); - if (!s.isOK()) { - return s; - } - - BatchReadEntry& entry = batchEntries[i]; - entry.resetToKey(idObj); - record.id = RecordId{entry.keyString.getBuffer(), entry.keyString.getSize()}; - batchTuples.emplace_back(txservice::TxKey(entry.mongoKey.get()), &entry.mongoRecord); + records[i].id = recordsVec[i].id; } auto ru = EloqRecoveryUnit::get(opCtx); MONGO_LOG(1) << "Insert into a Data Table."; const EloqRecoveryUnit::DiscoveredTable& table = ru->discoveredTable(_tableName); uint64_t pkeySchemaVersion = table._schema->KeySchema()->SchemaTs(); - - // First, check all keys for duplicates before inserting any records. - // This prevents partial inserts that would cause false duplicate key errors - // when falling back to one-at-a-time insertion. - txservice::TxErrorCode err = - ru->batchGetKV(opCtx, _tableName, pkeySchemaVersion, batchTuples, true); - if (err != txservice::TxErrorCode::NO_ERROR) { - MONGO_LOG(1) << "EloqRecordStore::_insertRecords batchGetKV failed, table: " - << _tableName.StringView() << ", txn: " << ru->getTxm()->TxNumber() - << txservice::TxErrorMessage(err); - return TxErrorCodeToMongoStatus(err); - } - - // Check all keys for duplicates first, before inserting anything. - for (size_t i = 0; i < nRecords; i++) { - const txservice::ScanBatchTuple& tuple = batchTuples[i]; - if (tuple.status_ == txservice::RecordStatus::Normal) { - MONGO_LOG(0) << "yf: insertRecords, duplicate key = " << batchEntries[i].keyString.toString() << ", txn: " << ru->getTxm()->TxNumber() << "i = " << i << ", n record = " << nRecords; - return {ErrorCodes::DuplicateKey, "DuplicateKey"}; - } else { - invariant(tuple.status_ == txservice::RecordStatus::Deleted); - } - } // Check unique index constraints for all records before inserting. // Use dirtySchema if it exists (reflects ongoing index create/drop operations), // otherwise use schema (committed state). // This ensures we check the same indexes that indexRecords will insert into. + + /* const Eloq::MongoTableSchema* tableSchemaToUse = table._dirtySchema ? static_cast(table._dirtySchema.get()) : static_cast(table._schema.get()); @@ -1211,8 +1243,19 @@ Status EloqRecordStore::_insertRecords(OperationContext* opCtx, } } } + */ // All keys are valid (no duplicates found). Now insert all records. + // Rebuild batchEntries for insertion + auto batchEntries = std::make_unique(nRecords); + for (size_t i = 0; i < nRecords; i++) { + Record& record = records[i]; + BSONObj obj{record.data.data()}; + const BSONObj idObj = getIdBSONObjWithoutFieldName(obj); + BatchReadEntry& entry = batchEntries[i]; + entry.resetToKey(idObj); + } + for (size_t i = 0; i < nRecords; i++) { std::unique_ptr& mongoKey = batchEntries[i].mongoKey; std::unique_ptr mongoRecord = std::make_unique(); @@ -1225,13 +1268,14 @@ Status EloqRecordStore::_insertRecords(OperationContext* opCtx, } bool checkUnique = true; MONGO_LOG(1) << "EloqRecordStore::_insertRecords setKV, table: " << _tableName.StringView() << ", txn: " << ru->getTxm()->TxNumber() << ", key: " << ks.toString() << ", checkUnique: " << checkUnique; - err = ru->setKV(_tableName, + txservice::TxErrorCode err = ru->setKV(_tableName, pkeySchemaVersion, std::move(mongoKey), std::move(mongoRecord), txservice::OperationType::Insert, checkUnique); if (err != txservice::TxErrorCode::NO_ERROR) { + MONGO_LOG(1) << "yf: _insertRecords setKV failed, table: " << _tableName.StringView() << ", txn: " << ru->getTxm()->TxNumber() << ", key: " << ks.toString() << ", checkUnique: " << checkUnique << ", error: " << txservice::TxErrorMessage(err); return TxErrorCodeToMongoStatus(err); } diff --git a/src/mongo/db/modules/eloq/src/eloq_record_store.h b/src/mongo/db/modules/eloq/src/eloq_record_store.h index d1227d0685..eb7f310a30 100644 --- a/src/mongo/db/modules/eloq/src/eloq_record_store.h +++ b/src/mongo/db/modules/eloq/src/eloq_record_store.h @@ -237,6 +237,9 @@ class EloqRecordStore final : public RecordStore { std::vector* timestamps, bool enforceQuota) override; + Status batchCheckDuplicateKey(OperationContext* opCtx, + std::vector* records) override; + Status insertRecordsWithDocWriter(OperationContext* opCtx, const DocWriter* const* docs, diff --git a/src/mongo/db/storage/record_store.h b/src/mongo/db/storage/record_store.h index f1c6700027..cc2024d149 100644 --- a/src/mongo/db/storage/record_store.h +++ b/src/mongo/db/storage/record_store.h @@ -417,6 +417,20 @@ class RecordStore { size_t nDocs, RecordId* idsOut = nullptr) = 0; + /** + * Batch check for duplicate keys before inserting records. + * This checks primary keys (RecordId) for duplicates. + * + * @param opCtx - Operation context + * @param records - Vector of records to check (input/output: RecordId may be set) + * @return Status::OK if no duplicates found, ErrorCodes::DuplicateKey if duplicate found + */ + virtual Status batchCheckDuplicateKey(OperationContext* opCtx, + std::vector* records) { + // Default implementation: no-op (for storage engines that don't need this) + return Status::OK(); + } + /** * A thin wrapper around insertRecordsWithDocWriter() to simplify handling of single DocWriters. */ diff --git a/src/mongo/db/storage/sorted_data_interface.h b/src/mongo/db/storage/sorted_data_interface.h index 556f6520ae..4e19973ad0 100644 --- a/src/mongo/db/storage/sorted_data_interface.h +++ b/src/mongo/db/storage/sorted_data_interface.h @@ -126,6 +126,19 @@ class SortedDataInterface { const BSONObj& key, const RecordId& loc) = 0; + /** + * Batch check for duplicate keys before inserting records. + * + * @param opCtx - Operation context + * @param bsonObjPtrs - Vector of pointers to BSON objects to check + * @return Status::OK if no duplicates found, ErrorCodes::DuplicateKey if duplicate found + */ + virtual Status batchCheckDuplicateKey(OperationContext* opCtx, + const std::vector& bsonObjPtrs) { + // Default implementation: no-op (for storage engines that don't need batch checking) + return Status::OK(); + } + /** * Attempt to reduce the storage space used by this index via compaction. Only called if the * indexed record store supports compaction-in-place. diff --git a/tests/jstests/core/batch_write_command_insert.js b/tests/jstests/core/batch_write_command_insert.js index 4d19dd6886..e23ddacb37 100644 --- a/tests/jstests/core/batch_write_command_insert.js +++ b/tests/jstests/core/batch_write_command_insert.js @@ -403,8 +403,8 @@ try { bulk.execute(); assert(false, "should have failed due to duplicate key"); } catch (err) { - // assert(coll.count() == 50, "Unexpected number inserted by bulk write: " + coll.count()); - assert(coll.count() == 1, "Unexpected number inserted by bulk write: " + coll.count()); + assert(coll.count() == 50, "Unexpected number inserted by bulk write: " + coll.count()); + // assert(coll.count() == 1, "Unexpected number inserted by bulk write: " + coll.count()); } // From 9b40ae4c887ad3306522beb91f83a54c15d98c12 Mon Sep 17 00:00:00 2001 From: lokax Date: Mon, 29 Dec 2025 15:49:37 +0800 Subject: [PATCH 07/16] remove unused code --- .../write_commands/write_commands.cpp | 39 ----- .../eloq/src/eloq_collection_helpers.cpp | 136 ------------------ .../eloq/src/eloq_collection_helpers.h | 48 ------- .../db/modules/eloq/src/eloq_record_store.cpp | 116 --------------- tests/jstests/core/bulk_write_error_format.js | 1 - 5 files changed, 340 deletions(-) delete mode 100644 src/mongo/db/modules/eloq/src/eloq_collection_helpers.cpp delete mode 100644 src/mongo/db/modules/eloq/src/eloq_collection_helpers.h delete mode 100644 tests/jstests/core/bulk_write_error_format.js diff --git a/src/mongo/db/commands/write_commands/write_commands.cpp b/src/mongo/db/commands/write_commands/write_commands.cpp index 34f326a90b..e883f4425e 100644 --- a/src/mongo/db/commands/write_commands/write_commands.cpp +++ b/src/mongo/db/commands/write_commands/write_commands.cpp @@ -90,12 +90,7 @@ void serializeReply(OperationContext* opCtx, size_t opsInBatch, WriteResult result, BSONObjBuilder* out) { - // Phase 1: Log entry point - MONGO_LOG(0) << "[Phase1] serializeReply called: opsInBatch=" << opsInBatch - << ", result.results.size()=" << result.results.size(); - if (shouldSkipOutput(opCtx)) { - MONGO_LOG(0) << "[Phase1] serializeReply: skipping output due to write concern"; return; } @@ -174,15 +169,6 @@ void serializeReply(OperationContext* opCtx, if (!errors.empty()) { out->append("writeErrors", errors); - // Phase 1: Logging to understand current response format - BSONObj tempObj = out->asTempObj(); - bool hasOk = tempObj.hasField("ok"); - bool okValue = hasOk ? tempObj["ok"].trueValue() : false; - // Use warning() to ensure log output, and LOG(0) for highest verbosity - warning() << "[Phase1] Bulk write errors: " << errors.size() << " errors, ok field: " - << (hasOk ? (okValue ? "1" : "0") : "missing"); - MONGO_LOG(0) << "[Phase1] Bulk write errors: " << errors.size() << " errors, ok field: " - << (hasOk ? (okValue ? "1" : "0") : "missing"); } // writeConcernError field is handled by command processor. @@ -250,22 +236,6 @@ class WriteCommand::InvocationBase : public CommandInvocation { BSONObjBuilder bob = result->getBodyBuilder(); runImpl(opCtx, bob); CommandHelpers::extractOrAppendOk(bob); - // Phase 1: Logging to understand final response format - BSONObj response = bob.asTempObj(); - if (response.hasField("writeErrors")) { - warning() << "[Phase1] Final bulk write response: ok=" - << (response.hasField("ok") ? (response["ok"].trueValue() ? "1" : "0") - : "missing") - << ", writeErrors=" << response["writeErrors"].Array().size() - << ", n=" << (response.hasField("n") ? response["n"].numberLong() : -1); - MONGO_LOG(0) << "[Phase1] Final bulk write response: ok=" - << (response.hasField("ok") ? (response["ok"].trueValue() ? "1" : "0") - : "missing") - << ", writeErrors=" << response["writeErrors"].Array().size() - << ", n=" << (response.hasField("n") ? response["n"].numberLong() : -1); - } - warning() << "[Phase1] Final bulk write response (full): " << response.toString(); - MONGO_LOG(0) << "[Phase1] Final bulk write response (full): " << response.toString(); } catch (const DBException& ex) { LastError::get(opCtx->getClient()).setLastError(ex.code(), ex.reason()); throw; @@ -334,17 +304,8 @@ class CmdInsert final : public WriteCommand { } void runImpl(OperationContext* opCtx, BSONObjBuilder& result) const override { - // Phase 1: Log entry point for insert command - warning() << "[Phase1] CmdInsert::runImpl called: documents=" << _batch.getDocuments().size() - << ", ordered=" << _batch.getWriteCommandBase().getOrdered(); - MONGO_LOG(0) << "[Phase1] CmdInsert::runImpl called: documents=" << _batch.getDocuments().size() - << ", ordered=" << _batch.getWriteCommandBase().getOrdered(); - auto reply = performInserts(opCtx, _batch); - warning() << "[Phase1] performInserts returned: results.size()=" << reply.results.size(); - MONGO_LOG(0) << "[Phase1] performInserts returned: results.size()=" << reply.results.size(); - serializeReply(opCtx, ReplyStyle::kNotUpdate, !_batch.getWriteCommandBase().getOrdered(), diff --git a/src/mongo/db/modules/eloq/src/eloq_collection_helpers.cpp b/src/mongo/db/modules/eloq/src/eloq_collection_helpers.cpp deleted file mode 100644 index 7e3c031603..0000000000 --- a/src/mongo/db/modules/eloq/src/eloq_collection_helpers.cpp +++ /dev/null @@ -1,136 +0,0 @@ -/** - * Copyright (C) 2025 EloqData Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the license: - * 1. GNU Affero General Public License, version 3, as published by the Free - * Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - */ - -#include "mongo/db/modules/eloq/src/eloq_collection_helpers.h" - -#include "mongo/base/error_codes.h" -#include "mongo/bson/simple_bsonobj_comparator.h" -#include "mongo/db/catalog/index_catalog.h" -#include "mongo/db/index/index_access_method.h" -#include "mongo/db/index/index_descriptor.h" -#include "mongo/db/namespace_string.h" -#include "mongo/db/ops/insert.h" -#include "mongo/db/storage/key_string.h" - -#include "mongo/db/modules/eloq/src/base/eloq_util.h" -#include "mongo/db/modules/eloq/src/eloq_recovery_unit.h" - -namespace mongo { -namespace eloq_collection_helpers { - -Status validateUniqueIndexConstraints( - OperationContext* opCtx, - const NamespaceString& ns, - IndexCatalog* indexCatalog, - std::vector::const_iterator begin, - std::vector::const_iterator end) { - // Only validate for Eloq storage engine - auto* eloqRU = dynamic_cast(opCtx->recoveryUnit()); - if (!eloqRU) { - // Not Eloq storage engine, skip validation - return Status::OK(); - } - - // Get all unique indexes - IndexCatalog::IndexIterator indexIter = indexCatalog->getIndexIterator(opCtx, false); - std::vector uniqueIndexes; - while (indexIter.more()) { - IndexCatalogEntry* entry = indexIter.next(); - const IndexDescriptor* desc = entry->descriptor(); - if (desc->unique() && entry->isReady(opCtx)) { - uniqueIndexes.push_back(entry); - } - } - - // If no unique indexes, nothing to validate - if (uniqueIndexes.empty()) { - return Status::OK(); - } - - // Construct table name for the collection - txservice::TableName tableName{Eloq::MongoTableToTxServiceTableName(ns.ns(), true)}; - - // For each unique index, track keys seen in the current batch to detect duplicates within batch - std::map batchKeys; - - // For each document to be inserted - for (auto docIt = begin; docIt != end; ++docIt) { - const BSONObj& doc = docIt->doc; - - // For each unique index - for (IndexCatalogEntry* entry : uniqueIndexes) { - const IndexDescriptor* desc = entry->descriptor(); - const IndexAccessMethod* accessMethod = entry->accessMethod(); - - // Extract keys for this index - BSONObjSet keys = SimpleBSONObjComparator::kInstance.makeBSONObjSet(); - MultikeyPaths multikeyPaths; - accessMethod->getKeys( - doc, IndexAccessMethod::GetKeysMode::kEnforceConstraints, &keys, &multikeyPaths); - - // Construct index table name - txservice::TableName indexName{desc->isIdIndex() - ? tableName - : Eloq::MongoIndexToTxServiceTableName( - desc->parentNS(), desc->indexName(), true)}; - - // Get index schema version - uint64_t keySchemaVersion = eloqRU->getIndexSchema(tableName, indexName)->SchemaTs(); - - // Check each key for duplicates - for (const BSONObj& key : keys) { - // First, check if this key was already seen in the current batch - auto& seenKeys = batchKeys[desc]; - if (seenKeys.count(key) > 0) { - // Duplicate key within the batch - return {ErrorCodes::DuplicateKey, - "Duplicate Key: " + indexName.String() + " key: " + key.toString()}; - } - - // Convert BSON key to MongoKey - KeyString keyString{KeyString::kLatestVersion, key, desc->ordering()}; - auto mongoKey = std::make_unique(keyString.getBuffer(), - keyString.getSize()); - - // Check if key exists in database or writeset (getKV checks both) - Eloq::MongoRecord mongoRecord; - auto [exists, err] = eloqRU->getKV( - opCtx, indexName, keySchemaVersion, mongoKey.get(), &mongoRecord, true); - - if (err != txservice::TxErrorCode::NO_ERROR) { - return Eloq::TxErrorCodeToMongoStatus(err); - } - - if (exists) { - // Duplicate key found in database or writeset - return {ErrorCodes::DuplicateKey, - "Duplicate Key: " + indexName.String() + " key: " + key.toString()}; - } - - // Add key to batch keys set for duplicate detection within batch - seenKeys.insert(key.getOwned()); - } - } - } - - return Status::OK(); -} - -} // namespace eloq_collection_helpers -} // namespace mongo - diff --git a/src/mongo/db/modules/eloq/src/eloq_collection_helpers.h b/src/mongo/db/modules/eloq/src/eloq_collection_helpers.h deleted file mode 100644 index 81a0144c70..0000000000 --- a/src/mongo/db/modules/eloq/src/eloq_collection_helpers.h +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Copyright (C) 2025 EloqData Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the license: - * 1. GNU Affero General Public License, version 3, as published by the Free - * Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - */ -#pragma once - -#include "mongo/base/status.h" -#include "mongo/db/operation_context.h" - -namespace mongo { - -class IndexCatalog; -class NamespaceString; -struct InsertStatement; - -namespace eloq_collection_helpers { - -/** - * Validates all unique index constraints before inserting into recordStore. - * This prevents data inconsistency where recordStore has data but uniqueIndex doesn't. - * Returns an error if any duplicate key is found in any unique index. - * - * This function only performs validation for Eloq storage engine. - * For other storage engines, it returns Status::OK() immediately. - */ -Status validateUniqueIndexConstraints( - OperationContext* opCtx, - const NamespaceString& ns, - IndexCatalog* indexCatalog, - std::vector::const_iterator begin, - std::vector::const_iterator end); - -} // namespace eloq_collection_helpers -} // namespace mongo - diff --git a/src/mongo/db/modules/eloq/src/eloq_record_store.cpp b/src/mongo/db/modules/eloq/src/eloq_record_store.cpp index 59a6e76b86..aff1caa251 100644 --- a/src/mongo/db/modules/eloq/src/eloq_record_store.cpp +++ b/src/mongo/db/modules/eloq/src/eloq_record_store.cpp @@ -1125,126 +1125,10 @@ Status EloqRecordStore::_insertRecords(OperationContext* opCtx, return {ErrorCodes::BadValue, "object to insert exceeds cappedMaxSize"}; } - // Convert Record* to std::vector for batchCheckDuplicateKey - std::vector recordsVec(records, records + nRecords); - - // Check for duplicate keys first (this also sets record.id) - /* - Status dupCheckStatus = batchCheckDuplicateKey(opCtx, &recordsVec); - if (!dupCheckStatus.isOK()) { - return dupCheckStatus; - } - */ - - // Copy back the RecordIds that were set by batchCheckDuplicateKey - for (size_t i = 0; i < nRecords; i++) { - records[i].id = recordsVec[i].id; - } - auto ru = EloqRecoveryUnit::get(opCtx); MONGO_LOG(1) << "Insert into a Data Table."; const EloqRecoveryUnit::DiscoveredTable& table = ru->discoveredTable(_tableName); uint64_t pkeySchemaVersion = table._schema->KeySchema()->SchemaTs(); - - // Check unique index constraints for all records before inserting. - // Use dirtySchema if it exists (reflects ongoing index create/drop operations), - // otherwise use schema (committed state). - // This ensures we check the same indexes that indexRecords will insert into. - - /* - const Eloq::MongoTableSchema* tableSchemaToUse = table._dirtySchema - ? static_cast(table._dirtySchema.get()) - : static_cast(table._schema.get()); - - const std::unordered_map>* allIndexes = tableSchemaToUse->GetIndexes(); - if (allIndexes && !allIndexes->empty()) { - // Collect unique indexes - std::vector> uniqueIndexes; - for (const auto& [kid, index] : *allIndexes) { - const auto* keySchema = static_cast(index.second.sk_schema_.get()); - if (keySchema && keySchema->Unique()) { - uniqueIndexes.emplace_back(index.first, keySchema); - } - } - - // If there are unique indexes, validate all records using batchGetKV - if (!uniqueIndexes.empty()) { - // For each unique index, collect all keys to check - for (const auto& [indexName, keySchema] : uniqueIndexes) { - // Build batch for this unique index - // Use vectors to store KeyString buffers and MongoKey objects - std::vector keyStringBuffers; // Store KeyString buffer data - std::vector> mongoKeys; // Store MongoKey objects - std::vector mongoRecords; // Store MongoRecord objects - std::vector indexBatchTuples; - // std::vector> recordKeyMapping; // Maps batch index to (record index, BSON key) - - // For each document to be inserted - for (size_t i = 0; i < nRecords; i++) { - BSONObj obj{records[i].data.data()}; - - // Extract keys for this index - BSONObjSet keys = SimpleBSONObjComparator::kInstance.makeBSONObjSet(); - MultikeyPaths multikeyPaths; - keySchema->GetKeys(obj, &keys, &multikeyPaths); - - // Check each key for duplicates - for (const BSONObj& key : keys) { - // Convert BSON key to KeyString - KeyString keyString{KeyString::kLatestVersion, key, keySchema->Ordering()}; - // keyString.resetToKey(key, keySchema->Ordering(), records[i].id); - - // Store KeyString buffer data - keyStringBuffers.emplace_back(keyString.getBuffer(), keyString.getSize()); - - // Create MongoKey from buffer - auto mongoKey = std::make_unique( - keyStringBuffers.back().data(), keyStringBuffers.back().size()); - mongoKeys.push_back(std::move(mongoKey)); - - // Create MongoRecord - mongoRecords.emplace_back(); - - // Store mapping for error reporting - // recordKeyMapping.emplace_back(i, key.getOwned()); - - MONGO_LOG(0) << "yf: mongo key = " << mongoKeys.back()->ToString() << ", key = " << key.toString(); - - // Add to batch (mongoRecord is not needed, just pass nullptr or empty record) - indexBatchTuples.emplace_back(txservice::TxKey(mongoKeys.back().get()), - &mongoRecords.back()); - } - } - - if (!indexBatchTuples.empty()) { - MONGO_LOG(0) << "yf: index name = " << indexName.StringView() << ", n records = " << nRecords << ", check unique index"; - // Use batchGetKV to check all keys - txservice::TxErrorCode err = ru->batchGetKV( - opCtx, indexName, keySchema->SchemaTs(), indexBatchTuples, true); - if (err != txservice::TxErrorCode::NO_ERROR) { - MONGO_LOG(1) << "EloqRecordStore::_insertRecords batchGetKV failed for unique index, index: " - << indexName.StringView() << ", error: " << txservice::TxErrorMessage(err); - return TxErrorCodeToMongoStatus(err); - } - - // Check results for duplicates - for (size_t batchIdx = 0; batchIdx < indexBatchTuples.size(); batchIdx++) { - const txservice::ScanBatchTuple& tuple = indexBatchTuples[batchIdx]; - if (tuple.status_ == txservice::RecordStatus::Normal) { - // Duplicate key found in database or writeset - MONGO_LOG(0) << "yf: duplicate key found, index: " << indexName.StringView() << ", key: " << keyStringBuffers[batchIdx]; - return {ErrorCodes::DuplicateKey, - "DuplicateKey"}; - } else { - invariant(tuple.status_ == txservice::RecordStatus::Deleted); - } - } - } - } - } - } - */ - // All keys are valid (no duplicates found). Now insert all records. // Rebuild batchEntries for insertion auto batchEntries = std::make_unique(nRecords); diff --git a/tests/jstests/core/bulk_write_error_format.js b/tests/jstests/core/bulk_write_error_format.js deleted file mode 100644 index 8b13789179..0000000000 --- a/tests/jstests/core/bulk_write_error_format.js +++ /dev/null @@ -1 +0,0 @@ - From 4b8a089a8948629ae6b3744d16161c3d7ee2d777 Mon Sep 17 00:00:00 2001 From: lokax Date: Mon, 29 Dec 2025 16:21:00 +0800 Subject: [PATCH 08/16] fix --- src/mongo/db/modules/eloq/src/eloq_index.cpp | 11 +++++------ .../db/modules/eloq/src/eloq_record_store.cpp | 15 +++++++++++++-- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/src/mongo/db/modules/eloq/src/eloq_index.cpp b/src/mongo/db/modules/eloq/src/eloq_index.cpp index df51378612..e40e94248e 100644 --- a/src/mongo/db/modules/eloq/src/eloq_index.cpp +++ b/src/mongo/db/modules/eloq/src/eloq_index.cpp @@ -613,7 +613,7 @@ Status EloqIndex::batchCheckDuplicateKey(OperationContext* opCtx, for (const BSONObj& key : keys) { // Check if this key already exists in the batch if (batchKeys.find(key) != batchKeys.end()) { - MONGO_LOG(0) << "EloqIndex::batchCheckDuplicateKey duplicate key found within batch, index: " + MONGO_LOG(1) << "yf: batchCheckDuplicateKey duplicate key found within batch, index: " << _indexName.StringView() << ", key: " << key.toString(); return {ErrorCodes::DuplicateKey, "DuplicateKey"}; } @@ -655,9 +655,9 @@ Status EloqIndex::batchCheckDuplicateKey(OperationContext* opCtx, const txservice::ScanBatchTuple& tuple = indexBatchTuples[batchIdx]; if (tuple.status_ == txservice::RecordStatus::Normal) { // Duplicate key found in database or writeset - MONGO_LOG(0) << "EloqIndex::batchCheckDuplicateKey duplicate key found, index: " - << _indexName.StringView() << ", key buffer size: " << keyStringBuffers[batchIdx].size(); - return {ErrorCodes::DuplicateKey, "DuplicateKey"}; + MONGO_LOG(1) << "yf: batchCheckDuplicateKey duplicate key found, index: " + << _indexName.StringView() << ", key = " << keyStringBuffers[batchIdx]; + return {ErrorCodes::Error::DuplicateKey, "DuplicateKey"}; } else { invariant(tuple.status_ == txservice::RecordStatus::Deleted); } @@ -769,7 +769,6 @@ Status EloqUniqueIndex::insert(OperationContext* opCtx, } if (exists) { - MONGO_LOG(0) << "yf: duplicate key found, index: " << _indexName.StringView() << ", key: " << key.toString() << ", mongo key = " << mongoKey->ToString(); return {ErrorCodes::Error::DuplicateKey, "Duplicate Key: " + _indexName.String()}; } */ @@ -785,7 +784,7 @@ Status EloqUniqueIndex::insert(OperationContext* opCtx, txservice::OperationType::Insert, true); if (err != txservice::TxErrorCode::NO_ERROR) { - MONGO_LOG(1) << "yf: insert setKV failed, index: " << _indexName.StringView() << ", key: " << key.toString() << ", error: " << txservice::TxErrorMessage(err); + MONGO_LOG(1) << "yf: EloqUniqueIndex::insert setKV failed, index: " << _indexName.StringView() << ", key: " << key.toString() << ", error: " << txservice::TxErrorMessage(err); } return TxErrorCodeToMongoStatus(err); diff --git a/src/mongo/db/modules/eloq/src/eloq_record_store.cpp b/src/mongo/db/modules/eloq/src/eloq_record_store.cpp index aff1caa251..e504aa2898 100644 --- a/src/mongo/db/modules/eloq/src/eloq_record_store.cpp +++ b/src/mongo/db/modules/eloq/src/eloq_record_store.cpp @@ -821,6 +821,9 @@ Status EloqRecordStore::batchCheckDuplicateKey(OperationContext* opCtx, std::vector batchTuples; batchTuples.reserve(nRecords); + // Use a set to track keys within this batch to detect duplicates within the batch + BSONObjSet batchKeys = SimpleBSONObjComparator::kInstance.makeBSONObjSet(); + for (size_t i = 0; i < nRecords; i++) { Record& record = (*records)[i]; BSONObj obj{record.data.data()}; @@ -830,6 +833,14 @@ Status EloqRecordStore::batchCheckDuplicateKey(OperationContext* opCtx, return s; } + // Check if this key already exists in the batch + if (batchKeys.find(idObj) != batchKeys.end()) { + MONGO_LOG(1) << "yf: EloqRecordStore::batchCheckDuplicateKey duplicate key found within batch, key: " + << idObj.toString(); + return {ErrorCodes::DuplicateKey, "DuplicateKey"}; + } + batchKeys.insert(idObj.getOwned()); + BatchReadEntry& entry = batchEntries[i]; entry.resetToKey(idObj); record.id = RecordId{entry.keyString.getBuffer(), entry.keyString.getSize()}; @@ -853,7 +864,7 @@ Status EloqRecordStore::batchCheckDuplicateKey(OperationContext* opCtx, for (size_t i = 0; i < nRecords; i++) { const txservice::ScanBatchTuple& tuple = batchTuples[i]; if (tuple.status_ == txservice::RecordStatus::Normal) { - MONGO_LOG(0) << "EloqRecordStore::batchCheckDuplicateKey duplicate key found, key: " + MONGO_LOG(1) << "yf: EloqRecordStore::batchCheckDuplicateKey duplicate key found, key: " << batchEntries[i].keyString.toString(); return {ErrorCodes::DuplicateKey, "DuplicateKey"}; } else { @@ -1151,7 +1162,7 @@ Status EloqRecordStore::_insertRecords(OperationContext* opCtx, mongoRecord->SetUnpackInfo(typeBits.getBuffer(), typeBits.getSize()); } bool checkUnique = true; - MONGO_LOG(1) << "EloqRecordStore::_insertRecords setKV, table: " << _tableName.StringView() << ", txn: " << ru->getTxm()->TxNumber() << ", key: " << ks.toString() << ", checkUnique: " << checkUnique; + MONGO_LOG(1) << "yf: EloqRecordStore::_insertRecords setKV, table: " << _tableName.StringView() << ", txn: " << ru->getTxm()->TxNumber() << ", key: " << ks.toString() << ", checkUnique: " << checkUnique; txservice::TxErrorCode err = ru->setKV(_tableName, pkeySchemaVersion, std::move(mongoKey), From 8f574f6075dc6f93b35ad73202f79aafb2ca39de Mon Sep 17 00:00:00 2001 From: lokax Date: Mon, 29 Dec 2025 16:53:02 +0800 Subject: [PATCH 09/16] fix --- src/mongo/db/modules/eloq/src/eloq_record_store.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/mongo/db/modules/eloq/src/eloq_record_store.cpp b/src/mongo/db/modules/eloq/src/eloq_record_store.cpp index e504aa2898..e15b2f80f9 100644 --- a/src/mongo/db/modules/eloq/src/eloq_record_store.cpp +++ b/src/mongo/db/modules/eloq/src/eloq_record_store.cpp @@ -1128,7 +1128,8 @@ Status EloqRecordStore::_insertRecords(OperationContext* opCtx, int64_t totalLength = 0; for (size_t i = 0; i < nRecords; i++) { const auto& record = records[i]; - assert(record.id.isNull()); + // we have already update record id on function `batchCheckDuplicateKey` + assert(!record.id.isNull()); totalLength += record.data.size(); } From 23978df2f2102e64857093efd442db8fe6c4edf2 Mon Sep 17 00:00:00 2001 From: lokax Date: Mon, 29 Dec 2025 18:42:20 +0800 Subject: [PATCH 10/16] fix assert --- src/mongo/db/catalog/collection_impl.cpp | 34 +++++++++---------- .../db/modules/eloq/src/eloq_record_store.cpp | 6 ++-- tests/jstests/core/write_result.js | 12 +++---- 3 files changed, 25 insertions(+), 27 deletions(-) diff --git a/src/mongo/db/catalog/collection_impl.cpp b/src/mongo/db/catalog/collection_impl.cpp index c49d1bcb63..1432f4b92a 100644 --- a/src/mongo/db/catalog/collection_impl.cpp +++ b/src/mongo/db/catalog/collection_impl.cpp @@ -519,41 +519,39 @@ Status CollectionImpl::_insertDocuments(OperationContext* opCtx, // Batch check for duplicate keys before inserting Status status = _recordStore->batchCheckDuplicateKey(opCtx, &records); if (!status.isOK()) + { return status; - - // Prepare BsonRecords for index duplicate check and later indexRecords call - std::vector bsonRecords; - bsonRecords.reserve(count); - int recordIndex = 0; - for (auto it = begin; it != end; it++) { - RecordId loc = records[recordIndex].id; - BsonRecord bsonRecord = {loc, Timestamp(it->oplogSlot.opTime.getTimestamp()), &(it->doc)}; - bsonRecords.push_back(bsonRecord); - recordIndex++; } - // Extract BSONObj pointers from BsonRecords for batchCheckDuplicateKey + // Extract BSONObj pointers for batchCheckDuplicateKey std::vector bsonObjPtrs; bsonObjPtrs.reserve(count); - for (const auto& bsonRecord : bsonRecords) { - bsonObjPtrs.push_back(bsonRecord.docPtr); + for (auto it = begin; it != end; it++) { + bsonObjPtrs.push_back(&(it->doc)); } // Batch check for duplicate keys in unique indexes status = _indexCatalog.batchCheckDuplicateKey(opCtx, bsonObjPtrs); if (!status.isOK()) { return status; -} + } // Now insert records (primary keys already checked) status = _recordStore->insertRecords(opCtx, &records, ×tamps, _enforceQuota(enforceQuota)); if (!status.isOK()) { return status; -} + } - // Update RecordIds in bsonRecords after insertRecords sets them - for (size_t i = 0; i < bsonRecords.size(); i++) { - bsonRecords[i].id = records[i].id; + // Prepare BsonRecords for indexRecords call (after insertRecords sets RecordIds) + std::vector bsonRecords; + bsonRecords.reserve(count); + int recordIndex = 0; + for (auto it = begin; it != end; it++) { + RecordId loc = records[recordIndex].id; + assert(!loc.isNull()); + BsonRecord bsonRecord = {loc, Timestamp(it->oplogSlot.opTime.getTimestamp()), &(it->doc)}; + bsonRecords.push_back(bsonRecord); + recordIndex++; } int64_t keysInserted; diff --git a/src/mongo/db/modules/eloq/src/eloq_record_store.cpp b/src/mongo/db/modules/eloq/src/eloq_record_store.cpp index e15b2f80f9..cdd78c09aa 100644 --- a/src/mongo/db/modules/eloq/src/eloq_record_store.cpp +++ b/src/mongo/db/modules/eloq/src/eloq_record_store.cpp @@ -843,7 +843,7 @@ Status EloqRecordStore::batchCheckDuplicateKey(OperationContext* opCtx, BatchReadEntry& entry = batchEntries[i]; entry.resetToKey(idObj); - record.id = RecordId{entry.keyString.getBuffer(), entry.keyString.getSize()}; + // record.id = RecordId{entry.keyString.getBuffer(), entry.keyString.getSize()}; batchTuples.emplace_back(txservice::TxKey(entry.mongoKey.get()), &entry.mongoRecord); } @@ -1128,8 +1128,7 @@ Status EloqRecordStore::_insertRecords(OperationContext* opCtx, int64_t totalLength = 0; for (size_t i = 0; i < nRecords; i++) { const auto& record = records[i]; - // we have already update record id on function `batchCheckDuplicateKey` - assert(!record.id.isNull()); + assert(record.id.isNull()); totalLength += record.data.size(); } @@ -1150,6 +1149,7 @@ Status EloqRecordStore::_insertRecords(OperationContext* opCtx, const BSONObj idObj = getIdBSONObjWithoutFieldName(obj); BatchReadEntry& entry = batchEntries[i]; entry.resetToKey(idObj); + record.id = RecordId{entry.keyString.getBuffer(), entry.keyString.getSize()}; } for (size_t i = 0; i < nRecords; i++) { diff --git a/tests/jstests/core/write_result.js b/tests/jstests/core/write_result.js index b4dc8ce7fa..ba681a3d1d 100644 --- a/tests/jstests/core/write_result.js +++ b/tests/jstests/core/write_result.js @@ -163,12 +163,12 @@ coll.remove({}); var id = new ObjectId(); // Second insert fails with duplicate _id printjson(result = coll.insert([{_id: id, foo: "bar"}, {_id: id, foo: "baz"}])); -// assert.eq(result.nInserted, 1); -// assert(result.hasWriteErrors()); -// assert(!result.hasWriteConcernError()); -// assert.eq(coll.count(), 1); -assert.commandFailedWithCode(result, ErrorCodes.DuplicateKey, result.errmsg) -assert.eq(coll.count(), 0); +assert.eq(result.nInserted, 1); +assert(result.hasWriteErrors()); +assert(!result.hasWriteConcernError()); +assert.eq(coll.count(), 1); +// assert.commandFailedWithCode(result, ErrorCodes.DuplicateKey, result.errmsg) +// assert.eq(coll.count(), 0); // // Custom write concern From 094209146c68823129e5727fb35d1c2f0524e2fc Mon Sep 17 00:00:00 2001 From: lokax Date: Tue, 30 Dec 2025 15:14:27 +0800 Subject: [PATCH 11/16] fix update --- src/mongo/db/catalog/collection_impl.cpp | 9 ++ src/mongo/db/index/index_access_method.cpp | 18 ++++ src/mongo/db/index/index_access_method.h | 24 +++++ src/mongo/db/modules/eloq/src/eloq_index.cpp | 96 ++++++++++++++++++++ src/mongo/db/modules/eloq/src/eloq_index.h | 13 +++ src/mongo/db/ops/write_ops_exec.cpp | 9 ++ src/mongo/db/storage/sorted_data_interface.h | 17 ++++ 7 files changed, 186 insertions(+) diff --git a/src/mongo/db/catalog/collection_impl.cpp b/src/mongo/db/catalog/collection_impl.cpp index 1432f4b92a..2bae00476c 100644 --- a/src/mongo/db/catalog/collection_impl.cpp +++ b/src/mongo/db/catalog/collection_impl.cpp @@ -723,6 +723,15 @@ RecordId CollectionImpl::updateDocument(OperationContext* opCtx, updateTicket, entry->getFilterExpression())); } + + // Check for duplicate keys in the added keys of each index before updating the record + ii = _indexCatalog.getIndexIterator(opCtx, true); + while (ii.more()) { + IndexDescriptor* descriptor = ii.next(); + IndexAccessMethod* iam = ii.accessMethod(descriptor); + UpdateTicket* updateTicket = updateTickets.mutableMap()[descriptor]; + uassertStatusOK(iam->checkDuplicateKeysForUpdate(opCtx, *updateTicket)); + } } args->preImageDoc = oldDoc.value().getOwned(); diff --git a/src/mongo/db/index/index_access_method.cpp b/src/mongo/db/index/index_access_method.cpp index 1c3cb4d6b1..0471008294 100644 --- a/src/mongo/db/index/index_access_method.cpp +++ b/src/mongo/db/index/index_access_method.cpp @@ -629,6 +629,24 @@ Status IndexAccessMethod::batchCheckDuplicateKey(OperationContext* opCtx, return Status::OK(); } +Status IndexAccessMethod::checkDuplicateKeysForUpdate(OperationContext* opCtx, + const std::vector& addedKeys, + const RecordId& currentRecordId) { + // Delegate to the underlying SortedDataInterface + SortedDataInterface* sdi = getSortedDataInterface(); + if (sdi) { + return sdi->checkDuplicateKeysForUpdate(opCtx, addedKeys, currentRecordId); + } + // If SortedDataInterface is not available, return OK + return Status::OK(); +} + +Status IndexAccessMethod::checkDuplicateKeysForUpdate(OperationContext* opCtx, + const UpdateTicket& ticket) { + // Extract added keys from UpdateTicket and delegate to the main method + return checkDuplicateKeysForUpdate(opCtx, ticket.added, ticket.loc); +} + } // namespace mongo #include "mongo/db/sorter/sorter.cpp" diff --git a/src/mongo/db/index/index_access_method.h b/src/mongo/db/index/index_access_method.h index 478312a84d..f9cf3b62f1 100644 --- a/src/mongo/db/index/index_access_method.h +++ b/src/mongo/db/index/index_access_method.h @@ -176,6 +176,30 @@ class IndexAccessMethod { virtual Status batchCheckDuplicateKey(OperationContext* opCtx, const std::vector& bsonObjPtrs); + /** + * Check for duplicate keys in UpdateTicket's added keys. + * This is called during update operations to validate that the new keys being added + * don't conflict with existing keys in the index (excluding the current document). + * + * @param opCtx - Operation context + * @param addedKeys - Vector of BSONObj keys that will be added to the index + * @param currentRecordId - RecordId of the document being updated (to exclude from duplicate check) + * @return Status::OK if no duplicates found, ErrorCodes::DuplicateKey if duplicate found + */ + virtual Status checkDuplicateKeysForUpdate(OperationContext* opCtx, + const std::vector& addedKeys, + const RecordId& currentRecordId); + + /** + * Check for duplicate keys in UpdateTicket's added keys. + * Convenience wrapper that extracts added keys from UpdateTicket. + * + * @param opCtx - Operation context + * @param ticket - UpdateTicket containing the added keys to check + * @return Status::OK if no duplicates found, ErrorCodes::DuplicateKey if duplicate found + */ + Status checkDuplicateKeysForUpdate(OperationContext* opCtx, const UpdateTicket& ticket); + /** * this pages in the entire index */ diff --git a/src/mongo/db/modules/eloq/src/eloq_index.cpp b/src/mongo/db/modules/eloq/src/eloq_index.cpp index e40e94248e..e5dd746caf 100644 --- a/src/mongo/db/modules/eloq/src/eloq_index.cpp +++ b/src/mongo/db/modules/eloq/src/eloq_index.cpp @@ -27,6 +27,7 @@ #include "mongo/util/log.h" #include "mongo/db/modules/eloq/src/base/eloq_key.h" +#include "mongo/db/modules/eloq/src/base/eloq_record.h" #include "mongo/db/modules/eloq/src/base/eloq_util.h" #include "mongo/db/modules/eloq/src/eloq_cursor.h" #include "mongo/db/modules/eloq/src/eloq_index.h" @@ -667,6 +668,101 @@ Status EloqIndex::batchCheckDuplicateKey(OperationContext* opCtx, return Status::OK(); } +Status EloqIndex::checkDuplicateKeysForUpdate(OperationContext* opCtx, + const std::vector& addedKeys, + const RecordId& currentRecordId) { + // Only check for unique indexes + if (!unique()) { + return Status::OK(); + } + + if (addedKeys.empty()) { + return Status::OK(); + } + + auto ru = EloqRecoveryUnit::get(opCtx); + const Eloq::MongoKeySchema* keySchema = ru->getIndexSchema(_tableName, _indexName); + if (!keySchema) { + return Status::OK(); // No schema available, skip check + } + + // Build batch for this unique index + // Use vectors to store KeyString buffers and MongoKey objects + std::vector keyStringBuffers; // Store KeyString buffer data + std::vector> mongoKeys; // Store MongoKey objects + std::vector mongoRecords; // Store MongoRecord objects + std::vector indexBatchTuples; + + // Use a set to track keys within this batch to detect duplicates within the batch + BSONObjSet batchKeys = SimpleBSONObjComparator::kInstance.makeBSONObjSet(); + + // For each key to be added + for (const BSONObj& key : addedKeys) { + // Check if this key already exists in the batch + if (batchKeys.find(key) != batchKeys.end()) { + MONGO_LOG(1) << "yf: checkDuplicateKeysForUpdate duplicate key found within batch, index: " + << _indexName.StringView() << ", key: " << key.toString(); + return {ErrorCodes::DuplicateKey, "DuplicateKey"}; + } + batchKeys.insert(key.getOwned()); + + // Convert BSON key to KeyString + KeyString keyString{KeyString::kLatestVersion, key, keySchema->Ordering()}; + + // Store KeyString buffer data + keyStringBuffers.emplace_back(keyString.getBuffer(), keyString.getSize()); + + // Create MongoKey from buffer + auto mongoKey = std::make_unique( + keyStringBuffers.back().data(), keyStringBuffers.back().size()); + mongoKeys.push_back(std::move(mongoKey)); + + // Create MongoRecord + mongoRecords.emplace_back(); + + // Add to batch + indexBatchTuples.emplace_back(txservice::TxKey(mongoKeys.back().get()), + &mongoRecords.back()); + } + + if (!indexBatchTuples.empty()) { + // Use batchGetKV to check all keys + uint64_t keySchemaVersion = keySchema->SchemaTs(); + txservice::TxErrorCode err = ru->batchGetKV( + opCtx, _indexName, keySchemaVersion, indexBatchTuples, true); + if (err != txservice::TxErrorCode::NO_ERROR) { + MONGO_LOG(1) << "EloqIndex::checkDuplicateKeysForUpdate batchGetKV failed for index: " + << _indexName.StringView() << ", error: " << txservice::TxErrorMessage(err); + return TxErrorCodeToMongoStatus(err); + } + + // Check results for duplicates + for (size_t batchIdx = 0; batchIdx < indexBatchTuples.size(); batchIdx++) { + const txservice::ScanBatchTuple& tuple = indexBatchTuples[batchIdx]; + if (tuple.status_ == txservice::RecordStatus::Normal) { + // Key exists in database or writeset + // Check if it's the same RecordId as currentRecordId (which is OK for updates) + Eloq::MongoRecord& record = mongoRecords[batchIdx]; + // In index, RecordId is stored in the value (MongoRecord's EncodedBlobData) + // Extract RecordId from the record value + RecordId existingRecordId = record.ToRecordId(false); + if (existingRecordId != currentRecordId) { + // Duplicate key found with different RecordId + MONGO_LOG(1) << "yf: checkDuplicateKeysForUpdate duplicate key found, index: " + << _indexName.StringView() << ", key = " << keyStringBuffers[batchIdx] + << ", existingRecordId = " << existingRecordId + << ", currentRecordId = " << currentRecordId; + return {ErrorCodes::DuplicateKey, "DuplicateKey"}; + } + } else { + invariant(tuple.status_ == txservice::RecordStatus::Deleted); + } + } + } + + return Status::OK(); +} + // EloqIdIndex std::unique_ptr EloqIdIndex::newCursor(OperationContext* opCtx, bool isForward) const { diff --git a/src/mongo/db/modules/eloq/src/eloq_index.h b/src/mongo/db/modules/eloq/src/eloq_index.h index 854f2f8437..21ec201627 100644 --- a/src/mongo/db/modules/eloq/src/eloq_index.h +++ b/src/mongo/db/modules/eloq/src/eloq_index.h @@ -91,6 +91,19 @@ class EloqIndex : public SortedDataInterface { Status batchCheckDuplicateKey(OperationContext* opCtx, const std::vector& bsonObjPtrs) override; + /** + * Check for duplicate keys in update operations. + * Override from SortedDataInterface. + * + * @param opCtx - Operation context + * @param addedKeys - Vector of BSONObj keys that will be added to the index + * @param currentRecordId - RecordId of the document being updated (to exclude from duplicate check) + * @return Status::OK if no duplicates found, ErrorCodes::DuplicateKey if duplicate found + */ + Status checkDuplicateKeysForUpdate(OperationContext* opCtx, + const std::vector& addedKeys, + const RecordId& currentRecordId) override; + protected: class BulkBuilder; class IdBulkBuilder; diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp index 72daf286c4..ce2aec511b 100644 --- a/src/mongo/db/ops/write_ops_exec.cpp +++ b/src/mongo/db/ops/write_ops_exec.cpp @@ -469,6 +469,11 @@ bool insertBatchAndHandleErrors(OperationContext* opCtx, if (session && session->inMultiDocumentTransaction()) { throw; } + + if (e.code() != ErrorCodes::DuplicateKey) { + throw; + } + MONGO_LOG(0) << "yf: catach, exception = " << e.toString() << ", code = " << e.code(); // Otherwise, ignore this failure and behave as-if we never tried to do the combined batch // insert. The loop below will handle reporting any non-transient errors. @@ -515,6 +520,10 @@ bool insertBatchAndHandleErrors(OperationContext* opCtx, } catch (const DBException& ex) { bool canContinue = handleError( opCtx, ex, wholeOp.getNamespace(), wholeOp.getWriteCommandBase(), out, true); + + if (ex.code() != ErrorCodes::DuplicateKey) { + throw; + } MONGO_LOG(0) << "yf: handleError, exception = " << ex.toString() << ", code = " << ex.code() << ", key = " << it->doc.firstElement().fieldNameStringData() << ", value = " << it->doc.firstElement().toString() ; diff --git a/src/mongo/db/storage/sorted_data_interface.h b/src/mongo/db/storage/sorted_data_interface.h index 4e19973ad0..c0181d3fd9 100644 --- a/src/mongo/db/storage/sorted_data_interface.h +++ b/src/mongo/db/storage/sorted_data_interface.h @@ -139,6 +139,23 @@ class SortedDataInterface { return Status::OK(); } + /** + * Check for duplicate keys in update operations. + * This is called during update operations to validate that the new keys being added + * don't conflict with existing keys in the index (excluding the current document). + * + * @param opCtx - Operation context + * @param addedKeys - Vector of BSONObj keys that will be added to the index + * @param currentRecordId - RecordId of the document being updated (to exclude from duplicate check) + * @return Status::OK if no duplicates found, ErrorCodes::DuplicateKey if duplicate found + */ + virtual Status checkDuplicateKeysForUpdate(OperationContext* opCtx, + const std::vector& addedKeys, + const RecordId& currentRecordId) { + // Default implementation: no-op (for storage engines that don't need this) + return Status::OK(); + } + /** * Attempt to reduce the storage space used by this index via compaction. Only called if the * indexed record store supports compaction-in-place. From 486dfc5a9ded82f0ad23123ec038b85bafb99dcb Mon Sep 17 00:00:00 2001 From: lokax Date: Tue, 30 Dec 2025 15:47:52 +0800 Subject: [PATCH 12/16] fix update --- src/mongo/db/catalog/collection_impl.cpp | 9 + src/mongo/db/modules/eloq/src/eloq_index.cpp | 181 +++++++------------ src/mongo/db/modules/eloq/src/eloq_index.h | 14 ++ 3 files changed, 84 insertions(+), 120 deletions(-) diff --git a/src/mongo/db/catalog/collection_impl.cpp b/src/mongo/db/catalog/collection_impl.cpp index 2bae00476c..a17f03b515 100644 --- a/src/mongo/db/catalog/collection_impl.cpp +++ b/src/mongo/db/catalog/collection_impl.cpp @@ -728,7 +728,16 @@ RecordId CollectionImpl::updateDocument(OperationContext* opCtx, ii = _indexCatalog.getIndexIterator(opCtx, true); while (ii.more()) { IndexDescriptor* descriptor = ii.next(); + IndexCatalogEntry* entry = ii.catalogEntry(descriptor); IndexAccessMethod* iam = ii.accessMethod(descriptor); + if (!entry->isReady(opCtx)) { + continue; + } + + if (!descriptor->unique()) { + continue; + } + UpdateTicket* updateTicket = updateTickets.mutableMap()[descriptor]; uassertStatusOK(iam->checkDuplicateKeysForUpdate(opCtx, *updateTicket)); } diff --git a/src/mongo/db/modules/eloq/src/eloq_index.cpp b/src/mongo/db/modules/eloq/src/eloq_index.cpp index e5dd746caf..6b2dbcf259 100644 --- a/src/mongo/db/modules/eloq/src/eloq_index.cpp +++ b/src/mongo/db/modules/eloq/src/eloq_index.cpp @@ -574,109 +574,12 @@ Status EloqIndex::initAsEmpty(OperationContext* opCtx) { return Status::OK(); } -Status EloqIndex::batchCheckDuplicateKey(OperationContext* opCtx, - const std::vector& bsonObjPtrs) { - // Default implementation: only check for unique indexes - if (!unique()) { - return Status::OK(); - } - - if (bsonObjPtrs.empty()) { - return Status::OK(); - } - - auto ru = EloqRecoveryUnit::get(opCtx); - const Eloq::MongoKeySchema* keySchema = ru->getIndexSchema(_tableName, _indexName); - if (!keySchema) { - return Status::OK(); // No schema available, skip check - } - - // Build batch for this unique index - // Use vectors to store KeyString buffers and MongoKey objects - std::vector keyStringBuffers; // Store KeyString buffer data - std::vector> mongoKeys; // Store MongoKey objects - std::vector mongoRecords; // Store MongoRecord objects - std::vector indexBatchTuples; - - // Use a set to track keys within this batch to detect duplicates within the batch - BSONObjSet batchKeys = SimpleBSONObjComparator::kInstance.makeBSONObjSet(); - - // For each document to be inserted - for (const BSONObj* objPtr : bsonObjPtrs) { - const BSONObj& obj = *objPtr; - - // Extract keys for this index - BSONObjSet keys = SimpleBSONObjComparator::kInstance.makeBSONObjSet(); - MultikeyPaths multikeyPaths; - keySchema->GetKeys(obj, &keys, &multikeyPaths); - - // Check each key for duplicates within this batch - for (const BSONObj& key : keys) { - // Check if this key already exists in the batch - if (batchKeys.find(key) != batchKeys.end()) { - MONGO_LOG(1) << "yf: batchCheckDuplicateKey duplicate key found within batch, index: " - << _indexName.StringView() << ", key: " << key.toString(); - return {ErrorCodes::DuplicateKey, "DuplicateKey"}; - } - batchKeys.insert(key.getOwned()); - - // Convert BSON key to KeyString - KeyString keyString{KeyString::kLatestVersion, key, keySchema->Ordering()}; - - // Store KeyString buffer data - keyStringBuffers.emplace_back(keyString.getBuffer(), keyString.getSize()); - - // Create MongoKey from buffer - auto mongoKey = std::make_unique( - keyStringBuffers.back().data(), keyStringBuffers.back().size()); - mongoKeys.push_back(std::move(mongoKey)); - - // Create MongoRecord - mongoRecords.emplace_back(); - - // Add to batch - indexBatchTuples.emplace_back(txservice::TxKey(mongoKeys.back().get()), - &mongoRecords.back()); - } - } - - if (!indexBatchTuples.empty()) { - // Use batchGetKV to check all keys - uint64_t keySchemaVersion = keySchema->SchemaTs(); - txservice::TxErrorCode err = ru->batchGetKV( - opCtx, _indexName, keySchemaVersion, indexBatchTuples, true); - if (err != txservice::TxErrorCode::NO_ERROR) { - MONGO_LOG(1) << "EloqIndex::batchCheckDuplicateKey batchGetKV failed for index: " - << _indexName.StringView() << ", error: " << txservice::TxErrorMessage(err); - return TxErrorCodeToMongoStatus(err); - } - - // Check results for duplicates - for (size_t batchIdx = 0; batchIdx < indexBatchTuples.size(); batchIdx++) { - const txservice::ScanBatchTuple& tuple = indexBatchTuples[batchIdx]; - if (tuple.status_ == txservice::RecordStatus::Normal) { - // Duplicate key found in database or writeset - MONGO_LOG(1) << "yf: batchCheckDuplicateKey duplicate key found, index: " - << _indexName.StringView() << ", key = " << keyStringBuffers[batchIdx]; - return {ErrorCodes::Error::DuplicateKey, "DuplicateKey"}; - } else { - invariant(tuple.status_ == txservice::RecordStatus::Deleted); - } - } - } - - return Status::OK(); -} - -Status EloqIndex::checkDuplicateKeysForUpdate(OperationContext* opCtx, - const std::vector& addedKeys, +// Internal helper method to check duplicate keys for a vector of already-extracted keys +// If currentRecordId is provided (not null), it will be excluded from duplicate check (for update operations) +Status EloqIndex::_checkDuplicateKeysInternal(OperationContext* opCtx, + const std::vector& keys, const RecordId& currentRecordId) { - // Only check for unique indexes - if (!unique()) { - return Status::OK(); - } - - if (addedKeys.empty()) { + if (keys.empty()) { return Status::OK(); } @@ -696,11 +599,11 @@ Status EloqIndex::checkDuplicateKeysForUpdate(OperationContext* opCtx, // Use a set to track keys within this batch to detect duplicates within the batch BSONObjSet batchKeys = SimpleBSONObjComparator::kInstance.makeBSONObjSet(); - // For each key to be added - for (const BSONObj& key : addedKeys) { + // For each key to be checked + for (const BSONObj& key : keys) { // Check if this key already exists in the batch if (batchKeys.find(key) != batchKeys.end()) { - MONGO_LOG(1) << "yf: checkDuplicateKeysForUpdate duplicate key found within batch, index: " + MONGO_LOG(1) << "yf: _checkDuplicateKeysInternal duplicate key found within batch, index: " << _indexName.StringView() << ", key: " << key.toString(); return {ErrorCodes::DuplicateKey, "DuplicateKey"}; } @@ -731,7 +634,7 @@ Status EloqIndex::checkDuplicateKeysForUpdate(OperationContext* opCtx, txservice::TxErrorCode err = ru->batchGetKV( opCtx, _indexName, keySchemaVersion, indexBatchTuples, true); if (err != txservice::TxErrorCode::NO_ERROR) { - MONGO_LOG(1) << "EloqIndex::checkDuplicateKeysForUpdate batchGetKV failed for index: " + MONGO_LOG(1) << "EloqIndex::_checkDuplicateKeysInternal batchGetKV failed for index: " << _indexName.StringView() << ", error: " << txservice::TxErrorMessage(err); return TxErrorCodeToMongoStatus(err); } @@ -740,20 +643,11 @@ Status EloqIndex::checkDuplicateKeysForUpdate(OperationContext* opCtx, for (size_t batchIdx = 0; batchIdx < indexBatchTuples.size(); batchIdx++) { const txservice::ScanBatchTuple& tuple = indexBatchTuples[batchIdx]; if (tuple.status_ == txservice::RecordStatus::Normal) { - // Key exists in database or writeset - // Check if it's the same RecordId as currentRecordId (which is OK for updates) - Eloq::MongoRecord& record = mongoRecords[batchIdx]; - // In index, RecordId is stored in the value (MongoRecord's EncodedBlobData) - // Extract RecordId from the record value - RecordId existingRecordId = record.ToRecordId(false); - if (existingRecordId != currentRecordId) { - // Duplicate key found with different RecordId - MONGO_LOG(1) << "yf: checkDuplicateKeysForUpdate duplicate key found, index: " - << _indexName.StringView() << ", key = " << keyStringBuffers[batchIdx] - << ", existingRecordId = " << existingRecordId - << ", currentRecordId = " << currentRecordId; - return {ErrorCodes::DuplicateKey, "DuplicateKey"}; - } + // For insert operations, any existing key is a duplicate + MONGO_LOG(1) << "yf: _checkDuplicateKeysInternal duplicate key found, index: " + << _indexName.StringView() << ", key = " << keyStringBuffers[batchIdx]; + return {ErrorCodes::DuplicateKey, "DuplicateKey"}; + } else { invariant(tuple.status_ == txservice::RecordStatus::Deleted); } @@ -763,6 +657,53 @@ Status EloqIndex::checkDuplicateKeysForUpdate(OperationContext* opCtx, return Status::OK(); } +Status EloqIndex::batchCheckDuplicateKey(OperationContext* opCtx, + const std::vector& bsonObjPtrs) { + // Default implementation: only check for unique indexes + if (!unique()) { + return Status::OK(); + } + + if (bsonObjPtrs.empty()) { + return Status::OK(); + } + + auto ru = EloqRecoveryUnit::get(opCtx); + const Eloq::MongoKeySchema* keySchema = ru->getIndexSchema(_tableName, _indexName); + if (!keySchema) { + return Status::OK(); // No schema available, skip check + } + + // Extract keys from documents + std::vector allKeys; + for (const BSONObj* objPtr : bsonObjPtrs) { + const BSONObj& obj = *objPtr; + BSONObjSet keys = SimpleBSONObjComparator::kInstance.makeBSONObjSet(); + MultikeyPaths multikeyPaths; + keySchema->GetKeys(obj, &keys, &multikeyPaths); + + // Add all keys to the vector + for (const BSONObj& key : keys) { + allKeys.push_back(key.getOwned()); + } + } + + // Use the internal helper method with null RecordId (for insert operations) + return _checkDuplicateKeysInternal(opCtx, allKeys, RecordId()); +} + +Status EloqIndex::checkDuplicateKeysForUpdate(OperationContext* opCtx, + const std::vector& addedKeys, + const RecordId& currentRecordId) { + // Only check for unique indexes + if (!unique()) { + return Status::OK(); + } + + // Reuse the internal helper method with currentRecordId (for update operations) + return _checkDuplicateKeysInternal(opCtx, addedKeys, currentRecordId); +} + // EloqIdIndex std::unique_ptr EloqIdIndex::newCursor(OperationContext* opCtx, bool isForward) const { diff --git a/src/mongo/db/modules/eloq/src/eloq_index.h b/src/mongo/db/modules/eloq/src/eloq_index.h index 21ec201627..e1a062ef6d 100644 --- a/src/mongo/db/modules/eloq/src/eloq_index.h +++ b/src/mongo/db/modules/eloq/src/eloq_index.h @@ -104,6 +104,20 @@ class EloqIndex : public SortedDataInterface { const std::vector& addedKeys, const RecordId& currentRecordId) override; +private: + /** + * Internal helper method to check duplicate keys for a vector of already-extracted keys. + * If currentRecordId is provided (not null), it will be excluded from duplicate check (for update operations). + * + * @param opCtx - Operation context + * @param keys - Vector of BSONObj keys that have already been extracted + * @param currentRecordId - RecordId to exclude from duplicate check (null for insert operations) + * @return Status::OK if no duplicates found, ErrorCodes::DuplicateKey if duplicate found + */ + Status _checkDuplicateKeysInternal(OperationContext* opCtx, + const std::vector& keys, + const RecordId& currentRecordId); + protected: class BulkBuilder; class IdBulkBuilder; From 8a14cfdd16bb197b99c8460c2fdda4befdd205dd Mon Sep 17 00:00:00 2001 From: lokax Date: Tue, 30 Dec 2025 17:33:24 +0800 Subject: [PATCH 13/16] fix --- src/mongo/db/catalog/collection_impl.cpp | 2 -- src/mongo/db/ops/write_ops_exec.cpp | 7 ++++--- tests/jstests/core/batch_write_command_insert.js | 3 +-- 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/src/mongo/db/catalog/collection_impl.cpp b/src/mongo/db/catalog/collection_impl.cpp index a17f03b515..6d9215ed1c 100644 --- a/src/mongo/db/catalog/collection_impl.cpp +++ b/src/mongo/db/catalog/collection_impl.cpp @@ -74,8 +74,6 @@ #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" -#include "mongo/db/storage/key_string.h" - namespace mongo { MONGO_REGISTER_SHIM(Collection::makeImpl) diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp index ce2aec511b..29656dc5fe 100644 --- a/src/mongo/db/ops/write_ops_exec.cpp +++ b/src/mongo/db/ops/write_ops_exec.cpp @@ -518,12 +518,13 @@ bool insertBatchAndHandleErrors(OperationContext* opCtx, } }); } catch (const DBException& ex) { - bool canContinue = handleError( - opCtx, ex, wholeOp.getNamespace(), wholeOp.getWriteCommandBase(), out, true); - if (ex.code() != ErrorCodes::DuplicateKey) { throw; } + + bool canContinue = handleError( + opCtx, ex, wholeOp.getNamespace(), wholeOp.getWriteCommandBase(), out, true); + MONGO_LOG(0) << "yf: handleError, exception = " << ex.toString() << ", code = " << ex.code() << ", key = " << it->doc.firstElement().fieldNameStringData() << ", value = " << it->doc.firstElement().toString() ; diff --git a/tests/jstests/core/batch_write_command_insert.js b/tests/jstests/core/batch_write_command_insert.js index e23ddacb37..550a3fe233 100644 --- a/tests/jstests/core/batch_write_command_insert.js +++ b/tests/jstests/core/batch_write_command_insert.js @@ -219,7 +219,6 @@ assert.eq(coll.count(), 1); assert.eq(1, result.writeErrors[0].index); assert.eq('number', typeof result.writeErrors[0].code); assert.eq('string', typeof result.writeErrors[0].errmsg); - assert.eq(2, result.writeErrors[1].index); assert.eq('number', typeof result.writeErrors[1].code); assert.eq('string', typeof result.writeErrors[1].errmsg); @@ -470,4 +469,4 @@ result.writeErrors.forEach(function(err) { assert.eq('number', typeof err.code); assert.eq('string', typeof err.errmsg); }); -assert.eq(coll.count(), 3, "Expected 3 documents (initial + 2 successful inserts)"); +assert.eq(coll.count(), 3, "Expected 3 documents (initial + 2 successful inserts)"); \ No newline at end of file From b76e87a378296ef011b2aa2530525d31b6586bf4 Mon Sep 17 00:00:00 2001 From: lokax Date: Tue, 30 Dec 2025 17:45:45 +0800 Subject: [PATCH 14/16] fix test --- tests/jstests/core/opcounters_write_cmd.js | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/jstests/core/opcounters_write_cmd.js b/tests/jstests/core/opcounters_write_cmd.js index be3bd55224..29c46eca5d 100644 --- a/tests/jstests/core/opcounters_write_cmd.js +++ b/tests/jstests/core/opcounters_write_cmd.js @@ -49,16 +49,16 @@ if (t.getMongo().writeMode() != "compatibility") { opCounters = newdb.serverStatus().opcounters; res = t.insert([{_id: 3}, {_id: 3}, {_id: 4}]); assert.writeError(res); - // assert.eq(opCounters.insert + 2, newdb.serverStatus().opcounters.insert); - assert.eq(opCounters.insert, newdb.serverStatus().opcounters.insert); + assert.eq(opCounters.insert + 2, newdb.serverStatus().opcounters.insert); + // assert.eq(opCounters.insert, newdb.serverStatus().opcounters.insert); // Bulk insert, with error, unordered. var continueOnErrorFlag = 1; opCounters = newdb.serverStatus().opcounters; res = t.insert([{_id: 5}, {_id: 5}, {_id: 6}], continueOnErrorFlag); assert.writeError(res); - // assert.eq(opCounters.insert + 3, newdb.serverStatus().opcounters.insert); - assert.eq(opCounters.insert, newdb.serverStatus().opcounters.insert); + assert.eq(opCounters.insert + 3, newdb.serverStatus().opcounters.insert); + // assert.eq(opCounters.insert, newdb.serverStatus().opcounters.insert); } // // 2. Update. From e9ab45860ccffe4ca32f122beb90315d30c63adb Mon Sep 17 00:00:00 2001 From: lokax Date: Wed, 31 Dec 2025 11:07:14 +0800 Subject: [PATCH 15/16] remove debug log --- src/mongo/db/commands/write_commands/write_commands.cpp | 4 +--- src/mongo/db/modules/eloq/src/eloq_index.cpp | 9 --------- src/mongo/db/modules/eloq/src/eloq_record_store.cpp | 6 ------ src/mongo/db/ops/write_ops_exec.cpp | 5 ----- 4 files changed, 1 insertion(+), 23 deletions(-) diff --git a/src/mongo/db/commands/write_commands/write_commands.cpp b/src/mongo/db/commands/write_commands/write_commands.cpp index e883f4425e..3b46cfee46 100644 --- a/src/mongo/db/commands/write_commands/write_commands.cpp +++ b/src/mongo/db/commands/write_commands/write_commands.cpp @@ -26,8 +26,7 @@ * it in the license file. */ -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage -// #include "mongo/platform/basic.h" + #include "mongo/base/init.h" #include "mongo/bson/mutable/document.h" #include "mongo/bson/mutable/element.h" @@ -55,7 +54,6 @@ #include "mongo/db/stats/counters.h" #include "mongo/db/write_concern.h" #include "mongo/s/stale_exception.h" -#include "mongo/util/log.h" namespace mongo { namespace { diff --git a/src/mongo/db/modules/eloq/src/eloq_index.cpp b/src/mongo/db/modules/eloq/src/eloq_index.cpp index 6b2dbcf259..aed97805df 100644 --- a/src/mongo/db/modules/eloq/src/eloq_index.cpp +++ b/src/mongo/db/modules/eloq/src/eloq_index.cpp @@ -603,8 +603,6 @@ Status EloqIndex::_checkDuplicateKeysInternal(OperationContext* opCtx, for (const BSONObj& key : keys) { // Check if this key already exists in the batch if (batchKeys.find(key) != batchKeys.end()) { - MONGO_LOG(1) << "yf: _checkDuplicateKeysInternal duplicate key found within batch, index: " - << _indexName.StringView() << ", key: " << key.toString(); return {ErrorCodes::DuplicateKey, "DuplicateKey"}; } batchKeys.insert(key.getOwned()); @@ -634,8 +632,6 @@ Status EloqIndex::_checkDuplicateKeysInternal(OperationContext* opCtx, txservice::TxErrorCode err = ru->batchGetKV( opCtx, _indexName, keySchemaVersion, indexBatchTuples, true); if (err != txservice::TxErrorCode::NO_ERROR) { - MONGO_LOG(1) << "EloqIndex::_checkDuplicateKeysInternal batchGetKV failed for index: " - << _indexName.StringView() << ", error: " << txservice::TxErrorMessage(err); return TxErrorCodeToMongoStatus(err); } @@ -644,8 +640,6 @@ Status EloqIndex::_checkDuplicateKeysInternal(OperationContext* opCtx, const txservice::ScanBatchTuple& tuple = indexBatchTuples[batchIdx]; if (tuple.status_ == txservice::RecordStatus::Normal) { // For insert operations, any existing key is a duplicate - MONGO_LOG(1) << "yf: _checkDuplicateKeysInternal duplicate key found, index: " - << _indexName.StringView() << ", key = " << keyStringBuffers[batchIdx]; return {ErrorCodes::DuplicateKey, "DuplicateKey"}; } else { @@ -820,9 +814,6 @@ Status EloqUniqueIndex::insert(OperationContext* opCtx, std::move(mongoRecord), txservice::OperationType::Insert, true); - if (err != txservice::TxErrorCode::NO_ERROR) { - MONGO_LOG(1) << "yf: EloqUniqueIndex::insert setKV failed, index: " << _indexName.StringView() << ", key: " << key.toString() << ", error: " << txservice::TxErrorMessage(err); - } return TxErrorCodeToMongoStatus(err); } diff --git a/src/mongo/db/modules/eloq/src/eloq_record_store.cpp b/src/mongo/db/modules/eloq/src/eloq_record_store.cpp index cdd78c09aa..9f8bb4c4bc 100644 --- a/src/mongo/db/modules/eloq/src/eloq_record_store.cpp +++ b/src/mongo/db/modules/eloq/src/eloq_record_store.cpp @@ -835,8 +835,6 @@ Status EloqRecordStore::batchCheckDuplicateKey(OperationContext* opCtx, // Check if this key already exists in the batch if (batchKeys.find(idObj) != batchKeys.end()) { - MONGO_LOG(1) << "yf: EloqRecordStore::batchCheckDuplicateKey duplicate key found within batch, key: " - << idObj.toString(); return {ErrorCodes::DuplicateKey, "DuplicateKey"}; } batchKeys.insert(idObj.getOwned()); @@ -864,8 +862,6 @@ Status EloqRecordStore::batchCheckDuplicateKey(OperationContext* opCtx, for (size_t i = 0; i < nRecords; i++) { const txservice::ScanBatchTuple& tuple = batchTuples[i]; if (tuple.status_ == txservice::RecordStatus::Normal) { - MONGO_LOG(1) << "yf: EloqRecordStore::batchCheckDuplicateKey duplicate key found, key: " - << batchEntries[i].keyString.toString(); return {ErrorCodes::DuplicateKey, "DuplicateKey"}; } else { invariant(tuple.status_ == txservice::RecordStatus::Deleted); @@ -1163,7 +1159,6 @@ Status EloqRecordStore::_insertRecords(OperationContext* opCtx, mongoRecord->SetUnpackInfo(typeBits.getBuffer(), typeBits.getSize()); } bool checkUnique = true; - MONGO_LOG(1) << "yf: EloqRecordStore::_insertRecords setKV, table: " << _tableName.StringView() << ", txn: " << ru->getTxm()->TxNumber() << ", key: " << ks.toString() << ", checkUnique: " << checkUnique; txservice::TxErrorCode err = ru->setKV(_tableName, pkeySchemaVersion, std::move(mongoKey), @@ -1171,7 +1166,6 @@ Status EloqRecordStore::_insertRecords(OperationContext* opCtx, txservice::OperationType::Insert, checkUnique); if (err != txservice::TxErrorCode::NO_ERROR) { - MONGO_LOG(1) << "yf: _insertRecords setKV failed, table: " << _tableName.StringView() << ", txn: " << ru->getTxm()->TxNumber() << ", key: " << ks.toString() << ", checkUnique: " << checkUnique << ", error: " << txservice::TxErrorMessage(err); return TxErrorCodeToMongoStatus(err); } diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp index 29656dc5fe..ed141fea12 100644 --- a/src/mongo/db/ops/write_ops_exec.cpp +++ b/src/mongo/db/ops/write_ops_exec.cpp @@ -368,7 +368,6 @@ void insertDocuments(OperationContext* opCtx, uassertStatusOK(collection->insertDocuments( opCtx, begin, end, &CurOp::get(opCtx)->debug(), /*enforceQuota*/ true, fromMigrate)); - MONGO_LOG(0) << "yf: insertDocuments, commit"; wuow.commit(); } @@ -474,7 +473,6 @@ bool insertBatchAndHandleErrors(OperationContext* opCtx, throw; } - MONGO_LOG(0) << "yf: catach, exception = " << e.toString() << ", code = " << e.code(); // Otherwise, ignore this failure and behave as-if we never tried to do the combined batch // insert. The loop below will handle reporting any non-transient errors. collection.reset(); @@ -500,7 +498,6 @@ bool insertBatchAndHandleErrors(OperationContext* opCtx, if (!collection) acquireCollection(); lastOpFixer->startingOp(); - MONGO_LOG(0) << "yf: insertDocuments, key = " << it->doc.firstElement().fieldNameStringData() << ", value = " << it->doc.firstElement().toString(); insertDocuments(opCtx, collection->getCollection(), it, it + 1, fromMigrate); lastOpFixer->finishedOpSuccessfully(); SingleWriteResult result; @@ -526,8 +523,6 @@ bool insertBatchAndHandleErrors(OperationContext* opCtx, opCtx, ex, wholeOp.getNamespace(), wholeOp.getWriteCommandBase(), out, true); - MONGO_LOG(0) << "yf: handleError, exception = " << ex.toString() << ", code = " << ex.code() << ", key = " << it->doc.firstElement().fieldNameStringData() << ", value = " << it->doc.firstElement().toString() ; - // Reset RecoveryUnit state if it was set to kFailedUnitOfWork by the nested WriteUnitOfWork // in insertDocuments. This is necessary when continuing to the next iteration after an error. // We only reset the state without aborting the transaction, since the entire insertMany operation From 383294a670e9187bad795dcf042feb1464b4add5 Mon Sep 17 00:00:00 2001 From: lokax Date: Wed, 31 Dec 2025 11:27:40 +0800 Subject: [PATCH 16/16] update submoduke --- src/mongo/db/modules/eloq/data_substrate | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mongo/db/modules/eloq/data_substrate b/src/mongo/db/modules/eloq/data_substrate index ca396ca35f..755c4b7fb9 160000 --- a/src/mongo/db/modules/eloq/data_substrate +++ b/src/mongo/db/modules/eloq/data_substrate @@ -1 +1 @@ -Subproject commit ca396ca35fd5fb04088886de01958a08d97e58a9 +Subproject commit 755c4b7fb98a1411cbe0abde27bab32ffe10a54c