From 522835b3e445df39c724ff0309e5dbfe2ea83701 Mon Sep 17 00:00:00 2001 From: Brandur Date: Sat, 6 Jul 2024 17:14:14 -0700 Subject: [PATCH] Test that checks that `AttemptError` can round trip properly Follows up #27 to add a more elaborate test that checks that non-default job properties can unmarshal to `Job` property including `errors, which includes `AttemptError`. It turned out that of course this wasn't working properly. --- CHANGELOG.md | 4 + .../riversqlalchemy/dbsqlc/river_job.py | 129 ++++++++++++++++++ .../riversqlalchemy/dbsqlc/river_job.sql | 35 ++++- .../riversqlalchemy/sql_alchemy_driver.py | 21 +-- src/riverqueue/job.py | 34 ++++- .../riversqlalchemy/sqlalchemy_driver_test.py | 46 ++++++- 6 files changed, 246 insertions(+), 23 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6946771..9a70b9c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- `riverqueue.AttemptError` can now round trip to and from JSON properly, including its `at` timestamp. [PR #31](https://github.com/riverqueue/riverqueue-python/pull/31). + ## [0.6.0] - 2024-07-06 ### Added diff --git a/src/riverqueue/driver/riversqlalchemy/dbsqlc/river_job.py b/src/riverqueue/driver/riversqlalchemy/dbsqlc/river_job.py index 6e0c2e2..5265e66 100644 --- a/src/riverqueue/driver/riversqlalchemy/dbsqlc/river_job.py +++ b/src/riverqueue/driver/riversqlalchemy/dbsqlc/river_job.py @@ -134,6 +134,59 @@ class JobInsertFastManyParams: tags: List[str] +JOB_INSERT_FULL = """-- name: job_insert_full \\:one +INSERT INTO river_job( + args, + attempt, + attempted_at, + created_at, + errors, + finalized_at, + kind, + max_attempts, + metadata, + priority, + queue, + scheduled_at, + state, + tags +) VALUES ( + :p1\\:\\:jsonb, + coalesce(:p2\\:\\:smallint, 0), + :p3, + coalesce(:p4\\:\\:timestamptz, now()), + :p5\\:\\:jsonb[], + :p6, + :p7\\:\\:text, + :p8\\:\\:smallint, + coalesce(:p9\\:\\:jsonb, '{}'), + :p10\\:\\:smallint, + :p11\\:\\:text, + coalesce(:p12\\:\\:timestamptz, now()), + :p13\\:\\:river_job_state, + coalesce(:p14\\:\\:varchar(255)[], '{}') +) RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +""" + + +@dataclasses.dataclass() +class JobInsertFullParams: + args: Any + attempt: int + attempted_at: Optional[datetime.datetime] + created_at: Optional[datetime.datetime] + errors: List[Any] + finalized_at: Optional[datetime.datetime] + kind: str + max_attempts: int + metadata: Any + priority: int + queue: str + scheduled_at: Optional[datetime.datetime] + state: models.RiverJobState + tags: List[str] + + class Querier: def __init__(self, conn: sqlalchemy.engine.Connection): self._conn = conn @@ -266,6 +319,44 @@ def job_insert_fast_many(self, arg: JobInsertFastManyParams) -> int: }) return result.rowcount + def job_insert_full(self, arg: JobInsertFullParams) -> Optional[models.RiverJob]: + row = self._conn.execute(sqlalchemy.text(JOB_INSERT_FULL), { + "p1": arg.args, + "p2": arg.attempt, + "p3": arg.attempted_at, + "p4": arg.created_at, + "p5": arg.errors, + "p6": arg.finalized_at, + "p7": arg.kind, + "p8": arg.max_attempts, + "p9": arg.metadata, + "p10": arg.priority, + "p11": arg.queue, + "p12": arg.scheduled_at, + "p13": arg.state, + "p14": arg.tags, + }).first() + if row is None: + return None + return models.RiverJob( + id=row[0], + args=row[1], + attempt=row[2], + attempted_at=row[3], + attempted_by=row[4], + created_at=row[5], + errors=row[6], + finalized_at=row[7], + kind=row[8], + max_attempts=row[9], + metadata=row[10], + priority=row[11], + queue=row[12], + state=row[13], + scheduled_at=row[14], + tags=row[15], + ) + class AsyncQuerier: def __init__(self, conn: sqlalchemy.ext.asyncio.AsyncConnection): @@ -398,3 +489,41 @@ async def job_insert_fast_many(self, arg: JobInsertFastManyParams) -> int: "p9": arg.tags, }) return result.rowcount + + async def job_insert_full(self, arg: JobInsertFullParams) -> Optional[models.RiverJob]: + row = (await self._conn.execute(sqlalchemy.text(JOB_INSERT_FULL), { + "p1": arg.args, + "p2": arg.attempt, + "p3": arg.attempted_at, + "p4": arg.created_at, + "p5": arg.errors, + "p6": arg.finalized_at, + "p7": arg.kind, + "p8": arg.max_attempts, + "p9": arg.metadata, + "p10": arg.priority, + "p11": arg.queue, + "p12": arg.scheduled_at, + "p13": arg.state, + "p14": arg.tags, + })).first() + if row is None: + return None + return models.RiverJob( + id=row[0], + args=row[1], + attempt=row[2], + attempted_at=row[3], + attempted_by=row[4], + created_at=row[5], + errors=row[6], + finalized_at=row[7], + kind=row[8], + max_attempts=row[9], + metadata=row[10], + priority=row[11], + queue=row[12], + state=row[13], + scheduled_at=row[14], + tags=row[15], + ) diff --git a/src/riverqueue/driver/riversqlalchemy/dbsqlc/river_job.sql b/src/riverqueue/driver/riversqlalchemy/dbsqlc/river_job.sql index 26b0d05..55b1788 100644 --- a/src/riverqueue/driver/riversqlalchemy/dbsqlc/river_job.sql +++ b/src/riverqueue/driver/riversqlalchemy/dbsqlc/river_job.sql @@ -103,4 +103,37 @@ INSERT INTO river_job( -- Had trouble getting multi-dimensional arrays to play nicely with sqlc, -- but it might be possible. For now, join tags into a single string. - string_to_array(unnest(@tags::text[]), ','); \ No newline at end of file + string_to_array(unnest(@tags::text[]), ','); + +-- name: JobInsertFull :one +INSERT INTO river_job( + args, + attempt, + attempted_at, + created_at, + errors, + finalized_at, + kind, + max_attempts, + metadata, + priority, + queue, + scheduled_at, + state, + tags +) VALUES ( + @args::jsonb, + coalesce(@attempt::smallint, 0), + @attempted_at, + coalesce(sqlc.narg('created_at')::timestamptz, now()), + @errors::jsonb[], + @finalized_at, + @kind::text, + @max_attempts::smallint, + coalesce(@metadata::jsonb, '{}'), + @priority::smallint, + @queue::text, + coalesce(sqlc.narg('scheduled_at')::timestamptz, now()), + @state::river_job_state, + coalesce(@tags::varchar(255)[], '{}') +) RETURNING *; \ No newline at end of file diff --git a/src/riverqueue/driver/riversqlalchemy/sql_alchemy_driver.py b/src/riverqueue/driver/riversqlalchemy/sql_alchemy_driver.py index 2baec8c..870ab37 100644 --- a/src/riverqueue/driver/riversqlalchemy/sql_alchemy_driver.py +++ b/src/riverqueue/driver/riversqlalchemy/sql_alchemy_driver.py @@ -8,7 +8,6 @@ from sqlalchemy.engine import Connection from sqlalchemy.ext.asyncio import AsyncConnection, AsyncEngine from typing import ( - Any, AsyncGenerator, AsyncIterator, Iterator, @@ -36,7 +35,7 @@ async def advisory_lock(self, key: int) -> None: await self.pg_misc_querier.pg_advisory_xact_lock(key=key) async def job_insert(self, insert_params: JobInsertParams) -> Job: - return _job_from_row( + return job_from_row( cast( # drop Optional[] because insert always returns a row models.RiverJob, await self.job_querier.job_insert_fast( @@ -57,7 +56,7 @@ async def job_get_by_kind_and_unique_properties( row = await self.job_querier.job_get_by_kind_and_unique_properties( cast(river_job.JobGetByKindAndUniquePropertiesParams, get_params) ) - return _job_from_row(row) if row else None + return job_from_row(row) if row else None @asynccontextmanager async def transaction(self) -> AsyncGenerator: @@ -103,7 +102,7 @@ def advisory_lock(self, key: int) -> None: self.pg_misc_querier.pg_advisory_xact_lock(key=key) def job_insert(self, insert_params: JobInsertParams) -> Job: - return _job_from_row( + return job_from_row( cast( # drop Optional[] because insert always returns a row models.RiverJob, self.job_querier.job_insert_fast( @@ -122,7 +121,7 @@ def job_get_by_kind_and_unique_properties( row = self.job_querier.job_get_by_kind_and_unique_properties( cast(river_job.JobGetByKindAndUniquePropertiesParams, get_params) ) - return _job_from_row(row) if row else None + return job_from_row(row) if row else None @contextmanager def transaction(self) -> Iterator[None]: @@ -187,21 +186,13 @@ def _build_insert_many_params( return insert_many_params -def _job_from_row(row: models.RiverJob) -> Job: +def job_from_row(row: models.RiverJob) -> Job: """ Converts an internal sqlc generated row to the top level type, issuing a few minor transformations along the way. Timestamps are changed from local timezone to UTC. """ - def attempt_error_from(data: dict[str, Any]) -> AttemptError: - return AttemptError( - at=data["at"], - attempt=data["attempt"], - error=data["error"], - trace=data["trace"], - ) - # Trivial shortcut, but avoids a bunch of ternaries getting line wrapped below. def to_utc(t: datetime) -> datetime: return t.astimezone(timezone.utc) @@ -213,7 +204,7 @@ def to_utc(t: datetime) -> datetime: attempted_at=to_utc(row.attempted_at) if row.attempted_at else None, attempted_by=row.attempted_by, created_at=to_utc(row.created_at), - errors=list(map(attempt_error_from, row.errors)) if row.errors else None, + errors=list(map(AttemptError.from_dict, row.errors)) if row.errors else None, finalized_at=to_utc(row.finalized_at) if row.finalized_at else None, kind=row.kind, max_attempts=row.max_attempts, diff --git a/src/riverqueue/job.py b/src/riverqueue/job.py index 91fa0b9..ca58209 100644 --- a/src/riverqueue/job.py +++ b/src/riverqueue/job.py @@ -1,6 +1,7 @@ from dataclasses import dataclass -import datetime +from datetime import datetime, timezone from enum import Enum +import json from typing import Any, Optional @@ -106,7 +107,7 @@ class Job: if it's either snoozed or errors. """ - attempted_at: Optional[datetime.datetime] + attempted_at: Optional[datetime] """ The time that the job was last worked. Starts out as `nil` on a new insert. """ @@ -120,7 +121,7 @@ class Job: time when it starts up. """ - created_at: datetime.datetime + created_at: datetime """ When the job record was created. """ @@ -131,7 +132,7 @@ class Job: Ordered from earliest error to the latest error. """ - finalized_at: Optional[datetime.datetime] + finalized_at: Optional[datetime] """ The time at which the job was "finalized", meaning it was either completed successfully or errored for the last time such that it'll no longer be @@ -170,7 +171,7 @@ class Job: independently and be used to isolate jobs. """ - scheduled_at: datetime.datetime + scheduled_at: datetime """ When the job is scheduled to become available to be worked. Jobs default to running immediately, but may be scheduled for the future when they're @@ -199,7 +200,7 @@ class AttemptError: that occurred. """ - at: datetime.datetime + at: datetime """ The time at which the error occurred. """ @@ -221,3 +222,24 @@ class AttemptError: Contains a stack trace from a job that panicked. The trace is produced by invoking `debug.Trace()` in Go. """ + + @staticmethod + def from_dict(data: dict[str, Any]) -> "AttemptError": + return AttemptError( + at=datetime.fromisoformat(data["at"]), + attempt=data["attempt"], + error=data["error"], + trace=data["trace"], + ) + + def to_json(self) -> str: + return json.dumps( + { + "at": self.at.astimezone(timezone.utc) + .isoformat() + .replace("+00:00", "Z"), + "attempt": self.attempt, + "error": self.error, + "trace": self.trace, + } + ) diff --git a/tests/driver/riversqlalchemy/sqlalchemy_driver_test.py b/tests/driver/riversqlalchemy/sqlalchemy_driver_test.py index d8a7ebe..ef7b97f 100644 --- a/tests/driver/riversqlalchemy/sqlalchemy_driver_test.py +++ b/tests/driver/riversqlalchemy/sqlalchemy_driver_test.py @@ -1,5 +1,7 @@ +import json import pytest import pytest_asyncio +from riverqueue.job import AttemptError import sqlalchemy import sqlalchemy.ext.asyncio from datetime import datetime, timezone @@ -52,7 +54,7 @@ async def client( # @pytest.mark.asyncio - async def test_insert_job_from_row(self, client, simple_args): + async def test_insert_job_from_row(self, client, simple_args, test_tx): insert_res = await client.insert(simple_args) job = insert_res.job assert job @@ -70,6 +72,48 @@ async def test_insert_job_from_row(self, client, simple_args): assert job.state == JobState.AVAILABLE assert job.tags == [] + now = datetime.now(timezone.utc) + + job_row = await dbsqlc.river_job.AsyncQuerier(test_tx).job_insert_full( + dbsqlc.river_job.JobInsertFullParams( + args=json.dumps(dict(foo="args")), + attempt=0, + attempted_at=None, + created_at=datetime.now(), + errors=[ + AttemptError( + at=now, + attempt=1, + error="message", + trace="trace", + ).to_json(), + ], + finalized_at=datetime.now(), + kind="custom_kind", + max_attempts=MAX_ATTEMPTS_DEFAULT, + metadata=json.dumps(dict(foo="metadata")), + priority=PRIORITY_DEFAULT, + queue=QUEUE_DEFAULT, + scheduled_at=datetime.now(), + state=JobState.COMPLETED, + tags=[], + ) + ) + + job = riversqlalchemy.sql_alchemy_driver.job_from_row(job_row) + assert job + assert job.args == dict(foo="args") + assert job.errors == [ + AttemptError( + at=now, + attempt=1, + error="message", + trace="trace", + ) + ] + assert job.finalized_at.tzinfo == timezone.utc + assert job.metadata == dict(foo="metadata") + # # tests below this line should match what are in the sync client tests below #