Skip to content

Address Enrichment Flow

This document describes the batch address enrichment system that runs every 6 hours.

Overview

All enrichable recipients across all campaigns are processed together in a single batch. The system uploads their identities to Faraday for address enrichment, polls the Faraday pipeline until the enriched output is ready, then downloads and applies results.

Task Flow

flowchart TD
    Beat([Celery Beat — every 6h]) --> T1

    T1[Task 1: collect_and_start_enrichment_batch] --> CacheCheck{Any uncached\nrecipients?}

    CacheCheck -->|No — all cached| Done1([Run completed])
    CacheCheck -->|Yes — upload needed| Upload[Upload CSV to Faraday\nCreate fresh pipeline components]

    Upload -->|Transient 429/500/502/503| Retry1[Reschedule T1 in 5min]
    Upload -->|Permanent error| Done2([Log error, return])
    Upload -->|Success| MarkRecipients[Mark recipients AddressEnriching\nCreate run record with pipeline IDs]

    MarkRecipients --> T2[Task 2: poll_enrichment_run_status\nafter 2h initial delay]

    T2 -->|Transient 429/500/502/503| Reschedule[Reschedule T2 in 5min]
    T2 -->|Target not ready| Reschedule
    Reschedule --> T2
    T2 -->|Target ready| T3[Task 3: download_and_apply_enrichment_run]
    T2 -->|Timeout 5.5h| Fail1([Revert recipients, mark Failed\nDelete pipeline])

    T3 -->|Transient 429/500/502/503| Retry2[Reschedule T2 in 5min]
    T3 -->|Permanent error| Fail2([Revert recipients, mark Failed\nDelete pipeline])
    T3 -->|Success| Done3([Apply addresses, holdout logic\nmark Completed, delete pipeline])

    style T1 fill:#e1f5ff
    style T2 fill:#e1f5ff
    style T3 fill:#e1f5ff
    style Done1 fill:#e1ffe1
    style Done3 fill:#e1ffe1
    style Fail1 fill:#ffe1e1
    style Fail2 fill:#ffe1e1

Tasks

# Task Trigger celery-once protection
1 collect_and_start_enrichment_batch Beat every 6h, or manual Global — skips if same task already running
2 poll_enrichment_run_status After T1 upload, self-rescheduling every 5min Global singleton (keys=[]) — only one poll active at any time
3 download_and_apply_enrichment_run T2 when pipeline ready Keyed on run_id, held for full execution (unlock_before_run=False)
4 cleanup_expired_cache Beat daily None

Faraday Pipeline (Scorched Earth)

Each enrichment run gets a fresh set of Faraday pipeline components named after a timestamp-derived trait_name (e.g. enrichment_20260302_120000). Components are created at the start of each run and deleted after any terminal outcome (success or failure).

Pipeline creation order: upload → dataset → cohort → scope → target

Pipeline deletion order (reverse): target → scope → cohort → (nullify trait) → (delete trait) → dataset → upload

Component IDs (faraday_dataset_id, faraday_cohort_id, faraday_scope_id, faraday_target_id) are stored on the address_enrichment_runs record.

Readiness check: poll_enrichment_run_status calls GET /targets/{id} and checks status == "ready" — no multi-component fetch needed.

Deletion is best-effort: each step is individually wrapped in try/except so deletion errors never affect the main task flow.

Transient Error Handling

All Faraday API calls treat 429, 500, 502, and 503 as transient errors — they reschedule rather than failing the run. Only permanent errors (4xx other than 429, network failures) cause an immediate failure with recipient revert.

Task Transient error action Permanent error action
T1 (upload/create) Reschedule T1 in 5min — no state changes made yet Log error, return
T2 (poll) Reschedule T2 in 5min Fail run, revert recipients, delete pipeline
T3 (download) Reschedule T2 in 5min Fail run, revert recipients, delete pipeline

Timing Configuration

Config Default Description
FARADAY_BATCH_INTERVAL_SECONDS 21600 (6h) How often the batch task runs
FARADAY_INITIAL_POLL_DELAY_SECONDS 7200 (2h) Delay before first poll after upload
FARADAY_POLL_INTERVAL_SECONDS 300 (5min) Poll retry interval
FARADAY_POLL_TIMEOUT_SECONDS 19800 (5.5h) Max time before run is failed

Cache

Enriched addresses are cached for 90 days. On each batch run, enrichable recipients are checked against the cache before uploading to Faraday. Cache hits are applied immediately (with per-campaign holdout logic); only uncached recipients are uploaded.

Key Helper Functions

Function Purpose
_create_pipeline(client, trait_name) Create all 4 Faraday pipeline components (dataset, cohort, scope, target) for a run
_delete_pipeline(client, run) Best-effort deletion of all pipeline components in reverse dependency order
apply_holdout_logic(recipients, campaign) Partition enriched recipients into pending/holdout groups per campaign settings