Skip to content

The Airflow Scheduler and Airflow Triggerer are failing to load the openlineage plugin with Custom extractors #38037

@aditya-7

Description

@aditya-7

Apache Airflow Provider(s)

openlineage

Versions of Apache Airflow Providers

Screenshot 2024-03-11 at 6 52 24 PM

Apache Airflow version

2.8.2

Operating System

Debian GNU/Linux 12 (bookworm)

Deployment

Docker-Compose

Deployment details

Docker Compose version v2.24.3-desktop.

Created a custom docker image using Dockerfile:

FROM apache/airflow:2.8.2
# COPY manager.py /home/airflow/.local/lib/python3.8/site-packages/airflow/providers/openlineage/extractors/manager.py
COPY dags /opt/airflow/dags/
COPY plugins /opt/airflow/plugins/

Changed x-airflow-common.&airflow-common in the docker-compose.yml file:

  &airflow-common
  # image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.8.2}
  build: .
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
    AIRFLOW__CORE__FERNET_KEY: ''
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
    AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
    AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true'
    _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:- apache-airflow-providers-amazon}
    AIRFLOW__OPENLINEAGE__TRANSPORT: '{"type":"http","url":"http://192.168.1.40:9090"}'
    AIRFLOW__OPENLINEAGE__NAMESPACE: MyNamespace
    AIRFLOW__OPENLINEAGE__EXTRACTORS: plugins.extractors.some_lineage_extractor.MyExtractor

Built & deployed using the command:
docker-compose build && docker-compose up

This is my project structure:
Screenshot 2024-03-11 at 7 29 04 PM

What happened

While I deploy Airflow, the airflow-scheduler, and the airflow-triggerer containers fail to load the openlineage plugin. They can load inbuilt extractors such as BashExtractor, PythonExtractor, etc.
Interestingly, the airflow-init container was able to load the plugin successfully. I was able to test this by overriding the library file /home/airflow/.local/lib/python3.8/site-packages/airflow/providers/openlineage/extractors/manager.py with a few debug points using the logger.
I overwrote the ExtractorManager constructor to add some debug points like this:

class ExtractorManager(LoggingMixin):
    """Class abstracting management of custom extractors."""

    def __init__(self):
        super().__init__()
        self.extractors: dict[str, type[BaseExtractor]] = {}
        self.default_extractor = DefaultExtractor

        # Built-in Extractors like Bash and Python
        for extractor in _iter_extractor_types():
            print(f"inbuilt extractor: {extractor}")
            for operator_class in extractor.get_operator_classnames():
                self.extractors[operator_class] = extractor

        # Semicolon-separated extractors in Airflow configuration or OPENLINEAGE_EXTRACTORS variable.
        # Extractors should implement BaseExtractor
        env_extractors = conf.get("openlineage", "extractors", fallback=os.getenv("OPENLINEAGE_EXTRACTORS"))
        # skip either when it's empty string or None
        if env_extractors:
            self.log.info(f"All extractors: {env_extractors}")
            for extractor in env_extractors.split(";"):
                self.log.info(f"extractor:{extractor}")
                try:
                    self.log.info(f"PATH = '{os.getenv('PATH')}'")
                    self.log.info(os.listdir('/opt/airflow/plugins/extractors/'))
                except FileNotFoundError:
                    self.log.error('Extractors directory does not exist.')
                extractor: type[BaseExtractor] = try_import_from_string(extractor.strip())

                for operator_class in extractor.get_operator_classnames():
                    if operator_class in self.extractors:
                        self.log.debug(
                            "Duplicate extractor found for `%s`. `%s` will be used instead of `%s`",
                            operator_class,
                            extractor,
                            self.extractors[operator_class],
                        )
                    self.extractors[operator_class] = extractor
  • The airflow-triggerer and the airflow-scheduler containers failed to load the openlineage plugin while trying to import the custom extractor class with the following error:
Screenshot 2024-03-11 at 7 42 22 PM
  • Whereas, the airflow-init container successfully loaded the plugin with the same custom extractor:
Screenshot 2024-03-11 at 7 42 02 PM

What you think should happen instead

The Airflow triggerer and the scheduler should also be able to import the Custom extractor class like the Airflow init container did, and successfully load the openlineage plugin.

How to reproduce

  1. Create the project folder with a simple DAG in the dags/ folder.
  2. Write a custom extractor <project_root>/plugins/extractors/some_ilneage_extractor.py
from airflow.providers.openlineage.extractors import BaseExtractor, OperatorLineage
from openlineage.client.run import Dataset


class MyExtractor(BaseExtractor):

    @classmethod
    def get_operator_classnames(cls):
        return ['PythonOperator']

    def extract(self) -> OperatorLineage:
        return OperatorLineage(inputs=[Dataset(namespace=f"s3a://{self.operator.input_bucket}", name=source)
                                       for source in sorted(self.operator.resolved_source_objects)],
                               outputs=[Dataset(namespace=f"s3a://{self.operator.output_bucket}", name=source)
                                        for source in sorted(self.operator.resolved_destination_objects)])

  1. Create a Dockerfile at project root and create update the docker-compose.yaml file as mentioned in the "Deployment Details" section above.
  2. Run docker-compose build && docker-compose up to see the errors in the log.

Anything else

I verified that the extractor class files are present in the containers by adding custom logs to the manager class. Relevant logs are added in the "What Happened" section.
Also verified that the default lineage metrics are submitted to the Lineage backed (Marquez).

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions