Data quality is the foundation of trustworthy analytics. When your event data is unreliable, every dashboard, report, and decision built on that data is suspect. Yet data quality issues are surprisingly common—according to Gartner research, poor data quality costs organizations an average of $12.9 million per year, while IBM estimates that in the US alone, businesses lose $3.1 trillion annually due to poor data quality. Beyond financial impact, employees spend up to 27% of their time correcting bad data, slowing decision-making and increasing operational costs.
This guide covers the dimensions of data quality, common issues in event streams, how to build monitoring and observability systems, testing strategies, and incident response procedures.
Dimensions of Data Quality
Data quality isn't a single metric—it's multi-dimensional. The widely accepted framework includes six core dimensions. Understanding these dimensions helps you measure and improve quality systematically:
Accuracy
Does the data correctly represent reality?
- Definition: Events reflect what actually happened in the real world
- Example issues: Wrong values, incorrect calculations, misattributed events, typos in data entry
- Measurement: Compare against known sources of truth, use primary research or third-party reference data validation
-- Accuracy check example
SELECT
source_system,
COUNT(*) as event_count,
COUNT(DISTINCT user_id) as unique_users
FROM events
WHERE event_date = CURRENT_DATE
GROUP BY source_system;
-- Compare against source system's own counts
-- Cross-validate with external reference data where available
Completeness
Is all expected data present?
- Definition: No missing events, properties, records, or time periods; all required fields are populated
- Example issues: Missing required properties, dropped events, gaps in time series, null values in mandatory fields
- Measurement: Check for nulls, missing periods, expected volumes, and required field population rates
-- Completeness check example
SELECT
event_name,
COUNT(*) as total_events,
COUNT(user_id) as events_with_user_id,
COUNT(session_id) as events_with_session_id,
ROUND(100.0 * COUNT(user_id) / COUNT(*), 2) as user_id_completeness,
ROUND(100.0 * COUNT(session_id) / COUNT(*), 2) as session_id_completeness
FROM events
GROUP BY event_name
ORDER BY user_id_completeness ASC;
Timeliness
Is data available when needed?
- Definition: Events arrive within expected time windows and reflect the most current situation
- Example issues: Late-arriving data, stale dashboards, processing delays, data not ready for scheduled reports
- Measurement: Track event_time vs. received_time latency, monitor SLA compliance
-- Timeliness check example
SELECT
DATE_TRUNC('hour', received_at) as hour,
AVG(EXTRACT(EPOCH FROM (received_at - event_timestamp))) as avg_latency_seconds,
MAX(EXTRACT(EPOCH FROM (received_at - event_timestamp))) as max_latency_seconds,
PERCENTILE_CONT(0.95) WITHIN GROUP (
ORDER BY EXTRACT(EPOCH FROM (received_at - event_timestamp))
) as p95_latency_seconds,
PERCENTILE_CONT(0.99) WITHIN GROUP (
ORDER BY EXTRACT(EPOCH FROM (received_at - event_timestamp))
) as p99_latency_seconds
FROM events
WHERE received_at > CURRENT_TIMESTAMP - INTERVAL '24 hours'
GROUP BY DATE_TRUNC('hour', received_at)
ORDER BY hour;
Consistency
Is data coherent across the system?
- Definition: Same entity has same values everywhere; data is uniform across all instances and systems
- Example issues: User exists in one table with different ID in another; inconsistent naming conventions; conflicting values across CRM and ERP systems
- Measurement: Cross-reference validation, schema enforcement, format consistency checks
-- Consistency check example
-- Check for user_id inconsistencies between events and users table
SELECT
e.user_id as event_user_id,
u.user_id as users_table_user_id,
CASE WHEN u.user_id IS NULL THEN 'Missing from users' ELSE 'OK' END as status
FROM (SELECT DISTINCT user_id FROM events WHERE user_id IS NOT NULL) e
LEFT JOIN users u ON e.user_id = u.user_id
WHERE u.user_id IS NULL
LIMIT 100;
-- Check for format consistency
SELECT
event_name,
properties->>'date_format' as date_value,
CASE
WHEN properties->>'date_format' ~ '^\d{4}-\d{2}-\d{2}$' THEN 'ISO format'
WHEN properties->>'date_format' ~ '^\d{2}/\d{2}/\d{4}$' THEN 'US format'
ELSE 'Other format'
END as format_type
FROM events
WHERE event_name = 'Form Submitted';
Validity
Does data conform to expected formats and constraints?
- Definition: Values are within expected ranges, formats, and conform to business rules
- Example issues: Negative prices, future timestamps, invalid enums, malformed email addresses, ZIP codes with wrong character counts
- Measurement: Constraint validation, range checks, regex pattern matching, business rule validation
-- Validity check example
SELECT
event_name,
COUNT(*) as invalid_events,
STRING_AGG(DISTINCT
CASE
WHEN event_timestamp > CURRENT_TIMESTAMP + INTERVAL '1 hour' THEN 'future_timestamp'
WHEN event_name = 'Purchase Completed' AND CAST(properties->>'price' AS DECIMAL) < 0 THEN 'negative_price'
WHEN properties->>'platform' NOT IN ('web', 'ios', 'android', 'unknown') THEN 'invalid_platform'
WHEN properties->>'email' !~ '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$' THEN 'invalid_email'
END, ', '
) as violation_types
FROM events
WHERE
event_timestamp > CURRENT_TIMESTAMP + INTERVAL '1 hour'
OR (event_name = 'Purchase Completed' AND CAST(properties->>'price' AS DECIMAL) < 0)
OR properties->>'platform' NOT IN ('web', 'ios', 'android', 'unknown')
GROUP BY event_name;
Uniqueness
Is the data free from unwanted duplicates?
- Definition: Each record represents a single, unique entity with no unintended duplicates
- Example issues: Duplicate customer records (e.g., "Fred Smith" and "Freddy Smith" as separate entries), same event recorded multiple times, records duplicated after migration
- Measurement: Primary key validation, duplicate detection queries, fuzzy matching for near-duplicates
-- Uniqueness check example
-- Find exact duplicates by event_id
SELECT
event_id,
COUNT(*) as occurrence_count,
MIN(received_at) as first_received,
MAX(received_at) as last_received
FROM events
GROUP BY event_id
HAVING COUNT(*) > 1
ORDER BY occurrence_count DESC
LIMIT 100;
-- Find potential duplicate users (fuzzy matching)
SELECT
a.user_id as user_id_1,
b.user_id as user_id_2,
a.email,
a.name as name_1,
b.name as name_2
FROM users a
JOIN users b ON a.email = b.email AND a.user_id < b.user_id
LIMIT 100;
Common Issues in Event Streams
Understanding typical problems helps you build better prevention and detection:
Duplicate Events
Same event recorded multiple times.
-- Detection query
SELECT
event_id,
user_id,
event_name,
COUNT(*) as occurrence_count,
MIN(event_timestamp) as first_occurrence,
MAX(event_timestamp) as last_occurrence
FROM events
GROUP BY event_id, user_id, event_name
HAVING COUNT(*) > 1
ORDER BY occurrence_count DESC
LIMIT 100;
Common causes:
- Retry logic without idempotency keys
- Multiple SDK initializations on page load
- Async processing without deduplication
- Network timeouts causing resubmission
- Message queue redelivery without proper handling
Missing Events
Expected events not arriving.
-- Detection: Compare hourly volumes with statistical anomaly detection
WITH hourly_volumes AS (
SELECT
DATE_TRUNC('hour', event_timestamp) as hour,
event_name,
COUNT(*) as event_count
FROM events
WHERE event_timestamp > CURRENT_TIMESTAMP - INTERVAL '7 days'
GROUP BY 1, 2
),
averages AS (
SELECT
event_name,
EXTRACT(HOUR FROM hour) as hour_of_day,
EXTRACT(DOW FROM hour) as day_of_week,
AVG(event_count) as avg_count,
STDDEV(event_count) as stddev_count
FROM hourly_volumes
GROUP BY 1, 2, 3
)
SELECT
hv.hour,
hv.event_name,
hv.event_count,
ROUND(a.avg_count, 2) as expected_count,
ROUND((hv.event_count - a.avg_count) / NULLIF(a.stddev_count, 0), 2) as z_score,
CASE
WHEN hv.event_count < a.avg_count - 2 * a.stddev_count THEN 'LOW_ANOMALY'
WHEN hv.event_count > a.avg_count + 2 * a.stddev_count THEN 'HIGH_ANOMALY'
ELSE 'OK'
END as status
FROM hourly_volumes hv
JOIN averages a ON hv.event_name = a.event_name
AND EXTRACT(HOUR FROM hv.hour) = a.hour_of_day
AND EXTRACT(DOW FROM hv.hour) = a.day_of_week
WHERE hv.hour > CURRENT_TIMESTAMP - INTERVAL '24 hours'
ORDER BY hv.hour DESC;
Schema Drift
Event structure changes unexpectedly—fields added, removed, renamed, or changed type.
-- Detection: Track property presence over time
WITH daily_schema AS (
SELECT
event_name,
DATE_TRUNC('day', event_timestamp) as day,
jsonb_object_keys(properties) as property_name
FROM events
WHERE event_timestamp > CURRENT_TIMESTAMP - INTERVAL '14 days'
GROUP BY 1, 2, 3
),
property_timeline AS (
SELECT
event_name,
property_name,
MIN(day) as first_seen,
MAX(day) as last_seen,
COUNT(DISTINCT day) as days_present
FROM daily_schema
GROUP BY 1, 2
)
SELECT
event_name,
property_name,
first_seen,
last_seen,
days_present,
CASE
WHEN first_seen > CURRENT_DATE - INTERVAL '3 days' THEN 'NEW_PROPERTY'
WHEN last_seen < CURRENT_DATE - INTERVAL '3 days' THEN 'DISAPPEARED'
ELSE 'STABLE'
END as status
FROM property_timeline
WHERE first_seen > CURRENT_DATE - INTERVAL '3 days'
OR last_seen < CURRENT_DATE - INTERVAL '3 days'
ORDER BY event_name, first_seen DESC;
Data Type Mismatches
Properties sent with wrong types.
-- Detection: Find type inconsistencies
SELECT
event_name,
'price' as property_name,
properties->>'price' as value,
CASE
WHEN properties->>'price' ~ '^-?[0-9]+\.?[0-9]*$' THEN 'numeric'
WHEN properties->>'price' ~ '^\$' THEN 'string_with_currency_symbol'
WHEN properties->>'price' ~ '^[0-9,]+\.?[0-9]*$' THEN 'numeric_with_commas'
WHEN properties->>'price' IS NULL THEN 'null'
ELSE 'non_numeric_string'
END as detected_type
FROM events
WHERE event_name = 'Purchase Completed'
AND properties->>'price' IS NOT NULL
AND NOT (properties->>'price' ~ '^-?[0-9]+\.?[0-9]*$')
LIMIT 100;
Incorrect Timestamps
Events with wrong time values.
-- Detection: Find suspicious timestamps
SELECT
event_name,
event_id,
event_timestamp,
received_at,
EXTRACT(EPOCH FROM (received_at - event_timestamp)) as latency_seconds,
CASE
WHEN event_timestamp > CURRENT_TIMESTAMP + INTERVAL '1 hour' THEN 'FUTURE_EVENT'
WHEN event_timestamp < '2020-01-01' THEN 'SUSPICIOUSLY_OLD'
WHEN EXTRACT(EPOCH FROM (received_at - event_timestamp)) > 86400 THEN 'EXCESSIVE_LATENCY'
WHEN EXTRACT(EPOCH FROM (received_at - event_timestamp)) < 0 THEN 'RECEIVED_BEFORE_EVENT'
ELSE 'OK'
END as issue_type
FROM events
WHERE
event_timestamp > CURRENT_TIMESTAMP + INTERVAL '1 hour'
OR event_timestamp < '2020-01-01'
OR ABS(EXTRACT(EPOCH FROM (received_at - event_timestamp))) > 86400
LIMIT 100;
PII Leakage
Personally identifiable information appearing in unexpected places.
-- Detection: Scan for potential PII in properties
SELECT
event_name,
event_id,
CASE
WHEN properties::text ~ '[0-9]{3}-[0-9]{2}-[0-9]{4}' THEN 'POTENTIAL_SSN'
WHEN properties::text ~ '[0-9]{16}' THEN 'POTENTIAL_CREDIT_CARD'
WHEN properties::text ~* '[a-z0-9._%+-]+@[a-z0-9.-]+\.[a-z]{2,}'
AND event_name NOT IN ('User Signup', 'Login') THEN 'UNEXPECTED_EMAIL'
ELSE 'OK'
END as pii_risk
FROM events
WHERE
properties::text ~ '[0-9]{3}-[0-9]{2}-[0-9]{4}'
OR properties::text ~ '[0-9]{16}'
LIMIT 100;
Monitoring and Observability Systems
Proactive monitoring catches issues before they impact decisions. Modern data observability extends traditional monitoring by applying DevOps best practices to data pipelines.
Key Metrics to Track
Implement monitoring for these essential metrics:
- MTTD (Mean Time to Detect): Average time from when an issue occurs to when it's detected
- MTTR (Mean Time to Resolve): Average time from detection to resolution
- Data Freshness: Time since last data update
- Volume Anomalies: Deviations from expected event counts
- Schema Changes: New, modified, or removed fields
- Distribution Drift: Statistical shifts in data values
Volume Monitoring
# Alert configuration example (pseudo-code)
monitor:
name: event_volume_anomaly
query: |
SELECT
event_name,
COUNT(*) as current_hour_count,
AVG(historical_count) as expected_count,
STDDEV(historical_count) as stddev_count
FROM current_events
JOIN historical_averages USING (event_name, hour_of_day, day_of_week)
GROUP BY event_name
alert_condition: |
current_hour_count < expected_count - 2 * stddev_count
OR current_hour_count > expected_count + 2 * stddev_count
severity: warning
channels: [slack, pagerduty]
runbook_url: https://wiki.company.com/runbooks/volume-anomaly
Freshness Monitoring
-- Check data freshness across all critical tables
WITH freshness_check AS (
SELECT
'events' as table_name,
MAX(event_timestamp) as latest_record,
30 as max_age_minutes -- SLA threshold
FROM events
UNION ALL
SELECT
'page_views' as table_name,
MAX(timestamp) as latest_record,
15 as max_age_minutes
FROM page_views
UNION ALL
SELECT
'transactions' as table_name,
MAX(created_at) as latest_record,
5 as max_age_minutes
FROM transactions
)
SELECT
table_name,
latest_record,
EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - latest_record)) / 60 as minutes_since_update,
max_age_minutes as sla_threshold_minutes,
CASE
WHEN EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - latest_record)) / 60 > max_age_minutes
THEN 'SLA_BREACH'
WHEN EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - latest_record)) / 60 > max_age_minutes * 0.8
THEN 'WARNING'
ELSE 'OK'
END as status
FROM freshness_check
ORDER BY status DESC, minutes_since_update DESC;
Schema Monitoring
-- Track required properties completeness
WITH property_checks AS (
SELECT
event_name,
required_property,
COUNT(*) as total_events,
COUNT(CASE WHEN properties->>required_property IS NOT NULL THEN 1 END) as events_with_property,
COUNT(CASE WHEN properties->>required_property = '' THEN 1 END) as events_with_empty_value
FROM events
CROSS JOIN (
SELECT unnest(ARRAY['user_id', 'session_id', 'timestamp', 'platform']) as required_property
) required
WHERE event_timestamp > CURRENT_TIMESTAMP - INTERVAL '1 hour'
GROUP BY event_name, required_property
)
SELECT
event_name,
required_property,
total_events,
events_with_property,
events_with_empty_value,
ROUND(100.0 * events_with_property / NULLIF(total_events, 0), 2) as completeness_pct,
CASE
WHEN events_with_property < total_events * 0.95 THEN 'CRITICAL'
WHEN events_with_property < total_events * 0.99 THEN 'WARNING'
ELSE 'OK'
END as status
FROM property_checks
WHERE events_with_property < total_events * 0.99
ORDER BY completeness_pct ASC;
Distribution Drift Monitoring
-- Detect statistical drift in numeric fields
WITH baseline AS (
SELECT
event_name,
AVG(CAST(properties->>'amount' AS DECIMAL)) as baseline_mean,
STDDEV(CAST(properties->>'amount' AS DECIMAL)) as baseline_stddev,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY CAST(properties->>'amount' AS DECIMAL)) as baseline_median
FROM events
WHERE event_timestamp BETWEEN CURRENT_TIMESTAMP - INTERVAL '30 days'
AND CURRENT_TIMESTAMP - INTERVAL '1 day'
AND event_name = 'Purchase Completed'
),
current_window AS (
SELECT
event_name,
AVG(CAST(properties->>'amount' AS DECIMAL)) as current_mean,
STDDEV(CAST(properties->>'amount' AS DECIMAL)) as current_stddev,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY CAST(properties->>'amount' AS DECIMAL)) as current_median
FROM events
WHERE event_timestamp > CURRENT_TIMESTAMP - INTERVAL '1 day'
AND event_name = 'Purchase Completed'
)
SELECT
b.event_name,
ROUND(b.baseline_mean, 2) as baseline_mean,
ROUND(c.current_mean, 2) as current_mean,
ROUND(ABS(c.current_mean - b.baseline_mean) / NULLIF(b.baseline_stddev, 0), 2) as z_score_shift,
CASE
WHEN ABS(c.current_mean - b.baseline_mean) / NULLIF(b.baseline_stddev, 0) > 3 THEN 'SIGNIFICANT_DRIFT'
WHEN ABS(c.current_mean - b.baseline_mean) / NULLIF(b.baseline_stddev, 0) > 2 THEN 'MODERATE_DRIFT'
ELSE 'OK'
END as drift_status
FROM baseline b
CROSS JOIN current_window c;
Data Quality Dashboard
Build a comprehensive data quality dashboard with these panels:
- Volume trends: Events per hour with anomaly highlighting and day-over-day comparisons
- Latency metrics: p50, p95, p99 ingestion latency with SLA threshold lines
- Completeness scores: Required property fill rates by event type
- Error rates: Failed validations by event type and error category
- Schema changes: New properties detected with first-seen timestamps
- MTTD/MTTR trends: Incident detection and resolution times over time
- Data freshness: Time since last update per critical table with SLA status
- Duplicate rates: Percentage of duplicate events by source
Testing Strategies
Catch issues before they reach production with a comprehensive testing approach:
Unit Tests for Tracking Code
// JavaScript example: Testing event tracking
describe('Purchase Event', () => {
it('should include all required properties', () => {
const event = trackPurchase({
orderId: '12345',
total: 99.99,
currency: 'USD',
items: [{ sku: 'PROD-1', quantity: 2, price: 49.99 }]
});
expect(event.event_name).toBe('Purchase Completed');
expect(event.properties.order_id).toBeDefined();
expect(event.properties.total).toBeGreaterThan(0);
expect(event.properties.currency).toMatch(/^[A-Z]{3}$/);
expect(event.properties.item_count).toBeGreaterThan(0);
expect(event.timestamp).toBeDefined();
expect(new Date(event.timestamp)).toBeInstanceOf(Date);
});
it('should not include PII', () => {
const event = trackPurchase({
orderId: '12345',
total: 99.99,
currency: 'USD',
items: [{ sku: 'PROD-1', quantity: 2, price: 49.99 }],
customerEmail: 'test@example.com', // This should be filtered
creditCard: '4111111111111111' // This should be filtered
});
expect(event.properties.email).toBeUndefined();
expect(event.properties.customerEmail).toBeUndefined();
expect(event.properties.credit_card).toBeUndefined();
expect(event.properties.creditCard).toBeUndefined();
expect(JSON.stringify(event)).not.toContain('4111111111111111');
});
it('should handle edge cases gracefully', () => {
const event = trackPurchase({
orderId: '12345',
total: 0, // Free order
currency: 'USD',
items: []
});
expect(event.properties.total).toBe(0);
expect(event.properties.item_count).toBe(0);
});
});
Integration Tests
# Python example: End-to-end tracking test
import uuid
import time
from datetime import datetime
def test_event_pipeline_end_to_end():
"""Test that events flow through the entire pipeline correctly."""
# Generate test event with unique identifier
test_event_id = f"test_{uuid.uuid4()}"
test_timestamp = datetime.utcnow().isoformat()
# Send test event
send_event({
"event_name": "Test Event",
"event_id": test_event_id,
"user_id": "test_user_integration",
"timestamp": test_timestamp,
"properties": {
"test_property": "test_value",
"numeric_property": 42
}
})
# Wait for processing (adjust based on your pipeline latency SLA)
max_wait_seconds = 60
poll_interval = 5
elapsed = 0
result = None
while elapsed < max_wait_seconds:
time.sleep(poll_interval)
elapsed += poll_interval
result = query_warehouse(f"""
SELECT * FROM events
WHERE event_id = '{test_event_id}'
""")
if len(result) > 0:
break
# Verify event arrived
assert len(result) == 1, f"Event should arrive exactly once, got {len(result)}"
# Verify data integrity
event = result[0]
assert event['user_id'] == 'test_user_integration'
assert event['event_name'] == 'Test Event'
assert event['properties']['test_property'] == 'test_value'
assert event['properties']['numeric_property'] == 42
# Verify latency is within SLA
received_time = event['received_at']
event_time = datetime.fromisoformat(test_timestamp)
latency_seconds = (received_time - event_time).total_seconds()
assert latency_seconds < 60, f"Latency {latency_seconds}s exceeds 60s SLA"
def test_duplicate_handling():
"""Test that duplicate events are handled correctly."""
test_event_id = f"test_dup_{uuid.uuid4()}"
# Send same event twice
for _ in range(2):
send_event({
"event_name": "Test Duplicate",
"event_id": test_event_id,
"user_id": "test_user",
"timestamp": datetime.utcnow().isoformat()
})
time.sleep(30)
result = query_warehouse(f"""
SELECT COUNT(*) as count FROM events
WHERE event_id = '{test_event_id}'
""")
# Depending on your deduplication strategy:
# - If deduped at ingestion: count should be 1
# - If deduped at query time: verify dedup logic works
assert result[0]['count'] == 1, "Duplicate events should be deduplicated"
Contract Tests with JSON Schema
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Purchase Completed Event",
"type": "object",
"required": ["event_name", "event_id", "user_id", "timestamp", "properties"],
"properties": {
"event_name": {
"const": "Purchase Completed"
},
"event_id": {
"type": "string",
"minLength": 1,
"pattern": "^[a-zA-Z0-9_-]+$"
},
"user_id": {
"type": "string",
"minLength": 1
},
"timestamp": {
"type": "string",
"format": "date-time"
},
"properties": {
"type": "object",
"required": ["order_id", "total", "currency", "item_count"],
"properties": {
"order_id": {
"type": "string",
"minLength": 1
},
"total": {
"type": "number",
"minimum": 0
},
"currency": {
"type": "string",
"pattern": "^[A-Z]{3}$",
"enum": ["USD", "EUR", "GBP", "CAD", "AUD", "JPY"]
},
"item_count": {
"type": "integer",
"minimum": 0
},
"discount_code": {
"type": ["string", "null"]
}
},
"additionalProperties": true
}
},
"additionalProperties": false
}
Data Quality Tests with dbt
# schema.yml - dbt data tests
version: 2
models:
- name: fct_events
description: "Fact table containing all tracked events"
columns:
- name: event_id
description: "Unique identifier for the event"
tests:
- unique
- not_null
- name: user_id
description: "User who triggered the event"
tests:
- not_null
- relationships:
to: ref('dim_users')
field: user_id
- name: event_timestamp
description: "When the event occurred"
tests:
- not_null
- dbt_utils.expression_is_true:
expression: "event_timestamp <= current_timestamp"
config:
error_if: ">100"
warn_if: ">10"
- name: platform
description: "Platform where event originated"
tests:
- accepted_values:
values: ['web', 'ios', 'android', 'unknown']
tests:
- dbt_utils.recency:
datepart: hour
field: event_timestamp
interval: 1
config:
severity: error
Great Expectations Validation
# Great Expectations validation suite
import great_expectations as gx
# Create expectation suite for events table
context = gx.get_context()
suite = context.add_expectation_suite("events_quality_suite")
# Add expectations
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column="event_id")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeUnique(column="event_id")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeBetween(
column="price",
min_value=0,
max_value=100000 # Reasonable upper bound
)
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToMatchRegex(
column="email",
regex=r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
)
)
suite.add_expectation(
gx.expectations.ExpectTableRowCountToBeBetween(
min_value=1000, # Alert if too few rows
max_value=10000000 # Alert if suspiciously many
)
)
Staging Environment Validation
- Run all tracking through staging before production deployment
- Compare staging metrics to production baselines for anomaly detection
- Validate schema changes don't break downstream consumers
- Use data diff tools to compare staging and production outputs
- Run contract tests against staging data before merging code
Incident Response
When data quality issues occur, respond systematically with clear processes:
Incident Severity Levels
| Level | Description | Response Time | Example | Escalation |
|---|---|---|---|---|
| Critical (P1) | Complete data loss or corruption affecting business-critical systems | Immediate (< 15 min) | All events dropping, production pipeline down | Page on-call, notify leadership |
| High (P2) | Significant quality degradation affecting key metrics | < 1 hour | 50% event loss, critical property missing | Page on-call, notify stakeholders |
| Medium (P3) | Partial quality issues with limited business impact | < 4 hours | Non-critical properties missing, minor schema drift | Slack alert, next available engineer |
| Low (P4) | Minor quality issues with minimal impact | Next business day | Cosmetic schema changes, low-volume anomalies | Ticket creation, backlog prioritization |
Incident Response Process
- Detect: Automated monitoring alert or user report triggers incident
- Acknowledge: On-call engineer acknowledges within SLA, starts incident channel
- Triage: Assess severity, impact scope, and affected downstream systems
- Communicate: Notify affected stakeholders with initial assessment
- Mitigate: Stop the bleeding—pause pipelines, enable circuit breakers, or roll back if needed
- Investigate: Root cause analysis using logs, lineage, and metrics
- Fix: Implement and test solution in staging
- Deploy: Roll out fix to production with monitoring
- Recover: Backfill or correct affected data as needed
- Review: Conduct post-incident review within 48 hours
Runbook Template
# Data Quality Incident Runbook
## Incident Information
- **Incident ID:** [auto-generated]
- **Alert Source:** [monitoring system/user report]
- **Time Detected:** [timestamp]
- **Severity:** [P1/P2/P3/P4]
- **On-Call Engineer:** [name]
## Initial Symptoms
- Description: [what was observed]
- Affected Events/Tables: [list]
- Error Messages: [relevant logs]
## Impact Assessment
- **Time Range Affected:** [start] to [end]
- **Data Volume Affected:** [estimated records]
- **Downstream Systems Impacted:** [dashboards, reports, ML models]
- **Business Impact:** [revenue, decisions, compliance]
- **Users Notified:** [list]
## Diagnostic Steps
1. [ ] Check pipeline status in orchestration tool
2. [ ] Review error logs for the affected time window
3. [ ] Query for data anomalies (volume, schema, values)
4. [ ] Check upstream data sources for issues
5. [ ] Review recent deployments or configuration changes
6. [ ] Trace data lineage to identify failure point
## Resolution Steps
1. [ ] Implement immediate mitigation (pause, rollback, circuit breaker)
2. [ ] Identify and document root cause
3. [ ] Develop fix and test in staging environment
4. [ ] Deploy fix to production
5. [ ] Verify fix resolves the issue
6. [ ] Backfill or correct affected data
7. [ ] Re-enable paused pipelines
8. [ ] Verify downstream systems are healthy
## Communication Log
| Time | Update | Audience |
|------|--------|----------|
| [timestamp] | Incident acknowledged | #data-incidents |
| [timestamp] | Initial assessment | Stakeholders |
| [timestamp] | Resolution deployed | All affected parties |
## Metrics
- **MTTD (Time to Detect):** [duration]
- **MTTR (Time to Resolve):** [duration]
- **Data Loss/Corruption:** [volume]
## Post-Incident
- **Root Cause:** [detailed description]
- **Contributing Factors:** [list]
- **Prevention Measures:** [action items]
- **Follow-up Tasks:** [tickets created]
- **Post-mortem Scheduled:** [date/time]
Common Runbook Scenarios
Pipeline Failure
# Quick diagnostic commands
# Check recent pipeline runs
$ airflow dags list-runs -d data_quality_pipeline --limit 10
# View task logs
$ airflow tasks logs data_quality_pipeline validate_events 2024-01-15
# Check for stuck tasks
$ airflow tasks list data_quality_pipeline --tree
Volume Drop Alert
-- Diagnose volume drop
-- Step 1: Identify when the drop started
SELECT
DATE_TRUNC('hour', event_timestamp) as hour,
COUNT(*) as events
FROM events
WHERE event_timestamp > CURRENT_TIMESTAMP - INTERVAL '48 hours'
GROUP BY 1
ORDER BY 1;
-- Step 2: Check by source/platform
SELECT
properties->>'source' as source,
DATE_TRUNC('hour', event_timestamp) as hour,
COUNT(*) as events
FROM events
WHERE event_timestamp > CURRENT_TIMESTAMP - INTERVAL '24 hours'
GROUP BY 1, 2
ORDER BY 1, 2;
-- Step 3: Check for errors in raw events
SELECT
error_type,
COUNT(*) as error_count
FROM raw_events_errors
WHERE received_at > CURRENT_TIMESTAMP - INTERVAL '24 hours'
GROUP BY 1
ORDER BY 2 DESC;
Building a Data Quality Culture
Technical solutions alone aren't enough—organizational commitment is essential:
Ownership and Accountability
- Assign clear data owners for each event type, table, and pipeline
- Include data quality metrics in team OKRs and performance reviews
- Create data quality SLOs (Service Level Objectives) with defined thresholds
- Make quality dashboards visible and accessible to all stakeholders
- Establish RACI matrix for data quality responsibilities
Process Integration
- Require data quality checks in code review checklists
- Include tracking validation in QA test plans for feature releases
- Add data contract review to the definition of done
- Integrate data tests into CI/CD pipelines with blocking failures
- Schedule regular data quality review meetings
Education and Training
- Train all engineers on tracking best practices and common pitfalls
- Share incident post-mortems widely to spread learnings
- Create internal documentation and style guides for data standards
- Celebrate quality improvements and recognize good practices
- Run data quality workshops and brown bag sessions
Continuous Improvement
- Track quality metrics trends over time
- Conduct quarterly data quality audits
- Review and update data contracts as business needs evolve
- Benchmark against industry standards
- Invest in tooling improvements based on incident patterns
Next Steps
- Audit current state: Measure quality across all six dimensions for your critical data assets
- Identify gaps: Prioritize the biggest quality issues by business impact
- Build monitoring: Start with volume, freshness, and schema alerts for high-priority tables
- Implement testing: Add validation to your CI/CD pipeline with contract tests
- Document processes: Create runbooks for your top 5 most common incident types
- Assign ownership: Designate data owners and establish SLOs
- Iterate: Continuously improve based on incidents and stakeholder feedback
Data quality is not a one-time project—it's an ongoing practice that requires sustained investment. The payoff is trustworthy analytics, confident decisions, reduced time debugging data issues, and ultimately better business outcomes. Organizations that treat data quality as a first-class concern consistently outperform those that treat it as an afterthought.