Documentation Index
Fetch the complete documentation index at: https://astronomer-preview.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
The @task.kubernetes decorator provides a TaskFlow alternative to the traditional KubernetesPodOperator, which allows you to run a specified task in its own Kubernetes pod. Note that the Docker image provided to the @task.kubernetes decorator’s image parameter must support executing Python scripts in order to leverage the KubernetesPodOperator decorator.
Like regular @task decorated functions, XComs can be passed to the Python script running in the dedicated Kubernetes pod. If do_xcom_push is set to True in the decorator parameters, the value returned by the decorated function is pushed to XCom.
Astronomer recommends using the @task.kubernetes decorator instead of the KubernetesPodOperator when using XCom with Python scripts in a dedicated Kubernetes pod.
from pendulum import datetime
from airflow.configuration import conf
from airflow.decorators import dag, task
import random
# get the current Kubernetes namespace Airflow is running in
namespace = conf.get("kubernetes", "NAMESPACE")
@dag(
start_date=datetime(2023, 1, 1),
catchup=False,
schedule="@daily",
)
def kubernetes_decorator_example_dag():
@task
def extract_data():
# simulating querying from a database
data_point = random.randint(0, 100)
return data_point
@task.kubernetes(
# specify the Docker image to launch, it needs to be able to run a Python script
image="python",
# launch the Pod on the same cluster as Airflow is running on
in_cluster=True,
# launch the Pod in the same namespace as Airflow is running in
namespace=namespace,
# Pod configuration
# naming the Pod
name="my_pod",
# log stdout of the container as task logs
get_logs=True,
# log events in case of Pod failure
log_events_on_failure=True,
# enable pushing to XCom
do_xcom_push=True,
)
def transform(data_point):
multiplied_data_point = 23 * int(data_point)
return multiplied_data_point
@task
def load_data(**context):
# pull the XCom value that has been pushed by the KubernetesPodOperator
transformed_data_point = context["ti"].xcom_pull(
task_ids="transform", key="return_value"
)
print(transformed_data_point)
load_data(transform(extract_data()))
kubernetes_decorator_example_dag()