Source code for dbt_airflow_factory.k8s.k8s_parameters

"""POD representing Kubernetes operator config file."""

from typing import Any, Dict, List, Optional

from dbt_airflow_factory.constants import IS_FIRST_AIRFLOW_VERSION

if IS_FIRST_AIRFLOW_VERSION:
    from airflow.contrib.kubernetes.secret import Secret
else:
    from airflow.kubernetes.secret import Secret


[docs]class KubernetesExecutionParameters: """POD representing Kubernetes operator config file. :param image: tag of Docker image you wish to launch. :type image: str :param namespace: the namespace to run within Kubernetes. :type namespace: str :param image_pull_policy: Specify a policy to cache or always pull an image. :type image_pull_policy: str :param node_selectors: A dict containing a group of scheduling rules. :type node_selectors: dict :param tolerations: A list of Kubernetes tolerations. :type tolerations: list :param labels: labels to apply to the Pod. (templated) :type labels: dict :param limit: A dict containing resources limits. :type limit: dict :param requests: A dict containing resources requests. :type requests: dict :param annotations: non-identifying metadata you can attach to the Pod. :type annotations: dict :param envs: Environment variables initialized in the container. (templated) :type envs: Optional[Dict[str, str]] :param secrets: Kubernetes secrets to inject in the container. They can be exposed as environment vars or files in a volume. :type secrets: List[Secret] :param is_delete_operator_pod: What to do when the pod reaches its final state, or the execution is interrupted. If True: delete the pod. :type is_delete_operator_pod: bool :param config_file: The path to the Kubernetes config file. (templated) If not specified, default value is ``~/.kube/config`` :type config_file: Optional[str] :param execution_script: Script that will be executed inside pod. :type execution_script: str :param in_cluster: Run kubernetes client with in_cluster configuration. :type in_cluster: bool :param cluster_context: Context that points to kubernetes cluster. Ignored when in_cluster is True. If None, current-context is used. :type cluster_context: str """ def __init__( self, image: str, namespace: str = "default", image_pull_policy: Optional[str] = None, node_selectors: Optional[dict] = None, tolerations: Optional[list] = None, labels: Optional[dict] = None, limit: Optional[dict] = None, requests: Optional[dict] = None, annotations: Optional[dict] = None, envs: Optional[Dict[str, str]] = None, secrets: Optional[List[Secret]] = None, is_delete_operator_pod: bool = True, config_file: Optional[str] = None, execution_script: str = "dbt --no-write-json", in_cluster: Optional[bool] = None, cluster_context: Optional[str] = None, startup_timeout_seconds: int = 120, **kwargs: Any, ) -> None: self.namespace = namespace self.image = image self.image_pull_policy = image_pull_policy self.node_selectors = node_selectors self.tolerations = tolerations self.labels = labels self._limit = limit self._requests = requests self.annotations = annotations self._env_vars = envs self._secrets = secrets self.is_delete_operator_pod = is_delete_operator_pod self.config_file = config_file self.execution_script = execution_script self.in_cluster = in_cluster self.cluster_context = cluster_context self.startup_timeout_seconds = startup_timeout_seconds @property def resources(self): # type: ignore """ Return dict containing resources requests and limits. In the Airflow 1, it was expected to be a real dictionary with ``request_memory``, ``request_cpu``, ``limit_memory``, and ``limit_cpu`` as keys. So for Airflow 1, the function returns such a dictionary. Beginning with Airflow 2, :class:`KubernetesPodOperator` expects ``V1ResourceRequirements`` class instead. Hence, for Airflow 2, the function returns instance of this class. :return: Dictionary containing resources requests and limits. """ if IS_FIRST_AIRFLOW_VERSION: return { "limit_memory": self._limit["memory"] if self._limit else None, "limit_cpu": self._limit["cpu"] if self._limit else None, "request_memory": self._requests["memory"] if self._requests else None, "request_cpu": self._requests["cpu"] if self._requests else None, } else: from kubernetes.client import models as k8s return k8s.V1ResourceRequirements(limits=self._limit, requests=self._requests) @property def env_vars(self): # type: ignore """ Return dict containing environment variables to set in Kubernetes. For Airflow 1, the function returns a dictionary. Beginning with Airflow 2, :class:`KubernetesPodOperator` expects a list of ``V1EnvVar`` instances instead. Hence, for Airflow 2, the function returns a ``List[k8s.V1EnvVar]``. :return: Dictionary or list containing environment variables. """ if self._env_vars is None: return None if IS_FIRST_AIRFLOW_VERSION: return self._env_vars else: from kubernetes.client import models as k8s return [k8s.V1EnvVar(k, v) for k, v in self._env_vars.items()] @property def secrets(self) -> Optional[List[Secret]]: """ Return list containing secrets to be set in Kubernetes. :return List containing kubernetes Secrets """ if self._secrets is None: return None return [Secret(**secret) for secret in self._secrets]