Address Enrichment Flow¶
This document describes the batch address enrichment system that runs every 6 hours.
Overview¶
All enrichable recipients across all campaigns are processed together in a single batch. The system uploads their identities to Faraday for address enrichment, polls the Faraday pipeline until the enriched output is ready, then downloads and applies results.
Task Flow¶
flowchart TD
Beat([Celery Beat — every 6h]) --> T1
T1[Task 1: collect_and_start_enrichment_batch] --> CacheCheck{Any uncached\nrecipients?}
CacheCheck -->|No — all cached| Done1([Run completed])
CacheCheck -->|Yes — upload needed| Upload[Upload CSV to Faraday\nCreate fresh pipeline components]
Upload -->|Transient 429/500/502/503| Retry1[Reschedule T1 in 5min]
Upload -->|Permanent error| Done2([Log error, return])
Upload -->|Success| MarkRecipients[Mark recipients AddressEnriching\nCreate run record with pipeline IDs]
MarkRecipients --> T2[Task 2: poll_enrichment_run_status\nafter 2h initial delay]
T2 -->|Transient 429/500/502/503| Reschedule[Reschedule T2 in 5min]
T2 -->|Target not ready| Reschedule
Reschedule --> T2
T2 -->|Target ready| T3[Task 3: download_and_apply_enrichment_run]
T2 -->|Timeout 5.5h| Fail1([Revert recipients, mark Failed\nDelete pipeline])
T3 -->|Transient 429/500/502/503| Retry2[Reschedule T2 in 5min]
T3 -->|Permanent error| Fail2([Revert recipients, mark Failed\nDelete pipeline])
T3 -->|Success| Done3([Apply addresses, holdout logic\nmark Completed, delete pipeline])
style T1 fill:#e1f5ff
style T2 fill:#e1f5ff
style T3 fill:#e1f5ff
style Done1 fill:#e1ffe1
style Done3 fill:#e1ffe1
style Fail1 fill:#ffe1e1
style Fail2 fill:#ffe1e1
Tasks¶
| # | Task | Trigger | celery-once protection |
|---|---|---|---|
| 1 | collect_and_start_enrichment_batch |
Beat every 6h, or manual | Global — skips if same task already running |
| 2 | poll_enrichment_run_status |
After T1 upload, self-rescheduling every 5min | Global singleton (keys=[]) — only one poll active at any time |
| 3 | download_and_apply_enrichment_run |
T2 when pipeline ready | Keyed on run_id, held for full execution (unlock_before_run=False) |
| 4 | cleanup_expired_cache |
Beat daily | None |
Faraday Pipeline (Scorched Earth)¶
Each enrichment run gets a fresh set of Faraday pipeline components named after a
timestamp-derived trait_name (e.g. enrichment_20260302_120000). Components are created
at the start of each run and deleted after any terminal outcome (success or failure).
Pipeline creation order: upload → dataset → cohort → scope → target
Pipeline deletion order (reverse): target → scope → cohort → (nullify trait) → (delete trait) → dataset → upload
Component IDs (faraday_dataset_id, faraday_cohort_id, faraday_scope_id,
faraday_target_id) are stored on the address_enrichment_runs record.
Readiness check: poll_enrichment_run_status calls GET /targets/{id} and checks
status == "ready" — no multi-component fetch needed.
Deletion is best-effort: each step is individually wrapped in try/except so deletion
errors never affect the main task flow.
Transient Error Handling¶
All Faraday API calls treat 429, 500, 502, and 503 as transient errors — they
reschedule rather than failing the run. Only permanent errors (4xx other than 429, network
failures) cause an immediate failure with recipient revert.
| Task | Transient error action | Permanent error action |
|---|---|---|
| T1 (upload/create) | Reschedule T1 in 5min — no state changes made yet | Log error, return |
| T2 (poll) | Reschedule T2 in 5min | Fail run, revert recipients, delete pipeline |
| T3 (download) | Reschedule T2 in 5min | Fail run, revert recipients, delete pipeline |
Timing Configuration¶
| Config | Default | Description |
|---|---|---|
FARADAY_BATCH_INTERVAL_SECONDS |
21600 (6h) | How often the batch task runs |
FARADAY_INITIAL_POLL_DELAY_SECONDS |
7200 (2h) | Delay before first poll after upload |
FARADAY_POLL_INTERVAL_SECONDS |
300 (5min) | Poll retry interval |
FARADAY_POLL_TIMEOUT_SECONDS |
19800 (5.5h) | Max time before run is failed |
Cache¶
Enriched addresses are cached for 90 days. On each batch run, enrichable recipients are checked against the cache before uploading to Faraday. Cache hits are applied immediately (with per-campaign holdout logic); only uncached recipients are uploaded.
Key Helper Functions¶
| Function | Purpose |
|---|---|
_create_pipeline(client, trait_name) |
Create all 4 Faraday pipeline components (dataset, cohort, scope, target) for a run |
_delete_pipeline(client, run) |
Best-effort deletion of all pipeline components in reverse dependency order |
apply_holdout_logic(recipients, campaign) |
Partition enriched recipients into pending/holdout groups per campaign settings |