Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 94 additions & 18 deletions backend/dashboard_metrics/management/commands/backfill_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@

Usage:
python manage.py backfill_metrics --days=30
python manage.py backfill_metrics --days=90 --org-id=12
python manage.py backfill_metrics --days=90 --org-id=5
python manage.py backfill_metrics --days=7 --dry-run
python manage.py backfill_metrics --days=90 --active-only
"""

import logging
Expand All @@ -27,6 +28,14 @@

logger = logging.getLogger(__name__)

# Cloud-only: Subscription model for --active-only filtering
try:
from pluggable_apps.subscription_v2.models import Subscription

HAS_SUBSCRIPTION = True
except ImportError:
HAS_SUBSCRIPTION = False
Comment thread
athul-rs marked this conversation as resolved.


class Command(BaseCommand):
help = "Backfill metrics from source tables into aggregated tables"
Expand Down Expand Up @@ -84,6 +93,14 @@ def add_arguments(self, parser):
action="store_true",
help="Skip monthly aggregation",
)
parser.add_argument(
"--active-only",
action="store_true",
help=(
"Only process orgs with an active subscription "
"(cloud-only, requires subscription_v2)"
),
)

def handle(self, *args, **options):
days = options["days"]
Expand All @@ -92,6 +109,7 @@ def handle(self, *args, **options):
skip_hourly = options["skip_hourly"]
skip_daily = options["skip_daily"]
skip_monthly = options["skip_monthly"]
active_only = options["active_only"]

end_date = timezone.now()
start_date = end_date - timedelta(days=days)
Expand All @@ -103,16 +121,16 @@ def handle(self, *args, **options):
self.stdout.write(self.style.WARNING("DRY RUN - no changes will be made"))

# Get organizations to process
if org_id:
try:
orgs = [Organization.objects.get(id=org_id)]
self.stdout.write(f"Processing single org: {org_id}")
except Organization.DoesNotExist:
self.stderr.write(self.style.ERROR(f"Organization {org_id} not found"))
return
else:
orgs = list(Organization.objects.all())
self.stdout.write(f"Processing {len(orgs)} organizations")
org_ids = self._resolve_org_ids(
org_id=org_id,
active_only=active_only,
)

if not org_ids:
self.stdout.write(self.style.WARNING("No organizations to process"))
return

self.stdout.write(f"Processing {len(org_ids)} organizations")

total_stats = {
"hourly": {"upserted": 0},
Expand All @@ -121,14 +139,15 @@ def handle(self, *args, **options):
"errors": 0,
}

for org in orgs:
org_id_str = str(org.id)
self.stdout.write(f"\nProcessing org: {org.display_name} ({org_id_str})")
for i, current_org_id in enumerate(org_ids):
self.stdout.write(
f"\n[{i + 1}/{len(org_ids)}] Processing org: {current_org_id}"
)

try:
# Collect all metric data for this org
hourly_data, daily_data, monthly_data = self._collect_metrics(
org_id_str, start_date, end_date
current_org_id, start_date, end_date
)

self.stdout.write(
Expand All @@ -155,10 +174,10 @@ def handle(self, *args, **options):

except Exception as e:
self.stderr.write(
self.style.ERROR(f" Error processing org {org_id_str}: {e}")
self.style.ERROR(f" Error processing org {current_org_id}: {e}")
)
total_stats["errors"] += 1
logger.exception(f"Error backfilling org {org_id_str}")
logger.exception("Error backfilling org %s", current_org_id)

# Print summary
self.stdout.write("\n" + "=" * 50)
Expand All @@ -168,6 +187,63 @@ def handle(self, *args, **options):
self.stdout.write(f"Monthly: {total_stats['monthly']['upserted']} upserted")
self.stdout.write(f"Errors: {total_stats['errors']}")

def _resolve_org_ids(
self,
org_id: str | None,
active_only: bool,
) -> list[str]:
"""Resolve the list of organization PKs to process.

Returns Organization.id (int PK) values as strings, since all
downstream queries (services, bulk upserts) use the FK which
references Organization.id, not Organization.organization_id.

Applies filters in order:
1. Single org (--org-id) — returns immediately
2. Active subscription filter (--active-only) — cloud only
"""
# Single org mode
if org_id:
try:
org = Organization.objects.filter(id=org_id).first()
except (ValueError, TypeError):
org = None
if not org:
self.stderr.write(self.style.ERROR(f"Organization {org_id} not found"))
return []
Comment thread
athul-rs marked this conversation as resolved.
self.stdout.write(f"Single org mode: {org_id}")
return [str(org.id)]

# Get org PKs based on filtering mode
if active_only and HAS_SUBSCRIPTION:
active_org_id_strings = set(
Subscription.objects.filter(is_active=True).values_list(
"organization_id", flat=True
)
)
# Map org_* strings back to Organization PKs
all_org_ids = set(
Organization.objects.filter(
organization_id__in=active_org_id_strings
).values_list("id", flat=True)
)
self.stdout.write(
f"Active organizations (subscription filter): {len(all_org_ids)}"
)
elif active_only and not HAS_SUBSCRIPTION:
self.stdout.write(
self.style.WARNING(
"subscription_v2 not available (OSS mode), ignoring --active-only"
)
)
all_org_ids = set(Organization.objects.values_list("id", flat=True))
self.stdout.write(f"Total organizations: {len(all_org_ids)}")
else:
all_org_ids = set(Organization.objects.values_list("id", flat=True))
self.stdout.write(f"Total organizations: {len(all_org_ids)}")

return sorted(str(oid) for oid in all_org_ids)

def _collect_metrics(
self, org_id: str, start_date: datetime, end_date: datetime
) -> tuple[dict, dict, dict]:
Expand Down Expand Up @@ -238,7 +314,7 @@ def _collect_metrics(
monthly_agg[key]["count"] += 1

except Exception as e:
logger.warning(f"Error querying {metric_name} for org {org_id}: {e}")
logger.warning("Error querying %s for org %s: %s", metric_name, org_id, e)

return hourly_agg, daily_agg, monthly_agg

Expand Down