|
| 1 | +import pytest |
| 2 | + |
| 3 | +from mpt_api_client.exceptions import MPTAPIError |
| 4 | +from mpt_api_client.rql.query_builder import RQLQuery |
| 5 | + |
| 6 | +pytestmark = [pytest.mark.flaky] |
| 7 | + |
| 8 | + |
| 9 | +@pytest.fixture |
| 10 | +def async_attachment_service(async_mpt_ops, pricing_policy_id): |
| 11 | + return async_mpt_ops.catalog.pricing_policies.attachments(pricing_policy_id) |
| 12 | + |
| 13 | + |
| 14 | +@pytest.fixture |
| 15 | +async def created_attachment_async(async_attachment_service, attachment_data, pdf_fd): |
| 16 | + attachment = await async_attachment_service.create(attachment_data, file=pdf_fd) |
| 17 | + yield attachment |
| 18 | + try: |
| 19 | + await async_attachment_service.delete(attachment.id) |
| 20 | + except MPTAPIError as error: |
| 21 | + print(f"TEARDOWN - Unable to delete attachment {attachment.id}: {error.title}") |
| 22 | + |
| 23 | + |
| 24 | +def test_create_attachment_async(created_attachment_async, attachment_data): |
| 25 | + result = created_attachment_async |
| 26 | + |
| 27 | + assert result.name == attachment_data["name"] |
| 28 | + assert result.description == attachment_data["description"] |
| 29 | + |
| 30 | + |
| 31 | +async def test_update_attachment_async(async_attachment_service, created_attachment_async): |
| 32 | + update_data = {"name": "Updated e2e test attachment - please delete"} |
| 33 | + |
| 34 | + result = await async_attachment_service.update(created_attachment_async.id, update_data) |
| 35 | + |
| 36 | + assert result.name == update_data["name"] |
| 37 | + |
| 38 | + |
| 39 | +async def test_get_attachment_async(async_attachment_service, attachment_id): |
| 40 | + result = await async_attachment_service.get(attachment_id) |
| 41 | + |
| 42 | + assert result.id == attachment_id |
| 43 | + |
| 44 | + |
| 45 | +async def test_download_attachment_async(async_attachment_service, attachment_id): |
| 46 | + result = await async_attachment_service.download(attachment_id) |
| 47 | + |
| 48 | + assert result.file_contents is not None |
| 49 | + assert result.filename == "empty.pdf" |
| 50 | + |
| 51 | + |
| 52 | +async def test_iterate_attachments_async(async_attachment_service, created_attachment_async): |
| 53 | + result = [att async for att in async_attachment_service.iterate()] |
| 54 | + |
| 55 | + assert any(att.id == created_attachment_async.id for att in result) |
| 56 | + |
| 57 | + |
| 58 | +async def test_filter_attachments_async(async_attachment_service, created_attachment_async): |
| 59 | + filtered_service = async_attachment_service.filter(RQLQuery(id=created_attachment_async.id)) |
| 60 | + |
| 61 | + result = [att async for att in filtered_service.iterate()] |
| 62 | + |
| 63 | + assert len(result) == 1 |
| 64 | + assert result[0].id == created_attachment_async.id |
| 65 | + |
| 66 | + |
| 67 | +async def test_not_found_async(async_attachment_service): |
| 68 | + with pytest.raises(MPTAPIError): |
| 69 | + await async_attachment_service.get("ATT-000-000-000") |
| 70 | + |
| 71 | + |
| 72 | +async def test_delete_attachment_async(async_attachment_service, created_attachment_async): |
| 73 | + await async_attachment_service.delete(created_attachment_async.id) |
| 74 | + |
| 75 | + with pytest.raises(MPTAPIError): |
| 76 | + await async_attachment_service.get(created_attachment_async.id) |
0 commit comments