dbt_airflow_factory package
Submodules
dbt_airflow_factory.airflow_dag_factory module
dbt_airflow_factory.builder_factory module
dbt_airflow_factory.config_utils module
Utilities for configuration files reading.
- read_config(dag_path: Union[str, os.PathLike[str]], env: str, file_name: str, replace_jinja: bool = False) dict [source]
Reads dictionaries out of file_name in both base and env directories, and compiles them into one. Values from env directory get precedence over base ones
- Parameters
dag_path (Union[str, os.PathLike[str]]) – Path to the directory containing
config
directory.env (str) – Name of the environment.
file_name (str) – Name of the config file.
replace_jinja (bool) – Whether replace Airflow vars using Jinja templating.
- Returns
Dictionary representing the config file.
- Return type
dict
- read_env_config(dag_path: Union[str, os.PathLike[str]], env: str, file_name: str, replace_jinja: bool = False) dict [source]
Read config file, depending on the
env
.- Parameters
dag_path (Union[str, os.PathLike[str]]) – Path to the directory containing
config
directory.env (str) – Name of the environment.
file_name (str) – Name of the config file.
replace_jinja (bool) – Whether replace Airflow vars using Jinja templating.
- Returns
Dictionary representing the config file.
- Return type
dict
- read_yaml_file(file_path: Union[str, os.PathLike[str]], replace_jinja: bool) dict [source]
Load yaml file to dictionary.
- Parameters
file_path (Union[str, os.PathLike[str]]) – Path to the file.
replace_jinja (bool) – Whether replace Airflow vars using Jinja templating.
- Returns
Loaded dictionary.
- Return type
dict
dbt_airflow_factory.dbt_parameters module
POD representing DBT operator config file.
- class DbtExecutionEnvironmentParameters(target: str, project_dir_path: str = '/dbt', profile_dir_path: str = '/root/.dbt', vars: Optional[Dict[str, str]] = None, **kwargs: Any)[source]
Bases:
object
POD representing DBT operator config file.
- Parameters
target (str) – Name of the target environment (passed to dbt as
--target
).project_dir_path (str) – Path to project directory.
profile_dir_path – Path to the directory containing
profiles.yml
.vars (Optional[Dict[str, str]]) – Dictionary of variables to pass to the dbt.
- property vars: str
String representation of dictionary of dbt variables.
DBT expects
--vars
passing string in YAML format. This property returns such a string.- Returns
String representation of dictionary of dbt variables.
- Return type
str
dbt_airflow_factory.ingestion module
dbt_airflow_factory.operator module
Factories creating Airflow Operators running DBT tasks.
- class DbtRunOperatorBuilder[source]
Bases:
object
Base class of a factory creating Airflow
airflow.models.baseoperator.BaseOperator
running a single DBT task.- abstract create(name: str, command: str, model: Optional[str] = None, additional_dbt_args: Optional[List[str]] = None) airflow.models.baseoperator.BaseOperator [source]
Create Airflow Operator running a single DBT task.
- Parameters
name (str) – task name.
command (str) – DBT command to run.
model (Optional[str]) – models to include.
additional_dbt_args (Optional[List[str]]) – Additional arguments to pass to dbt.
- Returns
Airflow Operator running a single DBT task.
- Return type
BaseOperator
- class EphemeralOperator(task_id: str, owner: str = 'airflow', email: str | Iterable[str] | None = None, email_on_retry: bool = True, email_on_failure: bool = True, retries: int | None = 0, retry_delay: timedelta | float = datetime.timedelta(seconds=300), retry_exponential_backoff: bool = False, max_retry_delay: timedelta | float | None = None, start_date: datetime | None = None, end_date: datetime | None = None, depends_on_past: bool = False, ignore_first_depends_on_past: bool = True, wait_for_past_depends_before_skipping: bool = False, wait_for_downstream: bool = False, dag: DAG | None = None, params: collections.abc.MutableMapping | None = None, default_args: dict | None = None, priority_weight: int = 1, weight_rule: str = WeightRule.DOWNSTREAM, queue: str = 'default', pool: str | None = None, pool_slots: int = 1, sla: timedelta | None = None, execution_timeout: timedelta | None = None, on_execute_callback: None | TaskStateChangeCallback | list[TaskStateChangeCallback] = None, on_failure_callback: None | TaskStateChangeCallback | list[TaskStateChangeCallback] = None, on_success_callback: None | TaskStateChangeCallback | list[TaskStateChangeCallback] = None, on_retry_callback: None | TaskStateChangeCallback | list[TaskStateChangeCallback] = None, pre_execute: TaskPreExecuteHook | None = None, post_execute: TaskPostExecuteHook | None = None, trigger_rule: str = TriggerRule.ALL_SUCCESS, resources: dict[str, Any] | None = None, run_as_user: str | None = None, task_concurrency: int | None = None, max_active_tis_per_dag: int | None = None, max_active_tis_per_dagrun: int | None = None, executor_config: dict | None = None, do_xcom_push: bool = True, inlets: Any | None = None, outlets: Any | None = None, task_group: TaskGroup | None = None, doc: str | None = None, doc_md: str | None = None, doc_json: str | None = None, doc_yaml: str | None = None, doc_rst: str | None = None, **kwargs)[source]
Bases:
airflow.operators.empty.EmptyOperator
DummyOperator
representing ephemeral DBT model.- ui_color: str = '#F3E4F7'
dbt_airflow_factory.tasks module
Classes representing tasks corresponding to a single DBT model.
- class ModelExecutionTask(execution_airflow_task: airflow.models.baseoperator.BaseOperator, test_airflow_task: Optional[airflow.models.baseoperator.BaseOperator] = None, task_group=None)[source]
Bases:
object
Wrapper around tasks corresponding to a single DBT model.
- Parameters
execution_airflow_task (BaseOperator) – Operator running DBT’s
run
task.test_airflow_task (BaseOperator) – Operator running DBT’s
test
task (optional).task_group – TaskGroup consisting of
run
andtest
tasks (if Airflow version is at least 2).
- class ModelExecutionTasks(tasks: Dict[str, dbt_airflow_factory.tasks.ModelExecutionTask], starting_task_names: List[str], ending_task_names: List[str])[source]
Bases:
object
Dictionary of all Operators corresponding to DBT tasks.
- Parameters
tasks (Dict[str, ModelExecutionTask]) – Dictionary of model tasks.
starting_task_names (List[str]) – List of names of initial tasks (DAG sources).
ending_task_names (List[str]) – List of names of ending tasks (DAG sinks).
- get_ending_tasks() List[dbt_airflow_factory.tasks.ModelExecutionTask] [source]
Get a list of all DAG sinks.
- Returns
List of all DAG sinks.
- Return type
List[ModelExecutionTask]
- get_starting_tasks() List[dbt_airflow_factory.tasks.ModelExecutionTask] [source]
Get a list of all DAG sources.
- Returns
List of all DAG sources.
- Return type
List[ModelExecutionTask]
- get_task(node_name: str) dbt_airflow_factory.tasks.ModelExecutionTask [source]
Return
ModelExecutionTask
for given model’s node_name.- Parameters
node_name (str) – Name of the task.
- Returns
Wrapper around tasks corresponding to a given model.
- Return type