Skip to content

Address Enrichment System

Overview

The address enrichment system automatically enriches recipient records that lack physical addresses by looking them up through the Faraday API. All enrichable recipients across all campaigns are processed together in a single batch every 6 hours.

How It Works

High-Level Flow

Campaign Sync → Recipients without address1 → AddressEnrichable status
                                         Collected every 6h (all campaigns)
                                         Check 90-day address cache
                                              ↙            ↘
                                         Cached          Uncached
                                            ↓                ↓
                                     Apply immediately   Upload CSV to Faraday
                                            ↓            Create fresh pipeline
                                            ↓                ↓
                                            ↓           Poll target status
                                            ↓                ↓
                                            ↓        Download enriched CSV
                                            ↓                ↓
                                            └────→ Apply addresses & cache
                                              Apply per-campaign holdout logic
                                                 Pending / Holdout status
                                                 Delete pipeline components

Detailed Workflow

1. Recipient Validation (Sync Time)

During campaign recipient sync (app/methods/campaigns/recipient_sync.py):

  • With address_enrichment: true: Recipients without address1 get AddressEnrichable status. No auto-queue happens — they are picked up by the next scheduled batch.
  • Without setting (default): Recipients without address get Skipped status with reason "Recipient has no address"
  • International recipients: Always Skipped (enrichment only works for US addresses)

2. Batch Collection (every 6 hours)

Celery Beat triggers collect_and_start_enrichment_batch every 6 hours:

  • Fetches all AddressEnrichable recipients across all US-size campaigns (30-day lookback)
  • Checks cache for each email
  • Cached: Applies addresses immediately with per-campaign holdout logic; creates a Completed run record and returns
  • Uncached: Uploads CSV to Faraday; creates fresh pipeline components; marks recipients AddressEnriching; creates a Processing run record with all component IDs

3. Faraday Pipeline (Scorched Earth)

Each run creates a fresh set of named Faraday components:

  • Upload CSV → Create dataset → Create cohort → Create scope → Create target
  • All components share a timestamped name (enrichment_YYYYMMDD_HHMMSS)
  • Component IDs are stored on the address_enrichment_runs record
  • After any terminal outcome (success or failure), all components are deleted in reverse order (best-effort)

4. Polling

Task poll_enrichment_run_status (self-rescheduling, global singleton):

  • Initial delay: 2 hours (Faraday average run time)
  • Polls GET /targets/{id} every 5 minutes
  • Ready when target.status == "ready"
  • Times out after 5.5 hours → reverts recipients, marks run Failed, deletes pipeline
  • On transient Faraday error (429/500/502/503): reschedules in 5 minutes

5. Download and Apply

Task download_and_apply_enrichment_run:

  • Downloads enriched CSV from Faraday target output
  • Updates the 90-day address cache with new results
  • Groups recipients by campaign; applies per-campaign holdout logic (sets Pending/Holdout)
  • Recipients with no enrichment result: set to Skipped
  • Marks run Completed with counts
  • Deletes all Faraday pipeline components (best-effort)

Components

Database Tables

address_enrichment_runs

One record per batch run (all campaigns together):

id               SERIAL PRIMARY KEY
status           VARCHAR(50)        -- processing, completed, failed
total_recipients INTEGER
cached_recipients INTEGER
uploaded_recipients INTEGER
enriched_recipients INTEGER
failed_recipients INTEGER
error_message    TEXT
faraday_upload_filename VARCHAR(255)
faraday_upload_directory VARCHAR(255)  -- also the trait_name
faraday_dataset_id  VARCHAR(255)
faraday_cohort_id   VARCHAR(255)
faraday_scope_id    VARCHAR(255)
faraday_target_id   VARCHAR(255)
initiated_at     TIMESTAMP
started_at       TIMESTAMP
completed_at     TIMESTAMP
updated_at       TIMESTAMP

address_enrichment_cache

email            VARCHAR(255) PRIMARY KEY
first_name       VARCHAR(255)
last_name        VARCHAR(255)
house_number_and_street TEXT
city             VARCHAR(255)
state            VARCHAR(255)
postcode         VARCHAR(20)
enriched_at      TIMESTAMP
created_at       TIMESTAMP
  • 90-day TTL; cleaned up daily by cleanup_expired_cache

campaign_address_enrichments (deprecated)

Legacy per-campaign enrichment records. No longer written by the new batch pipeline. Preserved for historical data.

Recipient Statuses

  • AddressEnrichable: Needs address enrichment (ready to be picked up by next batch)
  • AddressEnriching: Enrichment in progress (uploaded to Faraday this run)

Both statuses are excluded from billing counts and campaign recipient counts (unless include_errored=true).

Celery Tasks

Task Trigger Description
collect_and_start_enrichment_batch Beat every 6h, or manual Collect all enrichable recipients, check cache, upload to Faraday, create pipeline
poll_enrichment_run_status After upload (2h initial delay, then every 5min) Poll target status until ready, then trigger download
download_and_apply_enrichment_run When target is ready Download CSV, apply to recipients, delete pipeline
cleanup_expired_cache Beat daily Delete cache entries older than 90 days

Faraday Pipeline Components

Created fresh per run:

  • Dataset: Stores uploaded recipient identities; configured with output_to_traits
  • Cohort: Filters to recipients where the run's trait is true
  • Scope: Defines the population for the target
  • Target: Produces the enriched output CSV (5 columns: precision, phone, last/first name, email)

Configuration

Environment Variables

# Faraday API
FARADAY_API_KEY=<api_key>

# Timing
FARADAY_BATCH_INTERVAL_SECONDS=21600       # 6 hours — how often the batch runs
FARADAY_INITIAL_POLL_DELAY_SECONDS=7200    # 2 hours — delay before first poll
FARADAY_POLL_INTERVAL_SECONDS=300          # 5 minutes — poll retry interval
FARADAY_POLL_TIMEOUT_SECONDS=19800         # 5.5 hours — max run time before failure

Campaign Settings

In campaign settings JSON:

{
  "address_enrichment": true,
  "holdout": {
    "enabled": true,
    "holdout_percentage": 0.1,
    "holdout_absolute_maximum": 1000
  }
}

API Endpoints

All endpoints require admin authentication.

Trigger Batch Manually

POST /api/v1/address-enrichment/check
Authorization: Bearer <token>

Queues an immediate batch run. Silently dropped if a batch is already running (celery-once).

Response 202:

{"message": "Enrichment batch triggered"}

Get Run Status

GET /api/v1/address-enrichment/status
Authorization: Bearer <token>

Returns the in-progress run if one exists, otherwise the most recent run.

Response 200:

{
  "run_id": 42,
  "status": "completed",
  "progress": {
    "total_recipients": 1500,
    "cached_recipients": 400,
    "uploaded_recipients": 1100,
    "enriched_recipients": 1080,
    "failed_recipients": 20
  },
  "initiated_at": "2026-03-02T12:00:00",
  "started_at": "2026-03-02T12:00:02",
  "completed_at": "2026-03-02T14:35:00",
  "updated_at": "2026-03-02T14:35:01"
}

Status Flow

Campaign sync
AddressEnrichable  ──────────────────────────────────────────────────┐
    ↓                                                                 │
(next batch — every 6h)                                              │
    ↓                                                                 │
Cache hit? ──Yes──→ Pending / Holdout (applied immediately)          │
    │                                                                 │
   No                                                                 │
    ↓                                                                 │
AddressEnriching (upload sent to Faraday)                            │
    ↓                                                                 │
(poll every 5min after 2h delay)                                      │
    ↓                                                                 │
Faraday target ready?                                                 │
    ↓                                                                 │
Download enriched CSV                                                 │
    ↓                                                                 │
Address found? ──Yes──→ Pending / Holdout                            │
               ──No───→ Skipped (skip_reason: EnrichmentFailed)      │
                                  ↓                                   │
                        NOT re-queued (no address exists)             │
Pipeline error / timeout → revert to AddressEnrichable ──────────────┘
  (json: {enrichment_pipeline_failed: true})
(picked up by next batch)

Error Handling

Transient Errors (429 / 500 / 502 / 503)

All Faraday API calls treat these as retryable — the task reschedules and no run state changes. The batch task reschedules itself; the poll and download tasks reschedule the poll task.

Permanent Errors (other 4xx, network failures)

  • Recipients in AddressEnriching are reverted to AddressEnrichable with enrichment_pipeline_failed: true added to their JSON (set by revert_all_enriching_to_enrichable)
  • Run is marked Failed with the error message
  • Pipeline components are deleted best-effort
  • Recipients are picked up again by the next scheduled batch

Note: recipients skipped individually during download because Faraday returned no address for them are set directly to Skipped with skip_reason = EnrichmentFailed — they do not get enrichment_pipeline_failed and are not re-queued.

Polling Timeout (5.5 hours)

  • Same outcome as a permanent error: revert, mark Failed, delete pipeline
  • 30-minute gap between timeout (5.5h) and next scheduled batch (6h) allows recipients to be picked up immediately

Monitoring

Database Queries

-- Recent run history
SELECT id, status, total_recipients, cached_recipients, uploaded_recipients,
       enriched_recipients, failed_recipients, initiated_at, completed_at
FROM address_enrichment_runs
ORDER BY initiated_at DESC
LIMIT 10;

-- Current run in progress
SELECT * FROM address_enrichment_runs
WHERE status = 'processing'
ORDER BY initiated_at DESC
LIMIT 1;

-- Recipients currently being enriched
SELECT COUNT(*), campaign_id
FROM campaign_recipients
WHERE status = 'AddressEnriching'
GROUP BY campaign_id;

-- Cache size
SELECT COUNT(*) FROM address_enrichment_cache;

Key Log Lines

# Batch start
grep "Starting address enrichment batch" celery.log

# Upload + pipeline creation
grep "Uploaded.*CSV to Faraday" celery.log
grep "Created Faraday.*for enrichment_" celery.log

# Poll progress
grep "pipeline not ready yet" celery.log
grep "pipeline ready" celery.log

# Download + completion
grep "Downloaded enrichment CSV" celery.log
grep "Enrichment run.*completed" celery.log

# Failures
grep "Enrichment run.*failed\|timed out\|permanent error" celery.log
grep "Reverted.*recipients to AddressEnrichable" celery.log

# Pipeline cleanup
grep "Deleted Faraday" celery.log
grep "Failed to delete Faraday" celery.log   # best-effort — warnings only

Troubleshooting

Run Stuck in "Processing"

  1. Check how long it's been running:
    SELECT id, started_at,
           EXTRACT(EPOCH FROM (NOW() - started_at))/3600 AS hours_running
    FROM address_enrichment_runs
    WHERE status = 'processing';
    
  2. Under 2h: Normal — waiting for initial poll delay
  3. 2–5.5h: Normal — polling
  4. Over 5.5h: Should have timed out; check Celery worker is alive

  5. Check Celery logs for the poll task:

    grep "poll_enrichment_run_status" celery.log | tail -20
    

  6. The poll task is a global singleton (keys=[]). If it's not visible in Celery:

    celery -A main.celery_app inspect scheduled
    

  7. If the worker restarted and the poll chain was lost, manually trigger a check:

    curl -X POST /api/v1/address-enrichment/check   # re-runs batch from scratch
    
    Or re-enqueue the poll directly from a script.

No Recipients Being Enriched

  1. Verify address_enrichment: true in campaign settings
  2. Check recipients have AddressEnrichable status (not Skipped)
  3. Verify campaign is US-size (4x6, 6x9, 6x18_bifold, 12x9_bifold)
  4. Check if a run is currently in progress — recipients become AddressEnriching during upload and will be processed by the current run
  5. Check the 30-day lookback window — recipients older than 30 days are excluded

Low Cache Hit Rate

  1. Cache expires after 90 days — check cleanup_expired_cache isn't over-aggressive
  2. Monitor cache size: SELECT COUNT(*) FROM address_enrichment_cache
  3. Cache hit rate per run is visible in cached_recipients / total_recipients

Faraday API Errors

  1. Check FARADAY_API_KEY is set and valid
  2. Check Faraday API status
  3. Review error_message in address_enrichment_runs:
    SELECT id, error_message, initiated_at
    FROM address_enrichment_runs
    WHERE status = 'failed'
    ORDER BY initiated_at DESC
    LIMIT 5;
    
  4. Orphaned Faraday components (from mid-creation failures) can be cleaned up manually via the Faraday dashboard — they follow the naming convention enrichment_YYYYMMDD_HHMMSS_*

Testing

# All enrichment tests
just test app/tests/celery/test_address_enrichment.py

# Route tests
just test app/tests/routes/test_address_enrichment_routes.py