Recipient Pipeline: Sync -> Proof -> Send¶
Recipients move through a state machine driven entirely by Celery beat tasks. There is no single orchestrator — each task picks up recipients in the right state and advances them to the next.
This is the core product loop: sync recipients from a CDP, personalize images, print postcards, and track delivery.
Overview¶
graph LR
subgraph Sync["Recipient Sync (15 min)"]
S1[Klaviyo Segment] --> S2[Pending]
end
subgraph Enrich["Address Enrichment (4x daily)"]
E1[AddressEnrichable] --> E2[AddressEnriching] --> S2
end
subgraph Proof["Proofing (7-15s)"]
S2 --> P1[ProofingStaged]
P1 --> P2[Proofing]
P2 --> P3[Proofed]
end
subgraph Mail["Mailing (7-15s)"]
P3 --> M1[SendingStaged]
M1 --> M2[Sending]
M2 --> M3[Sent]
end
subgraph Track["Status Tracking (15s)"]
M3 --> T1[Received]
end
S2 --> SK[Skipped / Holdout / Blacklisted]
P2 --> ER[Error]
M2 --> ER
classDef sync fill:#e1f5ff,stroke:#0288d1;
classDef proof fill:#fff3e0,stroke:#ef6c00;
classDef mail fill:#e8f5e9,stroke:#388e3c;
classDef track fill:#f3e5f5,stroke:#7b1fa2;
classDef terminal fill:#ffd6d6,stroke:#c62828;
class S1,S2 sync;
class P1,P2,P3 proof;
class M1,M2,M3 mail;
class T1 track;
class SK,ER terminal;
Each section below traces the Celery task -> method -> integration path with real file references.
1. Recipient Syncing¶
Task: process_automated_recipient_sync_campaigns — app/celery/automator.py
Schedule: Every 15 minutes
State transition: Creates new recipients in Pending status
How it works:
1. Queries sync candidates with status in [Pending, Active, Paused], grouped by organization. Candidacy is gated on campaign_mode (see list_automation_campaigns_by_organization): automated campaigns sync in any of those statuses, while one_off campaigns sync only while Pending (Pending-sync) — the SQL filter is campaign_mode = 'automated' OR (campaign_mode = 'one_off' AND status = 'Pending')
2. For each organization, dispatches sync_recipients_for_org_campaigns.apply_async() — one task per org, deduplicated by QueueOnce on organization_id
3. That task iterates the org's campaigns serially, calling sync_recipients_for_campaign(campaign_id) in-process for each. Serial-per-org execution preserves cross-campaign recipient exclusion (parallel campaigns within an org would race on the read-then-write window in has_recipient_match_disqualification)
4. For each campaign, the worker acquires a distributed lock via campaign_sync_lock() to stay mutually exclusive with manual syncs of the same campaign, then calls recipient_sync.sync_automation_recipients() in app/methods/campaigns/recipient_sync.py
The sync method re-applies the same campaign_mode gate as a safety check (the campaign may have changed since selection, and manual/forced syncs hit this path too): it proceeds only for an automated campaign or a one_off campaign still in Pending, otherwise logs and returns 0. A one_off with no provider_segment_id simply has nothing to sync; only automated campaigns treat a missing segment as a misconfiguration. See docs/models/campaign.md §Mode for the canonical campaign_mode contract.
The sync method:
1. Gets the campaign's mail integration via get_mail_integration()
2. Extracts recipients from the integration (e.g., Klaviyo segment) via extract_integration_recipients()
3. Processes them — deduplicates, applies blocklists, creates DB records via process_automation_recipients()
Key files:
- app/celery/automator.py — Beat task and async dispatch
- app/methods/campaigns/recipient_sync.py — Sync orchestration
- app/core/integrations/klaviyo/ — Klaviyo API client
2. Address Enrichment (Optional)¶
Some recipients enter as AddressEnrichable instead of Pending — they have an email but no physical address.
Task: collect_and_start_enrichment_batch — app/celery/address_enrichment.py
Schedule: 4x daily (1am, 7am, 1pm, 7pm ET)
State transition: AddressEnrichable -> AddressEnriching -> Pending
Batch uploads to Faraday for geocoding/address appending. When results return, recipients move to Pending and enter the proofing pipeline.
For full details, see Address Enrichment.
3. Proofing¶
Two Celery tasks work in tandem using an SQS queue as a buffer:
Staging: stage_proofs — app/celery/proofer.py (line ~68)¶
Schedule: Every 15 seconds State transition: Pending -> ProofingStaged
- Atomically updates up to 50 recipients from Pending -> ProofingStaged using
cr.atomically_batch_update_and_retrieve_recipient_statuses() - Enqueues them to SQS via
bulk_enqueue_for_proofing()
The atomic update prevents two workers from grabbing the same recipients.
Processing: dispatch_proofing_queue — app/celery/proofer.py (line ~110)¶
Schedule: Every 7 seconds State transition: ProofingStaged -> Proofing -> Proofed
- Spawns multiple
process_proofing_queueworkers (count =config.QUEUE_DISPATCH_COUNT) - Each worker dequeues up to 10 messages from SQS
- For each recipient:
- Sets status to
Proofing - Calls
make_proof(rec, campaign)which callsrunner.make_image_proofs() - Image generation:
generate_images_v2()inapp/methods/generator.pycalls the personalization layer (Figma Plugin, Backup Generator, or legacy Pixelixe) - Uploads result to Cloudflare Images for hosting
- Sets status to
Proofed - Deletes the SQS message
Key files:
- app/celery/proofer.py — Beat tasks, staging, dispatch
- app/methods/proofs.py — make_proof() orchestration
- app/methods/runner.py — make_image_proofs() image generation
- app/methods/generator.py — generate_images_v2() rendering engine integration
- app/core/integrations/internal/ — Figma Plugin, Backup Generator, rendering gateway
- app/core/integrations/pixelixe.py — Pixelixe API client (deprecated)
- app/core/integrations/cloudflare_images.py — Image upload/storage
4. Mailing¶
Same two-task pattern as proofing:
Staging: stage_mailers — app/celery/mailer.py (line ~120)¶
Schedule: Every 15 seconds State transition: Proofed -> SendingStaged
- Atomically updates up to 50 recipients from Proofed -> SendingStaged
- Enqueues to SQS via
bulk_enqueue_for_mailing()
Processing: dispatch_mailing_queue — app/celery/mailer.py (line ~158)¶
Schedule: Every 7 seconds State transition: SendingStaged -> Sending -> Sent
- Spawns multiple
process_mailing_queueworkers - Each worker dequeues up to 10 messages from SQS
- For each recipient:
- Sets status to
Sending - Calls
make_mailer(rec, campaign)which callsrunner.create_mailer_v2() - Selects printer via
generate_printer_strategy()— LOB or IntelliPrint for US, Stannp for UK - Printer creates the mailpiece, returns tracking info
- Saves mailpiece via
save_mailpiece() - Updates billing meter via Stripe
- Submits event to Klaviyo for analytics
- Cleans up Cloudflare images
- Sets status to
Sent
Key files:
- app/celery/mailer.py — Beat tasks, staging, dispatch
- app/methods/runner.py — create_mailer_v2() printer orchestration
- app/methods/mailpieces.py — Mailpiece creation and status management
- app/core/integrations/printers/ — LOB, IntelliPrint, Stannp clients
- app/methods/billing.py — Stripe usage meter updates
5. Status Tracking¶
Task: process_status_updates — app/celery/mail_statuses.py (line ~13)
Schedule: Every 15 seconds
State transition: Sent -> Received (and intermediate statuses)
- Dequeues up to 50 messages (5 pages x 10) from the mail status SQS queue
- Printers push status webhooks to this queue (in transit, delivered, returned, etc.)
- Parses each update via
parse_status_update_from_sqs_response()inapp/methods/mailpieces.py - Bulk updates mailpiece statuses via
m.update_mailpiece_statuses() - Deletes processed messages from queue
Key files:
- app/celery/mail_statuses.py — Beat task
- app/methods/mailpieces.py — Status parsing and updates
- app/core/models/mailpieces.py — Mailpiece DB operations
What Happens Next¶
After recipients reach Sent/Received, the attribution pipeline kicks in separately: - Order Syncing — How orders are synced from Klaviyo/Shopify and attributed back to campaigns
Common Patterns¶
Across all pipeline stages, you'll see these patterns:
-
Two-task staging pattern — A staging task atomically grabs a batch and enqueues to SQS. A dispatch task spawns workers to process the queue. This prevents double-processing and enables horizontal scaling.
-
Distributed locks —
campaign_sync_lock()prevents concurrent operations on the same campaign. Uses Redis. -
Atomic status updates —
atomically_batch_update_and_retrieve_recipient_statuses()uses a single SQL UPDATE...RETURNING to grab and transition recipients without race conditions. -
SQS as buffer — All high-throughput work (proofing, mailing, status updates) flows through SQS queues, decoupling the beat scheduler from processing speed.
For details on the Celery infrastructure itself (queues, workers, broker, scheduler, observability, troubleshooting), see Celery Infrastructure.