title: Outbox Approval Protocol tags: [security, pubsub, approval, discord, email] library: cowboy

Outbox Approval Protocol

Cross-service approval flow for broker-owned outbox services. Any outbox service can hold messages for manual approval before sending. Approval notifications and responses route through a configurable channel (default: Discord).

This protocol is implemented in cowboy's agent-bridge package. Bridge services in agent-pkgs consume it without needing to know how approval works internally.

Related: [[Security]]

Flow

              Agent (${cfg.name}-ns)                 Broker services (host namespace)
              ──────────────────────                 ───────────────────────────────

1. Agent composes email
   |
   +-- XADD email:outbox
   |   channel_id: alice@example.com
   |   subject: "Re: meeting"
   |   content: "Sounds good..."
   |
   |                                          2. ${cfg.name}-email-outbox reads stream
   |                                             OutboxService.run() sees
   |                                               approval_required = true
   |                                             |
   |                                             +-- request_approval():
   |                                             |   HSET approval:{uuid}
   |                                             |     status: pending
   |                                             |     source: email
   |                                             |     channel_id: alice@example.com
   |                                             |     content_preview: "Sounds good..."
   |                                             |
   |                                             +-- XADD discord:outbox
   |                                             |     channel_id: {notify_channel}
   |                                             |     content: "Approval needed [email]
   |                                             |               To: alice@example.com
   |                                             |               Preview: Sounds good...
   |                                             |               React to approve/reject"
   |                                             |     approval_id: {uuid}
   |                                             |
   |                                          3. ${cfg.name}-discord-outbox reads stream
   |                                             OutboxService.send() posts to Discord
   |                                             |
   |                                             +-- gets discord_message_id from API
   |                                             +-- sees approval_id in message fields
   |                                             +-- auto-tracks: HSET approval:discord_map
   |                                             |     {discord_msg_id} -> {uuid}
   |                                             |     (handled by base class via
   |                                             |      SendResult.external_id)
   |                                             |
   |                                          4. broker sees message in Discord,
   |                                             reacts with checkmark
   |                                             |
   |                                          5. ${cfg.name}-discord-ingest handles
   |                                             on_raw_reaction_add
   |                                             |
   |                                             +-- calls OutboxService.resolve_approval()
   |                                             |   HGET approval:discord_map {msg_id}
   |                                             |     -> finds {uuid}
   |                                             |   maps emoji -> approved/rejected
   |                                             |   HSET approval:{uuid}
   |                                             |     status: approved
   |                                             |     approver: dylan
   |                                             |
   |                                          6. email-outbox was polling via
   |                                             wait_for_approval()
   |                                             sees status: approved
   |                                             |
   |                                             +-- OutboxService.send()
   |                                             +-- build MIME, DKIM-sign, SMTP send
   |                                             +-- XACK email:outbox
   |                                             +-- HDEL approval:{uuid}

State Machine

Approval state lives in Redis hashes (not streams). Simple key-value, pollable by any service.

approval:{uuid} = {
  status:          pending | approved | rejected | expired
  source:          email | slack | ...
  channel_id:      destination (email address, slack channel, etc.)
  content_preview: first 200 chars of message body
  created_at:      unix timestamp
  approver:        user who approved/rejected (set on resolution)
}

Transitions:

pending --+-- approve reaction --> approved --> send + cleanup
          +-- reject reaction  --> rejected --> discard + notify agent
          +-- timeout          --> expired  --> discard + notify agent

Implementation: cowboy's agent-bridge

The approval protocol is fully implemented in OutboxService (pkgs/agent-bridge/agent_bridge/base.py).

Requesting approval

async def request_approval(self, msg: OutboxMessage) -> bool:
    approval_id = str(uuid4())
    self.redis.hset(f"approval:{approval_id}", mapping={
        "status": "pending",
        "source": self.source,
        "channel_id": msg.channel_id,
        "content_preview": msg.content[:200],
        "created_at": str(int(time.time())),
    })
    self.redis.xadd(f"{self.approval_notify}:outbox", {
        "channel_id": self.approval_notify_channel,
        "content": f"Approval needed [{self.source}]: ...",
        "approval_id": approval_id,
    })
    return await self.wait_for_approval(approval_id)

Waiting for resolution

async def wait_for_approval(self, approval_id: str) -> bool:
    deadline = time.time() + self.approval_timeout
    while time.time() < deadline:
        status = self.redis.hget(f"approval:{approval_id}", "status")
        if status == "approved":
            self.redis.delete(f"approval:{approval_id}")
            return True
        if status in ("rejected", "expired"):
            self.redis.delete(f"approval:{approval_id}")
            return False
        await asyncio.sleep(2)
    # Timeout -> mark expired, clean up
    self.redis.hset(f"approval:{approval_id}", "status", "expired")
    self.redis.delete(f"approval:{approval_id}")
    return False

Resolving approval (static, called by ingest services)

@staticmethod
def resolve_approval(r, source, external_id, status, approver=""):
    map_key = f"approval:{source}_map"
    approval_id = r.hget(map_key, external_id)
    if not approval_id:
        return False
    r.hset(f"approval:{approval_id}", mapping={
        "status": status, "approver": approver,
    })
    r.hdel(map_key, external_id)
    return True

Auto-tracking in the main loop

The run() method handles the notification-tracking handshake automatically:

async def run(self):
    while True:
        messages = self.read_batch()
        for msg in messages:
            # Messages WITH approval_id are system notifications --
            # skip approval for these (they ARE the approval request)
            if self.approval_required and not msg.approval_id:
                approved = await self.request_approval(msg)
                if not approved:
                    self.ack(msg)
                    continue

            result = await self.send(msg)
            if result.ok:
                # Auto-track: if this was a notification with approval_id
                # and send() returned an external_id, store the mapping
                if msg.approval_id and result.external_id:
                    self.redis.hset(
                        f"approval:{self.source}_map",
                        result.external_id,
                        msg.approval_id,
                    )
                self.ack(msg)

This means bridge service authors (agent-pkgs) only need to:

  1. Return SendResult(ok=True, external_id=str(sent_message.id)) from send()
  2. Call OutboxService.resolve_approval() in their ingest handler when a reaction/reply arrives

Package Responsibilities

ConcernPackageLibrary
request_approval(), wait_for_approval()agent-bridgecowboy
resolve_approval() (static)agent-bridgecowboy
Approval hash schemaagent-bridgecowboy
run() loop with auto-trackingagent-bridgecowboy
approval:discord_map managementdiscord-service ingestagent-pkgs
on_raw_reaction_add handlerdiscord-service ingestagent-pkgs
Return external_id from send()discord-service outboxagent-pkgs
DKIM sign + SMTP send after approvalemail-service outboxagent-pkgs

Redis Keys

approval:{uuid}              # HASH -- approval state (bridge-owned schema)
approval:{source}_map        # HASH -- external_msg_id -> approval_id (per-service)

Bridge writes approval:{uuid} with status: pending. Notification service stores mapping in approval:{source}_map. Ingest service resolves by writing status: approved/rejected. Requesting service polls until resolved.

Discord doesn't know about email. Email doesn't know about Discord. They share only the approval:{uuid} hash key format.

Nix Configuration

Approval options are part of the bridge declaration (nix/agents/bridges.nix):

services.agent.bridges.<name>.approval = {
  required = false;           # hold for manual approval?
  notify = "discord";         # which outbox to route approval requests to
  notify_channel = "...";     # channel ID within that outbox
  timeout = 3600;             # auto-reject after N seconds
};

These flow into sources.json via pubsub.nix and are loaded by OutboxService.__init__() via load_config().

Adding a New Approval Channel

To use something other than Discord for approval responses (e.g. Slack, Telegram):

  1. The new service's outbox returns external_id from send() (auto-tracked by base class)
  2. The new service's ingest calls OutboxService.resolve_approval(r, source, ext_id, status, approver)
  3. Set approval.notify = "{service}" in the bridge config
  4. No changes needed to agent-bridge or the requesting service

This is a property of the two-library split: cowboy defines the protocol, agent-pkgs implements the per-platform details.