Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 46 additions & 6 deletions src/mongo/db/catalog/collection_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -513,21 +513,43 @@ Status CollectionImpl::_insertDocuments(OperationContext* opCtx,
Timestamp timestamp = Timestamp(it->oplogSlot.opTime.getTimestamp());
timestamps.push_back(timestamp);
}
Status status =
_recordStore->insertRecords(opCtx, &records, &timestamps, _enforceQuota(enforceQuota));

// Batch check for duplicate keys before inserting
Status status = _recordStore->batchCheckDuplicateKey(opCtx, &records);
if (!status.isOK())
{
return status;
}

// Extract BSONObj pointers for batchCheckDuplicateKey
std::vector<const BSONObj*> bsonObjPtrs;
bsonObjPtrs.reserve(count);
for (auto it = begin; it != end; it++) {
bsonObjPtrs.push_back(&(it->doc));
}

// Batch check for duplicate keys in unique indexes
status = _indexCatalog.batchCheckDuplicateKey(opCtx, bsonObjPtrs);
if (!status.isOK()) {
return status;
}

// Now insert records (primary keys already checked)
status = _recordStore->insertRecords(opCtx, &records, &timestamps, _enforceQuota(enforceQuota));
if (!status.isOK()) {
return status;
}

// Prepare BsonRecords for indexRecords call (after insertRecords sets RecordIds)
std::vector<BsonRecord> bsonRecords;
bsonRecords.reserve(count);
int recordIndex = 0;
for (auto it = begin; it != end; it++) {
RecordId loc = records[recordIndex++].id;
// invariant(RecordId::min() < loc);
// invariant(loc < RecordId::max());

RecordId loc = records[recordIndex].id;
assert(!loc.isNull());
BsonRecord bsonRecord = {loc, Timestamp(it->oplogSlot.opTime.getTimestamp()), &(it->doc)};
bsonRecords.push_back(bsonRecord);
recordIndex++;
}

int64_t keysInserted;
Expand Down Expand Up @@ -699,6 +721,24 @@ RecordId CollectionImpl::updateDocument(OperationContext* opCtx,
updateTicket,
entry->getFilterExpression()));
}

// Check for duplicate keys in the added keys of each index before updating the record
ii = _indexCatalog.getIndexIterator(opCtx, true);
while (ii.more()) {
IndexDescriptor* descriptor = ii.next();
IndexCatalogEntry* entry = ii.catalogEntry(descriptor);
IndexAccessMethod* iam = ii.accessMethod(descriptor);
if (!entry->isReady(opCtx)) {
continue;
}

if (!descriptor->unique()) {
continue;
}

UpdateTicket* updateTicket = updateTickets.mutableMap()[descriptor];
uassertStatusOK(iam->checkDuplicateKeysForUpdate(opCtx, *updateTicket));
}
}

args->preImageDoc = oldDoc.value().getOwned();
Expand Down
15 changes: 15 additions & 0 deletions src/mongo/db/catalog/index_catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,9 @@ class IndexCatalog {
const std::vector<BsonRecord>& bsonRecords,
int64_t* keysInsertedOut) = 0;

virtual Status batchCheckDuplicateKey(OperationContext* opCtx,
const std::vector<const BSONObj*>& bsonObjPtrs) = 0;

virtual void unindexRecord(OperationContext* opCtx,
const BSONObj& obj,
const RecordId& loc,
Expand Down Expand Up @@ -527,6 +530,18 @@ class IndexCatalog {
return this->_impl().unindexRecord(opCtx, obj, loc, noWarn, keysDeletedOut);
}

/**
* Batch check for duplicate keys in unique indexes before inserting records.
*
* @param opCtx - Operation context
* @param bsonObjPtrs - Vector of pointers to BSON objects to check
* @return Status::OK if no duplicates found, ErrorCodes::DuplicateKey if duplicate found
*/
inline Status batchCheckDuplicateKey(OperationContext* const opCtx,
const std::vector<const BSONObj*>& bsonObjPtrs) {
return this->_impl().batchCheckDuplicateKey(opCtx, bsonObjPtrs);
}

// ------- temp internal -------

inline std::string getAccessMethodName(OperationContext* const opCtx,
Expand Down
35 changes: 33 additions & 2 deletions src/mongo/db/catalog/index_catalog_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1446,9 +1446,40 @@ Status IndexCatalogImpl::_unindexRecord(OperationContext* opCtx,
}


Status IndexCatalogImpl::batchCheckDuplicateKey(OperationContext* opCtx,
const std::vector<const BSONObj*>& bsonObjPtrs) {
// Only check unique indexes for duplicates
for (IndexCatalogEntryContainer::const_iterator i = _entries.begin(); i != _entries.end();
++i) {
IndexCatalogEntry* entry = i->get();
if (!entry->isReady(opCtx)) {
continue; // Skip unfinished indexes
}

IndexDescriptor* desc = entry->descriptor();
if (!desc->unique()) {
continue; // Only check unique indexes
}

IndexAccessMethod* accessMethod = entry->accessMethod();
if (!accessMethod) {
continue;
}

// Call batchCheckDuplicateKey on the IndexAccessMethod
// This will delegate to EloqIndex for Eloq storage engine
Status s = accessMethod->batchCheckDuplicateKey(opCtx, bsonObjPtrs);
if (!s.isOK()) {
return s;
}
}

return Status::OK();
}

Status IndexCatalogImpl::indexRecords(OperationContext* opCtx,
const std::vector<BsonRecord>& bsonRecords,
int64_t* keysInsertedOut) {
const std::vector<BsonRecord>& bsonRecords,
int64_t* keysInsertedOut) {
if (keysInsertedOut) {
*keysInsertedOut = 0;
}
Expand Down
3 changes: 3 additions & 0 deletions src/mongo/db/catalog/index_catalog_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,9 @@ class IndexCatalogImpl : public IndexCatalog::Impl {
const std::vector<BsonRecord>& bsonRecords,
int64_t* keysInsertedOut) override;

Status batchCheckDuplicateKey(OperationContext* opCtx,
const std::vector<const BSONObj*>& bsonObjPtrs);

/**
* When 'keysDeletedOut' is not null, it will be set to the number of index keys removed by
* this operation.
Expand Down
5 changes: 4 additions & 1 deletion src/mongo/db/commands/write_commands/write_commands.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
* it in the license file.
*/


#include "mongo/base/init.h"
#include "mongo/bson/mutable/document.h"
#include "mongo/bson/mutable/element.h"
Expand Down Expand Up @@ -87,8 +88,9 @@ void serializeReply(OperationContext* opCtx,
size_t opsInBatch,
WriteResult result,
BSONObjBuilder* out) {
if (shouldSkipOutput(opCtx))
if (shouldSkipOutput(opCtx)) {
return;
}

if (continueOnError && !result.results.empty()) {
const auto& lastResult = result.results.back();
Expand Down Expand Up @@ -301,6 +303,7 @@ class CmdInsert final : public WriteCommand {

void runImpl(OperationContext* opCtx, BSONObjBuilder& result) const override {
auto reply = performInserts(opCtx, _batch);

serializeReply(opCtx,
ReplyStyle::kNotUpdate,
!_batch.getWriteCommandBase().getOrdered(),
Expand Down
29 changes: 29 additions & 0 deletions src/mongo/db/index/index_access_method.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,35 @@ bool IndexAccessMethod::BulkBuilder::isMultikey() const {
return _everGeneratedMultipleKeys || isMultikeyFromPaths(_indexMultikeyPaths);
}

Status IndexAccessMethod::batchCheckDuplicateKey(OperationContext* opCtx,
const std::vector<const BSONObj*>& bsonObjPtrs) {
// Delegate to the underlying SortedDataInterface
SortedDataInterface* sdi = getSortedDataInterface();
if (sdi) {
return sdi->batchCheckDuplicateKey(opCtx, bsonObjPtrs);
}
// If SortedDataInterface is not available, return OK
return Status::OK();
}

Status IndexAccessMethod::checkDuplicateKeysForUpdate(OperationContext* opCtx,
const std::vector<BSONObj>& addedKeys,
const RecordId& currentRecordId) {
// Delegate to the underlying SortedDataInterface
SortedDataInterface* sdi = getSortedDataInterface();
if (sdi) {
return sdi->checkDuplicateKeysForUpdate(opCtx, addedKeys, currentRecordId);
}
// If SortedDataInterface is not available, return OK
return Status::OK();
}

Status IndexAccessMethod::checkDuplicateKeysForUpdate(OperationContext* opCtx,
const UpdateTicket& ticket) {
// Extract added keys from UpdateTicket and delegate to the main method
return checkDuplicateKeysForUpdate(opCtx, ticket.added, ticket.loc);
}

} // namespace mongo

#include "mongo/db/sorter/sorter.cpp"
Expand Down
44 changes: 44 additions & 0 deletions src/mongo/db/index/index_access_method.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,50 @@ class IndexAccessMethod {
*/
Status touch(OperationContext* opCtx, const BSONObj& obj);

/**
* Get the underlying SortedDataInterface for this index.
* This allows storage engine specific implementations (like EloqIndex) to be accessed.
*
* @return Pointer to the SortedDataInterface, or nullptr if not available
*/
SortedDataInterface* getSortedDataInterface() const {
return _newInterface.get();
}

/**
* Batch check for duplicate keys before inserting records.
*
* @param opCtx - Operation context
* @param bsonObjPtrs - Vector of pointers to BSON objects to check
* @return Status::OK if no duplicates found, ErrorCodes::DuplicateKey if duplicate found
*/
virtual Status batchCheckDuplicateKey(OperationContext* opCtx,
const std::vector<const BSONObj*>& bsonObjPtrs);

/**
* Check for duplicate keys in UpdateTicket's added keys.
* This is called during update operations to validate that the new keys being added
* don't conflict with existing keys in the index (excluding the current document).
*
* @param opCtx - Operation context
* @param addedKeys - Vector of BSONObj keys that will be added to the index
* @param currentRecordId - RecordId of the document being updated (to exclude from duplicate check)
* @return Status::OK if no duplicates found, ErrorCodes::DuplicateKey if duplicate found
*/
virtual Status checkDuplicateKeysForUpdate(OperationContext* opCtx,
const std::vector<BSONObj>& addedKeys,
const RecordId& currentRecordId);

/**
* Check for duplicate keys in UpdateTicket's added keys.
* Convenience wrapper that extracts added keys from UpdateTicket.
*
* @param opCtx - Operation context
* @param ticket - UpdateTicket containing the added keys to check
* @return Status::OK if no duplicates found, ErrorCodes::DuplicateKey if duplicate found
*/
Status checkDuplicateKeysForUpdate(OperationContext* opCtx, const UpdateTicket& ticket);

/**
* this pages in the entire index
*/
Expand Down
2 changes: 1 addition & 1 deletion src/mongo/db/modules/eloq/data_substrate
Loading