diff --git a/backend/dashboard_metrics/management/commands/backfill_metrics.py b/backend/dashboard_metrics/management/commands/backfill_metrics.py index 2c8287ab60..4c71fcd329 100644 --- a/backend/dashboard_metrics/management/commands/backfill_metrics.py +++ b/backend/dashboard_metrics/management/commands/backfill_metrics.py @@ -5,8 +5,9 @@ Usage: python manage.py backfill_metrics --days=30 - python manage.py backfill_metrics --days=90 --org-id=12 + python manage.py backfill_metrics --days=90 --org-id=5 python manage.py backfill_metrics --days=7 --dry-run + python manage.py backfill_metrics --days=90 --active-only """ import logging @@ -27,6 +28,14 @@ logger = logging.getLogger(__name__) +# Cloud-only: Subscription model for --active-only filtering +try: + from pluggable_apps.subscription_v2.models import Subscription + + HAS_SUBSCRIPTION = True +except ImportError: + HAS_SUBSCRIPTION = False + class Command(BaseCommand): help = "Backfill metrics from source tables into aggregated tables" @@ -84,6 +93,14 @@ def add_arguments(self, parser): action="store_true", help="Skip monthly aggregation", ) + parser.add_argument( + "--active-only", + action="store_true", + help=( + "Only process orgs with an active subscription " + "(cloud-only, requires subscription_v2)" + ), + ) def handle(self, *args, **options): days = options["days"] @@ -92,6 +109,7 @@ def handle(self, *args, **options): skip_hourly = options["skip_hourly"] skip_daily = options["skip_daily"] skip_monthly = options["skip_monthly"] + active_only = options["active_only"] end_date = timezone.now() start_date = end_date - timedelta(days=days) @@ -103,16 +121,16 @@ def handle(self, *args, **options): self.stdout.write(self.style.WARNING("DRY RUN - no changes will be made")) # Get organizations to process - if org_id: - try: - orgs = [Organization.objects.get(id=org_id)] - self.stdout.write(f"Processing single org: {org_id}") - except Organization.DoesNotExist: - self.stderr.write(self.style.ERROR(f"Organization {org_id} not found")) - return - else: - orgs = list(Organization.objects.all()) - self.stdout.write(f"Processing {len(orgs)} organizations") + org_ids = self._resolve_org_ids( + org_id=org_id, + active_only=active_only, + ) + + if not org_ids: + self.stdout.write(self.style.WARNING("No organizations to process")) + return + + self.stdout.write(f"Processing {len(org_ids)} organizations") total_stats = { "hourly": {"upserted": 0}, @@ -121,14 +139,15 @@ def handle(self, *args, **options): "errors": 0, } - for org in orgs: - org_id_str = str(org.id) - self.stdout.write(f"\nProcessing org: {org.display_name} ({org_id_str})") + for i, current_org_id in enumerate(org_ids): + self.stdout.write( + f"\n[{i + 1}/{len(org_ids)}] Processing org: {current_org_id}" + ) try: # Collect all metric data for this org hourly_data, daily_data, monthly_data = self._collect_metrics( - org_id_str, start_date, end_date + current_org_id, start_date, end_date ) self.stdout.write( @@ -155,10 +174,10 @@ def handle(self, *args, **options): except Exception as e: self.stderr.write( - self.style.ERROR(f" Error processing org {org_id_str}: {e}") + self.style.ERROR(f" Error processing org {current_org_id}: {e}") ) total_stats["errors"] += 1 - logger.exception(f"Error backfilling org {org_id_str}") + logger.exception("Error backfilling org %s", current_org_id) # Print summary self.stdout.write("\n" + "=" * 50) @@ -168,6 +187,63 @@ def handle(self, *args, **options): self.stdout.write(f"Monthly: {total_stats['monthly']['upserted']} upserted") self.stdout.write(f"Errors: {total_stats['errors']}") + def _resolve_org_ids( + self, + org_id: str | None, + active_only: bool, + ) -> list[str]: + """Resolve the list of organization PKs to process. + + Returns Organization.id (int PK) values as strings, since all + downstream queries (services, bulk upserts) use the FK which + references Organization.id, not Organization.organization_id. + + Applies filters in order: + 1. Single org (--org-id) — returns immediately + 2. Active subscription filter (--active-only) — cloud only + """ + # Single org mode + if org_id: + try: + org = Organization.objects.filter(id=org_id).first() + except (ValueError, TypeError): + org = None + if not org: + self.stderr.write(self.style.ERROR(f"Organization {org_id} not found")) + return [] + self.stdout.write(f"Single org mode: {org_id}") + return [str(org.id)] + + # Get org PKs based on filtering mode + if active_only and HAS_SUBSCRIPTION: + active_org_id_strings = set( + Subscription.objects.filter(is_active=True).values_list( + "organization_id", flat=True + ) + ) + # Map org_* strings back to Organization PKs + all_org_ids = set( + Organization.objects.filter( + organization_id__in=active_org_id_strings + ).values_list("id", flat=True) + ) + self.stdout.write( + f"Active organizations (subscription filter): {len(all_org_ids)}" + ) + elif active_only and not HAS_SUBSCRIPTION: + self.stdout.write( + self.style.WARNING( + "subscription_v2 not available (OSS mode), ignoring --active-only" + ) + ) + all_org_ids = set(Organization.objects.values_list("id", flat=True)) + self.stdout.write(f"Total organizations: {len(all_org_ids)}") + else: + all_org_ids = set(Organization.objects.values_list("id", flat=True)) + self.stdout.write(f"Total organizations: {len(all_org_ids)}") + + return sorted(str(oid) for oid in all_org_ids) + def _collect_metrics( self, org_id: str, start_date: datetime, end_date: datetime ) -> tuple[dict, dict, dict]: @@ -238,7 +314,7 @@ def _collect_metrics( monthly_agg[key]["count"] += 1 except Exception as e: - logger.warning(f"Error querying {metric_name} for org {org_id}: {e}") + logger.warning("Error querying %s for org %s: %s", metric_name, org_id, e) return hourly_agg, daily_agg, monthly_agg