diff --git a/mod_ci/controllers.py b/mod_ci/controllers.py index 7ce8a0c2..bad2c7a2 100755 --- a/mod_ci/controllers.py +++ b/mod_ci/controllers.py @@ -10,7 +10,7 @@ import zipfile from collections import defaultdict from pathlib import Path -from typing import Any, Dict +from typing import Any, Dict, Optional import googleapiclient.discovery import requests @@ -43,6 +43,12 @@ TestResult, TestResultFile, TestStatus, TestType) from utility import is_valid_signature, request_from_github +# Timeout constants (in seconds) +GITHUB_API_TIMEOUT = 30 # Timeout for GitHub API calls +GCP_API_TIMEOUT = 60 # Timeout for GCP API calls +ARTIFACT_DOWNLOAD_TIMEOUT = 300 # 5 minutes for artifact downloads +GCP_OPERATION_MAX_WAIT = 1800 # 30 minutes max wait for GCP operations + mod_ci = Blueprint('ci', __name__) @@ -236,18 +242,25 @@ def gcp_instance(app, db, platform, repository, delay) -> None: for test in pending_tests: if test.test_type == TestType.pull_request: - gh_commit = repository.get_commit(test.commit) - if test.pr_nr == 0: - log.warn(f'[{platform}] Test {test.id} is invalid') - deschedule_test(gh_commit, message="Invalid PR number", test=test, db=db) - continue - test_pr = repository.get_pull(test.pr_nr) - if test.commit != test_pr.head.sha: - log.warn(f'[{platform}] Test {test.id} is invalid') - deschedule_test(gh_commit, message="PR closed or updated", test=test, db=db) - continue - if test_pr.state != 'open': - log.debug(f"PR {test.pr_nr} not in open state, skipping test {test.id}") + try: + gh_commit = repository.get_commit(test.commit) + if test.pr_nr == 0: + log.warn(f'[{platform}] Test {test.id} is invalid') + deschedule_test(gh_commit, message="Invalid PR number", test=test, db=db) + continue + test_pr = repository.get_pull(test.pr_nr) + if test.commit != test_pr.head.sha: + log.warn(f'[{platform}] Test {test.id} is invalid') + deschedule_test(gh_commit, message="PR closed or updated", test=test, db=db) + continue + if test_pr.state != 'open': + log.debug(f"PR {test.pr_nr} not in open state, skipping test {test.id}") + continue + except GithubException as e: + log.error(f"GitHub API error for test {test.id}: {e}") + continue # Skip this test, try next one + except Exception as e: + log.error(f"Unexpected error checking PR status for test {test.id}: {e}") continue start_test(compute, app, db, repository, test, github_config['bot_token']) @@ -264,6 +277,33 @@ def get_compute_service_object() -> googleapiclient.discovery.Resource: return googleapiclient.discovery.build('compute', 'v1', credentials=credentials) +def mark_test_failed(db, test, repository, message: str) -> None: + """ + Mark a test as failed and update GitHub status. + + :param db: Database session + :type db: sqlalchemy.orm.scoping.scoped_session + :param test: The test to mark as failed + :type test: mod_test.models.Test + :param repository: GitHub repository object + :type repository: Repository.Repository + :param message: Error message to display + :type message: str + """ + from run import log + + try: + progress = TestProgress(test.id, TestStatus.canceled, message) + db.add(progress) + db.commit() + + gh_commit = repository.get_commit(test.commit) + update_status_on_github(gh_commit, Status.ERROR, message, f"CI - {test.platform.value}") + log.info(f"Marked test {test.id} as failed: {message}") + except Exception as e: + log.error(f"Failed to mark test {test.id} as failed: {e}") + + def start_test(compute, app, db, repository: Repository.Repository, test, bot_token) -> None: """ Start a VM instance and run the tests. @@ -290,6 +330,19 @@ def start_test(compute, app, db, repository: Repository.Repository, test, bot_to :rtype: None """ from run import config, log + + # Check if test is already being processed (basic locking) + existing_instance = GcpInstance.query.filter(GcpInstance.test_id == test.id).first() + if existing_instance is not None: + log.warning(f"Test {test.id} already has a GCP instance, skipping duplicate start") + return + + # Check if test already has progress (already started or finished) + existing_progress = TestProgress.query.filter(TestProgress.test_id == test.id).first() + if existing_progress is not None: + log.warning(f"Test {test.id} already has progress entries, skipping") + return + gcp_instance_name = f"{test.platform.value}-{test.id}" log.debug(f'[{gcp_instance_name}] Starting test {test.id}') @@ -393,16 +446,28 @@ def start_test(compute, app, db, repository: Repository.Repository, test, bot_to artifact_url = artifact.archive_download_url try: auth_header = f"token {bot_token}" - r = requests.get(artifact_url, headers={"Authorization": auth_header}) + r = requests.get( + artifact_url, + headers={"Authorization": auth_header}, + timeout=ARTIFACT_DOWNLOAD_TIMEOUT + ) + except requests.exceptions.Timeout: + log.critical(f"Artifact download timed out after {ARTIFACT_DOWNLOAD_TIMEOUT}s") + mark_test_failed(db, test, repository, "Artifact download timed out") + return except Exception as e: - log.critical("Could not fetch artifact, request timed out") + log.critical(f"Could not fetch artifact: {e}") + mark_test_failed(db, test, repository, f"Artifact download failed: {e}") return if r.status_code != 200: log.critical(f"Could not fetch artifact, response code: {r.status_code}") + mark_test_failed(db, test, repository, f"Artifact download failed: HTTP {r.status_code}") return - open(os.path.join(base_folder, 'ccextractor.zip'), 'wb').write(r.content) - with zipfile.ZipFile(os.path.join(base_folder, 'ccextractor.zip'), 'r') as artifact_zip: + zip_path = os.path.join(base_folder, 'ccextractor.zip') + with open(zip_path, 'wb') as f: + f.write(r.content) + with zipfile.ZipFile(zip_path, 'r') as artifact_zip: artifact_zip.extractall(base_folder) artifact_saved = True @@ -410,6 +475,7 @@ def start_test(compute, app, db, repository: Repository.Repository, test, bot_to if not artifact_saved: log.critical("Could not find an artifact for this commit") + mark_test_failed(db, test, repository, "No build artifact found for this commit") return zone = config.get('ZONE', '') @@ -560,9 +626,9 @@ def get_config_for_gcp_instance(vm_name, source_disk_image, metadata_items) -> D } -def wait_for_operation(compute, project, zone, operation) -> Dict: +def wait_for_operation(compute, project, zone, operation, max_wait: int = GCP_OPERATION_MAX_WAIT) -> Dict: """ - Wait for an operation to get completed. + Wait for an operation to get completed with timeout. :param compute: The cloud compute engine service object :type compute: googleapiclient.discovery.Resource @@ -572,22 +638,47 @@ def wait_for_operation(compute, project, zone, operation) -> Dict: :type zone: str :param operation: Operation name for which server is waiting :type operation: str + :param max_wait: Maximum time to wait in seconds (default: 30 minutes) + :type max_wait: int :return: Response received after operation completion :rtype: Dict """ from run import log - log.info("Waiting for an operation to finish") - while True: - result = compute.zoneOperations().get( - project=project, - zone=zone, - operation=operation).execute() + log.info(f"Waiting for operation {operation} to finish (max {max_wait}s)") + start_time = time.time() + poll_interval = 1.0 # Start with 1 second polling - if result['status'] == 'DONE': - log.info("Operation Completed") - return result + while True: + elapsed = time.time() - start_time + if elapsed >= max_wait: + log.error(f"Operation {operation} timed out after {elapsed:.0f} seconds") + return { + 'status': 'TIMEOUT', + 'error': { + 'errors': [{ + 'code': 'TIMEOUT', + 'message': f'Operation timed out after {max_wait} seconds' + }] + } + } - time.sleep(1) + try: + result = compute.zoneOperations().get( + project=project, + zone=zone, + operation=operation).execute() + + if result['status'] == 'DONE': + log.info(f"Operation {operation} completed in {elapsed:.0f} seconds") + return result + + except Exception as e: + log.error(f"Error checking operation status: {e}") + return {'status': 'ERROR', 'error': {'errors': [{'code': 'API_ERROR', 'message': str(e)}]}} + + # Exponential backoff with cap at 10 seconds + time.sleep(min(poll_interval, 10)) + poll_interval = min(poll_interval * 1.5, 10) def save_xml_to_file(xml_node, folder_name, file_name) -> None: diff --git a/tests/test_ci/test_controllers.py b/tests/test_ci/test_controllers.py index 9af60716..07cb2a42 100644 --- a/tests/test_ci/test_controllers.py +++ b/tests/test_ci/test_controllers.py @@ -168,9 +168,8 @@ def __init__(self): mock_maintenance.query.filter.return_value.first.return_value = MockMaintenance() - resp = gcp_instance(mock.ANY, mock.ANY, "test", mock.ANY, 1) + gcp_instance(mock.ANY, mock.ANY, "test", mock.ANY, 1) - self.assertIsNone(resp) mock_log.info.assert_called_once() mock_log.critical.assert_not_called() self.assertEqual(mock_log.debug.call_count, 2) @@ -207,7 +206,10 @@ def test_cron_job_testing_true(self, mock_gcp_instance, mock_get_compute_service @mock.patch('mod_ci.controllers.create_instance') @mock.patch('builtins.open', new_callable=mock.mock_open()) @mock.patch('mod_ci.controllers.g') - def test_start_test(self, mock_g, mock_open_file, mock_create_instance, mock_wait_for_operation): + @mock.patch('mod_ci.controllers.TestProgress') + @mock.patch('mod_ci.controllers.GcpInstance') + def test_start_test(self, mock_gcp_instance, mock_test_progress, mock_g, mock_open_file, + mock_create_instance, mock_wait_for_operation): """Test start_test function.""" import zipfile @@ -215,6 +217,11 @@ def test_start_test(self, mock_g, mock_open_file, mock_create_instance, mock_wai from github.Artifact import Artifact from mod_ci.controllers import Artifact_names, start_test + + # Mock locking checks to return None (no existing instances/progress) + mock_gcp_instance.query.filter.return_value.first.return_value = None + mock_test_progress.query.filter.return_value.first.return_value = None + test = Test.query.first() repository = MagicMock() @@ -1738,6 +1745,294 @@ def config_get(key, *args, **kwargs): mock_log.critical.assert_called_with('GCP project name is empty!') + @mock.patch('run.log') + @mock.patch('mod_ci.controllers.update_status_on_github') + @mock.patch('mod_ci.controllers.TestProgress') + @mock.patch('mod_ci.controllers.g') + def test_mark_test_failed_success(self, mock_g, mock_test_progress, mock_update_status, mock_log): + """Test mark_test_failed function successfully marks a test as failed.""" + from mod_ci.controllers import mark_test_failed + + test = Test.query.first() + repository = MagicMock() + mock_commit = MagicMock() + repository.get_commit.return_value = mock_commit + + mark_test_failed(mock_g.db, test, repository, "Test error message") + + mock_test_progress.assert_called_once() + mock_g.db.add.assert_called_once() + mock_g.db.commit.assert_called_once() + mock_update_status.assert_called_once() + mock_log.info.assert_called() + + @mock.patch('run.log') + @mock.patch('mod_ci.controllers.update_status_on_github') + @mock.patch('mod_ci.controllers.TestProgress') + @mock.patch('mod_ci.controllers.g') + def test_mark_test_failed_exception(self, mock_g, mock_test_progress, mock_update_status, mock_log): + """Test mark_test_failed function handles exceptions gracefully.""" + from mod_ci.controllers import mark_test_failed + + test = Test.query.first() + repository = MagicMock() + repository.get_commit.side_effect = Exception("GitHub API error") + + # Should not raise, just log the error + mark_test_failed(mock_g.db, test, repository, "Test error message") + + mock_log.error.assert_called() + + @mock.patch('github.Github.get_repo') + @mock.patch('mod_ci.controllers.start_test') + @mock.patch('mod_ci.controllers.get_compute_service_object') + @mock.patch('mod_ci.controllers.g') + @mock.patch('run.log') + def test_gcp_instance_github_exception(self, mock_log, mock_g, mock_get_compute, + mock_start_test, mock_repo): + """Test gcp_instance handles GithubException gracefully.""" + from github import GithubException + + from mod_ci.controllers import gcp_instance + + repo = mock_repo() + # Make get_commit raise GithubException + repo.get_commit.side_effect = GithubException(404, "Not found", None) + + # Create a PR type test that will trigger the exception + test = Test(TestPlatform.linux, TestType.pull_request, 1, "test", "abc123", 1) + g.db.add(test) + g.db.commit() + + gcp_instance(self.app, mock_g.db, TestPlatform.linux, repo, None) + + # Should log error and continue + mock_log.error.assert_called() + + @mock.patch('github.Github.get_repo') + @mock.patch('mod_ci.controllers.start_test') + @mock.patch('mod_ci.controllers.get_compute_service_object') + @mock.patch('mod_ci.controllers.g') + @mock.patch('run.log') + def test_gcp_instance_unexpected_exception(self, mock_log, mock_g, mock_get_compute, + mock_start_test, mock_repo): + """Test gcp_instance handles unexpected exceptions gracefully.""" + from mod_ci.controllers import gcp_instance + + repo = mock_repo() + # Make get_commit raise unexpected exception + repo.get_commit.side_effect = RuntimeError("Unexpected error") + + # Create a PR type test that will trigger the exception + test = Test(TestPlatform.linux, TestType.pull_request, 1, "test", "def456", 2) + g.db.add(test) + g.db.commit() + + gcp_instance(self.app, mock_g.db, TestPlatform.linux, repo, None) + + # Should log error and continue + mock_log.error.assert_called() + + @mock.patch('mod_ci.controllers.wait_for_operation') + @mock.patch('mod_ci.controllers.create_instance') + @mock.patch('builtins.open', new_callable=mock.mock_open()) + @mock.patch('mod_ci.controllers.g') + @mock.patch('mod_ci.controllers.TestProgress') + @mock.patch('mod_ci.controllers.GcpInstance') + @mock.patch('run.log') + def test_start_test_duplicate_instance_check(self, mock_log, mock_gcp_instance, mock_test_progress, + mock_g, mock_open_file, mock_create_instance, + mock_wait_for_operation): + """Test start_test skips if GCP instance already exists for test.""" + from mod_ci.controllers import start_test + + test = Test.query.first() + repository = MagicMock() + + # Mock that an instance already exists + mock_gcp_instance.query.filter.return_value.first.return_value = MagicMock() + + start_test(mock.ANY, self.app, mock_g.db, repository, test, mock.ANY) + + # Should log warning and return early + mock_log.warning.assert_called() + mock_create_instance.assert_not_called() + + @mock.patch('mod_ci.controllers.wait_for_operation') + @mock.patch('mod_ci.controllers.create_instance') + @mock.patch('builtins.open', new_callable=mock.mock_open()) + @mock.patch('mod_ci.controllers.g') + @mock.patch('mod_ci.controllers.TestProgress') + @mock.patch('mod_ci.controllers.GcpInstance') + @mock.patch('run.log') + def test_start_test_duplicate_progress_check(self, mock_log, mock_gcp_instance, mock_test_progress, + mock_g, mock_open_file, mock_create_instance, + mock_wait_for_operation): + """Test start_test skips if test already has progress entries.""" + from mod_ci.controllers import start_test + + test = Test.query.first() + repository = MagicMock() + + # Mock no instance but progress exists + mock_gcp_instance.query.filter.return_value.first.return_value = None + mock_test_progress.query.filter.return_value.first.return_value = MagicMock() + + start_test(mock.ANY, self.app, mock_g.db, repository, test, mock.ANY) + + # Should log warning and return early + mock_log.warning.assert_called() + mock_create_instance.assert_not_called() + + @mock.patch('mod_ci.controllers.mark_test_failed') + @mock.patch('mod_ci.controllers.wait_for_operation') + @mock.patch('mod_ci.controllers.create_instance') + @mock.patch('builtins.open', new_callable=mock.mock_open()) + @mock.patch('mod_ci.controllers.g') + @mock.patch('mod_ci.controllers.TestProgress') + @mock.patch('mod_ci.controllers.GcpInstance') + @mock.patch('run.log') + @mock.patch('requests.get') + def test_start_test_artifact_timeout(self, mock_requests_get, mock_log, mock_gcp_instance, + mock_test_progress, mock_g, mock_open_file, + mock_create_instance, mock_wait_for_operation, + mock_mark_failed): + """Test start_test handles artifact download timeout.""" + import requests + from github.Artifact import Artifact + + from mod_ci.controllers import Artifact_names, start_test + + test = Test.query.first() + repository = MagicMock() + + # Mock locking checks + mock_gcp_instance.query.filter.return_value.first.return_value = None + mock_test_progress.query.filter.return_value.first.return_value = None + + # Mock artifact + artifact = MagicMock(Artifact) + artifact.name = Artifact_names.linux + artifact.workflow_run.head_sha = test.commit + repository.get_artifacts.return_value = [artifact] + + # Mock timeout exception + mock_requests_get.side_effect = requests.exceptions.Timeout() + + customized_test = CustomizedTest(1, 1) + g.db.add(customized_test) + g.db.commit() + + start_test(mock.ANY, self.app, mock_g.db, repository, test, mock.ANY) + + # Should log critical and mark test failed + mock_log.critical.assert_called() + mock_mark_failed.assert_called() + mock_create_instance.assert_not_called() + + @mock.patch('mod_ci.controllers.mark_test_failed') + @mock.patch('mod_ci.controllers.wait_for_operation') + @mock.patch('mod_ci.controllers.create_instance') + @mock.patch('builtins.open', new_callable=mock.mock_open()) + @mock.patch('mod_ci.controllers.g') + @mock.patch('mod_ci.controllers.TestProgress') + @mock.patch('mod_ci.controllers.GcpInstance') + @mock.patch('run.log') + @mock.patch('requests.get') + def test_start_test_artifact_http_error(self, mock_requests_get, mock_log, mock_gcp_instance, + mock_test_progress, mock_g, mock_open_file, + mock_create_instance, mock_wait_for_operation, + mock_mark_failed): + """Test start_test handles artifact download HTTP errors.""" + import requests + from github.Artifact import Artifact + + from mod_ci.controllers import Artifact_names, start_test + + test = Test.query.first() + repository = MagicMock() + + # Mock locking checks + mock_gcp_instance.query.filter.return_value.first.return_value = None + mock_test_progress.query.filter.return_value.first.return_value = None + + # Mock artifact + artifact = MagicMock(Artifact) + artifact.name = Artifact_names.linux + artifact.workflow_run.head_sha = test.commit + repository.get_artifacts.return_value = [artifact] + + # Mock HTTP 500 response + response = requests.models.Response() + response.status_code = 500 + mock_requests_get.return_value = response + + customized_test = CustomizedTest(1, 1) + g.db.add(customized_test) + g.db.commit() + + start_test(mock.ANY, self.app, mock_g.db, repository, test, mock.ANY) + + # Should log critical and mark test failed + mock_log.critical.assert_called() + mock_mark_failed.assert_called() + mock_create_instance.assert_not_called() + + @mock.patch('run.log') + @mock.patch('time.sleep') + @mock.patch('time.time') + def test_wait_for_operation_timeout(self, mock_time, mock_sleep, mock_log): + """Test wait_for_operation returns timeout error after max wait.""" + from mod_ci.controllers import wait_for_operation + + compute = MagicMock() + # Simulate time passing beyond max_wait + mock_time.side_effect = [0, 100] # Start at 0, then jump to 100 seconds + + result = wait_for_operation(compute, "project", "zone", "operation", max_wait=50) + + self.assertEqual(result['status'], 'TIMEOUT') + self.assertIn('error', result) + mock_log.error.assert_called() + + @mock.patch('run.log') + @mock.patch('time.sleep') + @mock.patch('time.time') + def test_wait_for_operation_api_error(self, mock_time, mock_sleep, mock_log): + """Test wait_for_operation handles API errors gracefully.""" + from mod_ci.controllers import wait_for_operation + + compute = MagicMock() + # Make the API call raise an exception + compute.zoneOperations.return_value.get.return_value.execute.side_effect = Exception("API Error") + mock_time.return_value = 0 + + result = wait_for_operation(compute, "project", "zone", "operation", max_wait=100) + + self.assertEqual(result['status'], 'ERROR') + self.assertIn('error', result) + mock_log.error.assert_called() + + @mock.patch('run.log') + @mock.patch('time.sleep') + @mock.patch('time.time') + def test_wait_for_operation_success(self, mock_time, mock_sleep, mock_log): + """Test wait_for_operation returns successfully when operation completes.""" + from mod_ci.controllers import wait_for_operation + + compute = MagicMock() + # First call returns RUNNING, second returns DONE + compute.zoneOperations.return_value.get.return_value.execute.side_effect = [ + {'status': 'RUNNING'}, + {'status': 'DONE'} + ] + mock_time.side_effect = [0, 0, 5, 5] # Various time readings during the loop + + result = wait_for_operation(compute, "project", "zone", "operation", max_wait=100) + + self.assertEqual(result['status'], 'DONE') + mock_log.info.assert_called() + @staticmethod def generate_header(data, event, ci_key=None): """