Skip to content

Commit 8d2d672

Browse files
committed
fix oom during flask dataset check
1 parent 56fc286 commit 8d2d672

File tree

1 file changed

+52
-22
lines changed

1 file changed

+52
-22
lines changed

scripts/sync_datasets.py

Lines changed: 52 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ def _insert_dataset_for_record(interface: HarvesterDBInterface, record: HarvestR
6565
raise click.ClickException(str(exc))
6666

6767

68-
def _records_missing_datasets(session) -> List[HarvestRecord]:
68+
def _records_missing_datasets(session):
6969
return (
7070
session.query(HarvestRecord)
7171
.outerjoin(Dataset, Dataset.harvest_record_id == HarvestRecord.id)
@@ -74,11 +74,10 @@ def _records_missing_datasets(session) -> List[HarvestRecord]:
7474
HarvestRecord.action.in_(["create", "update"]),
7575
Dataset.id.is_(None),
7676
)
77-
.all()
7877
)
7978

8079

81-
def _datasets_with_unexpected_records(session) -> List[Dataset]:
80+
def _datasets_with_unexpected_records(session):
8281
return (
8382
session.query(Dataset)
8483
.join(HarvestRecord, Dataset.harvest_record_id == HarvestRecord.id)
@@ -88,54 +87,85 @@ def _datasets_with_unexpected_records(session) -> List[Dataset]:
8887
HarvestRecord.action.notin_(["create", "update"]),
8988
)
9089
)
91-
.all()
9290
)
9391

9492

95-
def _report(records_missing, datasets_bad):
93+
def _report(records_missing_count: int, datasets_bad_count: int):
9694
click.echo("Dataset Sync Report\n====================")
97-
click.echo(f"Records needing datasets: {len(records_missing)}")
95+
click.echo(f"Records needing datasets: {records_missing_count}")
9896
click.echo(
99-
f"Datasets tied to non-success/non-create records: {len(datasets_bad)}"
97+
f"Datasets tied to non-success/non-create records: {datasets_bad_count}"
10098
)
10199

102100

103101
def _sync_impl(apply_changes: bool):
104102
interface = HarvesterDBInterface(session=db.session)
105103

106104
try:
107-
records_missing = _records_missing_datasets(db.session)
108-
datasets_bad = _datasets_with_unexpected_records(db.session)
105+
records_missing_query = _records_missing_datasets(db.session)
106+
datasets_bad_query = _datasets_with_unexpected_records(db.session)
109107

110-
_report(records_missing, datasets_bad)
108+
records_missing_count = records_missing_query.count()
109+
datasets_bad_count = datasets_bad_query.count()
110+
111+
_report(records_missing_count, datasets_bad_count)
111112

112113
if apply_changes:
113114
synced = 0
114-
total = len(records_missing)
115-
for start in range(0, total, BATCH_SIZE):
116-
batch = records_missing[start : start + BATCH_SIZE]
115+
batches = (records_missing_count + BATCH_SIZE - 1) // BATCH_SIZE
116+
current_batch = 0
117+
batch_records: List[HarvestRecord] = []
118+
for record in records_missing_query.yield_per(BATCH_SIZE):
119+
batch_records.append(record)
120+
if len(batch_records) == BATCH_SIZE:
121+
current_batch += 1
122+
click.echo(
123+
f"Processing batch {current_batch} "
124+
f"({len(batch_records)} records)..."
125+
)
126+
for record_in_batch in batch_records:
127+
try:
128+
slug = _insert_dataset_for_record(
129+
interface, record_in_batch
130+
)
131+
synced += 1
132+
click.echo(
133+
f"Created dataset for record {record_in_batch.id} "
134+
f"(slug: {slug})"
135+
)
136+
except click.ClickException as exc:
137+
click.echo(
138+
f"Failed to sync record {record_in_batch.id}: {exc}"
139+
)
140+
batch_records = []
141+
142+
if batch_records:
143+
current_batch += 1
117144
click.echo(
118-
f"Processing batch {start // BATCH_SIZE + 1}"
119-
f" ({len(batch)} records)..."
145+
f"Processing batch {current_batch} "
146+
f"({len(batch_records)} records)..."
120147
)
121-
for record in batch:
148+
for record_in_batch in batch_records:
122149
try:
123-
slug = _insert_dataset_for_record(interface, record)
150+
slug = _insert_dataset_for_record(interface, record_in_batch)
124151
synced += 1
125152
click.echo(
126-
f"Created dataset for record {record.id} (slug: {slug})"
153+
f"Created dataset for record {record_in_batch.id} "
154+
f"(slug: {slug})"
127155
)
128156
except click.ClickException as exc:
129-
click.echo(f"Failed to sync record {record.id}: {exc}")
157+
click.echo(
158+
f"Failed to sync record {record_in_batch.id}: {exc}"
159+
)
130160
click.echo(f"Datasets created: {synced}")
131161

132162
deleted = 0
133-
if datasets_bad:
163+
if datasets_bad_count:
134164
click.echo(
135-
f"Deleting {len(datasets_bad)} dataset(s) tied "
165+
f"Deleting {datasets_bad_count} dataset(s) tied "
136166
"to invalid harvest records..."
137167
)
138-
for dataset in datasets_bad:
168+
for dataset in datasets_bad_query.yield_per(BATCH_SIZE):
139169
try:
140170
interface.db.delete(dataset)
141171
interface.db.commit()

0 commit comments

Comments
 (0)