Use this file to discover all available pages before exploring further.
Great Expectations (GX) is an open source Python-based data validation framework. You can test your data by expressing what you “expect” from it as simple declarative statements in JSON or YAML, then run validations using those Expectation Suites against data SQL data, Filesystem Data or a pandas DataFrame. The airflow-provider-great-expectations package provides operators for running Great Expectations validations directly in your Dags.
Version 1.0.0 of the provider introduces three specialized operators that replace the legacy GreatExpectationsOperator:
Operator
Use case
GXValidateDataFrameOperator
Validate in-memory Spark or Pandas DataFrames
GXValidateBatchOperator
Validate data not in memory using a BatchDefinition
GXValidateCheckpointOperator
Most feature-rich: supports triggering actions on validation results
Requirements: Python 3.10+, Great Expectations 1.7.0+, Apache Airflow 2.1+.Install with:
pip install airflow-provider-great-expectations
Each operator has its own import path:
from great_expectations_provider.operators.validate_dataframe import GXValidateDataFrameOperatorfrom great_expectations_provider.operators.validate_batch import GXValidateBatchOperatorfrom great_expectations_provider.operators.validate_checkpoint import GXValidateCheckpointOperator
Use this operator when your data is already in memory as a Pandas or Spark DataFrame. This is the simplest option, you only need a DataFrame and your expectations.
GXValidateDataFrameOperator example
from great_expectations_provider.operators.validate_dataframe import GXValidateDataFrameOperatordef configure_dataframe(): """Load data into a DataFrame using an Airflow hook.""" from airflow.providers.common.sql.hooks.sql import DbApiHook hook = DbApiHook.get_hook("my_db_conn") return hook.get_pandas_df("SELECT * FROM daily_planet_report")def configure_expectations(dataframe): """Define expectations for the DataFrame.""" import great_expectations as gx suite = gx.ExpectationSuite(name="daily_report_suite") suite.add_expectation( gx.expectations.ExpectColumnValuesToNotBeNull(column="planet_name") ) suite.add_expectation( gx.expectations.ExpectColumnValuesToNotBeNull(column="total_passengers") ) suite.add_expectation( gx.expectations.ExpectColumnValuesToBeBetween( column="total_net_fare_usd", min_value=0 ) ) suite.add_expectation( gx.expectations.ExpectColumnValuesToBeBetween( column="total_discounts_usd", min_value=0 ) ) return suite_validate_dataframe = GXValidateDataFrameOperator( task_id="validate_with_gx_dataframe", configure_dataframe=configure_dataframe, configure_expectations=configure_expectations, result_format="SUMMARY", context_type="ephemeral",)
configure_expectations signatureThe configure_expectations callable receives the DataFrame as its first argument and must return an Expectation or ExpectationSuite.
Use this operator when your data is not in memory. You configure GX to connect directly to your data source by defining a BatchDefinition. This approach works with databases, warehouses, and file systems.
The provider includes helper functions to build connection strings from Airflow connections, so you do not need to duplicate credentials.
GXValidateBatchOperator example (SQL)
from great_expectations_provider.operators.validate_batch import GXValidateBatchOperatorfrom great_expectations_provider.common.external_connections import build_snowflake_connection_stringSNOWFLAKE_CONN_ID = "snowflake_default"def configure_batch_definition(context): """Configure a batch definition for a SQL table.""" connection_string = build_snowflake_connection_string( conn_id=SNOWFLAKE_CONN_ID, schema="my_schema" ) data_source = context.data_sources.add_sql( name="snowflake_ds", connection_string=connection_string, ) table_asset = data_source.add_table_asset( name="daily_report_table", table_name="daily_planet_report", ) return table_asset.add_batch_definition_whole_table(name="full_table")def configure_expectations(context): """Define expectations and add them to the context.""" import great_expectations.expectations as gxe from great_expectations import ExpectationSuite return context.suites.add_or_update( ExpectationSuite( name="daily_report_batch_suite", expectations=[ gxe.ExpectColumnValuesToNotBeNull(column="planet_name"), gxe.ExpectColumnValuesToNotBeNull(column="total_passengers"), gxe.ExpectColumnValuesToBeBetween( column="total_net_fare_usd", min_value=0 ), gxe.ExpectColumnValuesToBeBetween( column="total_discounts_usd", min_value=0 ), ], ) )_validate_batch = GXValidateBatchOperator( task_id="validate_with_gx_batch", configure_batch_definition=configure_batch_definition, configure_expectations=configure_expectations, result_format="SUMMARY", context_type="ephemeral",)
GXValidateBatchOperator example (file-based)
For file-based data (CSV, Parquet), use add_pandas_filesystem instead:
def configure_batch_definition(context): """Configure a batch definition to read from a CSV file.""" from pathlib import Path data_source = context.data_sources.add_pandas_filesystem( name="my_datasource", base_directory=Path("/path/to/data"), ) csv_asset = data_source.add_csv_asset(name="daily_report_csv") return csv_asset.add_batch_definition_path( name="daily_report_batch", path="daily_report.csv", )
configure_expectations signatureFor GXValidateBatchOperator, the configure_expectations callable receives the GX context as its first argument (not the DataFrame). Use context.suites.add_or_update() to register your suite.
Use this operator when you need the full power of GX Core, including the ability to trigger actions based on validation results. This requires the most configuration. You define a Checkpoint with a BatchDefinition, ExpectationSuite, and ValidationDefinition.