"""POD representing Kubernetes operator config file."""
from typing import Any, Dict, List, Optional
from dbt_airflow_factory.constants import IS_FIRST_AIRFLOW_VERSION
if IS_FIRST_AIRFLOW_VERSION:
from airflow.contrib.kubernetes.secret import Secret
else:
from airflow.kubernetes.secret import Secret
[docs]class KubernetesExecutionParameters:
"""POD representing Kubernetes operator config file.
:param image: tag of Docker image you wish to launch.
:type image: str
:param namespace: the namespace to run within Kubernetes.
:type namespace: str
:param image_pull_policy: Specify a policy to cache or always pull an image.
:type image_pull_policy: str
:param node_selectors: A dict containing a group of scheduling rules.
:type node_selectors: dict
:param tolerations: A list of Kubernetes tolerations.
:type tolerations: list
:param labels: labels to apply to the Pod. (templated)
:type labels: dict
:param limit: A dict containing resources limits.
:type limit: dict
:param requests: A dict containing resources requests.
:type requests: dict
:param annotations: non-identifying metadata you can attach to the Pod.
:type annotations: dict
:param envs: Environment variables initialized in the container. (templated)
:type envs: Optional[Dict[str, str]]
:param secrets: Kubernetes secrets to inject in the container.
They can be exposed as environment vars or files in a volume.
:type secrets: List[Secret]
:param is_delete_operator_pod: What to do when the pod reaches its final
state, or the execution is interrupted. If True: delete the pod.
:type is_delete_operator_pod: bool
:param config_file: The path to the Kubernetes config file. (templated)
If not specified, default value is ``~/.kube/config``
:type config_file: Optional[str]
:param execution_script: Script that will be executed inside pod.
:type execution_script: str
:param in_cluster: Run kubernetes client with in_cluster configuration.
:type in_cluster: bool
:param cluster_context: Context that points to kubernetes cluster.
Ignored when in_cluster is True. If None, current-context is used.
:type cluster_context: str
"""
def __init__(
self,
image: str,
namespace: str = "default",
image_pull_policy: Optional[str] = None,
node_selectors: Optional[dict] = None,
tolerations: Optional[list] = None,
labels: Optional[dict] = None,
limit: Optional[dict] = None,
requests: Optional[dict] = None,
annotations: Optional[dict] = None,
envs: Optional[Dict[str, str]] = None,
secrets: Optional[List[Secret]] = None,
is_delete_operator_pod: bool = True,
config_file: Optional[str] = None,
execution_script: str = "dbt --no-write-json",
in_cluster: Optional[bool] = None,
cluster_context: Optional[str] = None,
startup_timeout_seconds: int = 120,
**kwargs: Any,
) -> None:
self.namespace = namespace
self.image = image
self.image_pull_policy = image_pull_policy
self.node_selectors = node_selectors
self.tolerations = tolerations
self.labels = labels
self._limit = limit
self._requests = requests
self.annotations = annotations
self._env_vars = envs
self._secrets = secrets
self.is_delete_operator_pod = is_delete_operator_pod
self.config_file = config_file
self.execution_script = execution_script
self.in_cluster = in_cluster
self.cluster_context = cluster_context
self.startup_timeout_seconds = startup_timeout_seconds
@property
def resources(self): # type: ignore
"""
Return dict containing resources requests and limits.
In the Airflow 1, it was expected to be a real dictionary with
``request_memory``, ``request_cpu``, ``limit_memory``, and ``limit_cpu``
as keys. So for Airflow 1, the function returns such a dictionary.
Beginning with Airflow 2, :class:`KubernetesPodOperator` expects
``V1ResourceRequirements`` class instead. Hence, for Airflow 2, the
function returns instance of this class.
:return: Dictionary containing resources requests and limits.
"""
if IS_FIRST_AIRFLOW_VERSION:
return {
"limit_memory": self._limit["memory"] if self._limit else None,
"limit_cpu": self._limit["cpu"] if self._limit else None,
"request_memory": self._requests["memory"] if self._requests else None,
"request_cpu": self._requests["cpu"] if self._requests else None,
}
else:
from kubernetes.client import models as k8s
return k8s.V1ResourceRequirements(limits=self._limit, requests=self._requests)
@property
def env_vars(self): # type: ignore
"""
Return dict containing environment variables to set in Kubernetes.
For Airflow 1, the function returns a dictionary.
Beginning with Airflow 2, :class:`KubernetesPodOperator` expects
a list of ``V1EnvVar`` instances instead. Hence, for Airflow 2, the
function returns a ``List[k8s.V1EnvVar]``.
:return: Dictionary or list containing environment variables.
"""
if self._env_vars is None:
return None
if IS_FIRST_AIRFLOW_VERSION:
return self._env_vars
else:
from kubernetes.client import models as k8s
return [k8s.V1EnvVar(k, v) for k, v in self._env_vars.items()]
@property
def secrets(self) -> Optional[List[Secret]]:
"""
Return list containing secrets to be set in Kubernetes.
:return List containing kubernetes Secrets
"""
if self._secrets is None:
return None
return [Secret(**secret) for secret in self._secrets]