Skip to content

Recipient Pipeline: Sync -> Proof -> Send

Recipients move through a state machine driven entirely by Celery beat tasks. There is no single orchestrator — each task picks up recipients in the right state and advances them to the next.

This is the core product loop: sync recipients from a CDP, personalize images, print postcards, and track delivery.

Overview

graph LR
    subgraph Sync["Recipient Sync (15 min)"]
        S1[Klaviyo Segment] --> S2[Pending]
    end

    subgraph Enrich["Address Enrichment (4x daily)"]
        E1[AddressEnrichable] --> E2[AddressEnriching] --> S2
    end

    subgraph Proof["Proofing (7-15s)"]
        S2 --> P1[ProofingStaged]
        P1 --> P2[Proofing]
        P2 --> P3[Proofed]
    end

    subgraph Mail["Mailing (7-15s)"]
        P3 --> M1[SendingStaged]
        M1 --> M2[Sending]
        M2 --> M3[Sent]
    end

    subgraph Track["Status Tracking (15s)"]
        M3 --> T1[Received]
    end

    S2 --> SK[Skipped / Holdout / Blacklisted]
    P2 --> ER[Error]
    M2 --> ER

    classDef sync fill:#e1f5ff,stroke:#0288d1;
    classDef proof fill:#fff3e0,stroke:#ef6c00;
    classDef mail fill:#e8f5e9,stroke:#388e3c;
    classDef track fill:#f3e5f5,stroke:#7b1fa2;
    classDef terminal fill:#ffd6d6,stroke:#c62828;

    class S1,S2 sync;
    class P1,P2,P3 proof;
    class M1,M2,M3 mail;
    class T1 track;
    class SK,ER terminal;

Each section below traces the Celery task -> method -> integration path with real file references.

1. Recipient Syncing

Task: process_automated_recipient_sync_campaignsapp/celery/automator.py Schedule: Every 15 minutes State transition: Creates new recipients in Pending status

How it works: 1. Queries sync candidates with status in [Pending, Active, Paused], grouped by organization. Candidacy is gated on campaign_mode (see list_automation_campaigns_by_organization): automated campaigns sync in any of those statuses, while one_off campaigns sync only while Pending (Pending-sync) — the SQL filter is campaign_mode = 'automated' OR (campaign_mode = 'one_off' AND status = 'Pending') 2. For each organization, dispatches sync_recipients_for_org_campaigns.apply_async() — one task per org, deduplicated by QueueOnce on organization_id 3. That task iterates the org's campaigns serially, calling sync_recipients_for_campaign(campaign_id) in-process for each. Serial-per-org execution preserves cross-campaign recipient exclusion (parallel campaigns within an org would race on the read-then-write window in has_recipient_match_disqualification) 4. For each campaign, the worker acquires a distributed lock via campaign_sync_lock() to stay mutually exclusive with manual syncs of the same campaign, then calls recipient_sync.sync_automation_recipients() in app/methods/campaigns/recipient_sync.py

The sync method re-applies the same campaign_mode gate as a safety check (the campaign may have changed since selection, and manual/forced syncs hit this path too): it proceeds only for an automated campaign or a one_off campaign still in Pending, otherwise logs and returns 0. A one_off with no provider_segment_id simply has nothing to sync; only automated campaigns treat a missing segment as a misconfiguration. See docs/models/campaign.md §Mode for the canonical campaign_mode contract.

The sync method: 1. Gets the campaign's mail integration via get_mail_integration() 2. Extracts recipients from the integration (e.g., Klaviyo segment) via extract_integration_recipients() 3. Processes them — deduplicates, applies blocklists, creates DB records via process_automation_recipients()

Key files: - app/celery/automator.py — Beat task and async dispatch - app/methods/campaigns/recipient_sync.py — Sync orchestration - app/core/integrations/klaviyo/ — Klaviyo API client

2. Address Enrichment (Optional)

Some recipients enter as AddressEnrichable instead of Pending — they have an email but no physical address.

Task: collect_and_start_enrichment_batchapp/celery/address_enrichment.py Schedule: 4x daily (1am, 7am, 1pm, 7pm ET) State transition: AddressEnrichable -> AddressEnriching -> Pending

Batch uploads to Faraday for geocoding/address appending. When results return, recipients move to Pending and enter the proofing pipeline.

For full details, see Address Enrichment.

3. Proofing

Two Celery tasks work in tandem using an SQS queue as a buffer:

Staging: stage_proofsapp/celery/proofer.py (line ~68)

Schedule: Every 15 seconds State transition: Pending -> ProofingStaged

  1. Atomically updates up to 50 recipients from Pending -> ProofingStaged using cr.atomically_batch_update_and_retrieve_recipient_statuses()
  2. Enqueues them to SQS via bulk_enqueue_for_proofing()

The atomic update prevents two workers from grabbing the same recipients.

Processing: dispatch_proofing_queueapp/celery/proofer.py (line ~110)

Schedule: Every 7 seconds State transition: ProofingStaged -> Proofing -> Proofed

  1. Spawns multiple process_proofing_queue workers (count = config.QUEUE_DISPATCH_COUNT)
  2. Each worker dequeues up to 10 messages from SQS
  3. For each recipient:
  4. Sets status to Proofing
  5. Calls make_proof(rec, campaign) which calls runner.make_image_proofs()
  6. Image generation: generate_images_v2() in app/methods/generator.py calls the personalization layer (Figma Plugin, Backup Generator, or legacy Pixelixe)
  7. Uploads result to Cloudflare Images for hosting
  8. Sets status to Proofed
  9. Deletes the SQS message

Key files: - app/celery/proofer.py — Beat tasks, staging, dispatch - app/methods/proofs.pymake_proof() orchestration - app/methods/runner.pymake_image_proofs() image generation - app/methods/generator.pygenerate_images_v2() rendering engine integration - app/core/integrations/internal/ — Figma Plugin, Backup Generator, rendering gateway - app/core/integrations/pixelixe.py — Pixelixe API client (deprecated) - app/core/integrations/cloudflare_images.py — Image upload/storage

4. Mailing

Same two-task pattern as proofing:

Staging: stage_mailersapp/celery/mailer.py (line ~120)

Schedule: Every 15 seconds State transition: Proofed -> SendingStaged

  1. Atomically updates up to 50 recipients from Proofed -> SendingStaged
  2. Enqueues to SQS via bulk_enqueue_for_mailing()

Processing: dispatch_mailing_queueapp/celery/mailer.py (line ~158)

Schedule: Every 7 seconds State transition: SendingStaged -> Sending -> Sent

  1. Spawns multiple process_mailing_queue workers
  2. Each worker dequeues up to 10 messages from SQS
  3. For each recipient:
  4. Sets status to Sending
  5. Calls make_mailer(rec, campaign) which calls runner.create_mailer_v2()
  6. Selects printer via generate_printer_strategy() — LOB or IntelliPrint for US, Stannp for UK
  7. Printer creates the mailpiece, returns tracking info
  8. Saves mailpiece via save_mailpiece()
  9. Updates billing meter via Stripe
  10. Submits event to Klaviyo for analytics
  11. Cleans up Cloudflare images
  12. Sets status to Sent

Key files: - app/celery/mailer.py — Beat tasks, staging, dispatch - app/methods/runner.pycreate_mailer_v2() printer orchestration - app/methods/mailpieces.py — Mailpiece creation and status management - app/core/integrations/printers/ — LOB, IntelliPrint, Stannp clients - app/methods/billing.py — Stripe usage meter updates

5. Status Tracking

Task: process_status_updatesapp/celery/mail_statuses.py (line ~13) Schedule: Every 15 seconds State transition: Sent -> Received (and intermediate statuses)

  1. Dequeues up to 50 messages (5 pages x 10) from the mail status SQS queue
  2. Printers push status webhooks to this queue (in transit, delivered, returned, etc.)
  3. Parses each update via parse_status_update_from_sqs_response() in app/methods/mailpieces.py
  4. Bulk updates mailpiece statuses via m.update_mailpiece_statuses()
  5. Deletes processed messages from queue

Key files: - app/celery/mail_statuses.py — Beat task - app/methods/mailpieces.py — Status parsing and updates - app/core/models/mailpieces.py — Mailpiece DB operations

What Happens Next

After recipients reach Sent/Received, the attribution pipeline kicks in separately: - Order Syncing — How orders are synced from Klaviyo/Shopify and attributed back to campaigns

Common Patterns

Across all pipeline stages, you'll see these patterns:

  1. Two-task staging pattern — A staging task atomically grabs a batch and enqueues to SQS. A dispatch task spawns workers to process the queue. This prevents double-processing and enables horizontal scaling.

  2. Distributed lockscampaign_sync_lock() prevents concurrent operations on the same campaign. Uses Redis.

  3. Atomic status updatesatomically_batch_update_and_retrieve_recipient_statuses() uses a single SQL UPDATE...RETURNING to grab and transition recipients without race conditions.

  4. SQS as buffer — All high-throughput work (proofing, mailing, status updates) flows through SQS queues, decoupling the beat scheduler from processing speed.

For details on the Celery infrastructure itself (queues, workers, broker, scheduler, observability, troubleshooting), see Celery Infrastructure.