Skip to main content

Documentation Index

Fetch the complete documentation index at: https://astronomer-preview.mintlify.app/llms.txt

Use this file to discover all available pages before exploring further.

Great Expectations (GX) is an open source Python-based data validation framework. You can test your data by expressing what you “expect” from it as simple declarative statements in JSON or YAML, then run validations using those Expectation Suites against data SQL data, Filesystem Data or a pandas DataFrame. The airflow-provider-great-expectations package provides operators for running Great Expectations validations directly in your Dags.
Version 1.0.0 of the provider introduces three specialized operators that replace the legacy GreatExpectationsOperator:
OperatorUse case
GXValidateDataFrameOperatorValidate in-memory Spark or Pandas DataFrames
GXValidateBatchOperatorValidate data not in memory using a BatchDefinition
GXValidateCheckpointOperatorMost feature-rich: supports triggering actions on validation results
Requirements: Python 3.10+, Great Expectations 1.7.0+, Apache Airflow 2.1+. Install with:
pip install airflow-provider-great-expectations
Each operator has its own import path:
from great_expectations_provider.operators.validate_dataframe import GXValidateDataFrameOperator
from great_expectations_provider.operators.validate_batch import GXValidateBatchOperator
from great_expectations_provider.operators.validate_checkpoint import GXValidateCheckpointOperator

Choosing the right operator

When deciding which operator fits your use case, consider:
  1. Where is your data? In memory as a DataFrame, or in an external data source?
  2. Do you need to trigger actions? Such as sending notifications or updating external systems based on validation results.
  3. What Data Context do you need? Ephemeral for stateless validations, or persistent to track results over time.
ScenarioRecommended operator
Data already in memory as Pandas or Spark DataFrameGXValidateDataFrameOperator
Data in a database, warehouse, or file systemGXValidateBatchOperator
Need to trigger Slack notifications, emails, or other actionsGXValidateCheckpointOperator
Want full GX Core features with ValidationDefinitionsGXValidateCheckpointOperator

GXValidateDataFrameOperator

Use this operator when your data is already in memory as a Pandas or Spark DataFrame. This is the simplest option, you only need a DataFrame and your expectations.
from great_expectations_provider.operators.validate_dataframe import GXValidateDataFrameOperator

def configure_dataframe():
    """Load data into a DataFrame using an Airflow hook."""
    from airflow.providers.common.sql.hooks.sql import DbApiHook

    hook = DbApiHook.get_hook("my_db_conn")
    return hook.get_pandas_df("SELECT * FROM daily_planet_report")


def configure_expectations(dataframe):
    """Define expectations for the DataFrame."""
    import great_expectations as gx

    suite = gx.ExpectationSuite(name="daily_report_suite")
    suite.add_expectation(
        gx.expectations.ExpectColumnValuesToNotBeNull(column="planet_name")
    )
    suite.add_expectation(
        gx.expectations.ExpectColumnValuesToNotBeNull(column="total_passengers")
    )
    suite.add_expectation(
        gx.expectations.ExpectColumnValuesToBeBetween(
            column="total_net_fare_usd", min_value=0
        )
    )
    suite.add_expectation(
        gx.expectations.ExpectColumnValuesToBeBetween(
            column="total_discounts_usd", min_value=0
        )
    )
    return suite


_validate_dataframe = GXValidateDataFrameOperator(
    task_id="validate_with_gx_dataframe",
    configure_dataframe=configure_dataframe,
    configure_expectations=configure_expectations,
    result_format="SUMMARY",
    context_type="ephemeral",
)
configure_expectations signatureThe configure_expectations callable receives the DataFrame as its first argument and must return an Expectation or ExpectationSuite.

GXValidateBatchOperator

Use this operator when your data is not in memory. You configure GX to connect directly to your data source by defining a BatchDefinition. This approach works with databases, warehouses, and file systems.
The provider includes helper functions to build connection strings from Airflow connections, so you do not need to duplicate credentials.
from great_expectations_provider.operators.validate_batch import GXValidateBatchOperator
from great_expectations_provider.common.external_connections import build_snowflake_connection_string

SNOWFLAKE_CONN_ID = "snowflake_default"


def configure_batch_definition(context):
    """Configure a batch definition for a SQL table."""
    connection_string = build_snowflake_connection_string(
        conn_id=SNOWFLAKE_CONN_ID,
        schema="my_schema"
    )

    data_source = context.data_sources.add_sql(
        name="snowflake_ds",
        connection_string=connection_string,
    )

    table_asset = data_source.add_table_asset(
        name="daily_report_table",
        table_name="daily_planet_report",
    )

    return table_asset.add_batch_definition_whole_table(name="full_table")


def configure_expectations(context):
    """Define expectations and add them to the context."""
    import great_expectations.expectations as gxe
    from great_expectations import ExpectationSuite

    return context.suites.add_or_update(
        ExpectationSuite(
            name="daily_report_batch_suite",
            expectations=[
                gxe.ExpectColumnValuesToNotBeNull(column="planet_name"),
                gxe.ExpectColumnValuesToNotBeNull(column="total_passengers"),
                gxe.ExpectColumnValuesToBeBetween(
                    column="total_net_fare_usd", min_value=0
                ),
                gxe.ExpectColumnValuesToBeBetween(
                    column="total_discounts_usd", min_value=0
                ),
            ],
        )
    )


_validate_batch = GXValidateBatchOperator(
    task_id="validate_with_gx_batch",
    configure_batch_definition=configure_batch_definition,
    configure_expectations=configure_expectations,
    result_format="SUMMARY",
    context_type="ephemeral",
)
For file-based data (CSV, Parquet), use add_pandas_filesystem instead:
def configure_batch_definition(context):
    """Configure a batch definition to read from a CSV file."""
    from pathlib import Path

    data_source = context.data_sources.add_pandas_filesystem(
        name="my_datasource",
        base_directory=Path("/path/to/data"),
    )
    csv_asset = data_source.add_csv_asset(name="daily_report_csv")
    return csv_asset.add_batch_definition_path(
        name="daily_report_batch",
        path="daily_report.csv",
    )
configure_expectations signatureFor GXValidateBatchOperator, the configure_expectations callable receives the GX context as its first argument (not the DataFrame). Use context.suites.add_or_update() to register your suite.

GXValidateCheckpointOperator

Use this operator when you need the full power of GX Core, including the ability to trigger actions based on validation results. This requires the most configuration. You define a Checkpoint with a BatchDefinition, ExpectationSuite, and ValidationDefinition.
from great_expectations_provider.operators.validate_checkpoint import GXValidateCheckpointOperator
from great_expectations_provider.common.external_connections import build_snowflake_connection_string

SNOWFLAKE_CONN_ID = "snowflake_default"


def configure_checkpoint(context):
    """Configure a full GX checkpoint with validation and actions."""
    import great_expectations.expectations as gxe
    from great_expectations import Checkpoint, ExpectationSuite, ValidationDefinition

    connection_string = build_snowflake_connection_string(
        conn_id=SNOWFLAKE_CONN_ID,
        schema="my_schema"
    )

    batch_definition = (
        context.data_sources.add_sql(
            name="snowflake_ds",
            connection_string=connection_string,
        )
        .add_table_asset(
            name="daily_report_table",
            table_name="daily_planet_report",
        )
        .add_batch_definition_whole_table(name="full_table")
    )

    expectation_suite = context.suites.add(
        ExpectationSuite(
            name="daily_report_checkpoint_suite",
            expectations=[
                gxe.ExpectColumnValuesToNotBeNull(column="planet_name"),
                gxe.ExpectColumnValuesToNotBeNull(column="total_passengers"),
                gxe.ExpectColumnValuesToBeBetween(
                    column="total_net_fare_usd", min_value=0
                ),
                gxe.ExpectColumnValuesToBeBetween(
                    column="total_discounts_usd", min_value=0
                ),
            ],
        )
    )

    validation_definition = context.validation_definitions.add(
        ValidationDefinition(
            name="daily_report_validation",
            data=batch_definition,
            suite=expectation_suite,
        )
    )

    return context.checkpoints.add(
        Checkpoint(
            name="daily_report_checkpoint",
            validation_definitions=[validation_definition],
            actions=[],  # Add SlackNotificationAction, EmailAction, etc.
        )
    )


_validate_checkpoint = GXValidateCheckpointOperator(
    task_id="validate_with_gx_checkpoint",
    configure_checkpoint=configure_checkpoint,
    context_type="ephemeral",
)