Source code for dbt_airflow_factory.config_utils

"""Utilities for configuration files reading."""

from __future__ import annotations

import logging
import os
import pathlib
from typing import Any, Union

import yaml
from airflow.models import Variable
from jinja2 import FileSystemLoader
from jinja2.nativetypes import NativeEnvironment


[docs]def read_config( dag_path: Union[str, os.PathLike[str]], env: str, file_name: str, replace_jinja: bool = False, ) -> dict: """ Reads dictionaries out of *file_name* in both `base` and *env* directories, and compiles them into one. Values from *env* directory get precedence over `base` ones :param dag_path: Path to the directory containing ``config`` directory. :type dag_path: Union[str, os.PathLike[str]] :param env: Name of the environment. :type env: str :param file_name: Name of the config file. :type file_name: str :param replace_jinja: Whether replace Airflow vars using Jinja templating. :type replace_jinja: bool :return: Dictionary representing the config file. :rtype: dict """ config = read_env_config(dag_path, "base", file_name, replace_jinja) config.update(read_env_config(dag_path, env, file_name, replace_jinja)) return config
[docs]def read_env_config( dag_path: Union[str, os.PathLike[str]], env: str, file_name: str, replace_jinja: bool = False, ) -> dict: """ Read config file, depending on the ``env``. :param dag_path: Path to the directory containing ``config`` directory. :type dag_path: Union[str, os.PathLike[str]] :param env: Name of the environment. :type env: str :param file_name: Name of the config file. :type file_name: str :param replace_jinja: Whether replace Airflow vars using Jinja templating. :type replace_jinja: bool :return: Dictionary representing the config file. :rtype: dict """ config_file_path = os.path.join(dag_path, "config", env, file_name) if os.path.exists(config_file_path): logging.info("Reading config from " + config_file_path) return read_yaml_file(config_file_path, replace_jinja) logging.warning("Missing config file: " + config_file_path) return {}
[docs]def read_yaml_file(file_path: Union[str, os.PathLike[str]], replace_jinja: bool) -> dict: """ Load `yaml` file to dictionary. :param file_path: Path to the file. :type file_path: Union[str, os.PathLike[str]] :param replace_jinja: Whether replace Airflow vars using Jinja templating. :type replace_jinja: bool :return: Loaded dictionary. :rtype: dict """ if replace_jinja: return yaml.safe_load(_jinja_replace_airflow_vars(file_path)) with open(file_path, "r") as f: return yaml.safe_load(f)
def _jinja_replace_airflow_vars(file_path: Union[str, os.PathLike[str]]) -> str: # Copied from airflow.models.taskinstance class VariableAccessor: def __init__(self) -> None: self.var = None def __getattr__(self, item: str) -> Any: self.var = Variable.get(item) return self.var def __repr__(self) -> str: return str(self.var) @staticmethod def get(item: str) -> Any: return Variable.get(item) file_path = pathlib.Path(file_path) jinja_loader = FileSystemLoader(str(file_path.parent)) jinja_env = NativeEnvironment(loader=jinja_loader) return jinja_env.get_template(file_path.name).render(var={"value": VariableAccessor()})