Data quality is the foundation of trustworthy analytics. When your event data is unreliable, every dashboard, report, and decision built on that data is suspect. Yet data quality issues are surprisingly common—according to Gartner research, poor data quality costs organizations an average of $12.9 million per year, while IBM estimates that in the US alone, businesses lose $3.1 trillion annually due to poor data quality. Beyond financial impact, employees spend up to 27% of their time correcting bad data, slowing decision-making and increasing operational costs.

This guide covers the dimensions of data quality, common issues in event streams, how to build monitoring and observability systems, testing strategies, and incident response procedures.

Dimensions of Data Quality

Data quality isn't a single metric—it's multi-dimensional. The widely accepted framework includes six core dimensions. Understanding these dimensions helps you measure and improve quality systematically:

Accuracy

Does the data correctly represent reality?

  • Definition: Events reflect what actually happened in the real world
  • Example issues: Wrong values, incorrect calculations, misattributed events, typos in data entry
  • Measurement: Compare against known sources of truth, use primary research or third-party reference data validation
-- Accuracy check example
SELECT
    source_system,
    COUNT(*) as event_count,
    COUNT(DISTINCT user_id) as unique_users
FROM events
WHERE event_date = CURRENT_DATE
GROUP BY source_system;

-- Compare against source system's own counts
-- Cross-validate with external reference data where available

Completeness

Is all expected data present?

  • Definition: No missing events, properties, records, or time periods; all required fields are populated
  • Example issues: Missing required properties, dropped events, gaps in time series, null values in mandatory fields
  • Measurement: Check for nulls, missing periods, expected volumes, and required field population rates
-- Completeness check example
SELECT
    event_name,
    COUNT(*) as total_events,
    COUNT(user_id) as events_with_user_id,
    COUNT(session_id) as events_with_session_id,
    ROUND(100.0 * COUNT(user_id) / COUNT(*), 2) as user_id_completeness,
    ROUND(100.0 * COUNT(session_id) / COUNT(*), 2) as session_id_completeness
FROM events
GROUP BY event_name
ORDER BY user_id_completeness ASC;

Timeliness

Is data available when needed?

  • Definition: Events arrive within expected time windows and reflect the most current situation
  • Example issues: Late-arriving data, stale dashboards, processing delays, data not ready for scheduled reports
  • Measurement: Track event_time vs. received_time latency, monitor SLA compliance
-- Timeliness check example
SELECT
    DATE_TRUNC('hour', received_at) as hour,
    AVG(EXTRACT(EPOCH FROM (received_at - event_timestamp))) as avg_latency_seconds,
    MAX(EXTRACT(EPOCH FROM (received_at - event_timestamp))) as max_latency_seconds,
    PERCENTILE_CONT(0.95) WITHIN GROUP (
        ORDER BY EXTRACT(EPOCH FROM (received_at - event_timestamp))
    ) as p95_latency_seconds,
    PERCENTILE_CONT(0.99) WITHIN GROUP (
        ORDER BY EXTRACT(EPOCH FROM (received_at - event_timestamp))
    ) as p99_latency_seconds
FROM events
WHERE received_at > CURRENT_TIMESTAMP - INTERVAL '24 hours'
GROUP BY DATE_TRUNC('hour', received_at)
ORDER BY hour;

Consistency

Is data coherent across the system?

  • Definition: Same entity has same values everywhere; data is uniform across all instances and systems
  • Example issues: User exists in one table with different ID in another; inconsistent naming conventions; conflicting values across CRM and ERP systems
  • Measurement: Cross-reference validation, schema enforcement, format consistency checks
-- Consistency check example
-- Check for user_id inconsistencies between events and users table
SELECT
    e.user_id as event_user_id,
    u.user_id as users_table_user_id,
    CASE WHEN u.user_id IS NULL THEN 'Missing from users' ELSE 'OK' END as status
FROM (SELECT DISTINCT user_id FROM events WHERE user_id IS NOT NULL) e
LEFT JOIN users u ON e.user_id = u.user_id
WHERE u.user_id IS NULL
LIMIT 100;

-- Check for format consistency
SELECT
    event_name,
    properties->>'date_format' as date_value,
    CASE 
        WHEN properties->>'date_format' ~ '^\d{4}-\d{2}-\d{2}$' THEN 'ISO format'
        WHEN properties->>'date_format' ~ '^\d{2}/\d{2}/\d{4}$' THEN 'US format'
        ELSE 'Other format'
    END as format_type
FROM events
WHERE event_name = 'Form Submitted';

Validity

Does data conform to expected formats and constraints?

  • Definition: Values are within expected ranges, formats, and conform to business rules
  • Example issues: Negative prices, future timestamps, invalid enums, malformed email addresses, ZIP codes with wrong character counts
  • Measurement: Constraint validation, range checks, regex pattern matching, business rule validation
-- Validity check example
SELECT
    event_name,
    COUNT(*) as invalid_events,
    STRING_AGG(DISTINCT 
        CASE
            WHEN event_timestamp > CURRENT_TIMESTAMP + INTERVAL '1 hour' THEN 'future_timestamp'
            WHEN event_name = 'Purchase Completed' AND CAST(properties->>'price' AS DECIMAL) < 0 THEN 'negative_price'
            WHEN properties->>'platform' NOT IN ('web', 'ios', 'android', 'unknown') THEN 'invalid_platform'
            WHEN properties->>'email' !~ '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$' THEN 'invalid_email'
        END, ', '
    ) as violation_types
FROM events
WHERE
    event_timestamp > CURRENT_TIMESTAMP + INTERVAL '1 hour'
    OR (event_name = 'Purchase Completed' AND CAST(properties->>'price' AS DECIMAL) < 0)
    OR properties->>'platform' NOT IN ('web', 'ios', 'android', 'unknown')
GROUP BY event_name;

Uniqueness

Is the data free from unwanted duplicates?

  • Definition: Each record represents a single, unique entity with no unintended duplicates
  • Example issues: Duplicate customer records (e.g., "Fred Smith" and "Freddy Smith" as separate entries), same event recorded multiple times, records duplicated after migration
  • Measurement: Primary key validation, duplicate detection queries, fuzzy matching for near-duplicates
-- Uniqueness check example
-- Find exact duplicates by event_id
SELECT
    event_id,
    COUNT(*) as occurrence_count,
    MIN(received_at) as first_received,
    MAX(received_at) as last_received
FROM events
GROUP BY event_id
HAVING COUNT(*) > 1
ORDER BY occurrence_count DESC
LIMIT 100;

-- Find potential duplicate users (fuzzy matching)
SELECT 
    a.user_id as user_id_1,
    b.user_id as user_id_2,
    a.email,
    a.name as name_1,
    b.name as name_2
FROM users a
JOIN users b ON a.email = b.email AND a.user_id < b.user_id
LIMIT 100;

Common Issues in Event Streams

Understanding typical problems helps you build better prevention and detection:

Duplicate Events

Same event recorded multiple times.

-- Detection query
SELECT
    event_id,
    user_id,
    event_name,
    COUNT(*) as occurrence_count,
    MIN(event_timestamp) as first_occurrence,
    MAX(event_timestamp) as last_occurrence
FROM events
GROUP BY event_id, user_id, event_name
HAVING COUNT(*) > 1
ORDER BY occurrence_count DESC
LIMIT 100;

Common causes:

  • Retry logic without idempotency keys
  • Multiple SDK initializations on page load
  • Async processing without deduplication
  • Network timeouts causing resubmission
  • Message queue redelivery without proper handling

Missing Events

Expected events not arriving.

-- Detection: Compare hourly volumes with statistical anomaly detection
WITH hourly_volumes AS (
    SELECT
        DATE_TRUNC('hour', event_timestamp) as hour,
        event_name,
        COUNT(*) as event_count
    FROM events
    WHERE event_timestamp > CURRENT_TIMESTAMP - INTERVAL '7 days'
    GROUP BY 1, 2
),
averages AS (
    SELECT
        event_name,
        EXTRACT(HOUR FROM hour) as hour_of_day,
        EXTRACT(DOW FROM hour) as day_of_week,
        AVG(event_count) as avg_count,
        STDDEV(event_count) as stddev_count
    FROM hourly_volumes
    GROUP BY 1, 2, 3
)
SELECT
    hv.hour,
    hv.event_name,
    hv.event_count,
    ROUND(a.avg_count, 2) as expected_count,
    ROUND((hv.event_count - a.avg_count) / NULLIF(a.stddev_count, 0), 2) as z_score,
    CASE 
        WHEN hv.event_count < a.avg_count - 2 * a.stddev_count THEN 'LOW_ANOMALY'
        WHEN hv.event_count > a.avg_count + 2 * a.stddev_count THEN 'HIGH_ANOMALY'
        ELSE 'OK' 
    END as status
FROM hourly_volumes hv
JOIN averages a ON hv.event_name = a.event_name
    AND EXTRACT(HOUR FROM hv.hour) = a.hour_of_day
    AND EXTRACT(DOW FROM hv.hour) = a.day_of_week
WHERE hv.hour > CURRENT_TIMESTAMP - INTERVAL '24 hours'
ORDER BY hv.hour DESC;

Schema Drift

Event structure changes unexpectedly—fields added, removed, renamed, or changed type.

-- Detection: Track property presence over time
WITH daily_schema AS (
    SELECT
        event_name,
        DATE_TRUNC('day', event_timestamp) as day,
        jsonb_object_keys(properties) as property_name
    FROM events
    WHERE event_timestamp > CURRENT_TIMESTAMP - INTERVAL '14 days'
    GROUP BY 1, 2, 3
),
property_timeline AS (
    SELECT
        event_name,
        property_name,
        MIN(day) as first_seen,
        MAX(day) as last_seen,
        COUNT(DISTINCT day) as days_present
    FROM daily_schema
    GROUP BY 1, 2
)
SELECT
    event_name,
    property_name,
    first_seen,
    last_seen,
    days_present,
    CASE 
        WHEN first_seen > CURRENT_DATE - INTERVAL '3 days' THEN 'NEW_PROPERTY'
        WHEN last_seen < CURRENT_DATE - INTERVAL '3 days' THEN 'DISAPPEARED'
        ELSE 'STABLE'
    END as status
FROM property_timeline
WHERE first_seen > CURRENT_DATE - INTERVAL '3 days'
   OR last_seen < CURRENT_DATE - INTERVAL '3 days'
ORDER BY event_name, first_seen DESC;

Data Type Mismatches

Properties sent with wrong types.

-- Detection: Find type inconsistencies
SELECT
    event_name,
    'price' as property_name,
    properties->>'price' as value,
    CASE
        WHEN properties->>'price' ~ '^-?[0-9]+\.?[0-9]*$' THEN 'numeric'
        WHEN properties->>'price' ~ '^\$' THEN 'string_with_currency_symbol'
        WHEN properties->>'price' ~ '^[0-9,]+\.?[0-9]*$' THEN 'numeric_with_commas'
        WHEN properties->>'price' IS NULL THEN 'null'
        ELSE 'non_numeric_string'
    END as detected_type
FROM events
WHERE event_name = 'Purchase Completed'
    AND properties->>'price' IS NOT NULL
    AND NOT (properties->>'price' ~ '^-?[0-9]+\.?[0-9]*$')
LIMIT 100;

Incorrect Timestamps

Events with wrong time values.

-- Detection: Find suspicious timestamps
SELECT
    event_name,
    event_id,
    event_timestamp,
    received_at,
    EXTRACT(EPOCH FROM (received_at - event_timestamp)) as latency_seconds,
    CASE
        WHEN event_timestamp > CURRENT_TIMESTAMP + INTERVAL '1 hour' THEN 'FUTURE_EVENT'
        WHEN event_timestamp < '2020-01-01' THEN 'SUSPICIOUSLY_OLD'
        WHEN EXTRACT(EPOCH FROM (received_at - event_timestamp)) > 86400 THEN 'EXCESSIVE_LATENCY'
        WHEN EXTRACT(EPOCH FROM (received_at - event_timestamp)) < 0 THEN 'RECEIVED_BEFORE_EVENT'
        ELSE 'OK'
    END as issue_type
FROM events
WHERE
    event_timestamp > CURRENT_TIMESTAMP + INTERVAL '1 hour'
    OR event_timestamp < '2020-01-01'
    OR ABS(EXTRACT(EPOCH FROM (received_at - event_timestamp))) > 86400
LIMIT 100;

PII Leakage

Personally identifiable information appearing in unexpected places.

-- Detection: Scan for potential PII in properties
SELECT
    event_name,
    event_id,
    CASE
        WHEN properties::text ~ '[0-9]{3}-[0-9]{2}-[0-9]{4}' THEN 'POTENTIAL_SSN'
        WHEN properties::text ~ '[0-9]{16}' THEN 'POTENTIAL_CREDIT_CARD'
        WHEN properties::text ~* '[a-z0-9._%+-]+@[a-z0-9.-]+\.[a-z]{2,}' 
             AND event_name NOT IN ('User Signup', 'Login') THEN 'UNEXPECTED_EMAIL'
        ELSE 'OK'
    END as pii_risk
FROM events
WHERE 
    properties::text ~ '[0-9]{3}-[0-9]{2}-[0-9]{4}'
    OR properties::text ~ '[0-9]{16}'
LIMIT 100;

Monitoring and Observability Systems

Proactive monitoring catches issues before they impact decisions. Modern data observability extends traditional monitoring by applying DevOps best practices to data pipelines.

Key Metrics to Track

Implement monitoring for these essential metrics:

  • MTTD (Mean Time to Detect): Average time from when an issue occurs to when it's detected
  • MTTR (Mean Time to Resolve): Average time from detection to resolution
  • Data Freshness: Time since last data update
  • Volume Anomalies: Deviations from expected event counts
  • Schema Changes: New, modified, or removed fields
  • Distribution Drift: Statistical shifts in data values

Volume Monitoring

# Alert configuration example (pseudo-code)
monitor:
  name: event_volume_anomaly
  query: |
    SELECT
      event_name,
      COUNT(*) as current_hour_count,
      AVG(historical_count) as expected_count,
      STDDEV(historical_count) as stddev_count
    FROM current_events
    JOIN historical_averages USING (event_name, hour_of_day, day_of_week)
    GROUP BY event_name
  alert_condition: |
    current_hour_count < expected_count - 2 * stddev_count
    OR current_hour_count > expected_count + 2 * stddev_count
  severity: warning
  channels: [slack, pagerduty]
  runbook_url: https://wiki.company.com/runbooks/volume-anomaly

Freshness Monitoring

-- Check data freshness across all critical tables
WITH freshness_check AS (
    SELECT 
        'events' as table_name, 
        MAX(event_timestamp) as latest_record,
        30 as max_age_minutes  -- SLA threshold
    FROM events
    UNION ALL
    SELECT 
        'page_views' as table_name, 
        MAX(timestamp) as latest_record,
        15 as max_age_minutes
    FROM page_views
    UNION ALL
    SELECT 
        'transactions' as table_name, 
        MAX(created_at) as latest_record,
        5 as max_age_minutes
    FROM transactions
)
SELECT
    table_name,
    latest_record,
    EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - latest_record)) / 60 as minutes_since_update,
    max_age_minutes as sla_threshold_minutes,
    CASE 
        WHEN EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - latest_record)) / 60 > max_age_minutes 
        THEN 'SLA_BREACH'
        WHEN EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - latest_record)) / 60 > max_age_minutes * 0.8
        THEN 'WARNING'
        ELSE 'OK'
    END as status
FROM freshness_check
ORDER BY status DESC, minutes_since_update DESC;

Schema Monitoring

-- Track required properties completeness
WITH property_checks AS (
    SELECT
        event_name,
        required_property,
        COUNT(*) as total_events,
        COUNT(CASE WHEN properties->>required_property IS NOT NULL THEN 1 END) as events_with_property,
        COUNT(CASE WHEN properties->>required_property = '' THEN 1 END) as events_with_empty_value
    FROM events
    CROSS JOIN (
        SELECT unnest(ARRAY['user_id', 'session_id', 'timestamp', 'platform']) as required_property
    ) required
    WHERE event_timestamp > CURRENT_TIMESTAMP - INTERVAL '1 hour'
    GROUP BY event_name, required_property
)
SELECT
    event_name,
    required_property,
    total_events,
    events_with_property,
    events_with_empty_value,
    ROUND(100.0 * events_with_property / NULLIF(total_events, 0), 2) as completeness_pct,
    CASE 
        WHEN events_with_property < total_events * 0.95 THEN 'CRITICAL'
        WHEN events_with_property < total_events * 0.99 THEN 'WARNING'
        ELSE 'OK'
    END as status
FROM property_checks
WHERE events_with_property < total_events * 0.99
ORDER BY completeness_pct ASC;

Distribution Drift Monitoring

-- Detect statistical drift in numeric fields
WITH baseline AS (
    SELECT
        event_name,
        AVG(CAST(properties->>'amount' AS DECIMAL)) as baseline_mean,
        STDDEV(CAST(properties->>'amount' AS DECIMAL)) as baseline_stddev,
        PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY CAST(properties->>'amount' AS DECIMAL)) as baseline_median
    FROM events
    WHERE event_timestamp BETWEEN CURRENT_TIMESTAMP - INTERVAL '30 days' 
                              AND CURRENT_TIMESTAMP - INTERVAL '1 day'
      AND event_name = 'Purchase Completed'
),
current_window AS (
    SELECT
        event_name,
        AVG(CAST(properties->>'amount' AS DECIMAL)) as current_mean,
        STDDEV(CAST(properties->>'amount' AS DECIMAL)) as current_stddev,
        PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY CAST(properties->>'amount' AS DECIMAL)) as current_median
    FROM events
    WHERE event_timestamp > CURRENT_TIMESTAMP - INTERVAL '1 day'
      AND event_name = 'Purchase Completed'
)
SELECT
    b.event_name,
    ROUND(b.baseline_mean, 2) as baseline_mean,
    ROUND(c.current_mean, 2) as current_mean,
    ROUND(ABS(c.current_mean - b.baseline_mean) / NULLIF(b.baseline_stddev, 0), 2) as z_score_shift,
    CASE 
        WHEN ABS(c.current_mean - b.baseline_mean) / NULLIF(b.baseline_stddev, 0) > 3 THEN 'SIGNIFICANT_DRIFT'
        WHEN ABS(c.current_mean - b.baseline_mean) / NULLIF(b.baseline_stddev, 0) > 2 THEN 'MODERATE_DRIFT'
        ELSE 'OK'
    END as drift_status
FROM baseline b
CROSS JOIN current_window c;

Data Quality Dashboard

Build a comprehensive data quality dashboard with these panels:

  1. Volume trends: Events per hour with anomaly highlighting and day-over-day comparisons
  2. Latency metrics: p50, p95, p99 ingestion latency with SLA threshold lines
  3. Completeness scores: Required property fill rates by event type
  4. Error rates: Failed validations by event type and error category
  5. Schema changes: New properties detected with first-seen timestamps
  6. MTTD/MTTR trends: Incident detection and resolution times over time
  7. Data freshness: Time since last update per critical table with SLA status
  8. Duplicate rates: Percentage of duplicate events by source

Testing Strategies

Catch issues before they reach production with a comprehensive testing approach:

Unit Tests for Tracking Code

// JavaScript example: Testing event tracking
describe('Purchase Event', () => {
    it('should include all required properties', () => {
        const event = trackPurchase({
            orderId: '12345',
            total: 99.99,
            currency: 'USD',
            items: [{ sku: 'PROD-1', quantity: 2, price: 49.99 }]
        });

        expect(event.event_name).toBe('Purchase Completed');
        expect(event.properties.order_id).toBeDefined();
        expect(event.properties.total).toBeGreaterThan(0);
        expect(event.properties.currency).toMatch(/^[A-Z]{3}$/);
        expect(event.properties.item_count).toBeGreaterThan(0);
        expect(event.timestamp).toBeDefined();
        expect(new Date(event.timestamp)).toBeInstanceOf(Date);
    });

    it('should not include PII', () => {
        const event = trackPurchase({
            orderId: '12345',
            total: 99.99,
            currency: 'USD',
            items: [{ sku: 'PROD-1', quantity: 2, price: 49.99 }],
            customerEmail: 'test@example.com',  // This should be filtered
            creditCard: '4111111111111111'       // This should be filtered
        });

        expect(event.properties.email).toBeUndefined();
        expect(event.properties.customerEmail).toBeUndefined();
        expect(event.properties.credit_card).toBeUndefined();
        expect(event.properties.creditCard).toBeUndefined();
        expect(JSON.stringify(event)).not.toContain('4111111111111111');
    });

    it('should handle edge cases gracefully', () => {
        const event = trackPurchase({
            orderId: '12345',
            total: 0,  // Free order
            currency: 'USD',
            items: []
        });

        expect(event.properties.total).toBe(0);
        expect(event.properties.item_count).toBe(0);
    });
});

Integration Tests

# Python example: End-to-end tracking test
import uuid
import time
from datetime import datetime

def test_event_pipeline_end_to_end():
    """Test that events flow through the entire pipeline correctly."""
    
    # Generate test event with unique identifier
    test_event_id = f"test_{uuid.uuid4()}"
    test_timestamp = datetime.utcnow().isoformat()
    
    # Send test event
    send_event({
        "event_name": "Test Event",
        "event_id": test_event_id,
        "user_id": "test_user_integration",
        "timestamp": test_timestamp,
        "properties": {
            "test_property": "test_value",
            "numeric_property": 42
        }
    })

    # Wait for processing (adjust based on your pipeline latency SLA)
    max_wait_seconds = 60
    poll_interval = 5
    elapsed = 0
    result = None
    
    while elapsed < max_wait_seconds:
        time.sleep(poll_interval)
        elapsed += poll_interval
        
        result = query_warehouse(f"""
            SELECT * FROM events
            WHERE event_id = '{test_event_id}'
        """)
        
        if len(result) > 0:
            break
    
    # Verify event arrived
    assert len(result) == 1, f"Event should arrive exactly once, got {len(result)}"
    
    # Verify data integrity
    event = result[0]
    assert event['user_id'] == 'test_user_integration'
    assert event['event_name'] == 'Test Event'
    assert event['properties']['test_property'] == 'test_value'
    assert event['properties']['numeric_property'] == 42
    
    # Verify latency is within SLA
    received_time = event['received_at']
    event_time = datetime.fromisoformat(test_timestamp)
    latency_seconds = (received_time - event_time).total_seconds()
    assert latency_seconds < 60, f"Latency {latency_seconds}s exceeds 60s SLA"


def test_duplicate_handling():
    """Test that duplicate events are handled correctly."""
    
    test_event_id = f"test_dup_{uuid.uuid4()}"
    
    # Send same event twice
    for _ in range(2):
        send_event({
            "event_name": "Test Duplicate",
            "event_id": test_event_id,
            "user_id": "test_user",
            "timestamp": datetime.utcnow().isoformat()
        })
    
    time.sleep(30)
    
    result = query_warehouse(f"""
        SELECT COUNT(*) as count FROM events
        WHERE event_id = '{test_event_id}'
    """)
    
    # Depending on your deduplication strategy:
    # - If deduped at ingestion: count should be 1
    # - If deduped at query time: verify dedup logic works
    assert result[0]['count'] == 1, "Duplicate events should be deduplicated"

Contract Tests with JSON Schema

{
    "$schema": "http://json-schema.org/draft-07/schema#",
    "title": "Purchase Completed Event",
    "type": "object",
    "required": ["event_name", "event_id", "user_id", "timestamp", "properties"],
    "properties": {
        "event_name": {
            "const": "Purchase Completed"
        },
        "event_id": {
            "type": "string",
            "minLength": 1,
            "pattern": "^[a-zA-Z0-9_-]+$"
        },
        "user_id": {
            "type": "string",
            "minLength": 1
        },
        "timestamp": {
            "type": "string",
            "format": "date-time"
        },
        "properties": {
            "type": "object",
            "required": ["order_id", "total", "currency", "item_count"],
            "properties": {
                "order_id": { 
                    "type": "string",
                    "minLength": 1
                },
                "total": { 
                    "type": "number", 
                    "minimum": 0 
                },
                "currency": { 
                    "type": "string", 
                    "pattern": "^[A-Z]{3}$",
                    "enum": ["USD", "EUR", "GBP", "CAD", "AUD", "JPY"]
                },
                "item_count": {
                    "type": "integer",
                    "minimum": 0
                },
                "discount_code": {
                    "type": ["string", "null"]
                }
            },
            "additionalProperties": true
        }
    },
    "additionalProperties": false
}

Data Quality Tests with dbt

# schema.yml - dbt data tests
version: 2

models:
  - name: fct_events
    description: "Fact table containing all tracked events"
    columns:
      - name: event_id
        description: "Unique identifier for the event"
        tests:
          - unique
          - not_null
      
      - name: user_id
        description: "User who triggered the event"
        tests:
          - not_null
          - relationships:
              to: ref('dim_users')
              field: user_id
      
      - name: event_timestamp
        description: "When the event occurred"
        tests:
          - not_null
          - dbt_utils.expression_is_true:
              expression: "event_timestamp <= current_timestamp"
              config:
                error_if: ">100"
                warn_if: ">10"
      
      - name: platform
        description: "Platform where event originated"
        tests:
          - accepted_values:
              values: ['web', 'ios', 'android', 'unknown']

    tests:
      - dbt_utils.recency:
          datepart: hour
          field: event_timestamp
          interval: 1
          config:
            severity: error

Great Expectations Validation

# Great Expectations validation suite
import great_expectations as gx

# Create expectation suite for events table
context = gx.get_context()

suite = context.add_expectation_suite("events_quality_suite")

# Add expectations
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToNotBeNull(column="event_id")
)

suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeUnique(column="event_id")
)

suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeBetween(
        column="price",
        min_value=0,
        max_value=100000  # Reasonable upper bound
    )
)

suite.add_expectation(
    gx.expectations.ExpectColumnValuesToMatchRegex(
        column="email",
        regex=r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
    )
)

suite.add_expectation(
    gx.expectations.ExpectTableRowCountToBeBetween(
        min_value=1000,  # Alert if too few rows
        max_value=10000000  # Alert if suspiciously many
    )
)

Staging Environment Validation

  • Run all tracking through staging before production deployment
  • Compare staging metrics to production baselines for anomaly detection
  • Validate schema changes don't break downstream consumers
  • Use data diff tools to compare staging and production outputs
  • Run contract tests against staging data before merging code

Incident Response

When data quality issues occur, respond systematically with clear processes:

Incident Severity Levels

Level Description Response Time Example Escalation
Critical (P1) Complete data loss or corruption affecting business-critical systems Immediate (< 15 min) All events dropping, production pipeline down Page on-call, notify leadership
High (P2) Significant quality degradation affecting key metrics < 1 hour 50% event loss, critical property missing Page on-call, notify stakeholders
Medium (P3) Partial quality issues with limited business impact < 4 hours Non-critical properties missing, minor schema drift Slack alert, next available engineer
Low (P4) Minor quality issues with minimal impact Next business day Cosmetic schema changes, low-volume anomalies Ticket creation, backlog prioritization

Incident Response Process

  1. Detect: Automated monitoring alert or user report triggers incident
  2. Acknowledge: On-call engineer acknowledges within SLA, starts incident channel
  3. Triage: Assess severity, impact scope, and affected downstream systems
  4. Communicate: Notify affected stakeholders with initial assessment
  5. Mitigate: Stop the bleeding—pause pipelines, enable circuit breakers, or roll back if needed
  6. Investigate: Root cause analysis using logs, lineage, and metrics
  7. Fix: Implement and test solution in staging
  8. Deploy: Roll out fix to production with monitoring
  9. Recover: Backfill or correct affected data as needed
  10. Review: Conduct post-incident review within 48 hours

Runbook Template

# Data Quality Incident Runbook

## Incident Information
- **Incident ID:** [auto-generated]
- **Alert Source:** [monitoring system/user report]
- **Time Detected:** [timestamp]
- **Severity:** [P1/P2/P3/P4]
- **On-Call Engineer:** [name]

## Initial Symptoms
- Description: [what was observed]
- Affected Events/Tables: [list]
- Error Messages: [relevant logs]

## Impact Assessment
- **Time Range Affected:** [start] to [end]
- **Data Volume Affected:** [estimated records]
- **Downstream Systems Impacted:** [dashboards, reports, ML models]
- **Business Impact:** [revenue, decisions, compliance]
- **Users Notified:** [list]

## Diagnostic Steps
1. [ ] Check pipeline status in orchestration tool
2. [ ] Review error logs for the affected time window
3. [ ] Query for data anomalies (volume, schema, values)
4. [ ] Check upstream data sources for issues
5. [ ] Review recent deployments or configuration changes
6. [ ] Trace data lineage to identify failure point

## Resolution Steps
1. [ ] Implement immediate mitigation (pause, rollback, circuit breaker)
2. [ ] Identify and document root cause
3. [ ] Develop fix and test in staging environment
4. [ ] Deploy fix to production
5. [ ] Verify fix resolves the issue
6. [ ] Backfill or correct affected data
7. [ ] Re-enable paused pipelines
8. [ ] Verify downstream systems are healthy

## Communication Log
| Time | Update | Audience |
|------|--------|----------|
| [timestamp] | Incident acknowledged | #data-incidents |
| [timestamp] | Initial assessment | Stakeholders |
| [timestamp] | Resolution deployed | All affected parties |

## Metrics
- **MTTD (Time to Detect):** [duration]
- **MTTR (Time to Resolve):** [duration]
- **Data Loss/Corruption:** [volume]

## Post-Incident
- **Root Cause:** [detailed description]
- **Contributing Factors:** [list]
- **Prevention Measures:** [action items]
- **Follow-up Tasks:** [tickets created]
- **Post-mortem Scheduled:** [date/time]

Common Runbook Scenarios

Pipeline Failure

# Quick diagnostic commands

# Check recent pipeline runs
$ airflow dags list-runs -d data_quality_pipeline --limit 10

# View task logs
$ airflow tasks logs data_quality_pipeline validate_events 2024-01-15

# Check for stuck tasks
$ airflow tasks list data_quality_pipeline --tree

Volume Drop Alert

-- Diagnose volume drop
-- Step 1: Identify when the drop started
SELECT 
    DATE_TRUNC('hour', event_timestamp) as hour,
    COUNT(*) as events
FROM events
WHERE event_timestamp > CURRENT_TIMESTAMP - INTERVAL '48 hours'
GROUP BY 1
ORDER BY 1;

-- Step 2: Check by source/platform
SELECT 
    properties->>'source' as source,
    DATE_TRUNC('hour', event_timestamp) as hour,
    COUNT(*) as events
FROM events
WHERE event_timestamp > CURRENT_TIMESTAMP - INTERVAL '24 hours'
GROUP BY 1, 2
ORDER BY 1, 2;

-- Step 3: Check for errors in raw events
SELECT 
    error_type,
    COUNT(*) as error_count
FROM raw_events_errors
WHERE received_at > CURRENT_TIMESTAMP - INTERVAL '24 hours'
GROUP BY 1
ORDER BY 2 DESC;

Building a Data Quality Culture

Technical solutions alone aren't enough—organizational commitment is essential:

Ownership and Accountability

  • Assign clear data owners for each event type, table, and pipeline
  • Include data quality metrics in team OKRs and performance reviews
  • Create data quality SLOs (Service Level Objectives) with defined thresholds
  • Make quality dashboards visible and accessible to all stakeholders
  • Establish RACI matrix for data quality responsibilities

Process Integration

  • Require data quality checks in code review checklists
  • Include tracking validation in QA test plans for feature releases
  • Add data contract review to the definition of done
  • Integrate data tests into CI/CD pipelines with blocking failures
  • Schedule regular data quality review meetings

Education and Training

  • Train all engineers on tracking best practices and common pitfalls
  • Share incident post-mortems widely to spread learnings
  • Create internal documentation and style guides for data standards
  • Celebrate quality improvements and recognize good practices
  • Run data quality workshops and brown bag sessions

Continuous Improvement

  • Track quality metrics trends over time
  • Conduct quarterly data quality audits
  • Review and update data contracts as business needs evolve
  • Benchmark against industry standards
  • Invest in tooling improvements based on incident patterns

Next Steps

  1. Audit current state: Measure quality across all six dimensions for your critical data assets
  2. Identify gaps: Prioritize the biggest quality issues by business impact
  3. Build monitoring: Start with volume, freshness, and schema alerts for high-priority tables
  4. Implement testing: Add validation to your CI/CD pipeline with contract tests
  5. Document processes: Create runbooks for your top 5 most common incident types
  6. Assign ownership: Designate data owners and establish SLOs
  7. Iterate: Continuously improve based on incidents and stakeholder feedback

Data quality is not a one-time project—it's an ongoing practice that requires sustained investment. The payoff is trustworthy analytics, confident decisions, reduced time debugging data issues, and ultimately better business outcomes. Organizations that treat data quality as a first-class concern consistently outperform those that treat it as an afterthought.