From 68d2bf0bbf2dd6d8805e5cb21c467b57c40cd8e0 Mon Sep 17 00:00:00 2001 From: tmdeveloper007 Date: Fri, 19 Jun 2026 07:40:08 +0000 Subject: [PATCH] test: add unit tests for routes JSON deserialization helpers --- .../backend/unit/test_routes_json_helpers.py | 262 ++++++++++-------- 1 file changed, 139 insertions(+), 123 deletions(-) diff --git a/testing/backend/unit/test_routes_json_helpers.py b/testing/backend/unit/test_routes_json_helpers.py index a1124717..5bd97004 100644 --- a/testing/backend/unit/test_routes_json_helpers.py +++ b/testing/backend/unit/test_routes_json_helpers.py @@ -1,156 +1,172 @@ """ -Unit tests for routes.py JSON deserialization helpers. +Unit tests for backend.secuscan.routes_json_helpers. + +Covers: +- parse_json_fields: parses JSON string fields, leaves non-string/non-truthy unchanged +- parse_json_fields: JSON decode errors keep original string value +- parse_json_fields: does not mutate the original rows +- deserialize_finding_rows: strips *_json suffix and renames to friendly keys +- deserialize_finding_rows: rows without any *_json fields pass through unchanged +- deserialize_finding_rows: does not mutate the original rows +- deserialize_asset_service_rows: parses and renames metadata_json and cert_san_json +- deserialize_asset_service_rows: unknown *_json fields are not renamed """ + +import json + import pytest -from backend.secuscan.routes import ( - parse_json_fields, - deserialize_finding_rows, +from backend.secuscan.routes_json_helpers import ( + FINDING_JSON_FIELDS, deserialize_asset_service_rows, - _parse_workflow_steps, + deserialize_finding_rows, + parse_json_fields, ) +# --------------------------------------------------------------------------- +# parse_json_fields +# --------------------------------------------------------------------------- + + class TestParseJsonFields: - def test_decodes_valid_json_string(self): - rows = [{"field": '{"key": "val"}'}] - result = parse_json_fields(rows, ["field"]) - assert result[0]["field"] == {"key": "val"} - - def test_skips_non_string_values(self): - rows = [{"field": 42}] - result = parse_json_fields(rows, ["field"]) - assert result[0]["field"] == 42 - - def test_skips_missing_fields(self): - rows = [{}] - result = parse_json_fields(rows, ["field"]) - assert result[0] == {} - - def test_handles_bad_json_unchanged(self): - rows = [{"field": "not-json"}] - result = parse_json_fields(rows, ["field"]) - assert result[0]["field"] == "not-json" - - def test_handles_none_value(self): - rows = [{"field": None}] - result = parse_json_fields(rows, ["field"]) - assert result[0]["field"] is None - - def test_multiple_rows_and_fields(self): - rows = [ - {"a": '{"x":1}', "b": '{"y":2}'}, - {"a": '{"z":3}', "b": None}, - ] + def test_parses_json_string_fields(self): + """Fields that are JSON-encoded strings are replaced with parsed objects.""" + rows = [{"name": "n1", "config_json": '{"timeout": 30}'}] + result = parse_json_fields(rows, ["config_json"]) + assert result[0]["config_json"] == {"timeout": 30} + + def test_preserves_non_string_fields(self): + """Fields that are not strings are passed through.""" + rows = [{"name": "n1", "count": 5, "enabled": True}] + result = parse_json_fields(rows, ["count", "enabled"]) + assert result[0]["count"] == 5 + assert result[0]["enabled"] is True + + def test_skips_falsy_fields(self): + """Fields with falsy values (None, empty string) are not parsed.""" + rows = [{"name": "n1", "data": None}, {"name": "n2", "data": ""}] + result = parse_json_fields(rows, ["data"]) + assert result[0]["data"] is None + assert result[1]["data"] == "" + + def test_json_decode_error_keeps_original_string(self): + """A JSON decode error preserves the original string value.""" + rows = [{"name": "n1", "bad_json": "not valid json{"}] + result = parse_json_fields(rows, ["bad_json"]) + assert result[0]["bad_json"] == "not valid json{" + + def test_parses_multiple_fields_per_row(self): + """Multiple fields in the same row are all parsed when specified.""" + rows = [{"a": '{"x": 1}', "b": '{"y": 2}', "c": "plain"}] result = parse_json_fields(rows, ["a", "b"]) assert result[0]["a"] == {"x": 1} assert result[0]["b"] == {"y": 2} - assert result[1]["a"] == {"z": 3} - assert result[1]["b"] is None + assert result[0]["c"] == "plain" + + def test_does_not_mutate_input_rows(self): + """parse_json_fields must not modify the input list or its dicts.""" + original = [{"config_json": '{"timeout": 30}'}] + snapshot = json.dumps(original) + parse_json_fields(original, ["config_json"]) + assert json.dumps(original) == snapshot + - def test_empty_rows_list(self): - result = parse_json_fields([], ["field"]) - assert result == [] +# --------------------------------------------------------------------------- +# deserialize_finding_rows +# --------------------------------------------------------------------------- class TestDeserializeFindingRows: - def test_renames_json_keys_to_clean_api_names(self): + def test_strips_json_suffix_and_renames(self): + """Each *_json field is parsed and renamed to the base name.""" rows = [ { - "metadata_json": '{"src": "test"}', - "risk_factors_json": '[]', - "evidence_json": '[]', - "asset_refs_json": '[]', - "references_json": '[]', - "corroborating_sources_json": '[]', - "title": "Test Finding", + "id": "f1", + "metadata_json": '{"severity": "high"}', + "evidence_json": '["url1", "url2"]', } ] result = deserialize_finding_rows(rows) - f = result[0] - assert "metadata" in f - assert "metadata_json" not in f - assert f["metadata"] == {"src": "test"} - assert "risk_factors" in f - assert "evidence" in f - assert "asset_refs" in f - assert "references" in f - assert "corroborating_sources" in f - assert f["title"] == "Test Finding" - - def test_preserves_non_json_fields(self): - rows = [{"title": "Hello", "severity": "high", "metadata_json": "{}"}] + assert "metadata" in result[0] + assert "metadata_json" not in result[0] + assert result[0]["metadata"] == {"severity": "high"} + assert result[0]["evidence"] == ["url1", "url2"] + + def test_rows_without_json_fields_pass_through(self): + """Rows that have no *_json fields are returned unchanged (renamed to nothing).""" + rows = [{"id": "f1", "title": "SQL Injection", "severity": "high"}] result = deserialize_finding_rows(rows) - assert result[0]["title"] == "Hello" - assert result[0]["severity"] == "high" + assert result[0]["id"] == "f1" + assert result[0]["title"] == "SQL Injection" + assert "metadata" not in result[0] - def test_bad_json_in_metadata_field_unchanged(self): - rows = [{"metadata_json": "broken{", "risk_factors_json": "also-broken{"}] + def test_partial_json_fields_only_rename_present_keys(self): + """Only the *_json keys that are present get renamed.""" + rows = [{"id": "f1", "risk_factors_json": '[1, 2, 3]'}] result = deserialize_finding_rows(rows) - assert result[0]["metadata"] == "broken{" - assert result[0]["risk_factors"] == "also-broken{" + assert result[0]["risk_factors"] == [1, 2, 3] + assert "evidence" not in result[0] + assert "references" not in result[0] + + def test_does_not_mutate_input_rows(self): + """deserialize_finding_rows must not modify the input.""" + original = [{"id": "f1", "metadata_json": '{"x": 1}'}] + snapshot_keys = set(original[0].keys()) + deserialize_finding_rows(original) + assert set(original[0].keys()) == snapshot_keys + + +# --------------------------------------------------------------------------- +# deserialize_asset_service_rows +# --------------------------------------------------------------------------- class TestDeserializeAssetServiceRows: - def test_renames_metadata_json_and_cert_san_json(self): + def test_parses_and_renames_metadata_and_cert_san(self): + """metadata_json -> metadata and cert_san_json -> cert_san.""" rows = [ { - "metadata_json": '{"host": "web"}', - "cert_san_json": '["corp.local"]', - "port": 443, + "id": "svc1", + "metadata_json": '{"port": 443}', + "cert_san_json": '["host1.example.com"]', } ] result = deserialize_asset_service_rows(rows) - r = result[0] - assert "metadata" in r - assert "cert_san" in r - assert r["metadata"] == {"host": "web"} - assert r["cert_san"] == ["corp.local"] - assert r["port"] == 443 - - def test_bad_json_unchanged(self): - rows = [{"metadata_json": "not-json"}] + assert result[0]["metadata"] == {"port": 443} + assert "metadata_json" not in result[0] + assert result[0]["cert_san"] == ["host1.example.com"] + assert "cert_san_json" not in result[0] + + def test_ignores_unknown_json_fields(self): + """Fields not in the target set are left alone.""" + rows = [{"id": "svc1", "extra_json": '{"useless": true}'}] result = deserialize_asset_service_rows(rows) - assert result[0]["metadata"] == "not-json" - - -class TestParseWorkflowSteps: - def test_from_list_with_valid_step(self): - steps = [{"plugin_id": "nmap", "inputs": {"target": "127.0.0.1"}}] - result = _parse_workflow_steps(steps) - assert len(result) == 1 - assert result[0]["plugin_id"] == "nmap" - assert result[0]["inputs"] == {"target": "127.0.0.1"} - - def test_from_json_string(self): - result = _parse_workflow_steps('[{"plugin_id": "nmap", "inputs": {}}]') - assert len(result) == 1 - assert result[0]["plugin_id"] == "nmap" - - def test_from_none_returns_empty_list(self): - result = _parse_workflow_steps(None) - assert result == [] - - def test_from_empty_string_returns_empty_list(self): - result = _parse_workflow_steps("") - assert result == [] - - def test_skips_non_dict_items(self): - result = _parse_workflow_steps(["not-a-dict", 123, {"plugin_id": "x", "inputs": {}}]) - assert len(result) == 1 - assert result[0]["plugin_id"] == "x" - - def test_dicts_without_plugin_id_produce_empty_plugin_id(self): - # plugin_id="" is a valid WorkflowStep; the function coerces missing key to "" - result = _parse_workflow_steps([{"inputs": {}}]) - assert len(result) == 1 - assert result[0]["plugin_id"] == "" - - def test_preserves_preset_field(self): - steps = [{"plugin_id": "nmap", "inputs": {}, "preset": "fast", "execution_context": {}}] - result = _parse_workflow_steps(steps) - assert result[0]["preset"] == "fast" - - def test_empty_list_returns_empty(self): - result = _parse_workflow_steps([]) - assert result == [] + assert "extra_json" in result[0] + assert "metadata" not in result[0] + + def test_does_not_mutate_input_rows(self): + """deserialize_asset_service_rows must not modify the input.""" + original = [{"id": "svc1", "metadata_json": '{"x": 1}'}] + snapshot_keys = set(original[0].keys()) + deserialize_asset_service_rows(original) + assert set(original[0].keys()) == snapshot_keys + + +# --------------------------------------------------------------------------- +# FINDING_JSON_FIELDS +# --------------------------------------------------------------------------- + + +class TestFindingJsonFields: + def test_contains_expected_field_names(self): + """FINDING_JSON_FIELDS lists all the *_json columns used by findings.""" + expected = [ + "metadata_json", + "risk_factors_json", + "evidence_json", + "asset_refs_json", + "references_json", + "corroborating_sources_json", + ] + assert set(FINDING_JSON_FIELDS) == set(expected)