Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 0 additions & 55 deletions .github/workflows/sync.yml

This file was deleted.

2 changes: 2 additions & 0 deletions app/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from app.filters import else_na, usa_icon, utc_isoformat
from database.models import db
from harvester.lib.load_manager import LoadManager
from scripts.sync_datasets import register_cli

logger = logging.getLogger("harvest_admin")

Expand Down Expand Up @@ -40,6 +41,7 @@ def create_app():
register_routes(app)

add_template_filters(app)
register_cli(app)

with app.app_context():
# SQL-Alchemy can't be used to create the schema here
Expand Down
136 changes: 125 additions & 11 deletions database/interface.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
from datetime import datetime, timedelta, timezone
from functools import wraps
from typing import List
from typing import List, Optional

from sqlalchemy import asc, desc, func, inspect, text
from sqlalchemy.exc import NoResultFound
Expand All @@ -11,6 +11,7 @@
from harvester.utils.general_utils import query_filter_builder

from .models import (
Dataset,
DatasetViewCount,
HarvestJob,
HarvestJobError,
Expand Down Expand Up @@ -563,6 +564,115 @@ def delete_harvest_record(
def get_harvest_record(self, record_id):
return self.db.query(HarvestRecord).filter_by(id=record_id).first()

## DATASETS
def _apply_popularity_from_view_count(self, dataset: Optional[Dataset]) -> None:
if dataset is None or not dataset.slug:
return

view_count = (
self.db.query(DatasetViewCount.view_count)
.filter(DatasetViewCount.dataset_slug == dataset.slug)
.scalar()
)
if view_count is None:
return

dataset.popularity = view_count

def insert_dataset(self, dataset_data: dict):
slug = dataset_data.get("slug")
if not slug:
raise ValueError("dataset_data must include a slug")

# use a nested transaction so that rollbacks don't rollback
# the whole session during the pytests and fail
nested = self.db.begin_nested()
try:
dataset = Dataset(**dataset_data)
self.db.add(dataset)
self.db.flush()
self._apply_popularity_from_view_count(dataset)
except Exception as e:
nested.rollback()
logger.error("Error inserting dataset '%s': %s", slug, e)
raise
else:
nested.commit()
try:
self.db.commit()
except Exception:
self.db.rollback()
raise
self.db.refresh(dataset)
return dataset

def upsert_dataset(self, dataset_data: dict):
slug = dataset_data.get("slug")
if not slug:
raise ValueError("dataset_data must include a slug")

stmt = insert(Dataset).values(**dataset_data)
update_cols = {
column: getattr(stmt.excluded, column)
for column in dataset_data
if column != "slug"
}
stmt = stmt.on_conflict_do_update(
index_elements=[Dataset.slug],
set_=update_cols,
)

nested = self.db.begin_nested()
dataset = None
try:
self.db.execute(stmt)
self.db.flush()
dataset = self.get_dataset_by_slug(slug)
self._apply_popularity_from_view_count(dataset)
except Exception as e:
nested.rollback()
logger.error("Error upserting dataset '%s': %s", slug, e)
raise
else:
nested.commit()
try:
self.db.commit()
except Exception:
self.db.rollback()
raise
if dataset is None:
dataset = self.get_dataset_by_slug(slug)
if dataset is not None:
self.db.refresh(dataset)
return dataset

def delete_dataset_by_slug(self, slug: str) -> bool:
"""Delete a dataset by slug.

Returns True if deleted, False if slug is falsy or dataset not found.
On unexpected errors the transaction is rolled back and the exception is raised.
"""
if not slug:
return False

try:
dataset = self.get_dataset_by_slug(slug)
if dataset is None:
return False

self.db.delete(dataset)
self.db.commit()
return True
except Exception as e:
logger.error("Error deleting dataset '%s': %s", slug, e)
self.db.rollback()
raise

def get_dataset_by_slug(self, slug: str):
if not slug:
return None
return self.db.query(Dataset).filter_by(slug=slug).first()

def get_all_outdated_records(self, days=365, source_id=None):
"""
gets all outdated versions of records older than [days] ago
Expand Down Expand Up @@ -626,16 +736,20 @@ def get_latest_harvest_records_by_source_orm(
)
sq_alias = aliased(HarvestRecord, subq)

return self.db.query(
sq_alias.identifier,
sq_alias.source_hash,
sq_alias.ckan_id,
sq_alias.ckan_name,
sq_alias.date_created,
sq_alias.date_finished,
sq_alias.id,
sq_alias.action,
).filter(sq_alias.action != "delete")
return (
self.db.query(
sq_alias.identifier,
sq_alias.source_hash,
sq_alias.ckan_id,
sq_alias.date_created,
sq_alias.date_finished,
sq_alias.id,
sq_alias.action,
Dataset.slug.label("dataset_slug"),
)
.outerjoin(Dataset, Dataset.harvest_record_id == sq_alias.id)
.filter(sq_alias.action != "delete")
)

def get_latest_harvest_records_by_source(self, source_id):
return [
Expand Down
39 changes: 32 additions & 7 deletions database/models.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
# NOTE: Keep this file in sync between datagov-harvester and datagov-catalog

import uuid
from typing import Optional

from flask_sqlalchemy import SQLAlchemy
from geoalchemy2 import Geometry
from sqlalchemy import CheckConstraint, Column, Enum, String, func, Index
from sqlalchemy.dialects.postgresql import JSONB, TSVECTOR
from sqlalchemy import CheckConstraint, Column, Enum, String, func
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import DeclarativeBase, backref
from sqlalchemy.ext.mutable import MutableDict

Expand Down Expand Up @@ -178,7 +179,6 @@ class HarvestRecord(db.Model):
)
date_finished = db.Column(db.DateTime, index=True)
ckan_id = db.Column(db.String, index=True)
ckan_name = db.Column(db.String, index=True)
action = db.Column(
Enum("create", "update", "delete", name="record_action"), index=True
)
Expand All @@ -187,6 +187,13 @@ class HarvestRecord(db.Model):
status = db.Column(Enum("error", "success", name="record_status"), index=True)
errors = db.relationship("HarvestRecordError", backref="record", lazy=True)

@property
def dataset_slug(self) -> Optional[str]:
dataset = getattr(self, "dataset", None)
if dataset is None:
return None
return dataset.slug


class Dataset(db.Model):
__tablename__ = "dataset"
Expand All @@ -205,34 +212,52 @@ class Dataset(db.Model):
# make it mutable so that in-place mutations (e.g.,
# dcat["spatial"] = "...", for tests) are tracked
dcat = db.Column(MutableDict.as_mutable(JSONB), nullable=False)
translated_spatial = db.Column(JSONB)

organization_id = db.Column(
db.String(36),
db.ForeignKey("organization.id", ondelete="CASCADE"),
nullable=False,
index=True,
)

harvest_source_id = db.Column(
db.String(36),
db.ForeignKey("harvest_source.id", ondelete="CASCADE"),
nullable=False,
index=True,
)

harvest_record_id = db.Column(
db.String(36),
db.ForeignKey("harvest_record.id", ondelete="CASCADE"),
nullable=False,
index=True,
)

popularity = db.Column(db.Numeric)
popularity = db.Column(db.Integer, server_default="0")
last_harvested_date = db.Column(
db.DateTime,
index=True
)
search_vector = db.Column(TSVECTOR)

__table_args__ = (
Index("ix_dataset_search_vector", "search_vector", postgresql_using="gin"),
organization = db.relationship(
"Organization",
backref=backref("datasets", lazy=True),
lazy="joined",
)

harvest_source = db.relationship(
"HarvestSource",
backref=backref("datasets", lazy=True),
lazy="joined",
)

harvest_record = db.relationship(
"HarvestRecord",
backref=backref("dataset", uselist=False, lazy=True),
lazy="joined",
uselist=False,
)


Expand Down
Loading
Loading