Recipient Aggregates¶
File: app/core/models/recipient_aggregates.py
Pre-computed daily recipient counts and costs per campaign.
Key fields (RecipientDailyAggregate)¶
- Dimensions:
organization_id,campaign_id,day_bucket,is_holdout - Metrics:
recipient_count(int),summed_price(float)
Computation — two layers¶
The incremental and transition-triggered layers below were introduced to fix a permanent drift class where a recipient's status changed (Pending→Sent, Holdout→Sent, or out of {Sent,Holdout} entirely) more than 2 days after its created_at. The previous upsert-only design never updated the aggregate row for that bucket, leaving permanent stale data until an admin full_refresh=True rebuild.
Layer 1 is the authoritative cadence. Layer 3 is belt-and-braces for the moments users notice drift the most (right after a campaign goes live, resumes, or completes), shortening convergence from ≤15 min to "next slow-queue tick" and rebuilding the full org via full_refresh=True so its own mark-and-sweep covers any orphans that Layer 1 cannot.
(Layer 2 — a nightly orphan-sweep cron — was the third leg of an earlier design and was retired. The labels Layer 1 / Layer 3 are kept rather than renumbered so older commit messages, PR threads, and code comments stay coherent. See git history before commit 230b8022 for what it did.)
One drift class is not automatically covered after this design: a long-Active campaign whose recipients are all hard-deleted (or all status-flipped out of {Sent, Holdout}) more than 2 days after created_at. Layer 1 detects the bucket via updated_at > last_run_at but the recompute returns no rows and the sweep is intentionally gated (so a transient compute glitch can't wipe live data). Layer 3 never fires because the campaign never transitions. Recovery is the admin full_refresh=True rebuild — see runbook below.
Layer 1 — 15-minute watermark task¶
Task: recipient_aggregates.process_recipient_aggregations (schedule=900s, fast queue).
Bucket selection — get_buckets_to_recompute(org_id, inserted_after, inserted_before, updated_since) returns day-buckets where any recipient meets either of:
created_atfalls in the trailing window —[today - SLIDING_WINDOW_DAYS, db_now())(2 days in steady state,FIRST_RUN_SEED_DAYS = 90on a brand-new org).updated_at > progress.last_run_at— catches every status change since the previous tick, no matter how far in the past the bucket sits.
Recompute + sweep — compute_recipient_aggregates_for_buckets(org_id, day_buckets) re-aggregates every selected bucket from scratch via date_trunc('day', cr.created_at)::date = ANY(%s), filtered by status IN ('Sent','Holdout'). The result is upserted, then delete_stale_recipient_aggregates(org_id, before=run_started_at, day_buckets=…) deletes any tuple inside the recomputed buckets that the upsert didn't touch. Today's bucket is excluded from the sweep so a partial mid-day compute can't wipe an in-progress row.
Progress watermark — update_recipient_aggregation_progress records last_run_at = run_started_at (the scan-start, not upsert NOW()) so the next tick's updated_at > last_run_at clause covers anything updated mid-scan. A CASE WHEN EXCLUDED.last_error IS NULL guard preserves the watermark on failure so the retry re-detects everything the failed run was supposed to catch.
Today's bucket is excluded from Layer 1's sweep: a recipient inserted seconds ago, or a status flipping mid-day, would briefly produce a (campaign, today, is_holdout) tuple with zero matches — sweeping it would race the next 15-min tick that's about to repopulate it.
Layer 3 — transition-triggered recalc pipeline (belt-and-braces)¶
Helper: app/methods/campaigns/status_handlers.py::enqueue_aggregate_rebuild_on_status_change(org_id, campaign_id, old_status, new_status).
Three campaign status transitions reliably coincide with bulk recipient mutations users would notice if the dashboards lagged:
- Pending → Active — recipient list finalises; many rows flip into the proofing/sending pipeline within the next few ticks.
- Paused → Active — resume often kicks off a recipient resync.
- Scheduled → Completed — final reconciliation; dashboards switch into completed-campaign mode.
For any of these transitions, the helper dispatches recalc_campaign_pipeline.apply_async(kwargs={…, full_refresh=True}) — a single Celery task (app/celery/recalc_pipeline.py) that runs order-aggregate rebuild → recipient-aggregate rebuild → per-campaign stats cache warm in a fixed sequence. Both backfills reset the Redis reporting cache, so the warm step writes against an empty cache and the next dashboard reader sees fresh stats without a recompute. QueueOnce keyed on (organization_id, campaign_id) coalesces duplicate dispatches; an org-scope and a campaign-scope enqueue do not dedupe each other. The helper short-circuits for every other transition (Active→Paused, Pending→Scheduled, etc.) so it can be called unconditionally from every status-mutation site without the call sites knowing which transitions matter.
Call sites:
app/routes/campaign_routes.py::update_campaign_status— capturesold_statusbefore the in-place mutation, calls the helper afterc.update_campaign(campaign)succeeds. Covers manual user-driven transitions.app/methods/campaigns/status_handlers.py::resume_paused_campaign_if_needed— covers celery-driven scheduled resumes.app/methods/campaigns/status_handlers.py::complete_scheduled_campaign_if_needed— covers celery-driven scheduled completions.
Dispatch failures (broker down, etc.) are caught inside the helper and logged at exception level. Best-effort by design — Layer 1 still runs on cadence and remains authoritative.
Source filter¶
Both layers source from campaign_recipients filtered by status IN ('Sent', 'Holdout') and created_at IS NOT NULL. The Layer 1 bucket-detection query is unfiltered by status — any insertion or update qualifies a bucket for recompute.
Idempotency¶
upsert_daily_recipient_aggregates uses ON CONFLICT (organization_id, campaign_id, day_bucket, is_holdout) DO UPDATE. Layer 1's scoped sweep is keyed on last_updated < run_started_at so the same run can't sweep what it just wrote. Layer 3 dispatches are deduped by the pipeline task's QueueOnce lock on (organization_id, campaign_id).
Reporting integration¶
Both aggregate models feed into methods/reporting_aggregates.py to produce StatsResponse objects. Reduces 100k+ raw scans to indexed lookups, enabling 10-100x faster dashboards.
Admin rebuild¶
Operators can rebuild a single org or one campaign's aggregates out-of-band (e.g. after a rule change or to validate a fix) without shelling into a container.
- Endpoint:
POST /api/v1/admin/aggregations/recipient/rebuild— admin-only, body{organization_id, campaign_id?, full_refresh?}.full_refreshdefaults tofalse(incremental); passtrueto additionally sweep stale rows after the rebuild. Returns202 {message, task_id, ...}. Seeroutes/aggregation_trigger_routes.py. - Task:
recipient_aggregates.rebuild_recipient_aggregates_for_organization(organization_id, campaign_id=None, full_refresh=False)on theslowqueue, deduped byQueueOnceon(organization_id, campaign_id). Delegates tomethods/aggregation_rebuild.backfill_recipient_aggregatesand resets the Redis reporting cache on completion so fresh data is visible immediately — the scheduled 15-min task does not reset the cache. The same method powers thescripts/backfill_recipient_aggregates.pyCLI. - Mark-and-sweep on
full_refresh=True: existing rows stay visible the whole run. The orchestrator capturesdb_now()before the chunk loop, every chunk'sUPSERTrefresheslast_updated = NOW(), and at the enddelete_stale_recipient_aggregates(org_id, campaign_id, before=run_started_at)removes only rows the run did not touch. Readers never see an empty stats window.
Runbook — drift detected¶
If a campaign's reported recipient count disagrees with COUNT(campaign_recipients):
- Confirm the gap — run
app/scripts/compare_recipient_aggregates.py(modeaudit-drift) to see real vs false drift. False drift = org disabled / raw zero / no aggregate data. - Check whether Layer 3 should have fired. If the campaign recently transitioned Pending→Active, Paused→Active, or Scheduled→Completed, a
recalc_campaign_pipelinetask should have hit the slow queue at the transition moment. If it didn't, the broker may have been down — check theFailed to enqueue recalc pipelinelog line for the campaign id. - Wait one tick for the 15-min Layer 1 watermark task to pick up any in-window changes. If the drift persists, it's older than the 2-day window or involves a fully-deleted/all-flipped bucket Layer 1 cannot sweep.
- Trigger an admin full-refresh rebuild. Dispatch
rebuild_recipient_aggregates_for_organization(org_id, full_refresh=True)via the admin endpoint — it rebuilds and sweeps stale tuples in one shot. (Use the recipient-only admin task here, not the Layer 3 pipeline: an isolated recipient-side drift doesn't need the order rebuild + stats warm the pipeline performs.) - Inspect the mechanism per campaign with
app/scripts/inspect_drift_recipient_states.pyandapp/scripts/analyze_drift_buckets.pyto see per-(day_bucket, status) live counts vs aggregate values.
Key files¶
- Model:
core/models/recipient_aggregates.py - Rebuild orchestration:
methods/aggregation_rebuild.py - Scheduled + admin-trigger task:
celery/recipient_aggregates.py - Transition-triggered recalc pipeline (Layer 3):
celery/recalc_pipeline.py - Transition-triggered dispatch helper:
methods/campaigns/status_handlers.py - Admin route:
routes/aggregation_trigger_routes.py - Backfill CLI:
scripts/backfill_recipient_aggregates.py - Drift diagnostic scripts:
scripts/compare_recipient_aggregates.py,scripts/inspect_drift_recipient_states.py,scripts/analyze_drift_buckets.py - Reporting:
methods/reporting_aggregates.py