Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
262 changes: 139 additions & 123 deletions testing/backend/unit/test_routes_json_helpers.py
Original file line number Diff line number Diff line change
@@ -1,156 +1,172 @@
"""
Unit tests for routes.py JSON deserialization helpers.
Unit tests for backend.secuscan.routes_json_helpers.

Covers:
- parse_json_fields: parses JSON string fields, leaves non-string/non-truthy unchanged
- parse_json_fields: JSON decode errors keep original string value
- parse_json_fields: does not mutate the original rows
- deserialize_finding_rows: strips *_json suffix and renames to friendly keys
- deserialize_finding_rows: rows without any *_json fields pass through unchanged
- deserialize_finding_rows: does not mutate the original rows
- deserialize_asset_service_rows: parses and renames metadata_json and cert_san_json
- deserialize_asset_service_rows: unknown *_json fields are not renamed
"""

import json

import pytest

from backend.secuscan.routes import (
parse_json_fields,
deserialize_finding_rows,
from backend.secuscan.routes_json_helpers import (
FINDING_JSON_FIELDS,
deserialize_asset_service_rows,
_parse_workflow_steps,
deserialize_finding_rows,
parse_json_fields,
)


# ---------------------------------------------------------------------------
# parse_json_fields
# ---------------------------------------------------------------------------


class TestParseJsonFields:
def test_decodes_valid_json_string(self):
rows = [{"field": '{"key": "val"}'}]
result = parse_json_fields(rows, ["field"])
assert result[0]["field"] == {"key": "val"}

def test_skips_non_string_values(self):
rows = [{"field": 42}]
result = parse_json_fields(rows, ["field"])
assert result[0]["field"] == 42

def test_skips_missing_fields(self):
rows = [{}]
result = parse_json_fields(rows, ["field"])
assert result[0] == {}

def test_handles_bad_json_unchanged(self):
rows = [{"field": "not-json"}]
result = parse_json_fields(rows, ["field"])
assert result[0]["field"] == "not-json"

def test_handles_none_value(self):
rows = [{"field": None}]
result = parse_json_fields(rows, ["field"])
assert result[0]["field"] is None

def test_multiple_rows_and_fields(self):
rows = [
{"a": '{"x":1}', "b": '{"y":2}'},
{"a": '{"z":3}', "b": None},
]
def test_parses_json_string_fields(self):
"""Fields that are JSON-encoded strings are replaced with parsed objects."""
rows = [{"name": "n1", "config_json": '{"timeout": 30}'}]
result = parse_json_fields(rows, ["config_json"])
assert result[0]["config_json"] == {"timeout": 30}

def test_preserves_non_string_fields(self):
"""Fields that are not strings are passed through."""
rows = [{"name": "n1", "count": 5, "enabled": True}]
result = parse_json_fields(rows, ["count", "enabled"])
assert result[0]["count"] == 5
assert result[0]["enabled"] is True

def test_skips_falsy_fields(self):
"""Fields with falsy values (None, empty string) are not parsed."""
rows = [{"name": "n1", "data": None}, {"name": "n2", "data": ""}]
result = parse_json_fields(rows, ["data"])
assert result[0]["data"] is None
assert result[1]["data"] == ""

def test_json_decode_error_keeps_original_string(self):
"""A JSON decode error preserves the original string value."""
rows = [{"name": "n1", "bad_json": "not valid json{"}]
result = parse_json_fields(rows, ["bad_json"])
assert result[0]["bad_json"] == "not valid json{"

def test_parses_multiple_fields_per_row(self):
"""Multiple fields in the same row are all parsed when specified."""
rows = [{"a": '{"x": 1}', "b": '{"y": 2}', "c": "plain"}]
result = parse_json_fields(rows, ["a", "b"])
assert result[0]["a"] == {"x": 1}
assert result[0]["b"] == {"y": 2}
assert result[1]["a"] == {"z": 3}
assert result[1]["b"] is None
assert result[0]["c"] == "plain"

def test_does_not_mutate_input_rows(self):
"""parse_json_fields must not modify the input list or its dicts."""
original = [{"config_json": '{"timeout": 30}'}]
snapshot = json.dumps(original)
parse_json_fields(original, ["config_json"])
assert json.dumps(original) == snapshot


def test_empty_rows_list(self):
result = parse_json_fields([], ["field"])
assert result == []
# ---------------------------------------------------------------------------
# deserialize_finding_rows
# ---------------------------------------------------------------------------


class TestDeserializeFindingRows:
def test_renames_json_keys_to_clean_api_names(self):
def test_strips_json_suffix_and_renames(self):
"""Each *_json field is parsed and renamed to the base name."""
rows = [
{
"metadata_json": '{"src": "test"}',
"risk_factors_json": '[]',
"evidence_json": '[]',
"asset_refs_json": '[]',
"references_json": '[]',
"corroborating_sources_json": '[]',
"title": "Test Finding",
"id": "f1",
"metadata_json": '{"severity": "high"}',
"evidence_json": '["url1", "url2"]',
}
]
result = deserialize_finding_rows(rows)
f = result[0]
assert "metadata" in f
assert "metadata_json" not in f
assert f["metadata"] == {"src": "test"}
assert "risk_factors" in f
assert "evidence" in f
assert "asset_refs" in f
assert "references" in f
assert "corroborating_sources" in f
assert f["title"] == "Test Finding"

def test_preserves_non_json_fields(self):
rows = [{"title": "Hello", "severity": "high", "metadata_json": "{}"}]
assert "metadata" in result[0]
assert "metadata_json" not in result[0]
assert result[0]["metadata"] == {"severity": "high"}
assert result[0]["evidence"] == ["url1", "url2"]

def test_rows_without_json_fields_pass_through(self):
"""Rows that have no *_json fields are returned unchanged (renamed to nothing)."""
rows = [{"id": "f1", "title": "SQL Injection", "severity": "high"}]
result = deserialize_finding_rows(rows)
assert result[0]["title"] == "Hello"
assert result[0]["severity"] == "high"
assert result[0]["id"] == "f1"
assert result[0]["title"] == "SQL Injection"
assert "metadata" not in result[0]

def test_bad_json_in_metadata_field_unchanged(self):
rows = [{"metadata_json": "broken{", "risk_factors_json": "also-broken{"}]
def test_partial_json_fields_only_rename_present_keys(self):
"""Only the *_json keys that are present get renamed."""
rows = [{"id": "f1", "risk_factors_json": '[1, 2, 3]'}]
result = deserialize_finding_rows(rows)
assert result[0]["metadata"] == "broken{"
assert result[0]["risk_factors"] == "also-broken{"
assert result[0]["risk_factors"] == [1, 2, 3]
assert "evidence" not in result[0]
assert "references" not in result[0]

def test_does_not_mutate_input_rows(self):
"""deserialize_finding_rows must not modify the input."""
original = [{"id": "f1", "metadata_json": '{"x": 1}'}]
snapshot_keys = set(original[0].keys())
deserialize_finding_rows(original)
assert set(original[0].keys()) == snapshot_keys


# ---------------------------------------------------------------------------
# deserialize_asset_service_rows
# ---------------------------------------------------------------------------


class TestDeserializeAssetServiceRows:
def test_renames_metadata_json_and_cert_san_json(self):
def test_parses_and_renames_metadata_and_cert_san(self):
"""metadata_json -> metadata and cert_san_json -> cert_san."""
rows = [
{
"metadata_json": '{"host": "web"}',
"cert_san_json": '["corp.local"]',
"port": 443,
"id": "svc1",
"metadata_json": '{"port": 443}',
"cert_san_json": '["host1.example.com"]',
}
]
result = deserialize_asset_service_rows(rows)
r = result[0]
assert "metadata" in r
assert "cert_san" in r
assert r["metadata"] == {"host": "web"}
assert r["cert_san"] == ["corp.local"]
assert r["port"] == 443

def test_bad_json_unchanged(self):
rows = [{"metadata_json": "not-json"}]
assert result[0]["metadata"] == {"port": 443}
assert "metadata_json" not in result[0]
assert result[0]["cert_san"] == ["host1.example.com"]
assert "cert_san_json" not in result[0]

def test_ignores_unknown_json_fields(self):
"""Fields not in the target set are left alone."""
rows = [{"id": "svc1", "extra_json": '{"useless": true}'}]
result = deserialize_asset_service_rows(rows)
assert result[0]["metadata"] == "not-json"


class TestParseWorkflowSteps:
def test_from_list_with_valid_step(self):
steps = [{"plugin_id": "nmap", "inputs": {"target": "127.0.0.1"}}]
result = _parse_workflow_steps(steps)
assert len(result) == 1
assert result[0]["plugin_id"] == "nmap"
assert result[0]["inputs"] == {"target": "127.0.0.1"}

def test_from_json_string(self):
result = _parse_workflow_steps('[{"plugin_id": "nmap", "inputs": {}}]')
assert len(result) == 1
assert result[0]["plugin_id"] == "nmap"

def test_from_none_returns_empty_list(self):
result = _parse_workflow_steps(None)
assert result == []

def test_from_empty_string_returns_empty_list(self):
result = _parse_workflow_steps("")
assert result == []

def test_skips_non_dict_items(self):
result = _parse_workflow_steps(["not-a-dict", 123, {"plugin_id": "x", "inputs": {}}])
assert len(result) == 1
assert result[0]["plugin_id"] == "x"

def test_dicts_without_plugin_id_produce_empty_plugin_id(self):
# plugin_id="" is a valid WorkflowStep; the function coerces missing key to ""
result = _parse_workflow_steps([{"inputs": {}}])
assert len(result) == 1
assert result[0]["plugin_id"] == ""

def test_preserves_preset_field(self):
steps = [{"plugin_id": "nmap", "inputs": {}, "preset": "fast", "execution_context": {}}]
result = _parse_workflow_steps(steps)
assert result[0]["preset"] == "fast"

def test_empty_list_returns_empty(self):
result = _parse_workflow_steps([])
assert result == []
assert "extra_json" in result[0]
assert "metadata" not in result[0]

def test_does_not_mutate_input_rows(self):
"""deserialize_asset_service_rows must not modify the input."""
original = [{"id": "svc1", "metadata_json": '{"x": 1}'}]
snapshot_keys = set(original[0].keys())
deserialize_asset_service_rows(original)
assert set(original[0].keys()) == snapshot_keys


# ---------------------------------------------------------------------------
# FINDING_JSON_FIELDS
# ---------------------------------------------------------------------------


class TestFindingJsonFields:
def test_contains_expected_field_names(self):
"""FINDING_JSON_FIELDS lists all the *_json columns used by findings."""
expected = [
"metadata_json",
"risk_factors_json",
"evidence_json",
"asset_refs_json",
"references_json",
"corroborating_sources_json",
]
assert set(FINDING_JSON_FIELDS) == set(expected)
Loading