diff --git a/src/mongo/db/catalog/collection_impl.cpp b/src/mongo/db/catalog/collection_impl.cpp index 06dcd6f9a8..6d9215ed1c 100644 --- a/src/mongo/db/catalog/collection_impl.cpp +++ b/src/mongo/db/catalog/collection_impl.cpp @@ -513,21 +513,43 @@ Status CollectionImpl::_insertDocuments(OperationContext* opCtx, Timestamp timestamp = Timestamp(it->oplogSlot.opTime.getTimestamp()); timestamps.push_back(timestamp); } - Status status = - _recordStore->insertRecords(opCtx, &records, ×tamps, _enforceQuota(enforceQuota)); + + // Batch check for duplicate keys before inserting + Status status = _recordStore->batchCheckDuplicateKey(opCtx, &records); if (!status.isOK()) + { return status; + } + // Extract BSONObj pointers for batchCheckDuplicateKey + std::vector bsonObjPtrs; + bsonObjPtrs.reserve(count); + for (auto it = begin; it != end; it++) { + bsonObjPtrs.push_back(&(it->doc)); + } + + // Batch check for duplicate keys in unique indexes + status = _indexCatalog.batchCheckDuplicateKey(opCtx, bsonObjPtrs); + if (!status.isOK()) { + return status; + } + + // Now insert records (primary keys already checked) + status = _recordStore->insertRecords(opCtx, &records, ×tamps, _enforceQuota(enforceQuota)); + if (!status.isOK()) { + return status; + } + + // Prepare BsonRecords for indexRecords call (after insertRecords sets RecordIds) std::vector bsonRecords; bsonRecords.reserve(count); int recordIndex = 0; for (auto it = begin; it != end; it++) { - RecordId loc = records[recordIndex++].id; - // invariant(RecordId::min() < loc); - // invariant(loc < RecordId::max()); - + RecordId loc = records[recordIndex].id; + assert(!loc.isNull()); BsonRecord bsonRecord = {loc, Timestamp(it->oplogSlot.opTime.getTimestamp()), &(it->doc)}; bsonRecords.push_back(bsonRecord); + recordIndex++; } int64_t keysInserted; @@ -699,6 +721,24 @@ RecordId CollectionImpl::updateDocument(OperationContext* opCtx, updateTicket, entry->getFilterExpression())); } + + // Check for duplicate keys in the added keys of each index before updating the record + ii = _indexCatalog.getIndexIterator(opCtx, true); + while (ii.more()) { + IndexDescriptor* descriptor = ii.next(); + IndexCatalogEntry* entry = ii.catalogEntry(descriptor); + IndexAccessMethod* iam = ii.accessMethod(descriptor); + if (!entry->isReady(opCtx)) { + continue; + } + + if (!descriptor->unique()) { + continue; + } + + UpdateTicket* updateTicket = updateTickets.mutableMap()[descriptor]; + uassertStatusOK(iam->checkDuplicateKeysForUpdate(opCtx, *updateTicket)); + } } args->preImageDoc = oldDoc.value().getOwned(); diff --git a/src/mongo/db/catalog/index_catalog.h b/src/mongo/db/catalog/index_catalog.h index d8c30a27f6..f0dea75c76 100644 --- a/src/mongo/db/catalog/index_catalog.h +++ b/src/mongo/db/catalog/index_catalog.h @@ -231,6 +231,9 @@ class IndexCatalog { const std::vector& bsonRecords, int64_t* keysInsertedOut) = 0; + virtual Status batchCheckDuplicateKey(OperationContext* opCtx, + const std::vector& bsonObjPtrs) = 0; + virtual void unindexRecord(OperationContext* opCtx, const BSONObj& obj, const RecordId& loc, @@ -527,6 +530,18 @@ class IndexCatalog { return this->_impl().unindexRecord(opCtx, obj, loc, noWarn, keysDeletedOut); } + /** + * Batch check for duplicate keys in unique indexes before inserting records. + * + * @param opCtx - Operation context + * @param bsonObjPtrs - Vector of pointers to BSON objects to check + * @return Status::OK if no duplicates found, ErrorCodes::DuplicateKey if duplicate found + */ + inline Status batchCheckDuplicateKey(OperationContext* const opCtx, + const std::vector& bsonObjPtrs) { + return this->_impl().batchCheckDuplicateKey(opCtx, bsonObjPtrs); + } + // ------- temp internal ------- inline std::string getAccessMethodName(OperationContext* const opCtx, diff --git a/src/mongo/db/catalog/index_catalog_impl.cpp b/src/mongo/db/catalog/index_catalog_impl.cpp index 629d6a3999..ac476cf7df 100644 --- a/src/mongo/db/catalog/index_catalog_impl.cpp +++ b/src/mongo/db/catalog/index_catalog_impl.cpp @@ -1446,9 +1446,40 @@ Status IndexCatalogImpl::_unindexRecord(OperationContext* opCtx, } +Status IndexCatalogImpl::batchCheckDuplicateKey(OperationContext* opCtx, + const std::vector& bsonObjPtrs) { + // Only check unique indexes for duplicates + for (IndexCatalogEntryContainer::const_iterator i = _entries.begin(); i != _entries.end(); + ++i) { + IndexCatalogEntry* entry = i->get(); + if (!entry->isReady(opCtx)) { + continue; // Skip unfinished indexes + } + + IndexDescriptor* desc = entry->descriptor(); + if (!desc->unique()) { + continue; // Only check unique indexes + } + + IndexAccessMethod* accessMethod = entry->accessMethod(); + if (!accessMethod) { + continue; + } + + // Call batchCheckDuplicateKey on the IndexAccessMethod + // This will delegate to EloqIndex for Eloq storage engine + Status s = accessMethod->batchCheckDuplicateKey(opCtx, bsonObjPtrs); + if (!s.isOK()) { + return s; + } + } + + return Status::OK(); +} + Status IndexCatalogImpl::indexRecords(OperationContext* opCtx, - const std::vector& bsonRecords, - int64_t* keysInsertedOut) { + const std::vector& bsonRecords, + int64_t* keysInsertedOut) { if (keysInsertedOut) { *keysInsertedOut = 0; } diff --git a/src/mongo/db/catalog/index_catalog_impl.h b/src/mongo/db/catalog/index_catalog_impl.h index 9c751c1769..325102a37c 100644 --- a/src/mongo/db/catalog/index_catalog_impl.h +++ b/src/mongo/db/catalog/index_catalog_impl.h @@ -341,6 +341,9 @@ class IndexCatalogImpl : public IndexCatalog::Impl { const std::vector& bsonRecords, int64_t* keysInsertedOut) override; + Status batchCheckDuplicateKey(OperationContext* opCtx, + const std::vector& bsonObjPtrs); + /** * When 'keysDeletedOut' is not null, it will be set to the number of index keys removed by * this operation. diff --git a/src/mongo/db/commands/write_commands/write_commands.cpp b/src/mongo/db/commands/write_commands/write_commands.cpp index aa2f727a03..3b46cfee46 100644 --- a/src/mongo/db/commands/write_commands/write_commands.cpp +++ b/src/mongo/db/commands/write_commands/write_commands.cpp @@ -26,6 +26,7 @@ * it in the license file. */ + #include "mongo/base/init.h" #include "mongo/bson/mutable/document.h" #include "mongo/bson/mutable/element.h" @@ -87,8 +88,9 @@ void serializeReply(OperationContext* opCtx, size_t opsInBatch, WriteResult result, BSONObjBuilder* out) { - if (shouldSkipOutput(opCtx)) + if (shouldSkipOutput(opCtx)) { return; + } if (continueOnError && !result.results.empty()) { const auto& lastResult = result.results.back(); @@ -301,6 +303,7 @@ class CmdInsert final : public WriteCommand { void runImpl(OperationContext* opCtx, BSONObjBuilder& result) const override { auto reply = performInserts(opCtx, _batch); + serializeReply(opCtx, ReplyStyle::kNotUpdate, !_batch.getWriteCommandBase().getOrdered(), diff --git a/src/mongo/db/index/index_access_method.cpp b/src/mongo/db/index/index_access_method.cpp index 7dbaa346da..0471008294 100644 --- a/src/mongo/db/index/index_access_method.cpp +++ b/src/mongo/db/index/index_access_method.cpp @@ -618,6 +618,35 @@ bool IndexAccessMethod::BulkBuilder::isMultikey() const { return _everGeneratedMultipleKeys || isMultikeyFromPaths(_indexMultikeyPaths); } +Status IndexAccessMethod::batchCheckDuplicateKey(OperationContext* opCtx, + const std::vector& bsonObjPtrs) { + // Delegate to the underlying SortedDataInterface + SortedDataInterface* sdi = getSortedDataInterface(); + if (sdi) { + return sdi->batchCheckDuplicateKey(opCtx, bsonObjPtrs); + } + // If SortedDataInterface is not available, return OK + return Status::OK(); +} + +Status IndexAccessMethod::checkDuplicateKeysForUpdate(OperationContext* opCtx, + const std::vector& addedKeys, + const RecordId& currentRecordId) { + // Delegate to the underlying SortedDataInterface + SortedDataInterface* sdi = getSortedDataInterface(); + if (sdi) { + return sdi->checkDuplicateKeysForUpdate(opCtx, addedKeys, currentRecordId); + } + // If SortedDataInterface is not available, return OK + return Status::OK(); +} + +Status IndexAccessMethod::checkDuplicateKeysForUpdate(OperationContext* opCtx, + const UpdateTicket& ticket) { + // Extract added keys from UpdateTicket and delegate to the main method + return checkDuplicateKeysForUpdate(opCtx, ticket.added, ticket.loc); +} + } // namespace mongo #include "mongo/db/sorter/sorter.cpp" diff --git a/src/mongo/db/index/index_access_method.h b/src/mongo/db/index/index_access_method.h index bbf651bed3..f9cf3b62f1 100644 --- a/src/mongo/db/index/index_access_method.h +++ b/src/mongo/db/index/index_access_method.h @@ -156,6 +156,50 @@ class IndexAccessMethod { */ Status touch(OperationContext* opCtx, const BSONObj& obj); + /** + * Get the underlying SortedDataInterface for this index. + * This allows storage engine specific implementations (like EloqIndex) to be accessed. + * + * @return Pointer to the SortedDataInterface, or nullptr if not available + */ + SortedDataInterface* getSortedDataInterface() const { + return _newInterface.get(); + } + + /** + * Batch check for duplicate keys before inserting records. + * + * @param opCtx - Operation context + * @param bsonObjPtrs - Vector of pointers to BSON objects to check + * @return Status::OK if no duplicates found, ErrorCodes::DuplicateKey if duplicate found + */ + virtual Status batchCheckDuplicateKey(OperationContext* opCtx, + const std::vector& bsonObjPtrs); + + /** + * Check for duplicate keys in UpdateTicket's added keys. + * This is called during update operations to validate that the new keys being added + * don't conflict with existing keys in the index (excluding the current document). + * + * @param opCtx - Operation context + * @param addedKeys - Vector of BSONObj keys that will be added to the index + * @param currentRecordId - RecordId of the document being updated (to exclude from duplicate check) + * @return Status::OK if no duplicates found, ErrorCodes::DuplicateKey if duplicate found + */ + virtual Status checkDuplicateKeysForUpdate(OperationContext* opCtx, + const std::vector& addedKeys, + const RecordId& currentRecordId); + + /** + * Check for duplicate keys in UpdateTicket's added keys. + * Convenience wrapper that extracts added keys from UpdateTicket. + * + * @param opCtx - Operation context + * @param ticket - UpdateTicket containing the added keys to check + * @return Status::OK if no duplicates found, ErrorCodes::DuplicateKey if duplicate found + */ + Status checkDuplicateKeysForUpdate(OperationContext* opCtx, const UpdateTicket& ticket); + /** * this pages in the entire index */ diff --git a/src/mongo/db/modules/eloq/data_substrate b/src/mongo/db/modules/eloq/data_substrate index ca396ca35f..755c4b7fb9 160000 --- a/src/mongo/db/modules/eloq/data_substrate +++ b/src/mongo/db/modules/eloq/data_substrate @@ -1 +1 @@ -Subproject commit ca396ca35fd5fb04088886de01958a08d97e58a9 +Subproject commit 755c4b7fb98a1411cbe0abde27bab32ffe10a54c diff --git a/src/mongo/db/modules/eloq/src/eloq_index.cpp b/src/mongo/db/modules/eloq/src/eloq_index.cpp index bad826e240..aed97805df 100644 --- a/src/mongo/db/modules/eloq/src/eloq_index.cpp +++ b/src/mongo/db/modules/eloq/src/eloq_index.cpp @@ -27,6 +27,7 @@ #include "mongo/util/log.h" #include "mongo/db/modules/eloq/src/base/eloq_key.h" +#include "mongo/db/modules/eloq/src/base/eloq_record.h" #include "mongo/db/modules/eloq/src/base/eloq_util.h" #include "mongo/db/modules/eloq/src/eloq_cursor.h" #include "mongo/db/modules/eloq/src/eloq_index.h" @@ -573,6 +574,130 @@ Status EloqIndex::initAsEmpty(OperationContext* opCtx) { return Status::OK(); } +// Internal helper method to check duplicate keys for a vector of already-extracted keys +// If currentRecordId is provided (not null), it will be excluded from duplicate check (for update operations) +Status EloqIndex::_checkDuplicateKeysInternal(OperationContext* opCtx, + const std::vector& keys, + const RecordId& currentRecordId) { + if (keys.empty()) { + return Status::OK(); + } + + auto ru = EloqRecoveryUnit::get(opCtx); + const Eloq::MongoKeySchema* keySchema = ru->getIndexSchema(_tableName, _indexName); + if (!keySchema) { + return Status::OK(); // No schema available, skip check + } + + // Build batch for this unique index + // Use vectors to store KeyString buffers and MongoKey objects + std::vector keyStringBuffers; // Store KeyString buffer data + std::vector> mongoKeys; // Store MongoKey objects + std::vector mongoRecords; // Store MongoRecord objects + std::vector indexBatchTuples; + + // Use a set to track keys within this batch to detect duplicates within the batch + BSONObjSet batchKeys = SimpleBSONObjComparator::kInstance.makeBSONObjSet(); + + // For each key to be checked + for (const BSONObj& key : keys) { + // Check if this key already exists in the batch + if (batchKeys.find(key) != batchKeys.end()) { + return {ErrorCodes::DuplicateKey, "DuplicateKey"}; + } + batchKeys.insert(key.getOwned()); + + // Convert BSON key to KeyString + KeyString keyString{KeyString::kLatestVersion, key, keySchema->Ordering()}; + + // Store KeyString buffer data + keyStringBuffers.emplace_back(keyString.getBuffer(), keyString.getSize()); + + // Create MongoKey from buffer + auto mongoKey = std::make_unique( + keyStringBuffers.back().data(), keyStringBuffers.back().size()); + mongoKeys.push_back(std::move(mongoKey)); + + // Create MongoRecord + mongoRecords.emplace_back(); + + // Add to batch + indexBatchTuples.emplace_back(txservice::TxKey(mongoKeys.back().get()), + &mongoRecords.back()); + } + + if (!indexBatchTuples.empty()) { + // Use batchGetKV to check all keys + uint64_t keySchemaVersion = keySchema->SchemaTs(); + txservice::TxErrorCode err = ru->batchGetKV( + opCtx, _indexName, keySchemaVersion, indexBatchTuples, true); + if (err != txservice::TxErrorCode::NO_ERROR) { + return TxErrorCodeToMongoStatus(err); + } + + // Check results for duplicates + for (size_t batchIdx = 0; batchIdx < indexBatchTuples.size(); batchIdx++) { + const txservice::ScanBatchTuple& tuple = indexBatchTuples[batchIdx]; + if (tuple.status_ == txservice::RecordStatus::Normal) { + // For insert operations, any existing key is a duplicate + return {ErrorCodes::DuplicateKey, "DuplicateKey"}; + + } else { + invariant(tuple.status_ == txservice::RecordStatus::Deleted); + } + } + } + + return Status::OK(); +} + +Status EloqIndex::batchCheckDuplicateKey(OperationContext* opCtx, + const std::vector& bsonObjPtrs) { + // Default implementation: only check for unique indexes + if (!unique()) { + return Status::OK(); + } + + if (bsonObjPtrs.empty()) { + return Status::OK(); + } + + auto ru = EloqRecoveryUnit::get(opCtx); + const Eloq::MongoKeySchema* keySchema = ru->getIndexSchema(_tableName, _indexName); + if (!keySchema) { + return Status::OK(); // No schema available, skip check + } + + // Extract keys from documents + std::vector allKeys; + for (const BSONObj* objPtr : bsonObjPtrs) { + const BSONObj& obj = *objPtr; + BSONObjSet keys = SimpleBSONObjComparator::kInstance.makeBSONObjSet(); + MultikeyPaths multikeyPaths; + keySchema->GetKeys(obj, &keys, &multikeyPaths); + + // Add all keys to the vector + for (const BSONObj& key : keys) { + allKeys.push_back(key.getOwned()); + } + } + + // Use the internal helper method with null RecordId (for insert operations) + return _checkDuplicateKeysInternal(opCtx, allKeys, RecordId()); +} + +Status EloqIndex::checkDuplicateKeysForUpdate(OperationContext* opCtx, + const std::vector& addedKeys, + const RecordId& currentRecordId) { + // Only check for unique indexes + if (!unique()) { + return Status::OK(); + } + + // Reuse the internal helper method with currentRecordId (for update operations) + return _checkDuplicateKeysInternal(opCtx, addedKeys, currentRecordId); +} + // EloqIdIndex std::unique_ptr EloqIdIndex::newCursor(OperationContext* opCtx, bool isForward) const { @@ -666,7 +791,8 @@ Status EloqUniqueIndex::insert(OperationContext* opCtx, auto mongoKey = std::make_unique(keyString.getBuffer(), keyString.getSize()); auto mongoRecord = std::make_unique(); uint64_t keySchemaVersion = ru->getIndexSchema(_tableName, _indexName)->SchemaTs(); - + + /* auto [exists, err] = ru->getKV(opCtx, _indexName, keySchemaVersion, mongoKey.get(), mongoRecord.get(), true); if (err != txservice::TxErrorCode::NO_ERROR) { @@ -676,18 +802,19 @@ Status EloqUniqueIndex::insert(OperationContext* opCtx, if (exists) { return {ErrorCodes::Error::DuplicateKey, "Duplicate Key: " + _indexName.String()}; } + */ mongoRecord->SetEncodedBlob(valueItem); if (const auto& typeBits = keyString.getTypeBits(); !typeBits.isAllZeros()) { mongoRecord->SetUnpackInfo(typeBits.getBuffer(), typeBits.getSize()); } - err = ru->setKV(_indexName, + auto err = ru->setKV(_indexName, keySchemaVersion, std::move(mongoKey), std::move(mongoRecord), txservice::OperationType::Insert, true); - + return TxErrorCodeToMongoStatus(err); } diff --git a/src/mongo/db/modules/eloq/src/eloq_index.h b/src/mongo/db/modules/eloq/src/eloq_index.h index 8ca8945b0c..e1a062ef6d 100644 --- a/src/mongo/db/modules/eloq/src/eloq_index.h +++ b/src/mongo/db/modules/eloq/src/eloq_index.h @@ -22,6 +22,9 @@ #include "mongo/db/record_id.h" #include "mongo/db/storage/key_string.h" #include "mongo/db/storage/sorted_data_interface.h" +#include "mongo/db/catalog/index_catalog.h" +#include "mongo/db/index/multikey_paths.h" +#include "mongo/bson/simple_bsonobj_comparator.h" #include "mongo/db/modules/eloq/data_substrate/tx_service/include/type.h" @@ -77,6 +80,44 @@ class EloqIndex : public SortedDataInterface { virtual bool unique() const = 0; + /** + * Batch check for duplicate keys before inserting records. + * Override from SortedDataInterface. + * + * @param opCtx - Operation context + * @param bsonObjPtrs - Vector of pointers to BSON objects to check + * @return Status::OK if no duplicates found, ErrorCodes::DuplicateKey if duplicate found + */ + Status batchCheckDuplicateKey(OperationContext* opCtx, + const std::vector& bsonObjPtrs) override; + + /** + * Check for duplicate keys in update operations. + * Override from SortedDataInterface. + * + * @param opCtx - Operation context + * @param addedKeys - Vector of BSONObj keys that will be added to the index + * @param currentRecordId - RecordId of the document being updated (to exclude from duplicate check) + * @return Status::OK if no duplicates found, ErrorCodes::DuplicateKey if duplicate found + */ + Status checkDuplicateKeysForUpdate(OperationContext* opCtx, + const std::vector& addedKeys, + const RecordId& currentRecordId) override; + +private: + /** + * Internal helper method to check duplicate keys for a vector of already-extracted keys. + * If currentRecordId is provided (not null), it will be excluded from duplicate check (for update operations). + * + * @param opCtx - Operation context + * @param keys - Vector of BSONObj keys that have already been extracted + * @param currentRecordId - RecordId to exclude from duplicate check (null for insert operations) + * @return Status::OK if no duplicates found, ErrorCodes::DuplicateKey if duplicate found + */ + Status _checkDuplicateKeysInternal(OperationContext* opCtx, + const std::vector& keys, + const RecordId& currentRecordId); + protected: class BulkBuilder; class IdBulkBuilder; diff --git a/src/mongo/db/modules/eloq/src/eloq_record_store.cpp b/src/mongo/db/modules/eloq/src/eloq_record_store.cpp index c0d4fbc947..9f8bb4c4bc 100644 --- a/src/mongo/db/modules/eloq/src/eloq_record_store.cpp +++ b/src/mongo/db/modules/eloq/src/eloq_record_store.cpp @@ -57,6 +57,18 @@ bvar::LatencyRecorder bVarUpdateRecord("update_record"); namespace mongo { +struct BatchReadEntry { + BatchReadEntry() : keyString(KeyString::kLatestVersion) {} + void resetToKey(const BSONObj& idObj) { + keyString.resetToKey(idObj, kIdOrdering); + mongoKey = std::make_unique(keyString); + } + + KeyString keyString; + std::unique_ptr mongoKey; + Eloq::MongoRecord mongoRecord; +}; + class EloqCatalogRecordStoreCursor : public SeekableRecordCursor { public: explicit EloqCatalogRecordStoreCursor(OperationContext* opCtx) @@ -796,10 +808,73 @@ StatusWith EloqRecordStore::insertRecord( return {record.id}; } +Status EloqRecordStore::batchCheckDuplicateKey(OperationContext* opCtx, + std::vector* records) { + MONGO_LOG(1) << "EloqRecordStore::batchCheckDuplicateKey, nRecords: " << records->size(); + + if (records->empty()) { + return Status::OK(); + } + + size_t nRecords = records->size(); + auto batchEntries = std::make_unique(nRecords); + std::vector batchTuples; + batchTuples.reserve(nRecords); + + // Use a set to track keys within this batch to detect duplicates within the batch + BSONObjSet batchKeys = SimpleBSONObjComparator::kInstance.makeBSONObjSet(); + + for (size_t i = 0; i < nRecords; i++) { + Record& record = (*records)[i]; + BSONObj obj{record.data.data()}; + const BSONObj idObj = getIdBSONObjWithoutFieldName(obj); + Status s = checkKeySize(idObj, "RecordStore"); + if (!s.isOK()) { + return s; + } + + // Check if this key already exists in the batch + if (batchKeys.find(idObj) != batchKeys.end()) { + return {ErrorCodes::DuplicateKey, "DuplicateKey"}; + } + batchKeys.insert(idObj.getOwned()); + + BatchReadEntry& entry = batchEntries[i]; + entry.resetToKey(idObj); + // record.id = RecordId{entry.keyString.getBuffer(), entry.keyString.getSize()}; + batchTuples.emplace_back(txservice::TxKey(entry.mongoKey.get()), &entry.mongoRecord); + } + + auto ru = EloqRecoveryUnit::get(opCtx); + const EloqRecoveryUnit::DiscoveredTable& table = ru->discoveredTable(_tableName); + uint64_t pkeySchemaVersion = table._schema->KeySchema()->SchemaTs(); + + // Check all keys for duplicates + txservice::TxErrorCode err = + ru->batchGetKV(opCtx, _tableName, pkeySchemaVersion, batchTuples, true); + if (err != txservice::TxErrorCode::NO_ERROR) { + MONGO_LOG(1) << "EloqRecordStore::batchCheckDuplicateKey batchGetKV failed, table: " + << _tableName.StringView() << ", error: " << txservice::TxErrorMessage(err); + return TxErrorCodeToMongoStatus(err); + } + + // Check results for duplicates + for (size_t i = 0; i < nRecords; i++) { + const txservice::ScanBatchTuple& tuple = batchTuples[i]; + if (tuple.status_ == txservice::RecordStatus::Normal) { + return {ErrorCodes::DuplicateKey, "DuplicateKey"}; + } else { + invariant(tuple.status_ == txservice::RecordStatus::Deleted); + } + } + + return Status::OK(); +} + Status EloqRecordStore::insertRecords(OperationContext* opCtx, - std::vector* records, - std::vector* timestamps, - bool enforceQuota) { + std::vector* records, + std::vector* timestamps, + bool enforceQuota) { MONGO_LOG(1) << "EloqRecordStore::insertRecords"; return _insertRecords(opCtx, records->data(), timestamps->data(), records->size()); } @@ -1035,18 +1110,6 @@ void EloqRecordStore::waitForAllEarlierOplogWritesToBeVisible(OperationContext* // } -struct BatchReadEntry { - BatchReadEntry() : keyString(KeyString::kLatestVersion) {} - void resetToKey(const BSONObj& idObj) { - keyString.resetToKey(idObj, kIdOrdering); - mongoKey = std::make_unique(keyString); - } - - KeyString keyString; - std::unique_ptr mongoKey; - Eloq::MongoRecord mongoRecord; -}; - Status EloqRecordStore::_insertRecords(OperationContext* opCtx, Record* records, const Timestamp* timestamps, @@ -1069,46 +1132,23 @@ Status EloqRecordStore::_insertRecords(OperationContext* opCtx, return {ErrorCodes::BadValue, "object to insert exceeds cappedMaxSize"}; } + auto ru = EloqRecoveryUnit::get(opCtx); + MONGO_LOG(1) << "Insert into a Data Table."; + const EloqRecoveryUnit::DiscoveredTable& table = ru->discoveredTable(_tableName); + uint64_t pkeySchemaVersion = table._schema->KeySchema()->SchemaTs(); + // All keys are valid (no duplicates found). Now insert all records. + // Rebuild batchEntries for insertion auto batchEntries = std::make_unique(nRecords); - std::vector batchTuples; - batchTuples.reserve(nRecords); for (size_t i = 0; i < nRecords; i++) { Record& record = records[i]; BSONObj obj{record.data.data()}; const BSONObj idObj = getIdBSONObjWithoutFieldName(obj); - MONGO_LOG(1) << idObj.jsonString(); - Status s = checkKeySize(idObj, "RecordStore"); - if (!s.isOK()) { - return s; - } - BatchReadEntry& entry = batchEntries[i]; entry.resetToKey(idObj); record.id = RecordId{entry.keyString.getBuffer(), entry.keyString.getSize()}; - batchTuples.emplace_back(txservice::TxKey(entry.mongoKey.get()), &entry.mongoRecord); - } - - auto ru = EloqRecoveryUnit::get(opCtx); - MONGO_LOG(1) << "Insert into a Data Table."; - const EloqRecoveryUnit::DiscoveredTable& table = ru->discoveredTable(_tableName); - uint64_t pkeySchemaVersion = table._schema->KeySchema()->SchemaTs(); - txservice::TxErrorCode err = - ru->batchGetKV(opCtx, _tableName, pkeySchemaVersion, batchTuples, true); - if (err != txservice::TxErrorCode::NO_ERROR) { - MONGO_LOG(1) << "EloqRecordStore::_insertRecords batchGetKV failed, table: " - << _tableName.StringView() << ", txn: " << ru->getTxm()->TxNumber() - << txservice::TxErrorMessage(err); - return TxErrorCodeToMongoStatus(err); } for (size_t i = 0; i < nRecords; i++) { - const txservice::ScanBatchTuple& tuple = batchTuples[i]; - if (tuple.status_ == txservice::RecordStatus::Normal) { - return {ErrorCodes::DuplicateKey, "DuplicateKey"}; - } else { - invariant(tuple.status_ == txservice::RecordStatus::Deleted); - } - std::unique_ptr& mongoKey = batchEntries[i].mongoKey; std::unique_ptr mongoRecord = std::make_unique(); const RecordData& data = records[i].data; @@ -1119,7 +1159,7 @@ Status EloqRecordStore::_insertRecords(OperationContext* opCtx, mongoRecord->SetUnpackInfo(typeBits.getBuffer(), typeBits.getSize()); } bool checkUnique = true; - err = ru->setKV(_tableName, + txservice::TxErrorCode err = ru->setKV(_tableName, pkeySchemaVersion, std::move(mongoKey), std::move(mongoRecord), diff --git a/src/mongo/db/modules/eloq/src/eloq_record_store.h b/src/mongo/db/modules/eloq/src/eloq_record_store.h index d1227d0685..eb7f310a30 100644 --- a/src/mongo/db/modules/eloq/src/eloq_record_store.h +++ b/src/mongo/db/modules/eloq/src/eloq_record_store.h @@ -237,6 +237,9 @@ class EloqRecordStore final : public RecordStore { std::vector* timestamps, bool enforceQuota) override; + Status batchCheckDuplicateKey(OperationContext* opCtx, + std::vector* records) override; + Status insertRecordsWithDocWriter(OperationContext* opCtx, const DocWriter* const* docs, diff --git a/src/mongo/db/operation_context.cpp b/src/mongo/db/operation_context.cpp index 60f146ffb3..f7fc4ce5f6 100644 --- a/src/mongo/db/operation_context.cpp +++ b/src/mongo/db/operation_context.cpp @@ -482,6 +482,10 @@ WriteUnitOfWork::RecoveryUnitState OperationContext::getRecoveryUnitState() cons return _ruState; } +void OperationContext::setRecoveryUnitState(WriteUnitOfWork::RecoveryUnitState state) { + _ruState = state; +} + void OperationContext::resetLockState() { _locker->reset(); } diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h index b1c51935bf..14dc2b6ad0 100644 --- a/src/mongo/db/operation_context.h +++ b/src/mongo/db/operation_context.h @@ -117,6 +117,13 @@ class OperationContext : public Decorable { WriteUnitOfWork::RecoveryUnitState getRecoveryUnitState() const; + /** + * Sets the RecoveryUnitState without aborting the transaction. + * This is used when we need to reset state after a nested WriteUnitOfWork failure + * but want to keep the outer transaction active (e.g., command-level transactions). + */ + void setRecoveryUnitState(WriteUnitOfWork::RecoveryUnitState state); + // WriteUnitOfWork::RecoveryUnitState setRecoveryUnit(RecoveryUnit::UPtr unit, // WriteUnitOfWork::RecoveryUnitState state); /** diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp index 783d0d3e7a..ed141fea12 100644 --- a/src/mongo/db/ops/write_ops_exec.cpp +++ b/src/mongo/db/ops/write_ops_exec.cpp @@ -226,7 +226,8 @@ bool handleError(OperationContext* opCtx, const DBException& ex, const NamespaceString& nss, const write_ops::WriteCommandBase& wholeOp, - WriteResult* out) { + WriteResult* out, + bool allowCommandLevelTransaction = false) { LastError::get(opCtx->getClient()).setLastError(ex.code(), ex.reason()); auto& curOp = *CurOp::get(opCtx); curOp.debug().errInfo = ex.toStatus(); @@ -242,7 +243,9 @@ bool handleError(OperationContext* opCtx, } // EloqDoc enables command level transaction. - if (opCtx->lockState()->inAWriteUnitOfWork()) { + // For insert operations, allow error handling in command-level transactions. + // For update/delete operations, keep the original behavior (throw). + if (!allowCommandLevelTransaction && opCtx->lockState()->inAWriteUnitOfWork()) { throw; } @@ -456,17 +459,33 @@ bool insertBatchAndHandleErrors(OperationContext* opCtx, curOp.debug().additiveMetrics.incrementNinserted(batch.size()); return true; } - } catch (const DBException&) { + } catch (const DBException&e) { // If we cannot abandon the current snapshot, we give up and rethrow the exception. - // No WCE retrying is attempted. This code path is intended for snapshot read concern. - if (opCtx->lockState()->inAWriteUnitOfWork()) { + // No WCE retrying is attempted. This code path is intended for snapshot read concern, + // which is only used in multi-document transactions. + auto session = OperationContextSession::get(opCtx); + if (session && session->inMultiDocumentTransaction()) { + throw; + } + + if (e.code() != ErrorCodes::DuplicateKey) { throw; } // Otherwise, ignore this failure and behave as-if we never tried to do the combined batch // insert. The loop below will handle reporting any non-transient errors. collection.reset(); + + // Reset RecoveryUnit state if it was set to kFailedUnitOfWork by the nested WriteUnitOfWork + // in insertDocuments. This is necessary because the nested WriteUnitOfWork's destructor + // sets the state to kFailedUnitOfWork when it aborts, but we need kActiveUnitOfWork for + // the fallback single-insert operations. Since we're in a command-level transaction (outer + // WriteUnitOfWork still exists), we only reset the state without aborting the transaction. + if (opCtx->getRecoveryUnitState() == WriteUnitOfWork::RecoveryUnitState::kFailedUnitOfWork) { + //opCtx->recoveryUnit()->abandonSnapshot(); // Clear any failed snapshot state + opCtx->setRecoveryUnitState(WriteUnitOfWork::RecoveryUnitState::kActiveUnitOfWork); + } } // Try to insert the batch one-at-a-time. This path is executed both for singular batches, and @@ -496,8 +515,23 @@ bool insertBatchAndHandleErrors(OperationContext* opCtx, } }); } catch (const DBException& ex) { - bool canContinue = - handleError(opCtx, ex, wholeOp.getNamespace(), wholeOp.getWriteCommandBase(), out); + if (ex.code() != ErrorCodes::DuplicateKey) { + throw; + } + + bool canContinue = handleError( + opCtx, ex, wholeOp.getNamespace(), wholeOp.getWriteCommandBase(), out, true); + + + // Reset RecoveryUnit state if it was set to kFailedUnitOfWork by the nested WriteUnitOfWork + // in insertDocuments. This is necessary when continuing to the next iteration after an error. + // We only reset the state without aborting the transaction, since the entire insertMany operation + // should be within a single command-level transaction. + if (opCtx->getRecoveryUnitState() == WriteUnitOfWork::RecoveryUnitState::kFailedUnitOfWork) { + // opCtx->recoveryUnit()->abandonSnapshot(); + opCtx->setRecoveryUnitState(WriteUnitOfWork::RecoveryUnitState::kActiveUnitOfWork); + } + if (!canContinue) return false; } @@ -616,7 +650,7 @@ WriteResult performInserts(OperationContext* opCtx, MONGO_UNREACHABLE; } catch (const DBException& ex) { canContinue = handleError( - opCtx, ex, wholeOp.getNamespace(), wholeOp.getWriteCommandBase(), &out); + opCtx, ex, wholeOp.getNamespace(), wholeOp.getWriteCommandBase(), &out, true); } } diff --git a/src/mongo/db/storage/record_store.h b/src/mongo/db/storage/record_store.h index f1c6700027..cc2024d149 100644 --- a/src/mongo/db/storage/record_store.h +++ b/src/mongo/db/storage/record_store.h @@ -417,6 +417,20 @@ class RecordStore { size_t nDocs, RecordId* idsOut = nullptr) = 0; + /** + * Batch check for duplicate keys before inserting records. + * This checks primary keys (RecordId) for duplicates. + * + * @param opCtx - Operation context + * @param records - Vector of records to check (input/output: RecordId may be set) + * @return Status::OK if no duplicates found, ErrorCodes::DuplicateKey if duplicate found + */ + virtual Status batchCheckDuplicateKey(OperationContext* opCtx, + std::vector* records) { + // Default implementation: no-op (for storage engines that don't need this) + return Status::OK(); + } + /** * A thin wrapper around insertRecordsWithDocWriter() to simplify handling of single DocWriters. */ diff --git a/src/mongo/db/storage/sorted_data_interface.h b/src/mongo/db/storage/sorted_data_interface.h index 556f6520ae..c0181d3fd9 100644 --- a/src/mongo/db/storage/sorted_data_interface.h +++ b/src/mongo/db/storage/sorted_data_interface.h @@ -126,6 +126,36 @@ class SortedDataInterface { const BSONObj& key, const RecordId& loc) = 0; + /** + * Batch check for duplicate keys before inserting records. + * + * @param opCtx - Operation context + * @param bsonObjPtrs - Vector of pointers to BSON objects to check + * @return Status::OK if no duplicates found, ErrorCodes::DuplicateKey if duplicate found + */ + virtual Status batchCheckDuplicateKey(OperationContext* opCtx, + const std::vector& bsonObjPtrs) { + // Default implementation: no-op (for storage engines that don't need batch checking) + return Status::OK(); + } + + /** + * Check for duplicate keys in update operations. + * This is called during update operations to validate that the new keys being added + * don't conflict with existing keys in the index (excluding the current document). + * + * @param opCtx - Operation context + * @param addedKeys - Vector of BSONObj keys that will be added to the index + * @param currentRecordId - RecordId of the document being updated (to exclude from duplicate check) + * @return Status::OK if no duplicates found, ErrorCodes::DuplicateKey if duplicate found + */ + virtual Status checkDuplicateKeysForUpdate(OperationContext* opCtx, + const std::vector& addedKeys, + const RecordId& currentRecordId) { + // Default implementation: no-op (for storage engines that don't need this) + return Status::OK(); + } + /** * Attempt to reduce the storage space used by this index via compaction. Only called if the * indexed record store supports compaction-in-place. diff --git a/tests/jstests/core/batch_write_command_insert.js b/tests/jstests/core/batch_write_command_insert.js index fbe531bb94..550a3fe233 100644 --- a/tests/jstests/core/batch_write_command_insert.js +++ b/tests/jstests/core/batch_write_command_insert.js @@ -112,11 +112,12 @@ request = { ordered: false }; result = coll.runCommand(request); -assert(!result.ok, tojson(result)); -// assert(result.writeErrors != null); -// assert.eq(1, result.writeErrors.length); -// assert.eq(0, result.n); -// assert.eq(coll.count(), 0); +// assert(!result.ok, tojson(result)); +assert(result.ok, tojson(result)); +assert(result.writeErrors != null); +assert.eq(1, result.writeErrors.length); +assert.eq(0, result.n); +assert.eq(coll.count(), 0); // // Document with valid nested key should insert (op log format) @@ -195,10 +196,9 @@ request = { documents: [{a: 1}] }; result = coll.runCommand(request); -// assert(result.ok, tojson(result)); -assert(!result.ok, tojson(result)); -// assert.eq(1, result.writeErrors.length); -// assert.eq(0, result.n); +assert(result.ok, tojson(result)); +assert.eq(1, result.writeErrors.length); +assert.eq(0, result.n); assert.eq(coll.count(), 1); // @@ -211,22 +211,19 @@ request = { ordered: false }; result = coll.runCommand(request); -// assert(result.ok, tojson(result)); -assert(!result.ok, tojson(result)); -// assert.eq(1, result.n); -// assert.eq(2, result.writeErrors.length); -// assert.eq(coll.count(), 1); - -// assert.eq(1, result.writeErrors[0].index); -// assert.eq('number', typeof result.writeErrors[0].code); -// assert.eq('string', typeof result.writeErrors[0].errmsg); -// -// assert.eq(2, result.writeErrors[1].index); -// assert.eq('number', typeof result.writeErrors[1].code); -// assert.eq('string', typeof result.writeErrors[1].errmsg); - -// assert.eq(coll.count(), 1); -assert.eq(coll.count(), 0); +assert(result.ok, tojson(result)); +assert.eq(1, result.n); +assert.eq(2, result.writeErrors.length); +assert.eq(coll.count(), 1); + +assert.eq(1, result.writeErrors[0].index); +assert.eq('number', typeof result.writeErrors[0].code); +assert.eq('string', typeof result.writeErrors[0].errmsg); +assert.eq(2, result.writeErrors[1].index); +assert.eq('number', typeof result.writeErrors[1].code); +assert.eq('string', typeof result.writeErrors[1].errmsg); + +assert.eq(coll.count(), 1); // // Fail with duplicate key error on multiple document inserts, ordered true @@ -238,17 +235,15 @@ request = { ordered: true }; result = coll.runCommand(request); -// assert(result.ok, tojson(result)); -assert(!result.ok, tojson(result)); -// assert.eq(1, result.n); -// assert.eq(1, result.writeErrors.length); +assert(result.ok, tojson(result)); +assert.eq(1, result.n); +assert.eq(1, result.writeErrors.length); -// assert.eq(1, result.writeErrors[0].index); -// assert.eq('number', typeof result.writeErrors[0].code); -// assert.eq('string', typeof result.writeErrors[0].errmsg); +assert.eq(1, result.writeErrors[0].index); +assert.eq('number', typeof result.writeErrors[0].code); +assert.eq('string', typeof result.writeErrors[0].errmsg); -// assert.eq(coll.count(), 1); -assert.eq(coll.count(), 0); +assert.eq(coll.count(), 1); // // Ensure _id is the first field in all documents @@ -407,8 +402,8 @@ try { bulk.execute(); assert(false, "should have failed due to duplicate key"); } catch (err) { - // assert(coll.count() == 50, "Unexpected number inserted by bulk write: " + coll.count()); - assert(coll.count() == 1, "Unexpected number inserted by bulk write: " + coll.count()); + assert(coll.count() == 50, "Unexpected number inserted by bulk write: " + coll.count()); + // assert(coll.count() == 1, "Unexpected number inserted by bulk write: " + coll.count()); } // @@ -428,3 +423,50 @@ allIndexes = coll.getIndexes(); spec = GetIndexHelpers.findByName(allIndexes, "x_1"); assert.neq(null, spec, "Index with name 'x_1' not found: " + tojson(allIndexes)); assert.lte(2, spec.v, tojson(spec)); + +// +// Additional tests for bulk write error format (ok: 1 with writeErrors) +// These tests verify that DuplicateKey errors return ok: 1 with writeErrors array, +// matching MongoDB's BulkWriteException format for dsync compatibility. +// + +// +// Test ordered batch with DuplicateKey error - stops at first error +coll.drop(); +coll.ensureIndex({a: 1}, {unique: true}); +coll.insert({a: 1}); // Insert initial document +request = { + insert: coll.getName(), + documents: [{a: 1}, {a: 2}, {a: 3}], + writeConcern: {w: 1}, + ordered: true +}; +result = coll.runCommand(request); +assert(result.ok, tojson(result)); +assert.eq(0, result.n, "Expected n: 0 (ordered stops at first error)"); +assert.eq(1, result.writeErrors.length, "Expected one write error"); +assert.eq(result.writeErrors[0].code, ErrorCodes.DuplicateKey, "Expected DuplicateKey error"); +assert.eq(result.writeErrors[0].index, 0, "Expected error at index 0"); +assert.eq(coll.count(), 1, "Expected only initial document"); + +// +// Test unordered batch with multiple DuplicateKey errors - continues after errors +coll.remove({}); +coll.insert({a: 1}); // Insert initial document +request = { + insert: coll.getName(), + documents: [{a: 1}, {a: 2}, {a: 1}, {a: 3}], + writeConcern: {w: 1}, + ordered: false +}; +result = coll.runCommand(request); +assert(result.ok, tojson(result)); +assert.eq(2, result.n, "Expected 2 successful inserts ({a: 2} and {a: 3})"); +assert.gte(result.writeErrors.length, 1, "Expected at least one write error"); +// Verify all writeErrors are DuplicateKey +result.writeErrors.forEach(function(err) { + assert.eq(err.code, ErrorCodes.DuplicateKey, "Expected DuplicateKey error: " + tojson(err)); + assert.eq('number', typeof err.code); + assert.eq('string', typeof err.errmsg); +}); +assert.eq(coll.count(), 3, "Expected 3 documents (initial + 2 successful inserts)"); \ No newline at end of file diff --git a/tests/jstests/core/opcounters_write_cmd.js b/tests/jstests/core/opcounters_write_cmd.js index be3bd55224..29c46eca5d 100644 --- a/tests/jstests/core/opcounters_write_cmd.js +++ b/tests/jstests/core/opcounters_write_cmd.js @@ -49,16 +49,16 @@ if (t.getMongo().writeMode() != "compatibility") { opCounters = newdb.serverStatus().opcounters; res = t.insert([{_id: 3}, {_id: 3}, {_id: 4}]); assert.writeError(res); - // assert.eq(opCounters.insert + 2, newdb.serverStatus().opcounters.insert); - assert.eq(opCounters.insert, newdb.serverStatus().opcounters.insert); + assert.eq(opCounters.insert + 2, newdb.serverStatus().opcounters.insert); + // assert.eq(opCounters.insert, newdb.serverStatus().opcounters.insert); // Bulk insert, with error, unordered. var continueOnErrorFlag = 1; opCounters = newdb.serverStatus().opcounters; res = t.insert([{_id: 5}, {_id: 5}, {_id: 6}], continueOnErrorFlag); assert.writeError(res); - // assert.eq(opCounters.insert + 3, newdb.serverStatus().opcounters.insert); - assert.eq(opCounters.insert, newdb.serverStatus().opcounters.insert); + assert.eq(opCounters.insert + 3, newdb.serverStatus().opcounters.insert); + // assert.eq(opCounters.insert, newdb.serverStatus().opcounters.insert); } // // 2. Update. diff --git a/tests/jstests/core/write_result.js b/tests/jstests/core/write_result.js index b4dc8ce7fa..ba681a3d1d 100644 --- a/tests/jstests/core/write_result.js +++ b/tests/jstests/core/write_result.js @@ -163,12 +163,12 @@ coll.remove({}); var id = new ObjectId(); // Second insert fails with duplicate _id printjson(result = coll.insert([{_id: id, foo: "bar"}, {_id: id, foo: "baz"}])); -// assert.eq(result.nInserted, 1); -// assert(result.hasWriteErrors()); -// assert(!result.hasWriteConcernError()); -// assert.eq(coll.count(), 1); -assert.commandFailedWithCode(result, ErrorCodes.DuplicateKey, result.errmsg) -assert.eq(coll.count(), 0); +assert.eq(result.nInserted, 1); +assert(result.hasWriteErrors()); +assert(!result.hasWriteConcernError()); +assert.eq(coll.count(), 1); +// assert.commandFailedWithCode(result, ErrorCodes.DuplicateKey, result.errmsg) +// assert.eq(coll.count(), 0); // // Custom write concern