diff --git a/ravendb/documents/operations/operation.py b/ravendb/documents/operations/operation.py index 868198d9..2504a044 100644 --- a/ravendb/documents/operations/operation.py +++ b/ravendb/documents/operations/operation.py @@ -1,14 +1,55 @@ # todo: DatabaseChanges class import time +from datetime import timedelta from typing import Callable, TYPE_CHECKING, Optional from ravendb.documents.operations.definitions import OperationExceptionResult from ravendb.exceptions.exception_dispatcher import ExceptionDispatcher +from ravendb.exceptions.exceptions import InvalidOperationException from ravendb.http.raven_command import RavenCommand from ravendb.primitives.exceptions import OperationCancelledException from ravendb.tools.utils import Utils from ravendb.documents.operations.misc import GetOperationStateOperation + +class BulkOperationResult: + def __init__( + self, + total: int = 0, + documents_processed: int = 0, + attachments_processed: int = 0, + counters_processed: int = 0, + time_series_processed: int = 0, + query: Optional[str] = None, + details: Optional[list] = None, + ): + self.total = total + self.documents_processed = documents_processed + self.attachments_processed = attachments_processed + self.counters_processed = counters_processed + self.time_series_processed = time_series_processed + self.query = query + self.details = details if details is not None else [] + + @property + def message(self) -> str: + return f"Processed {self.total:,} items." + + @classmethod + def from_json(cls, json_dict: dict) -> "BulkOperationResult": + if json_dict is None: + return cls() + return cls( + total=json_dict.get("Total", 0), + documents_processed=json_dict.get("DocumentsProcessed", 0), + attachments_processed=json_dict.get("AttachmentsProcessed", 0), + counters_processed=json_dict.get("CountersProcessed", 0), + time_series_processed=json_dict.get("TimeSeriesProcessed", 0), + query=json_dict.get("Query"), + details=json_dict.get("Details", []), + ) + + if TYPE_CHECKING: from ravendb.documents.conventions import DocumentConventions from ravendb.http.request_executor import RequestExecutor @@ -30,23 +71,37 @@ def __init__( self.node_tag = node_tag def fetch_operations_status(self) -> dict: - command = self._get_operation_state_command(self.__conventions, self.__key, self.node_tag) - self.__request_executor.execute_command(command) - - return command.result + for _ in range(10): + command = self._get_operation_state_command(self.__conventions, self.__key, self.node_tag) + self.__request_executor.execute_command(command) + if command.result is not None: + return command.result + time.sleep(0.5) + raise InvalidOperationException( + f"Could not fetch state of operation '{self.__key}' from node '{self.node_tag}'." + ) def _get_operation_state_command( self, conventions: "DocumentConventions", key: int, node_tag: str = None ) -> RavenCommand[dict]: return GetOperationStateOperation.GetOperationStateCommand(self.__key, node_tag) - def wait_for_completion(self) -> None: + def wait_for_completion(self, timeout: Optional[timedelta] = None) -> Optional[BulkOperationResult]: + deadline = time.monotonic() + timeout.total_seconds() if timeout is not None else None while True: + if deadline is not None and time.monotonic() > deadline: + raise TimeoutError( + f"Operation {self.__key} did not complete within the specified timeout of {timeout}." + ) + status = self.fetch_operations_status() operation_status = status.get("Status") if operation_status == "Completed": - return + result = status.get("Result") + if result is not None: + return BulkOperationResult.from_json(result) + return None elif operation_status == "Canceled": raise OperationCancelledException() elif operation_status == "Faulted": @@ -59,4 +114,7 @@ def wait_for_completion(self) -> None: ) raise ExceptionDispatcher.get(schema, exception_result.status_code) - time.sleep(0.5) + sleep_secs = 0.5 + if deadline is not None: + sleep_secs = min(sleep_secs, max(0.0, deadline - time.monotonic())) + time.sleep(sleep_secs) diff --git a/ravendb/tests/issue_tests/test_RDBC_1032.py b/ravendb/tests/issue_tests/test_RDBC_1032.py new file mode 100644 index 00000000..0aa9df06 --- /dev/null +++ b/ravendb/tests/issue_tests/test_RDBC_1032.py @@ -0,0 +1,140 @@ +""" +RDBC-1032: Operation.wait_for_completion() returns BulkOperationResult instead of None. + +C# reference: FastTests/Client/Operations/BasicChangesOperationsTests.cs + Can_Perform_Patch_By_Query_Operation() +""" + +import unittest +from datetime import timedelta +from unittest.mock import MagicMock, patch + +from ravendb.documents.operations.operation import BulkOperationResult, Operation +from ravendb.exceptions.exceptions import InvalidOperationException +from ravendb.tests.test_base import TestBase + + +class TestBulkOperationResultUnit(unittest.TestCase): + """Unit tests — no server required.""" + + def test_bulk_operation_result_from_json(self): + result = BulkOperationResult.from_json( + {"Total": 5, "DocumentsProcessed": 3, "AttachmentsProcessed": 1, "CountersProcessed": 1} + ) + self.assertEqual(5, result.total) + self.assertEqual(3, result.documents_processed) + self.assertEqual(1, result.attachments_processed) + self.assertEqual(1, result.counters_processed) + + def test_bulk_operation_result_defaults(self): + result = BulkOperationResult() + self.assertEqual(0, result.total) + self.assertEqual(0, result.documents_processed) + self.assertEqual(0, result.attachments_processed) + self.assertEqual(0, result.counters_processed) + self.assertEqual(0, result.time_series_processed) + self.assertIsNone(result.query) + self.assertEqual([], result.details) + + def test_bulk_operation_result_message(self): + result = BulkOperationResult(total=1234) + self.assertEqual("Processed 1,234 items.", result.message) + + def test_bulk_operation_result_details(self): + detail = {"Id": "docs/1", "ChangeVector": "A:1", "Status": "Patched"} + result = BulkOperationResult.from_json({"Total": 1, "Details": [detail]}) + self.assertEqual(1, len(result.details)) + self.assertEqual("docs/1", result.details[0]["Id"]) + + def test_bulk_operation_result_from_json_none(self): + result = BulkOperationResult.from_json(None) + self.assertEqual(0, result.total) + + def test_bulk_operation_result_from_json_zero_total(self): + result = BulkOperationResult.from_json({"Total": 0}) + self.assertEqual(0, result.total) + self.assertEqual(0, result.documents_processed) + + def test_wait_for_completion_raises_on_timeout(self): + op = Operation(None, None, None, key=1) + with self.assertRaises(TimeoutError): + op.wait_for_completion(timeout=timedelta(microseconds=-1)) + + def test_fetch_operations_status_raises_when_state_always_null(self): + # Mocks are unavoidable here: the null-state scenario is a server-side race condition + # (operation submitted but not yet registered) that cannot be triggered deterministically + # without a real server. _get_operation_state_command is stubbed to always return + # result=None; time.sleep is suppressed so the 10-retry loop completes instantly. + executor = MagicMock() + op = Operation(executor, None, None, key=42) + command_stub = MagicMock() + command_stub.result = None + with ( + patch.object(op, "_get_operation_state_command", return_value=command_stub), + patch("ravendb.documents.operations.operation.time.sleep"), + ): + with self.assertRaises(InvalidOperationException): + op.fetch_operations_status() + + def test_bulk_operation_result_from_json_all_fields(self): + result = BulkOperationResult.from_json( + { + "Total": 10, + "DocumentsProcessed": 4, + "AttachmentsProcessed": 2, + "CountersProcessed": 2, + "TimeSeriesProcessed": 2, + "Query": "FROM Orders", + } + ) + self.assertEqual(10, result.total) + self.assertEqual(4, result.documents_processed) + self.assertEqual(2, result.attachments_processed) + self.assertEqual(2, result.counters_processed) + self.assertEqual(2, result.time_series_processed) + self.assertEqual("FROM Orders", result.query) + + +class TestOperationResult(TestBase): + """Integration tests — require a live server.""" + + def setUp(self): + super().setUp() + self.store = self.get_document_store() + + def tearDown(self): + super().tearDown() + self.store.close() + + def test_patch_by_query_returns_bulk_result(self): + from ravendb.documents.operations.patch import PatchByQueryOperation + from ravendb.infrastructure.orders import Product + + with self.store.open_session() as session: + p1 = Product() + p1.name = "Apple" + p1.price_per_unit = 1.0 + session.store(p1, "products/1") + p2 = Product() + p2.name = "Banana" + p2.price_per_unit = 2.0 + session.store(p2, "products/2") + session.save_changes() + + patch_op = PatchByQueryOperation("FROM Products UPDATE { this.price_per_unit = this.price_per_unit * 2; }") + op = self.store.operations.send_async(patch_op) + result = op.wait_for_completion() + + self.assertIsInstance(result, BulkOperationResult) + self.assertEqual(2, result.total) + self.assertEqual([], result.details) + + with self.store.open_session() as session: + p1 = session.load("products/1", Product) + p2 = session.load("products/2", Product) + self.assertAlmostEqual(2.0, p1.price_per_unit) + self.assertAlmostEqual(4.0, p2.price_per_unit) + + +if __name__ == "__main__": + unittest.main() diff --git a/ravendb/tests/operations_tests/test_operations.py b/ravendb/tests/operations_tests/test_operations.py index 835b5396..ad4a0a43 100644 --- a/ravendb/tests/operations_tests/test_operations.py +++ b/ravendb/tests/operations_tests/test_operations.py @@ -80,7 +80,9 @@ def test_patch_by_index(self): response.operation_id, response.operation_node_tag, ) - operation.wait_for_completion() + bulk_result = operation.wait_for_completion() + self.assertIsNotNone(bulk_result) + self.assertGreater(len(bulk_result.details), 0) with self.store.open_session() as session: result = session.load_starting_with("patches", Patch) values = list(map(lambda patch: patch.patched, result)) diff --git a/ravendb/tests/raven_commands_tests/test_by_index_actions.py b/ravendb/tests/raven_commands_tests/test_by_index_actions.py index 56b29f03..fe495f90 100644 --- a/ravendb/tests/raven_commands_tests/test_by_index_actions.py +++ b/ravendb/tests/raven_commands_tests/test_by_index_actions.py @@ -6,7 +6,7 @@ from ravendb.documents.indexes.definitions import IndexDefinition from ravendb.documents.operations.indexes import PutIndexesOperation from ravendb.documents.operations.misc import QueryOperationOptions, DeleteByQueryOperation -from ravendb.documents.operations.operation import Operation +from ravendb.documents.operations.operation import BulkOperationResult, Operation from ravendb.documents.operations.patch import PatchByQueryOperation from ravendb.documents.queries.index_query import IndexQuery from ravendb.tests.test_base import TestBase @@ -113,8 +113,9 @@ def test_delete_by_index_success(self): response.operation_id, response.operation_node_tag, ) - # wait_for_completion doesnt return anything (None) when operation state is 'Completed' - self.assertIsNone(x.wait_for_completion()) + result = x.wait_for_completion() + self.assertIsInstance(result, BulkOperationResult) + self.assertGreater(result.total, 0) if __name__ == "__main__":