Skip to main content

Documentation Index

Fetch the complete documentation index at: https://astronomer-preview.mintlify.app/llms.txt

Use this file to discover all available pages before exploring further.

If some of your tasks require specific resources such as a GPU, you might want to run them in a different cluster than your Airflow instance. In setups where both clusters are used by the same AWS, Azure, or GCP account, you can manage separate clusters with roles and permissions.
To launch Pods in external clusters from a local Airflow environment, you must have valid authentication for the external cluster so that your local Airflow environment has permissions to launch a Pod in the external cluster. For managed Kubernetes services from public cloud providers, authentication is federated through the native IAM service. To grant the Astro role permissions to launch pods on your cluster, you can either include static credentials or use workload identity to authorize the Astro role to your cluster.
This example shows how to set up an EKS cluster on AWS and run a Pod on it from an Airflow instance where cross-account access is not available.

Prerequisites

  • Network connectivity between your Airflow execution environment and the external Kubernetes cluster:
    • Hosted execution mode: A network connection between your Astro Deployment and the external cluster.
    • Remote execution mode: Network connectivity between the environment where your Remote Execution Agent runs and the external cluster. You are responsible for managing this connectivity. A direct network connection between Astro and the external cluster is not required.

Setup

1
Set up your external cluster
2
  • Create an EKS cluster IAM role with a unique name and add the following permission policies:
    • AmazonEKSWorkerNodePolicy
    • AmazonEKS_CNI_Policy
    • AmazonEC2ContainerRegistryReadOnly
    Record the ARN of the new role, as it will be needed below.
  • Update the trust policy of this new role to include the workload identity of your Deployment. This step ensures that the role can be assumed by your Deployment.
    {
    "Version": "2012-10-17",
    "Statement": [
        {
        "Effect": "Allow",
        "Principal": {
            "AWS": "arn:aws:iam::<aws account id>:<your user>",
            "Service": [
                "ec2.amazonaws.com",
                "eks.amazonaws.com"
            ]
        },
        "Action": "sts:AssumeRole"
        }
    ]
    }
    
  • If you don’t already have a cluster, create a new EKS cluster and assign the new role to it.
  • 3
    Retrieve the KubeConfig file from the EKS cluster
    4
  • Use a KubeConfig file to remotely connect to your new cluster. On AWS, you can run the following command to retrieve it:
    aws eks --region <your-region> update-kubeconfig --name <cluster-name>  --kubeconfig my_kubeconfig.yaml
    
    This command creates a new KubeConfig file called my_kubeconfig.yaml.
  • Ensure that the file below matches your generated KubeConfig. The newly generated KubeConfig must be edited to instruct the AWS IAM Authenticator for Kubernetes to assume your new IAM Role created in Step 1. Replace <your assume role arn> with the IAM Role ARN from Step 1.
    apiVersion: v1
    clusters:
    - cluster:
        certificate-authority-data: <base 64 public certificate>
        server: <Kubernetes API Endpoint>
      name: <arn of your cluster>
    contexts:
    - context:
        cluster: <arn of your cluster>
        user: <arn of your cluster>
      name: <arn of your cluster>
    current-context: <arn of your cluster>
    kind: Config
    preferences: {}
    users:
    - name: <arn of your cluster>
      user:
        exec:
          apiVersion: client.authentication.k8s.io/v1alpha1
          args:
          - --region
          - <your cluster's AWS region>
          - eks
          - get-token
          - --cluster-name
          - <name of your cluster>
          - --role
          - <your assume role arn>
          command: aws
          interactiveMode: IfAvailable
          provideClusterInfo: false
    
  • 5
    Create a Kubernetes cluster connection
    6
    Astronomer recommends creating a Kubernetes cluster connection because it’s more secure than adding an unencrypted kubeconfig file directly to your Astro project.
    7
  • Convert the kubeconfig configuration you retrieved from your cluster to JSON format.
  • In either the Airflow UI or the Astro environment manager, create a new Kubernetes Cluster Connection connection. In the Kube config (JSON format) field, paste the kubeconfig configuration you retrieved from your cluster after converting it from yaml to json format.
  • Click Save.
  • 8
    You can now specify this connection in the configuration of any KubernetesPodOperator task that needs to access your external cluster.
    9
    Install the AWS CLI in your Astro environment
    10
    To connect to your external EKS cluster, you need to install the AWS CLI in your Astro project.
    11
  • Add the following to your Dockerfile to install the AWS CLI:
    USER root
    
    RUN curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
    # Note: if you are testing your pipeline locally you may need to adjust the zip version to your dev local environment
    RUN unzip awscliv2.zip
    RUN ./aws/install
    
    USER astro
    
  • Add the unzip package to your packages.txt file to make the unzip command available in your Docker container:
    unzip
    
  • 12
    If you are working locally, you need to restart your Astro project to apply the changes.
    13
    Configure your task
    14
    In your KubernetesPodOperator task configuration, ensure that you set cluster-context and namespace for your remote cluster. In the following example, the task launches a Pod in an external cluster based on the configuration defined in the k8s connection.
    15
    run_on_EKS = KubernetesPodOperator(
        task_id="run_on_EKS",
        kubernetes_conn_id="k8s",
        cluster_context="<your-cluster-id>",
        namespace="<your-namespace>",
        name="example_pod",
        image="ubuntu",
        cmds=["bash", "-cx"],
        arguments=["echo hello"],
        get_logs=True,
        startup_timeout_seconds=240,
    )
    

    Example dag

    The following dag uses several classes from the Amazon provider package to dynamically spin up and delete Pods for each task in a newly created node group. If your remote Kubernetes cluster already has a node group available, you only need to define your task in the KubernetesPodOperator itself. The example dag contains 5 consecutive tasks:
    • Create a node group according to the user’s specifications (For the example that uses GPU resources).
    • Use a sensor to check that the cluster is running correctly.
    • Use the KubernetesPodOperator to run any valid Docker image in a Pod on the newly created node group on the remote cluster. The example dag uses the standard Ubuntu image to print “hello” to the console using a bash command.
    • Delete the node group.
    • Verify that the node group has been deleted.
    # import DAG object and utility packages
    from airflow import DAG
    from pendulum import datetime
    from airflow.configuration import conf
    
    # import the KubernetesPodOperator
    from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
        KubernetesPodOperator,
    )
    
    # import EKS related packages from the Amazon Provider
    from airflow.providers.amazon.aws.hooks.eks import EksHook, NodegroupStates
    from airflow.providers.amazon.aws.operators.eks import (
        EksCreateNodegroupOperator,
        EksDeleteNodegroupOperator,
    )
    from airflow.providers.amazon.aws.sensors.eks import EksNodegroupStateSensor
    
    # custom class to create a node group with Nodes on EKS
    class EksCreateNodegroupWithNodesOperator(EksCreateNodegroupOperator):
        def execute(self, context):
            # instantiating an EKSHook on the basis of the AWS connection (Step 5)
            eks_hook = EksHook(
                aws_conn_id=self.aws_conn_id,
                region_name=self.region,
            )
    
            # define the Node group to create
            eks_hook.create_nodegroup(
                clusterName=self.cluster_name,
                nodegroupName=self.nodegroup_name,
                subnets=self.nodegroup_subnets,
                nodeRole=self.nodegroup_role_arn,
                scalingConfig={"minSize": 1, "maxSize": 1, "desiredSize": 1},
                diskSize=20,
                instanceTypes=["g4dn.xlarge"],
                amiType="AL2_x86_64_GPU",  # get GPU resources
                updateConfig={"maxUnavailable": 1},
            )
    
    
    # instantiate the DAG
    with DAG(
        start_date=datetime(2022, 6, 1),
        catchup=False,
        schedule="@daily",
        dag_id="KPO_remote_EKS_cluster_example_dag",
    ) as dag:
        # task 1 creates the node group
        create_gpu_nodegroup = EksCreateNodegroupWithNodesOperator(
            task_id="create_gpu_nodegroup",
            cluster_name="<your cluster name>",
            nodegroup_name="gpu-nodes",
            nodegroup_subnets=["<your subnet>", "<your subnet>"],
            nodegroup_role_arn="<arn of your EKS role>",
            aws_conn_id="<your aws conn id>",
            region="<your region>",
        )
    
        # task 2 check for node group status, if it is up and running
        check_nodegroup_status = EKSNodegroupStateSensor(
            task_id="check_nodegroup_status",
            cluster_name="<your cluster name>",
            nodegroup_name="gpu-nodes",
            mode="reschedule",
            timeout=60 * 30,
            exponential_backoff=True,
            aws_conn_id="<your aws conn id>",
            region="<your region>",
        )
    
        # task 3 the KubernetesPodOperator running a task
        # here, cluster_context and the kubernetes_conn_id are defined at the task level.
        run_on_EKS = KubernetesPodOperator(
            task_id="run_on_EKS",
            cluster_context="<arn of your cluster>",
            namespace="airflow-kpo-default",
            name="example_pod",
            image="ubuntu",
            cmds=["bash", "-cx"],
            arguments=["echo hello"],
            get_logs=True,
            in_cluster=False,
            kubernetes_conn_id="k8s",
            startup_timeout_seconds=240,
        )
    
        # task 4 deleting the node group
        delete_gpu_nodegroup = EksDeleteNodegroupOperator(
            task_id="delete_gpu_nodegroup",
            cluster_name="<your cluster name>",
            nodegroup_name="gpu-nodes",
            aws_conn_id="<your aws conn id>",
            region="<your region>",
        )
    
        # task 5 checking that the node group was deleted successfully
        check_nodegroup_termination = EksNodegroupStateSensor(
            task_id="check_nodegroup_termination",
            cluster_name="<your cluster name>",
            nodegroup_name="gpu-nodes",
            aws_conn_id="<your aws conn id>",
            region="<your region>",
            mode="reschedule",
            timeout=60 * 30,
            target_state=NodegroupStates.NONEXISTENT,
        )
    
        # setting the dependencies
        create_gpu_nodegroup >> check_nodegroup_status >> run_on_EKS
        run_on_EKS >> delete_gpu_nodegroup >> check_nodegroup_termination