Skip to content
Merged
149 changes: 120 additions & 29 deletions mod_ci/controllers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import zipfile
from collections import defaultdict
from pathlib import Path
from typing import Any, Dict
from typing import Any, Dict, Optional

import googleapiclient.discovery
import requests
Expand Down Expand Up @@ -43,6 +43,12 @@
TestResult, TestResultFile, TestStatus, TestType)
from utility import is_valid_signature, request_from_github

# Timeout constants (in seconds)
GITHUB_API_TIMEOUT = 30 # Timeout for GitHub API calls
GCP_API_TIMEOUT = 60 # Timeout for GCP API calls
ARTIFACT_DOWNLOAD_TIMEOUT = 300 # 5 minutes for artifact downloads
GCP_OPERATION_MAX_WAIT = 1800 # 30 minutes max wait for GCP operations

mod_ci = Blueprint('ci', __name__)


Expand Down Expand Up @@ -236,18 +242,25 @@ def gcp_instance(app, db, platform, repository, delay) -> None:

for test in pending_tests:
if test.test_type == TestType.pull_request:
gh_commit = repository.get_commit(test.commit)
if test.pr_nr == 0:
log.warn(f'[{platform}] Test {test.id} is invalid')
deschedule_test(gh_commit, message="Invalid PR number", test=test, db=db)
continue
test_pr = repository.get_pull(test.pr_nr)
if test.commit != test_pr.head.sha:
log.warn(f'[{platform}] Test {test.id} is invalid')
deschedule_test(gh_commit, message="PR closed or updated", test=test, db=db)
continue
if test_pr.state != 'open':
log.debug(f"PR {test.pr_nr} not in open state, skipping test {test.id}")
try:
gh_commit = repository.get_commit(test.commit)
if test.pr_nr == 0:
log.warn(f'[{platform}] Test {test.id} is invalid')
deschedule_test(gh_commit, message="Invalid PR number", test=test, db=db)
continue
test_pr = repository.get_pull(test.pr_nr)
if test.commit != test_pr.head.sha:
log.warn(f'[{platform}] Test {test.id} is invalid')
deschedule_test(gh_commit, message="PR closed or updated", test=test, db=db)
continue
if test_pr.state != 'open':
log.debug(f"PR {test.pr_nr} not in open state, skipping test {test.id}")
continue
except GithubException as e:
log.error(f"GitHub API error for test {test.id}: {e}")
continue # Skip this test, try next one
except Exception as e:
log.error(f"Unexpected error checking PR status for test {test.id}: {e}")
continue
start_test(compute, app, db, repository, test, github_config['bot_token'])

Expand All @@ -264,6 +277,33 @@ def get_compute_service_object() -> googleapiclient.discovery.Resource:
return googleapiclient.discovery.build('compute', 'v1', credentials=credentials)


def mark_test_failed(db, test, repository, message: str) -> None:
"""
Mark a test as failed and update GitHub status.

:param db: Database session
:type db: sqlalchemy.orm.scoping.scoped_session
:param test: The test to mark as failed
:type test: mod_test.models.Test
:param repository: GitHub repository object
:type repository: Repository.Repository
:param message: Error message to display
:type message: str
"""
from run import log

try:
progress = TestProgress(test.id, TestStatus.canceled, message)
db.add(progress)
db.commit()

gh_commit = repository.get_commit(test.commit)
update_status_on_github(gh_commit, Status.ERROR, message, f"CI - {test.platform.value}")
log.info(f"Marked test {test.id} as failed: {message}")
except Exception as e:
log.error(f"Failed to mark test {test.id} as failed: {e}")


def start_test(compute, app, db, repository: Repository.Repository, test, bot_token) -> None:
"""
Start a VM instance and run the tests.
Expand All @@ -290,6 +330,19 @@ def start_test(compute, app, db, repository: Repository.Repository, test, bot_to
:rtype: None
"""
from run import config, log

# Check if test is already being processed (basic locking)
existing_instance = GcpInstance.query.filter(GcpInstance.test_id == test.id).first()
if existing_instance is not None:
log.warning(f"Test {test.id} already has a GCP instance, skipping duplicate start")
return

# Check if test already has progress (already started or finished)
existing_progress = TestProgress.query.filter(TestProgress.test_id == test.id).first()
if existing_progress is not None:
log.warning(f"Test {test.id} already has progress entries, skipping")
return

gcp_instance_name = f"{test.platform.value}-{test.id}"
log.debug(f'[{gcp_instance_name}] Starting test {test.id}')

Expand Down Expand Up @@ -393,23 +446,36 @@ def start_test(compute, app, db, repository: Repository.Repository, test, bot_to
artifact_url = artifact.archive_download_url
try:
auth_header = f"token {bot_token}"
r = requests.get(artifact_url, headers={"Authorization": auth_header})
r = requests.get(
artifact_url,
headers={"Authorization": auth_header},
timeout=ARTIFACT_DOWNLOAD_TIMEOUT
)
except requests.exceptions.Timeout:
log.critical(f"Artifact download timed out after {ARTIFACT_DOWNLOAD_TIMEOUT}s")
mark_test_failed(db, test, repository, "Artifact download timed out")
return
except Exception as e:
log.critical("Could not fetch artifact, request timed out")
log.critical(f"Could not fetch artifact: {e}")
mark_test_failed(db, test, repository, f"Artifact download failed: {e}")
return
if r.status_code != 200:
log.critical(f"Could not fetch artifact, response code: {r.status_code}")
mark_test_failed(db, test, repository, f"Artifact download failed: HTTP {r.status_code}")
return

open(os.path.join(base_folder, 'ccextractor.zip'), 'wb').write(r.content)
with zipfile.ZipFile(os.path.join(base_folder, 'ccextractor.zip'), 'r') as artifact_zip:
zip_path = os.path.join(base_folder, 'ccextractor.zip')
with open(zip_path, 'wb') as f:
f.write(r.content)
with zipfile.ZipFile(zip_path, 'r') as artifact_zip:
artifact_zip.extractall(base_folder)

artifact_saved = True
break

if not artifact_saved:
log.critical("Could not find an artifact for this commit")
mark_test_failed(db, test, repository, "No build artifact found for this commit")
return

zone = config.get('ZONE', '')
Expand Down Expand Up @@ -560,9 +626,9 @@ def get_config_for_gcp_instance(vm_name, source_disk_image, metadata_items) -> D
}


def wait_for_operation(compute, project, zone, operation) -> Dict:
def wait_for_operation(compute, project, zone, operation, max_wait: int = GCP_OPERATION_MAX_WAIT) -> Dict:
"""
Wait for an operation to get completed.
Wait for an operation to get completed with timeout.

:param compute: The cloud compute engine service object
:type compute: googleapiclient.discovery.Resource
Expand All @@ -572,22 +638,47 @@ def wait_for_operation(compute, project, zone, operation) -> Dict:
:type zone: str
:param operation: Operation name for which server is waiting
:type operation: str
:param max_wait: Maximum time to wait in seconds (default: 30 minutes)
:type max_wait: int
:return: Response received after operation completion
:rtype: Dict
"""
from run import log
log.info("Waiting for an operation to finish")
while True:
result = compute.zoneOperations().get(
project=project,
zone=zone,
operation=operation).execute()
log.info(f"Waiting for operation {operation} to finish (max {max_wait}s)")
start_time = time.time()
poll_interval = 1.0 # Start with 1 second polling

if result['status'] == 'DONE':
log.info("Operation Completed")
return result
while True:
elapsed = time.time() - start_time
if elapsed >= max_wait:
log.error(f"Operation {operation} timed out after {elapsed:.0f} seconds")
return {
'status': 'TIMEOUT',
'error': {
'errors': [{
'code': 'TIMEOUT',
'message': f'Operation timed out after {max_wait} seconds'
}]
}
}

time.sleep(1)
try:
result = compute.zoneOperations().get(
project=project,
zone=zone,
operation=operation).execute()

if result['status'] == 'DONE':
log.info(f"Operation {operation} completed in {elapsed:.0f} seconds")
return result

except Exception as e:
log.error(f"Error checking operation status: {e}")
return {'status': 'ERROR', 'error': {'errors': [{'code': 'API_ERROR', 'message': str(e)}]}}

# Exponential backoff with cap at 10 seconds
time.sleep(min(poll_interval, 10))
poll_interval = min(poll_interval * 1.5, 10)


def save_xml_to_file(xml_node, folder_name, file_name) -> None:
Expand Down
Loading