Source code for dbt_airflow_factory.operator

"""Factories creating Airflow Operators running DBT tasks."""

import abc
from typing import List, Optional

from dbt_airflow_factory.constants import IS_FIRST_AIRFLOW_VERSION

if IS_FIRST_AIRFLOW_VERSION:
    from airflow.operators.dummy_operator import DummyOperator
else:
    from airflow.operators.dummy import DummyOperator

from airflow.models.baseoperator import BaseOperator


[docs]class DbtRunOperatorBuilder(metaclass=abc.ABCMeta): """ Base class of a factory creating Airflow :class:`airflow.models.baseoperator.BaseOperator` running a single DBT task. """
[docs] @abc.abstractmethod def create( self, name: str, command: str, model: Optional[str] = None, additional_dbt_args: Optional[List[str]] = None, ) -> BaseOperator: """ Create Airflow Operator running a single DBT task. :param name: task name. :type name: str :param command: DBT command to run. :type command: str :param model: models to include. :type model: Optional[str] :param additional_dbt_args: Additional arguments to pass to dbt. :type additional_dbt_args: Optional[List[str]] :return: Airflow Operator running a single DBT task. :rtype: BaseOperator """ raise NotImplementedError
[docs]class EphemeralOperator(DummyOperator): """ :class:`DummyOperator` representing ephemeral DBT model. """ ui_color = "#F3E4F7"