Skip to content

Commit 351d2bc

Browse files
committed
Remove default create_dataset method.
Add section in experimental lineage docs. Signed-off-by: Jakub Dardzinski <kuba0221@gmail.com>
1 parent 8d455d6 commit 351d2bc

File tree

8 files changed

+185
-91
lines changed

8 files changed

+185
-91
lines changed

airflow/datasets/__init__.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,6 @@ def normalize_noop(parts: SplitResult) -> SplitResult:
4444
return parts
4545

4646

47-
def create_dataset(uri: str) -> Dataset:
48-
"""Create a dataset object from a dataset URI."""
49-
return Dataset(uri=uri)
50-
51-
5247
def _get_uri_normalizer(scheme: str) -> Callable[[SplitResult], SplitResult] | None:
5348
if scheme == "file":
5449
return normalize_noop

airflow/lineage/hook.py

Lines changed: 62 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
import attr
2323

24-
from airflow.datasets import Dataset, create_dataset
24+
from airflow.datasets import Dataset
2525
from airflow.hooks.base import BaseHook
2626
from airflow.io.store import ObjectStore
2727
from airflow.providers_manager import ProvidersManager
@@ -53,36 +53,73 @@ def __init__(self, **kwargs):
5353
self.inputs: list[tuple[Dataset, LineageContext]] = []
5454
self.outputs: list[tuple[Dataset, LineageContext]] = []
5555

56-
@staticmethod
57-
def create_dataset(dataset_kwargs: dict) -> Dataset:
58-
"""Create a Dataset instance from the given dataset kwargs."""
59-
if "uri" in dataset_kwargs:
56+
def create_dataset(
57+
self, scheme: str | None, uri: str | None, dataset_kwargs: dict | None, dataset_extra: dict | None
58+
) -> Dataset | None:
59+
"""
60+
Create a Dataset instance using the provided parameters.
61+
62+
This method attempts to create a Dataset instance using the given parameters.
63+
It first checks if a URI is provided and falls back to using the default dataset factory
64+
with the given URI if no other information is available.
65+
66+
If a scheme is provided but no URI, it attempts to find a dataset factory that matches
67+
the given scheme. If no such factory is found, it logs an error message and returns None.
68+
69+
If dataset_kwargs is provided, it is used to pass additional parameters to the Dataset
70+
factory. The dataset_extra parameter is also passed to the factory as an ``extra`` parameter.
71+
"""
72+
if uri:
6073
# Fallback to default factory using the provided URI
61-
return create_dataset(dataset_kwargs["uri"])
74+
return Dataset(uri=uri, extra=dataset_extra)
6275

63-
scheme: str = dataset_kwargs.pop("scheme", None)
6476
if not scheme:
65-
raise ValueError(
77+
self.log.debug(
6678
"Missing required parameter: either 'uri' or 'scheme' must be provided to create a Dataset."
6779
)
80+
return None
6881

6982
dataset_factory = ProvidersManager().dataset_factories.get(scheme)
7083
if not dataset_factory:
71-
raise ValueError(
72-
f"Unsupported scheme: '{scheme}'. Please provide a valid URI to create a Dataset."
73-
)
84+
self.log.debug("Unsupported scheme: %s. Please provide a valid URI to create a Dataset.", scheme)
85+
return None
7486

75-
return dataset_factory(**dataset_kwargs)
87+
dataset_kwargs = dataset_kwargs or {}
88+
try:
89+
return dataset_factory(**dataset_kwargs, extra=dataset_extra)
90+
except Exception as e:
91+
self.log.debug("Failed to create dataset. Skipping. Error: %s", e)
92+
return None
7693

77-
def add_input_dataset(self, dataset_kwargs: dict, hook: LineageContext):
94+
def add_input_dataset(
95+
self,
96+
context: LineageContext,
97+
scheme: str | None = None,
98+
uri: str | None = None,
99+
dataset_kwargs: dict | None = None,
100+
dataset_extra: dict | None = None,
101+
):
78102
"""Add the input dataset and its corresponding hook execution context to the collector."""
79-
dataset = self.create_dataset(dataset_kwargs)
80-
self.inputs.append((dataset, hook))
103+
dataset = self.create_dataset(
104+
scheme=scheme, uri=uri, dataset_kwargs=dataset_kwargs, dataset_extra=dataset_extra
105+
)
106+
if dataset:
107+
self.inputs.append((dataset, context))
81108

82-
def add_output_dataset(self, dataset_kwargs: dict, hook: LineageContext):
109+
def add_output_dataset(
110+
self,
111+
context: LineageContext,
112+
scheme: str | None = None,
113+
uri: str | None = None,
114+
dataset_kwargs: dict | None = None,
115+
dataset_extra: dict | None = None,
116+
):
83117
"""Add the output dataset and its corresponding hook execution context to the collector."""
84-
dataset = self.create_dataset(dataset_kwargs)
85-
self.outputs.append((dataset, hook))
118+
dataset = self.create_dataset(
119+
scheme=scheme, uri=uri, dataset_kwargs=dataset_kwargs, dataset_extra=dataset_extra
120+
)
121+
if dataset:
122+
self.outputs.append((dataset, context))
86123

87124
@property
88125
def collected_datasets(self) -> HookLineage:
@@ -112,7 +149,9 @@ def add_output_dataset(self, *_):
112149
def collected_datasets(
113150
self,
114151
) -> HookLineage:
115-
self.log.warning("You should not call this as there's no reader.")
152+
self.log.warning(
153+
"Data lineage tracking is disabled. Register a hook lineage reader to start tracking hook lineage."
154+
)
116155
return HookLineage([], [])
117156

118157

@@ -132,8 +171,10 @@ def get_hook_lineage_collector() -> HookLineageCollector:
132171
"""Get singleton lineage collector."""
133172
global _hook_lineage_collector
134173
if not _hook_lineage_collector:
135-
# is there a better why how to use noop?
136-
if ProvidersManager().hook_lineage_readers:
174+
from airflow import plugins_manager
175+
176+
plugins_manager.initialize_hook_lineage_readers_plugins()
177+
if plugins_manager.hook_lineage_reader_classes:
137178
_hook_lineage_collector = HookLineageCollector()
138179
else:
139180
_hook_lineage_collector = NoOpCollector()

airflow/plugins_manager.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import os
2828
import sys
2929
import types
30+
from cgitb import Hook
3031
from pathlib import Path
3132
from typing import TYPE_CHECKING, Any, Iterable
3233

@@ -41,6 +42,8 @@
4142
from airflow.utils.module_loading import import_string, qualname
4243

4344
if TYPE_CHECKING:
45+
from airflow.lineage.hook import HookLineageReader
46+
4447
try:
4548
import importlib_metadata as metadata
4649
except ImportError:
@@ -75,6 +78,7 @@
7578
registered_operator_link_classes: dict[str, type] | None = None
7679
registered_ti_dep_classes: dict[str, type] | None = None
7780
timetable_classes: dict[str, type[Timetable]] | None = None
81+
hook_lineage_reader_classes: list[type[Hook]] | None = None
7882
priority_weight_strategy_classes: dict[str, type[PriorityWeightStrategy]] | None = None
7983
"""
8084
Mapping of class names to class of OperatorLinks registered by plugins.
@@ -176,8 +180,12 @@ class AirflowPlugin:
176180
# A list of timetable classes that can be used for DAG scheduling.
177181
timetables: list[type[Timetable]] = []
178182

183+
# A list of listeners that can be used for tracking task and DAG states.
179184
listeners: list[ModuleType | object] = []
180185

186+
# A list of hook lineage reader classes that can be used for reading lineage information from a hook.
187+
hook_lineage_readers: list[type[HookLineageReader]] = []
188+
181189
# A list of priority weight strategy classes that can be used for calculating tasks weight priority.
182190
priority_weight_strategies: list[type[PriorityWeightStrategy]] = []
183191

@@ -483,6 +491,25 @@ def initialize_timetables_plugins():
483491
}
484492

485493

494+
def initialize_hook_lineage_readers_plugins():
495+
"""Collect hook lineage reader classes registered by plugins."""
496+
global hook_lineage_reader_classes
497+
498+
if hook_lineage_reader_classes is not None:
499+
return
500+
501+
ensure_plugins_loaded()
502+
503+
if plugins is None:
504+
raise AirflowPluginException("Can't load plugins.")
505+
506+
log.debug("Initialize hook lineage readers plugins")
507+
508+
hook_lineage_reader_classes = []
509+
for plugin in plugins:
510+
hook_lineage_reader_classes.extend(plugin.hook_lineage_readers)
511+
512+
486513
def integrate_executor_plugins() -> None:
487514
"""Integrate executor plugins to the context."""
488515
global plugins

airflow/provider.yaml.schema.json

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -220,13 +220,6 @@
220220
}
221221
}
222222
},
223-
"hook-lineage-readers": {
224-
"type": "array",
225-
"description": "Hook lineage readers",
226-
"items": {
227-
"type": "string"
228-
}
229-
},
230223
"transfers": {
231224
"type": "array",
232225
"items": {

airflow/providers_manager.py

Lines changed: 12 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -428,7 +428,6 @@ def __init__(self):
428428
self._fs_set: set[str] = set()
429429
self._dataset_uri_handlers: dict[str, Callable[[SplitResult], SplitResult]] = {}
430430
self._dataset_factories: dict[str, Callable[..., Dataset]] = {}
431-
self._hook_lineage_readers: set[str] = set()
432431
self._taskflow_decorators: dict[str, Callable] = LazyDictWithCache() # type: ignore[assignment]
433432
# keeps mapping between connection_types and hook class, package they come from
434433
self._hook_provider_dict: dict[str, HookClassProvider] = {}
@@ -526,17 +525,11 @@ def initialize_providers_filesystems(self):
526525
self._discover_filesystems()
527526

528527
@provider_info_cache("dataset_uris")
529-
def initialize_providers_dataset_uri_handlers(self):
528+
def initialize_providers_dataset_uri_handlers_and_factories(self):
530529
"""Lazy initialization of provider dataset URI handlers."""
531530
self.initialize_providers_list()
532531
self._discover_dataset_uri_handlers_and_factories()
533532

534-
@provider_info_cache("hook_lineage_readers")
535-
def initialize_providers_hook_lineage_readers(self):
536-
"""Lazy initialization of providers hook lineage readers."""
537-
self.initialize_providers_list()
538-
self._discover_hook_lineage_readers()
539-
540533
@provider_info_cache("hook_lineage_writers")
541534
@provider_info_cache("taskflow_decorators")
542535
def initialize_providers_taskflow_decorator(self):
@@ -574,7 +567,7 @@ def initialize_providers_notifications(self):
574567
self.initialize_providers_list()
575568
self._discover_notifications()
576569

577-
@provider_info_cache(cache_name="auth_managers")
570+
@provider_info_cache("auth_managers")
578571
def initialize_providers_auth_managers(self):
579572
"""Lazy initialization of providers notifications information."""
580573
self.initialize_providers_list()
@@ -889,34 +882,28 @@ def _discover_filesystems(self) -> None:
889882
self._fs_set = set(sorted(self._fs_set))
890883

891884
def _discover_dataset_uri_handlers_and_factories(self) -> None:
892-
from airflow.datasets import create_dataset, normalize_noop
885+
from airflow.datasets import normalize_noop
893886

894887
for provider_package, provider in self._provider_dict.items():
895888
for handler_info in provider.data.get("dataset-uris", []):
896889
try:
897890
schemes = handler_info["schemes"]
898891
handler_path = handler_info["handler"]
899-
factory_path = handler_info["factory"]
900892
except KeyError:
901893
continue
902894
if handler_path is None:
903895
handler = normalize_noop
904-
if factory_path is None:
905-
factory = create_dataset
906-
elif not (handler := _correctness_check(provider_package, handler_path, provider)) or not (
907-
factory := _correctness_check(provider_package, factory_path, provider)
908-
):
896+
elif not (handler := _correctness_check(provider_package, handler_path, provider)):
909897
continue
910898
self._dataset_uri_handlers.update((scheme, handler) for scheme in schemes)
899+
factory_path = handler_info.get("factory")
900+
if not (
901+
factory_path is not None
902+
and (factory := _correctness_check(provider_package, factory_path, provider))
903+
):
904+
continue
911905
self._dataset_factories.update((scheme, factory) for scheme in schemes)
912906

913-
def _discover_hook_lineage_readers(self) -> None:
914-
for provider_package, provider in self._provider_dict.items():
915-
for hook_lineage_reader in provider.data.get("hook-lineage-readers", []):
916-
if _correctness_check(provider_package, hook_lineage_reader, provider):
917-
self._hook_lineage_readers.add(hook_lineage_reader)
918-
self._fs_set = set(sorted(self._fs_set))
919-
920907
def _discover_taskflow_decorators(self) -> None:
921908
for name, info in self._provider_dict.items():
922909
for taskflow_decorator in info.data.get("task-decorators", []):
@@ -1314,19 +1301,14 @@ def filesystem_module_names(self) -> list[str]:
13141301

13151302
@property
13161303
def dataset_factories(self) -> dict[str, Callable[..., Dataset]]:
1317-
self.initialize_providers_dataset_uri_handlers()
1304+
self.initialize_providers_dataset_uri_handlers_and_factories()
13181305
return self._dataset_factories
13191306

13201307
@property
13211308
def dataset_uri_handlers(self) -> dict[str, Callable[[SplitResult], SplitResult]]:
1322-
self.initialize_providers_dataset_uri_handlers()
1309+
self.initialize_providers_dataset_uri_handlers_and_factories()
13231310
return self._dataset_uri_handlers
13241311

1325-
@property
1326-
def hook_lineage_readers(self) -> list[str]:
1327-
self.initialize_providers_hook_lineage_readers()
1328-
return sorted(self._hook_lineage_readers)
1329-
13301312
@property
13311313
def provider_configs(self) -> list[tuple[str, dict[str, Any]]]:
13321314
self.initialize_providers_configuration()

docs/apache-airflow/administration-and-deployment/lineage.rst

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,49 @@ has outlets defined (e.g. by using ``add_outlets(..)`` or has out of the box sup
8989
9090
.. _precedence: https://docs.python.org/3/reference/expressions.html
9191

92+
Hook Lineage
93+
------------
94+
95+
Airflow provides a powerful feature for tracking data lineage not only between tasks but also from hooks used within those tasks.
96+
This functionality helps you understand how data flows throughout your Airflow pipelines.
97+
98+
A global instance of ``HookLineageCollector`` serves as the central hub for collecting lineage information.
99+
Hooks can send details about datasets they interact with to this collector.
100+
The collector then uses this data to construct AIP-60 compliant Datasets, a standard format for describing datasets.
101+
102+
.. code-block:: python
103+
104+
from airflow.lineage.hook_lineage import get_hook_lineage_collector
105+
106+
107+
class CustomHook(BaseHook):
108+
def run(self):
109+
# run actual code
110+
collector = get_hook_lineage_collector()
111+
collector.add_input_dataset(self, dataset_kwargs={"scheme": "file", "path": "/tmp/in"})
112+
collector.add_output_dataset(self, dataset_kwargs={"scheme": "file", "path": "/tmp/out"})
113+
114+
Lineage data collected by the ``HookLineageCollector`` can be accessed using an instance of ``HookLineageReader``,
115+
which is registered in an Airflow plugin.
116+
117+
.. code-block:: python
118+
119+
from airflow.lineage.hook_lineage import HookLineageReader
120+
from airflow.plugins_manager import AirflowPlugin
121+
122+
123+
class CustomHookLineageReader(HookLineageReader):
124+
def get_inputs(self):
125+
return self.lineage_collector.collected_datasets.inputs
126+
127+
128+
class HookLineageCollectionPlugin(AirflowPlugin):
129+
name = "HookLineageCollectionPlugin"
130+
hook_lineage_readers = [CustomHookLineageReader]
131+
132+
If no ``HookLineageReader`` is registered within Airflow, a default ``NoOpCollector`` is used instead.
133+
This collector does not create AIP-60 compliant datasets or collect lineage information.
134+
92135

93136
Lineage Backend
94137
---------------

0 commit comments

Comments
 (0)