title: Outbox Approval Protocol tags: [security, pubsub, approval, discord, email] library: cowboy
Outbox Approval Protocol
Cross-service approval flow for broker-owned outbox services. Any outbox service can hold messages for manual approval before sending. Approval notifications and responses route through a configurable channel (default: Discord).
This protocol is implemented in cowboy's agent-bridge package. Bridge services in agent-pkgs consume it without needing to know how approval works internally.
Related: [[Security]]
Flow
Agent (${cfg.name}-ns) Broker services (host namespace)
────────────────────── ───────────────────────────────
1. Agent composes email
|
+-- XADD email:outbox
| channel_id: alice@example.com
| subject: "Re: meeting"
| content: "Sounds good..."
|
| 2. ${cfg.name}-email-outbox reads stream
| OutboxService.run() sees
| approval_required = true
| |
| +-- request_approval():
| | HSET approval:{uuid}
| | status: pending
| | source: email
| | channel_id: alice@example.com
| | content_preview: "Sounds good..."
| |
| +-- XADD discord:outbox
| | channel_id: {notify_channel}
| | content: "Approval needed [email]
| | To: alice@example.com
| | Preview: Sounds good...
| | React to approve/reject"
| | approval_id: {uuid}
| |
| 3. ${cfg.name}-discord-outbox reads stream
| OutboxService.send() posts to Discord
| |
| +-- gets discord_message_id from API
| +-- sees approval_id in message fields
| +-- auto-tracks: HSET approval:discord_map
| | {discord_msg_id} -> {uuid}
| | (handled by base class via
| | SendResult.external_id)
| |
| 4. broker sees message in Discord,
| reacts with checkmark
| |
| 5. ${cfg.name}-discord-ingest handles
| on_raw_reaction_add
| |
| +-- calls OutboxService.resolve_approval()
| | HGET approval:discord_map {msg_id}
| | -> finds {uuid}
| | maps emoji -> approved/rejected
| | HSET approval:{uuid}
| | status: approved
| | approver: dylan
| |
| 6. email-outbox was polling via
| wait_for_approval()
| sees status: approved
| |
| +-- OutboxService.send()
| +-- build MIME, DKIM-sign, SMTP send
| +-- XACK email:outbox
| +-- HDEL approval:{uuid}
State Machine
Approval state lives in Redis hashes (not streams). Simple key-value, pollable by any service.
approval:{uuid} = {
status: pending | approved | rejected | expired
source: email | slack | ...
channel_id: destination (email address, slack channel, etc.)
content_preview: first 200 chars of message body
created_at: unix timestamp
approver: user who approved/rejected (set on resolution)
}
Transitions:
pending --+-- approve reaction --> approved --> send + cleanup
+-- reject reaction --> rejected --> discard + notify agent
+-- timeout --> expired --> discard + notify agent
Implementation: cowboy's agent-bridge
The approval protocol is fully implemented in OutboxService (pkgs/agent-bridge/agent_bridge/base.py).
Requesting approval
async def request_approval(self, msg: OutboxMessage) -> bool:
approval_id = str(uuid4())
self.redis.hset(f"approval:{approval_id}", mapping={
"status": "pending",
"source": self.source,
"channel_id": msg.channel_id,
"content_preview": msg.content[:200],
"created_at": str(int(time.time())),
})
self.redis.xadd(f"{self.approval_notify}:outbox", {
"channel_id": self.approval_notify_channel,
"content": f"Approval needed [{self.source}]: ...",
"approval_id": approval_id,
})
return await self.wait_for_approval(approval_id)
Waiting for resolution
async def wait_for_approval(self, approval_id: str) -> bool:
deadline = time.time() + self.approval_timeout
while time.time() < deadline:
status = self.redis.hget(f"approval:{approval_id}", "status")
if status == "approved":
self.redis.delete(f"approval:{approval_id}")
return True
if status in ("rejected", "expired"):
self.redis.delete(f"approval:{approval_id}")
return False
await asyncio.sleep(2)
# Timeout -> mark expired, clean up
self.redis.hset(f"approval:{approval_id}", "status", "expired")
self.redis.delete(f"approval:{approval_id}")
return False
Resolving approval (static, called by ingest services)
@staticmethod
def resolve_approval(r, source, external_id, status, approver=""):
map_key = f"approval:{source}_map"
approval_id = r.hget(map_key, external_id)
if not approval_id:
return False
r.hset(f"approval:{approval_id}", mapping={
"status": status, "approver": approver,
})
r.hdel(map_key, external_id)
return True
Auto-tracking in the main loop
The run() method handles the notification-tracking handshake automatically:
async def run(self):
while True:
messages = self.read_batch()
for msg in messages:
# Messages WITH approval_id are system notifications --
# skip approval for these (they ARE the approval request)
if self.approval_required and not msg.approval_id:
approved = await self.request_approval(msg)
if not approved:
self.ack(msg)
continue
result = await self.send(msg)
if result.ok:
# Auto-track: if this was a notification with approval_id
# and send() returned an external_id, store the mapping
if msg.approval_id and result.external_id:
self.redis.hset(
f"approval:{self.source}_map",
result.external_id,
msg.approval_id,
)
self.ack(msg)
This means bridge service authors (agent-pkgs) only need to:
- Return
SendResult(ok=True, external_id=str(sent_message.id))fromsend() - Call
OutboxService.resolve_approval()in their ingest handler when a reaction/reply arrives
Package Responsibilities
| Concern | Package | Library |
|---|---|---|
request_approval(), wait_for_approval() | agent-bridge | cowboy |
resolve_approval() (static) | agent-bridge | cowboy |
| Approval hash schema | agent-bridge | cowboy |
run() loop with auto-tracking | agent-bridge | cowboy |
approval:discord_map management | discord-service ingest | agent-pkgs |
on_raw_reaction_add handler | discord-service ingest | agent-pkgs |
Return external_id from send() | discord-service outbox | agent-pkgs |
| DKIM sign + SMTP send after approval | email-service outbox | agent-pkgs |
Redis Keys
approval:{uuid} # HASH -- approval state (bridge-owned schema)
approval:{source}_map # HASH -- external_msg_id -> approval_id (per-service)
Bridge writes approval:{uuid} with status: pending.
Notification service stores mapping in approval:{source}_map.
Ingest service resolves by writing status: approved/rejected.
Requesting service polls until resolved.
Discord doesn't know about email. Email doesn't know about Discord. They share only the approval:{uuid} hash key format.
Nix Configuration
Approval options are part of the bridge declaration (nix/agents/bridges.nix):
services.agent.bridges.<name>.approval = {
required = false; # hold for manual approval?
notify = "discord"; # which outbox to route approval requests to
notify_channel = "..."; # channel ID within that outbox
timeout = 3600; # auto-reject after N seconds
};
These flow into sources.json via pubsub.nix and are loaded by OutboxService.__init__() via load_config().
Adding a New Approval Channel
To use something other than Discord for approval responses (e.g. Slack, Telegram):
- The new service's outbox returns
external_idfromsend()(auto-tracked by base class) - The new service's ingest calls
OutboxService.resolve_approval(r, source, ext_id, status, approver) - Set
approval.notify = "{service}"in the bridge config - No changes needed to
agent-bridgeor the requesting service
This is a property of the two-library split: cowboy defines the protocol, agent-pkgs implements the per-platform details.