dbt_airflow_factory package

Submodules

dbt_airflow_factory.airflow_dag_factory module

dbt_airflow_factory.builder_factory module

dbt_airflow_factory.config_utils module

Utilities for configuration files reading.

read_config(dag_path: Union[str, os.PathLike[str]], env: str, file_name: str, replace_jinja: bool = False) dict[source]

Reads dictionaries out of file_name in both base and env directories, and compiles them into one. Values from env directory get precedence over base ones

Parameters
  • dag_path (Union[str, os.PathLike[str]]) – Path to the directory containing config directory.

  • env (str) – Name of the environment.

  • file_name (str) – Name of the config file.

  • replace_jinja (bool) – Whether replace Airflow vars using Jinja templating.

Returns

Dictionary representing the config file.

Return type

dict

read_env_config(dag_path: Union[str, os.PathLike[str]], env: str, file_name: str, replace_jinja: bool = False) dict[source]

Read config file, depending on the env.

Parameters
  • dag_path (Union[str, os.PathLike[str]]) – Path to the directory containing config directory.

  • env (str) – Name of the environment.

  • file_name (str) – Name of the config file.

  • replace_jinja (bool) – Whether replace Airflow vars using Jinja templating.

Returns

Dictionary representing the config file.

Return type

dict

read_yaml_file(file_path: Union[str, os.PathLike[str]], replace_jinja: bool) dict[source]

Load yaml file to dictionary.

Parameters
  • file_path (Union[str, os.PathLike[str]]) – Path to the file.

  • replace_jinja (bool) – Whether replace Airflow vars using Jinja templating.

Returns

Loaded dictionary.

Return type

dict

dbt_airflow_factory.dbt_parameters module

POD representing DBT operator config file.

class DbtExecutionEnvironmentParameters(target: str, project_dir_path: str = '/dbt', profile_dir_path: str = '/root/.dbt', vars: Optional[Dict[str, str]] = None, **kwargs: Any)[source]

Bases: object

POD representing DBT operator config file.

Parameters
  • target (str) – Name of the target environment (passed to dbt as --target).

  • project_dir_path (str) – Path to project directory.

  • profile_dir_path – Path to the directory containing profiles.yml.

  • vars (Optional[Dict[str, str]]) – Dictionary of variables to pass to the dbt.

property vars: str

String representation of dictionary of dbt variables.

DBT expects --vars passing string in YAML format. This property returns such a string.

Returns

String representation of dictionary of dbt variables.

Return type

str

dbt_airflow_factory.ingestion module

dbt_airflow_factory.operator module

Factories creating Airflow Operators running DBT tasks.

class DbtRunOperatorBuilder[source]

Bases: object

Base class of a factory creating Airflow airflow.models.baseoperator.BaseOperator running a single DBT task.

abstract create(name: str, command: str, model: Optional[str] = None, additional_dbt_args: Optional[List[str]] = None) airflow.models.baseoperator.BaseOperator[source]

Create Airflow Operator running a single DBT task.

Parameters
  • name (str) – task name.

  • command (str) – DBT command to run.

  • model (Optional[str]) – models to include.

  • additional_dbt_args (Optional[List[str]]) – Additional arguments to pass to dbt.

Returns

Airflow Operator running a single DBT task.

Return type

BaseOperator

class EphemeralOperator(task_id: str, owner: str = 'airflow', email: str | Iterable[str] | None = None, email_on_retry: bool = True, email_on_failure: bool = True, retries: int | None = 0, retry_delay: timedelta | float = datetime.timedelta(seconds=300), retry_exponential_backoff: bool = False, max_retry_delay: timedelta | float | None = None, start_date: datetime | None = None, end_date: datetime | None = None, depends_on_past: bool = False, ignore_first_depends_on_past: bool = True, wait_for_past_depends_before_skipping: bool = False, wait_for_downstream: bool = False, dag: DAG | None = None, params: collections.abc.MutableMapping | None = None, default_args: dict | None = None, priority_weight: int = 1, weight_rule: str = WeightRule.DOWNSTREAM, queue: str = 'default', pool: str | None = None, pool_slots: int = 1, sla: timedelta | None = None, execution_timeout: timedelta | None = None, on_execute_callback: None | TaskStateChangeCallback | list[TaskStateChangeCallback] = None, on_failure_callback: None | TaskStateChangeCallback | list[TaskStateChangeCallback] = None, on_success_callback: None | TaskStateChangeCallback | list[TaskStateChangeCallback] = None, on_retry_callback: None | TaskStateChangeCallback | list[TaskStateChangeCallback] = None, pre_execute: TaskPreExecuteHook | None = None, post_execute: TaskPostExecuteHook | None = None, trigger_rule: str = TriggerRule.ALL_SUCCESS, resources: dict[str, Any] | None = None, run_as_user: str | None = None, task_concurrency: int | None = None, max_active_tis_per_dag: int | None = None, max_active_tis_per_dagrun: int | None = None, executor_config: dict | None = None, do_xcom_push: bool = True, inlets: Any | None = None, outlets: Any | None = None, task_group: TaskGroup | None = None, doc: str | None = None, doc_md: str | None = None, doc_json: str | None = None, doc_yaml: str | None = None, doc_rst: str | None = None, **kwargs)[source]

Bases: airflow.operators.empty.EmptyOperator

DummyOperator representing ephemeral DBT model.

ui_color: str = '#F3E4F7'

dbt_airflow_factory.tasks module

Classes representing tasks corresponding to a single DBT model.

class ModelExecutionTask(execution_airflow_task: airflow.models.baseoperator.BaseOperator, test_airflow_task: Optional[airflow.models.baseoperator.BaseOperator] = None, task_group=None)[source]

Bases: object

Wrapper around tasks corresponding to a single DBT model.

Parameters
  • execution_airflow_task (BaseOperator) – Operator running DBT’s run task.

  • test_airflow_task (BaseOperator) – Operator running DBT’s test task (optional).

  • task_group – TaskGroup consisting of run and test tasks (if Airflow version is at least 2).

get_end_task()[source]

Return model’s last task.

It is either a whole TaskGroup, test task, or run task, depending on version of Airflow and existence of test task.

get_start_task()[source]

Return model’s first task.

It is either a whole TaskGroup or run task.

class ModelExecutionTasks(tasks: Dict[str, dbt_airflow_factory.tasks.ModelExecutionTask], starting_task_names: List[str], ending_task_names: List[str])[source]

Bases: object

Dictionary of all Operators corresponding to DBT tasks.

Parameters
  • tasks (Dict[str, ModelExecutionTask]) – Dictionary of model tasks.

  • starting_task_names (List[str]) – List of names of initial tasks (DAG sources).

  • ending_task_names (List[str]) – List of names of ending tasks (DAG sinks).

get_ending_tasks() List[dbt_airflow_factory.tasks.ModelExecutionTask][source]

Get a list of all DAG sinks.

Returns

List of all DAG sinks.

Return type

List[ModelExecutionTask]

get_starting_tasks() List[dbt_airflow_factory.tasks.ModelExecutionTask][source]

Get a list of all DAG sources.

Returns

List of all DAG sources.

Return type

List[ModelExecutionTask]

get_task(node_name: str) dbt_airflow_factory.tasks.ModelExecutionTask[source]

Return ModelExecutionTask for given model’s node_name.

Parameters

node_name (str) – Name of the task.

Returns

Wrapper around tasks corresponding to a given model.

Return type

ModelExecutionTask

length() int[source]

Count TaskGroups corresponding to a single DBT model.