Why Soda Core Over Ad-Hoc SQL Assertions
Every data team has a variation of the same script: a collection of SELECT COUNT(*) WHERE ... queries, threshold comparisons, and Python conditionals that evolved organically over years. These scripts are scattered across notebooks, Airflow DAGs, and CI jobs; they produce different output formats; and they duplicate logic across environments. When a check fails, the error is a generic Python exception rather than a structured incident.
Soda Core is an open-source Python library that replaces this pattern with a declarative YAML-based language called SodaCL (Soda Checks Language). You define what "good data" looks like in plain, readable YAML. Soda executes the checks against your data source and returns structured scan results — pass, fail, warning — with precise metrics attached to each outcome. The scan results can be sent to Soda Cloud, written to files, or consumed by CI/CD gates and alerting pipelines. The data quality observability guide covers the complementary monitoring layer — Monte Carlo and freshness SLAs — that catches anomalies in data that already passed static checks — together, rule-based checks and ML-based anomaly detection provide defense in depth.
SodaCL Checks
Declarative YAML syntax for row count, missing values, invalid values, duplicates, distribution, freshness, and custom SQL metrics. Checks compile to optimized SQL executed directly against your data warehouse.
Data Contracts
Schema enforcement at pipeline boundaries — a YAML contract declares expected columns, types, and check thresholds that both producers and consumers sign off on, with automated verification on every deploy.
CI/CD Integration
Scan results produce non-zero exit codes on failure, enabling GitHub Actions, Airflow, and dbt CI pipelines to gate on data quality before promoting transformations to production.
Installation and Data Source Configuration
Soda Core is distributed as a base package plus data-source-specific connectors. Install only the connector for your target warehouse — each connector bundles the correct driver without requiring you to manage adapter compatibility manually.
# Install Soda Core with the connector for your warehouse
pip install soda-core # base library only
pip install soda-core-bigquery # BigQuery
pip install soda-core-snowflake # Snowflake
pip install soda-core-postgres # PostgreSQL
pip install soda-core-spark-df # Apache Spark DataFrames
pip install soda-core-dask # Dask DataFrames
pip install soda-core-trino # Trino / Starburst
pip install soda-core-redshift # Amazon Redshift
pip install soda-core-sqlserver # SQL Server
# Optional: dbt manifest integration for auto-generated checks
pip install soda-core-dbtEach data source requires a YAML configuration file (configuration.yml) with connection parameters. Credentials are resolved from environment variables — never hardcode them in configuration files.
# configuration.yml — data source connection profiles
data_sources:
production_bq:
type: bigquery
account_info_json_path: /tmp/sa.json # path to GCP service account JSON
auth_scopes:
- https://www.googleapis.com/auth/bigquery.readonly
project_id: my-gcp-project
dataset: analytics_production
staging_pg:
type: postgres
host: db.internal
port: 5432
username: ${POSTGRES_USER}
password: ${POSTGRES_PASSWORD}
database: staging
schema: public
analytics_sf:
type: snowflake
username: ${SNOWFLAKE_USER}
password: ${SNOWFLAKE_PASSWORD}
account: myaccount.us-east-1
warehouse: compute_wh
database: ANALYTICS
schema: PRODUCTION
role: SODA_ROLENote
account_info_json_path to the mount path. The service account needs bigquery.jobs.create and bigquery.tables.getData on the target dataset — avoid granting bigquery.dataViewer project-wide if the service account only needs to scan specific tables.SodaCL Check Syntax — Rows, Missing, Invalid, and Freshness
SodaCL check files are YAML documents that declare checks scoped to a dataset. Each check has a type, optional configuration, and a threshold expression using fail, warn, or both. Checks that violate fail thresholds set the scan exit code to non-zero. warn-only violations leave the scan status as a warning — useful for progressive rollout of stricter thresholds without breaking the pipeline immediately.
# checks/orders.yml — SodaCL checks for the fct_orders table
checks for fct_orders:
# Row count — fail if no rows loaded today
- row_count > 0
# Missing values — no NULLs allowed in required columns
- missing_count(order_id) = 0
- missing_count(customer_id) = 0
- missing_percent(shipping_address) < 1%:
fail: when > 1%
warn: when between 0.1% and 1%
# Invalid values — revenue must be non-negative
- invalid_count(revenue_usd) = 0:
valid min: 0
# Enum validation — order_status must match known values
- invalid_count(order_status) = 0:
valid values:
- pending
- processing
- shipped
- delivered
- cancelled
- refunded
# Duplicate primary key check
- duplicate_count(order_id) = 0
# Freshness — table must have been updated within the last 26 hours
- freshness(created_at) < 26h
# Schema check — columns must exist with correct types
- schema:
fail:
when required column missing: [order_id, customer_id, revenue_usd, created_at]
when wrong column type:
order_id: character varying
revenue_usd: numeric# checks/orders_partitioned.yml — partition-scoped checks
checks for fct_orders [today]:
filter: created_at >= CURRENT_DATE
# Partition must have at least 5000 rows by end of day
- row_count:
fail: when < 5000
warn: when < 10000
# Null rate stays below threshold for today's partition
- missing_percent(email) < 2%
- missing_percent(product_id) < 0.1%
# Aggregate sanity: total revenue for today must be positive
- min(revenue_usd) >= 0
- max(revenue_usd) < 1000000:
name: Revenue outlier guard — single order above $1M is suspiciousNote
[today] partition filter in SodaCL is a named filter that scopes all checks in the block to a WHERE clause. This is essential for large tables where a row_count check on the full table would take minutes. Always define partition filters for date-partitioned tables to scope checks to the most recent partition — full-table scans at daily frequency are expensive and slow even if technically correct. The filter value is evaluated at scan execution time, not at file write time.Custom SQL Metrics and Multi-Table Referential Integrity
SodaCL built-in checks cover the most common patterns, but business logic often requires custom SQL. Two mechanisms address this: user-defined metrics that define a custom SQL expression as a named metric, and failed rows checks that capture the specific rows that fail a condition for direct inspection. Data pipeline testing covers Great Expectations as an alternative framework for defining expectation suites with a Python API — useful when your team prefers code-first check authoring over YAML — the two frameworks have different ergonomics but overlapping capabilities for most check types.
# Custom SQL metric: checks using arbitrary aggregation expressions
checks for fct_orders:
- avg_revenue > 10:
avg_revenue metric expression: AVG(revenue_usd)
# Failed rows check: captures rows where referential integrity fails
- failed rows:
name: Orders with no matching customer
fail condition: customer_id NOT IN (SELECT id FROM dim_customers)
fail: when failed rows > 0
# Multi-table referential integrity via SQL metric query
- orphan_orders = 0:
orphan_orders metric query: |
SELECT COUNT(*)
FROM fct_orders o
LEFT JOIN dim_customers c ON o.customer_id = c.id
WHERE c.id IS NULL
# Distribution check: P95 order value must be below threshold
- percentile_95_revenue < 50000:
percentile_95_revenue metric query: |
SELECT PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY revenue_usd)
FROM fct_orders
WHERE created_at >= CURRENT_DATE - INTERVAL '7 days'
# Partition completeness: at least 40 US states in today's orders
- distinct_states >= 40:
distinct_states metric expression: COUNT(DISTINCT shipping_state)
filter: created_at::date = CURRENT_DATE# Running a scan programmatically via the Python API
from soda.scan import Scan
scan = Scan()
scan.set_data_source_name("production_bq")
scan.add_configuration_yaml_file(file_path="configuration.yml")
scan.add_sodacl_yaml_file(file_path="checks/orders.yml")
scan.add_sodacl_yaml_file(file_path="checks/orders_partitioned.yml")
exit_code = scan.execute()
# Access structured results
for check_result in scan.get_checks():
print(f"{check_result.check.name}: {check_result.outcome}")
if check_result.outcome.name == "fail":
print(f" Metric: {check_result.metrics}")
# Exit with non-zero on failure for CI gates
import sys
sys.exit(exit_code)Data Contracts — Schema Enforcement at Pipeline Boundaries
A data contract is a formal agreement between a data producer and its consumers specifying the schema, data types, and quality thresholds that the producer guarantees. Soda Core's contract verification reads a YAML contract file, scans the actual table, and fails the pipeline if the table does not match the declared specification. Contracts are versioned in Git alongside the pipeline code — when a producer changes a column type, the contract file must be updated, reviewed, and approved by consumers before the change can be deployed.
# contracts/fct_orders_v2.yml — Data contract for fct_orders
dataset: fct_orders
columns:
- name: order_id
data_type: character varying
not_null: true
unique: true
- name: customer_id
data_type: bigint
not_null: true
- name: order_status
data_type: character varying
not_null: true
valid_values: [pending, processing, shipped, delivered, cancelled, refunded]
- name: revenue_usd
data_type: numeric
not_null: true
checks:
- name: Revenue is non-negative
fail: when invalid_count(revenue_usd) > 0
valid min: 0
- name: created_at
data_type: timestamp without time zone
not_null: true
- name: updated_at
data_type: timestamp without time zone
not_null: true
checks:
- row_count > 0
- duplicate_count(order_id) = 0
- freshness(created_at) < 26h# Verifying a contract in Python
from soda.contracts.contract_verification import (
ContractVerification,
ContractVerificationResult,
)
result: ContractVerificationResult = (
ContractVerification.builder()
.with_contract_yaml_file("contracts/fct_orders_v2.yml")
.with_data_source_yaml_file("configuration.yml")
.execute()
)
if result.failed():
print("Contract verification FAILED:")
for failure in result.contract_failures:
print(f" {failure.check}: {failure.outcome}")
raise SystemExit(1)
print("Contract verification PASSED")Note
CI/CD Integration — GitHub Actions, dbt, and Airflow
Soda Core integrates into CI/CD pipelines through its command-line interface (soda scan) and Python API. The CLI returns exit code 0 on pass or warn, and exit code 1 on any failed check — making it natively compatible with shell-based CI gates.
# GitHub Actions: run Soda Core checks after dbt production run
name: Data Quality Gate
on:
workflow_run:
workflows: ["dbt Production"]
types: [completed]
jobs:
soda-scan:
runs-on: ubuntu-latest
if: ${{ github.event.workflow_run.conclusion == 'success' }}
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.12"
- name: Install Soda Core
run: pip install soda-core-bigquery
- name: Write GCP credentials
env:
GCP_SA_JSON: ${{ secrets.GCP_SODA_SA_JSON }}
run: echo "$GCP_SA_JSON" > /tmp/soda_sa.json
- name: Run Soda scans
env:
SODA_API_KEY_ID: ${{ secrets.SODA_API_KEY_ID }}
SODA_API_KEY_SECRET: ${{ secrets.SODA_API_KEY_SECRET }}
run: |
soda scan \
--data-source production_bq \
--configuration configuration.yml \
checks/orders.yml \
checks/customers.yml \
checks/revenue.yml
- name: Clean up credentials
if: always()
run: rm -f /tmp/soda_sa.json# Apache Airflow: Soda scan as a pipeline task
from datetime import datetime
from airflow.decorators import dag, task
from airflow.operators.bash import BashOperator
@dag(
schedule="0 6 * * *",
start_date=datetime(2026, 1, 1),
catchup=False,
)
def daily_orders_pipeline():
@task
def extract_and_load():
# ... Fivetran / custom EL logic
pass
dbt_run = BashOperator(
task_id="dbt_run",
bash_command=(
"dbt run --target prod --select tag:daily_orders "
"&& dbt test --target prod --select tag:daily_orders"
),
)
# Soda quality gate — fails the DAG if checks fail
soda_scan = BashOperator(
task_id="soda_quality_gate",
bash_command=(
"soda scan "
"--data-source analytics_sf "
"--configuration /opt/soda/configuration.yml "
"/opt/soda/checks/orders.yml"
),
env={
"SNOWFLAKE_USER": "{{ var.value.snowflake_user }}",
"SNOWFLAKE_PASSWORD": "{{ var.value.snowflake_password }}",
},
)
@task
def export_to_salesforce():
pass
@task
def refresh_dashboard_cache():
pass
load = extract_and_load()
load >> dbt_run >> soda_scan >> [export_to_salesforce(), refresh_dashboard_cache()]
daily_orders_pipeline()dbt Integration — Complementing dbt Tests with SodaCL
dbt's built-in tests cover the most common quality patterns — not_null, unique, accepted_values, relationships — but they run synchronously inside dbt test. Soda Core runs as a separate post-dbt scan, adding check types unavailable in dbt: freshness windows, percentile metrics, cross-table referential integrity, and custom SQL aggregates. The soda-core-dbt package parses the dbt manifest.json to auto-generate SodaCL checks from your existing schema YAML definitions. The dbt testing guide covers unit tests, schema tests, and singular tests in depth — including how dbt's native test framework compares to Soda Core's checks for row-level validation — the two layers are complementary: use dbt tests for compile-time column definitions, and Soda for post-load business logic.
# Generate SodaCL checks from the dbt manifest
# pip install soda-core-dbt
soda generate-checks \
--dbt-manifest target/manifest.json \
--output-dir checks/generated/
# The generated file for a model with not_null and unique dbt tests:
# checks/generated/fct_orders.yml
#
# checks for fct_orders:
# - missing_count(order_id) = 0 # from not_null test
# - duplicate_count(order_id) = 0 # from unique test
# - missing_count(customer_id) = 0
# - missing_count(revenue_usd) = 0
# - invalid_count(order_status) = 0: # from accepted_values test
# valid values: [pending, processing, shipped, delivered, cancelled]# checks/fct_orders_extended.yml — augment generated checks with business logic
checks for fct_orders:
# Business logic checks — not expressible in dbt's schema YAML
# Freshness: table must update within 26 hours
- freshness(created_at) < 26h
# Volume: warn if fewer than 10k rows, fail below 1k
- row_count > 1000:
warn: when < 10000
fail: when < 1000
# Revenue plausibility: daily total between $10K and $10M
- daily_revenue between 10000 and 10000000:
daily_revenue metric query: |
SELECT COALESCE(SUM(revenue_usd), 0)
FROM fct_orders
WHERE created_at::date = CURRENT_DATE
# Run both files together:
# soda scan \
# --data-source analytics_sf \
# checks/generated/fct_orders.yml \
# checks/fct_orders_extended.ymlSoda Cloud — Centralized Results, Alerting, and Incident Tracking
Soda Core is fully functional as a standalone open-source tool — scans run locally and results are printed to stdout or written to files. Soda Cloud is the optional SaaS layer that stores scan results historically, surfaces trends (is missing_percent for this column increasing?), sends Slack or email notifications on failures, and provides a shared dashboard for the data quality program. Cloud connectivity requires API keys configured either in configuration.yml or via environment variables.
# configuration.yml — add Soda Cloud connection
soda_cloud:
host: cloud.soda.io
api_key_id: ${SODA_API_KEY_ID}
api_key_secret: ${SODA_API_KEY_SECRET}
# Running a scan — results are automatically pushed to Soda Cloud
soda scan \
--data-source production_bq \
--configuration configuration.yml \
checks/orders.yml
# Alternative: write structured JSON results locally (no SaaS dependency)
soda scan \
--data-source production_bq \
--configuration configuration.yml \
checks/orders.yml \
--json-output scan-results.json# Python: programmatic scan with Soda Cloud reporting
from soda.scan import Scan
scan = Scan()
scan.set_data_source_name("production_bq")
scan.add_configuration_yaml_file("configuration.yml") # includes soda_cloud block
scan.add_sodacl_yaml_file("checks/orders.yml")
# Attach scan metadata visible in the Soda Cloud UI
scan.set_scan_definition_name("fct_orders_daily")
exit_code = scan.execute()
# scan results are automatically uploaded to Soda Cloud on execute()
# Access structured results locally too
for check_result in scan.get_checks():
status = check_result.outcome.name # "pass", "warn", "fail"
print(f"[{status.upper()}] {check_result.check.name}")
import sys
sys.exit(exit_code)Note
--json-output results.json to every soda scan invocation in CI, then upload the file to S3 as a build artifact. This creates an audit log of check outcomes per pipeline run without any SaaS dependency. Include the Git commit hash and dbt run ID in the artifact path — s3://scans/2026/07/03/abc1234/fct_orders.json — to link quality results to the exact code version and data run that produced them.Production Checklist
Scope checks to relevant partitions for large tables. Running row_count or missing_count on the entire table for a billion-row fact table can take minutes and consume expensive warehouse slots. Define named filters in SodaCL — [today], [last_7_days] — that scope checks to the most recently loaded partition. Reserve full-table scans for low-frequency checks like schema validation and duplicate detection on dimension tables that are fully refreshed each run.
Separate warn and fail thresholds for gradual threshold tightening. When introducing a new check against an existing table, start with a warn threshold that triggers notification but does not fail the pipeline. Observe the baseline metric distribution over two weeks. Then set fail thresholds at the P1 level of historical values — values that only occur during a genuine data incident. This avoids alert fatigue from checks that fail on normal data variability before the team has calibrated expectations.
Version contract files alongside the producer pipeline code, not separately. A data contract is part of the pipeline contract — it defines what the pipeline guarantees. When a migration changes a column type, the contract file should be updated in the same pull request as the migration code, reviewed by downstream consumer team leads, and merged only after they have acknowledged the change. Keeping contracts in a separate repository breaks this coupling and makes it easy to merge the migration without updating the contract.
Run Soda checks as a post-dbt, pre-downstream-consumer task in Airflow. The canonical task order is: EL job → dbt run → dbt test → Soda scan → downstream consumers. dbt test catches compile-time column definition violations (not_null, unique, accepted_values). Soda scan catches post-load business logic violations (freshness, volume, referential integrity, custom aggregates). Downstream consumers — Salesforce sync, dashboard refresh, ML feature computation — run only if both dbt test and Soda scan pass.
Store soda scan results as JSON artifacts in CI for audit trails. Pass --json-output scan-results.json to every soda scan invocation, then upload the file to S3 or GCS as a build artifact. This creates an audit log of check outcomes per pipeline run without requiring Soda Cloud. Include the Git commit hash and dbt run ID in the artifact path to link quality scan results to the exact code version and transformation run that produced the data.
Use failed rows checks sparingly — they materialize row samples. When a failed rows check triggers, Soda Core executes an additional query to fetch a sample of failing rows for inspection. This is valuable for debugging but can be expensive on large tables. Reserve failed rows checks for the most critical quality assertions — primary key uniqueness, referential integrity, revenue non-negativity — and use metric-based checks (invalid_count, missing_percent) for cheaper, higher-frequency monitoring.
Configure check names explicitly for clarity in incident triage. SodaCL auto-generates check names from the check expression, but auto-generated names for custom SQL metrics are cryptic. Add a name: field to every check that might trigger an incident. The name appears in Slack notifications, Soda Cloud incident cards, and CI logs. A descriptive name like Revenue non-negative — invalid values found is triaged in seconds; a bare expression string requires opening the checks file to understand.
Gate dbt model promotion with Soda contract verification in addition to dbt test. dbt test validates that a model conforms to its own declared schema. Soda contract verification validates that the model conforms to the schema agreed with downstream consumers — which may be stricter. Run soda contract verify as the final CI step before deploying a dbt model to production. Contract failures block the deploy and require the team to either update the contract or fix the model.
Parameterize data source credentials exclusively through environment variables. Never commit passwords, API keys, or service account JSON content to configuration files. Use dollar-sign brace variable interpolation in configuration.yml for all sensitive values. In CI, inject secrets from the CI secrets store. In Kubernetes, mount Secret volumes and reference the mount path in the configuration. For local development, use a .env file loaded by python-dotenv before running scans — and ensure .env is in .gitignore.
Monitor scan execution time and warehouse slot consumption as part of pipeline SLAs. A Soda scan that takes 20 minutes on a large table is no longer a lightweight quality gate — it has become a bottleneck. Track scan duration per dataset per run in your pipeline monitoring. If a scan exceeds a threshold (e.g., 5 minutes for a daily check), investigate whether the scanned partition is too large, whether filter pushdown is working correctly, or whether the check SQL can be rewritten to use aggregates rather than full scans.
Running ad-hoc SQL quality scripts scattered across Airflow DAGs and notebooks with no standard output format, discovering data issues from stakeholder complaints hours after pipelines complete, or unable to enforce schema agreements between producer and consumer teams because there is no shared contract definition?
We design and implement Soda Core data quality programs — from SodaCL check file design for your critical tables with row_count, missing, invalid, duplicate, freshness, and custom SQL metric checks, through partition filter configuration for large fact tables to scope scans to recent data, warn-and-fail threshold calibration based on historical metric distributions, data contract authoring with column type enforcement and quality thresholds for producer-consumer agreements, contract verification integration as the first task of downstream Airflow DAGs, GitHub Actions quality gate pipelines running after dbt production builds, soda-core-dbt manifest integration to auto-generate checks from existing schema definitions, JSON artifact audit trail storage in S3 or GCS for teams without Soda Cloud, Soda Cloud setup with Slack and PagerDuty alerting and incident tracking for teams that want the SaaS observability layer, and check naming conventions and runbook integration for fast incident triage when scans fail in production. Let’s talk.
Let's Talk