Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 65 additions & 7 deletions ravendb/documents/operations/operation.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,55 @@
# todo: DatabaseChanges class
import time
from datetime import timedelta
from typing import Callable, TYPE_CHECKING, Optional

from ravendb.documents.operations.definitions import OperationExceptionResult
from ravendb.exceptions.exception_dispatcher import ExceptionDispatcher
from ravendb.exceptions.exceptions import InvalidOperationException
from ravendb.http.raven_command import RavenCommand
from ravendb.primitives.exceptions import OperationCancelledException
from ravendb.tools.utils import Utils
from ravendb.documents.operations.misc import GetOperationStateOperation


class BulkOperationResult:
def __init__(
self,
total: int = 0,
documents_processed: int = 0,
attachments_processed: int = 0,
counters_processed: int = 0,
time_series_processed: int = 0,
query: Optional[str] = None,
details: Optional[list] = None,
):
self.total = total
self.documents_processed = documents_processed
self.attachments_processed = attachments_processed
self.counters_processed = counters_processed
self.time_series_processed = time_series_processed
self.query = query
self.details = details if details is not None else []

@property
def message(self) -> str:
return f"Processed {self.total:,} items."

@classmethod
def from_json(cls, json_dict: dict) -> "BulkOperationResult":
if json_dict is None:
return cls()
return cls(
total=json_dict.get("Total", 0),
documents_processed=json_dict.get("DocumentsProcessed", 0),
attachments_processed=json_dict.get("AttachmentsProcessed", 0),
counters_processed=json_dict.get("CountersProcessed", 0),
time_series_processed=json_dict.get("TimeSeriesProcessed", 0),
query=json_dict.get("Query"),
details=json_dict.get("Details", []),
)


if TYPE_CHECKING:
from ravendb.documents.conventions import DocumentConventions
from ravendb.http.request_executor import RequestExecutor
Expand All @@ -30,23 +71,37 @@ def __init__(
self.node_tag = node_tag

def fetch_operations_status(self) -> dict:
command = self._get_operation_state_command(self.__conventions, self.__key, self.node_tag)
self.__request_executor.execute_command(command)

return command.result
for _ in range(10):
command = self._get_operation_state_command(self.__conventions, self.__key, self.node_tag)
self.__request_executor.execute_command(command)
if command.result is not None:
return command.result
time.sleep(0.5)
raise InvalidOperationException(
f"Could not fetch state of operation '{self.__key}' from node '{self.node_tag}'."
)

def _get_operation_state_command(
self, conventions: "DocumentConventions", key: int, node_tag: str = None
) -> RavenCommand[dict]:
return GetOperationStateOperation.GetOperationStateCommand(self.__key, node_tag)

def wait_for_completion(self) -> None:
def wait_for_completion(self, timeout: Optional[timedelta] = None) -> Optional[BulkOperationResult]:
deadline = time.monotonic() + timeout.total_seconds() if timeout is not None else None
while True:
if deadline is not None and time.monotonic() > deadline:
raise TimeoutError(
f"Operation {self.__key} did not complete within the specified timeout of {timeout}."
)

status = self.fetch_operations_status()
operation_status = status.get("Status")

if operation_status == "Completed":
return
result = status.get("Result")
if result is not None:
return BulkOperationResult.from_json(result)
return None
elif operation_status == "Canceled":
raise OperationCancelledException()
elif operation_status == "Faulted":
Expand All @@ -59,4 +114,7 @@ def wait_for_completion(self) -> None:
)
raise ExceptionDispatcher.get(schema, exception_result.status_code)

time.sleep(0.5)
sleep_secs = 0.5
if deadline is not None:
sleep_secs = min(sleep_secs, max(0.0, deadline - time.monotonic()))
time.sleep(sleep_secs)
140 changes: 140 additions & 0 deletions ravendb/tests/issue_tests/test_RDBC_1032.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
"""
RDBC-1032: Operation.wait_for_completion() returns BulkOperationResult instead of None.

C# reference: FastTests/Client/Operations/BasicChangesOperationsTests.cs
Can_Perform_Patch_By_Query_Operation()
"""

import unittest
from datetime import timedelta
from unittest.mock import MagicMock, patch

from ravendb.documents.operations.operation import BulkOperationResult, Operation
from ravendb.exceptions.exceptions import InvalidOperationException
from ravendb.tests.test_base import TestBase


class TestBulkOperationResultUnit(unittest.TestCase):
"""Unit tests — no server required."""

def test_bulk_operation_result_from_json(self):
result = BulkOperationResult.from_json(
{"Total": 5, "DocumentsProcessed": 3, "AttachmentsProcessed": 1, "CountersProcessed": 1}
)
self.assertEqual(5, result.total)
self.assertEqual(3, result.documents_processed)
self.assertEqual(1, result.attachments_processed)
self.assertEqual(1, result.counters_processed)

def test_bulk_operation_result_defaults(self):
result = BulkOperationResult()
self.assertEqual(0, result.total)
self.assertEqual(0, result.documents_processed)
self.assertEqual(0, result.attachments_processed)
self.assertEqual(0, result.counters_processed)
self.assertEqual(0, result.time_series_processed)
self.assertIsNone(result.query)
self.assertEqual([], result.details)

def test_bulk_operation_result_message(self):
result = BulkOperationResult(total=1234)
self.assertEqual("Processed 1,234 items.", result.message)

def test_bulk_operation_result_details(self):
detail = {"Id": "docs/1", "ChangeVector": "A:1", "Status": "Patched"}
result = BulkOperationResult.from_json({"Total": 1, "Details": [detail]})
self.assertEqual(1, len(result.details))
self.assertEqual("docs/1", result.details[0]["Id"])

def test_bulk_operation_result_from_json_none(self):
result = BulkOperationResult.from_json(None)
self.assertEqual(0, result.total)

def test_bulk_operation_result_from_json_zero_total(self):
result = BulkOperationResult.from_json({"Total": 0})
self.assertEqual(0, result.total)
self.assertEqual(0, result.documents_processed)

def test_wait_for_completion_raises_on_timeout(self):
op = Operation(None, None, None, key=1)
with self.assertRaises(TimeoutError):
op.wait_for_completion(timeout=timedelta(microseconds=-1))

def test_fetch_operations_status_raises_when_state_always_null(self):
# Mocks are unavoidable here: the null-state scenario is a server-side race condition
# (operation submitted but not yet registered) that cannot be triggered deterministically
# without a real server. _get_operation_state_command is stubbed to always return
# result=None; time.sleep is suppressed so the 10-retry loop completes instantly.
executor = MagicMock()
op = Operation(executor, None, None, key=42)
command_stub = MagicMock()
command_stub.result = None
with (
patch.object(op, "_get_operation_state_command", return_value=command_stub),
patch("ravendb.documents.operations.operation.time.sleep"),
):
with self.assertRaises(InvalidOperationException):
op.fetch_operations_status()

def test_bulk_operation_result_from_json_all_fields(self):
result = BulkOperationResult.from_json(
{
"Total": 10,
"DocumentsProcessed": 4,
"AttachmentsProcessed": 2,
"CountersProcessed": 2,
"TimeSeriesProcessed": 2,
"Query": "FROM Orders",
}
)
self.assertEqual(10, result.total)
self.assertEqual(4, result.documents_processed)
self.assertEqual(2, result.attachments_processed)
self.assertEqual(2, result.counters_processed)
self.assertEqual(2, result.time_series_processed)
self.assertEqual("FROM Orders", result.query)


class TestOperationResult(TestBase):
"""Integration tests — require a live server."""

def setUp(self):
super().setUp()
self.store = self.get_document_store()

def tearDown(self):
super().tearDown()
self.store.close()

def test_patch_by_query_returns_bulk_result(self):
from ravendb.documents.operations.patch import PatchByQueryOperation
from ravendb.infrastructure.orders import Product

with self.store.open_session() as session:
p1 = Product()
p1.name = "Apple"
p1.price_per_unit = 1.0
session.store(p1, "products/1")
p2 = Product()
p2.name = "Banana"
p2.price_per_unit = 2.0
session.store(p2, "products/2")
session.save_changes()

patch_op = PatchByQueryOperation("FROM Products UPDATE { this.price_per_unit = this.price_per_unit * 2; }")
op = self.store.operations.send_async(patch_op)
result = op.wait_for_completion()

self.assertIsInstance(result, BulkOperationResult)
self.assertEqual(2, result.total)
self.assertEqual([], result.details)

with self.store.open_session() as session:
p1 = session.load("products/1", Product)
p2 = session.load("products/2", Product)
self.assertAlmostEqual(2.0, p1.price_per_unit)
self.assertAlmostEqual(4.0, p2.price_per_unit)


if __name__ == "__main__":
unittest.main()
4 changes: 3 additions & 1 deletion ravendb/tests/operations_tests/test_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ def test_patch_by_index(self):
response.operation_id,
response.operation_node_tag,
)
operation.wait_for_completion()
bulk_result = operation.wait_for_completion()
self.assertIsNotNone(bulk_result)
self.assertGreater(len(bulk_result.details), 0)
with self.store.open_session() as session:
result = session.load_starting_with("patches", Patch)
values = list(map(lambda patch: patch.patched, result))
Expand Down
7 changes: 4 additions & 3 deletions ravendb/tests/raven_commands_tests/test_by_index_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from ravendb.documents.indexes.definitions import IndexDefinition
from ravendb.documents.operations.indexes import PutIndexesOperation
from ravendb.documents.operations.misc import QueryOperationOptions, DeleteByQueryOperation
from ravendb.documents.operations.operation import Operation
from ravendb.documents.operations.operation import BulkOperationResult, Operation
from ravendb.documents.operations.patch import PatchByQueryOperation
from ravendb.documents.queries.index_query import IndexQuery
from ravendb.tests.test_base import TestBase
Expand Down Expand Up @@ -113,8 +113,9 @@ def test_delete_by_index_success(self):
response.operation_id,
response.operation_node_tag,
)
# wait_for_completion doesnt return anything (None) when operation state is 'Completed'
self.assertIsNone(x.wait_for_completion())
result = x.wait_for_completion()
self.assertIsInstance(result, BulkOperationResult)
self.assertGreater(result.total, 0)


if __name__ == "__main__":
Expand Down