Decoupling AI Plan Generation from the Combat Loop with Redis Queues
For the first few months of AI Arena's internal testing, matches had a deeply unsatisfying rhythm. The combat engine would fire two AI calls at the start of each planning cycle, then hang — sometimes for 10, sometimes for 25 seconds — before the robots finally moved. Players could see the arena but couldn't interact with it. The match felt frozen.
The root cause was architectural: AI plan generation was synchronous, embedded directly in the combat tick loop. Fixing it required rethinking how the combat engine and AI workers relate to each other.
The Problem: Blocking the Tick Loop#
Our initial architecture looked like this:
# games/robot_combat/combat/engine.py (before)
async def tick(self, state: CombatState) -> CombatState:
# For each robot that needs a new plan...
for robot in state.robots_needing_plans():
plan = await self.ai_service.generate_plan(robot, state)
robot.apply_plan(plan)
# Advance physics, resolve collisions, etc.
state = self._advance_physics(state)
return state
generate_plan makes a real API call to Anthropic, Google, or OpenAI. That call takes anywhere from 5 to 30 seconds depending on model and server load. During that entire window, tick is suspended — no physics, no collision detection, no UI updates. The match is frozen.
With two robots, the worst case was sequential waits of up to 60 seconds per planning cycle. Even with async parallelism, we were blocked until both calls returned.
The fundamental problem is that the combat engine and AI inference operate on completely different timescales. The engine wants to tick at 10 Hz. AI inference takes seconds. These two loops should not be in the same call stack.
The Architecture: Fire-and-Forget with Redis#
The solution is conceptually simple: the combat engine should never wait for AI. It publishes a request and immediately moves on. AI workers process requests asynchronously and make results available. The engine picks up results opportunistically on the next tick.
Combat Engine Redis AI Worker
│ │ │
│── publish plan request ──►│ │
│ │◄── consume request ────────│
│ (continues ticking) │ │
│ │ [calls LLM API...] │
│ │ │
│ │◄── store plan + notify ────│
│◄── pick up plan ──────────│ │
│ │ │
Two-Tier Priority Queue#
Not all plan requests are equal. A robot executing its default strategy can wait. But when a player injects a neurohack (a sabotage prompt that corrupts the enemy AI's next decision) or sends a real-time tactical directive, that input should jump the queue.
We use two Redis lists as a priority queue:
# ai_engine/worker/queue.py
QUEUE_NORMAL = "ai:plan:requests"
QUEUE_PRIORITY = "ai:plan:requests:priority"
class PlanRequestPublisher:
async def enqueue(self, request: PlanRequest) -> None:
payload = request.model_dump_json()
queue = QUEUE_PRIORITY if request.has_player_input else QUEUE_NORMAL
await self.redis.rpush(queue, payload)
class PlanRequestConsumer:
async def consume(self) -> PlanRequest | None:
# BLPOP with priority: check priority queue first, fall back to normal
result = await self.redis.blpop(
[QUEUE_PRIORITY, QUEUE_NORMAL],
timeout=1.0,
)
if result is None:
return None
_, payload = result
return PlanRequest.model_validate_json(payload)
BLPOP with multiple keys returns from the first non-empty key, in order. Priority requests are always drained before normal ones. This gives player interactions sub-second queue pickup while background requests process at full throughput.
Lightweight Requests: Only IDs Travel the Wire#
An important design decision: plan requests carry minimal data. They contain robot IDs, match ID, and any player-supplied input (neurohack text, tactical prompts). They do not carry a snapshot of combat state.
# ai_engine/worker/schemas.py
class PlanRequest(BaseModel):
match_id: str
robot_id: str
requesting_robot_player_id: str
player_prompt: str | None = None # tactical directive
neurohack_text: str | None = None # sabotage injection
enqueued_at: datetime
has_player_input: bool = False
class PlanReadyNotification(BaseModel):
match_id: str
robot_id: str
plan_key: str # Redis key where the plan lives
Workers read live combat state from Redis at processing time. This eliminates a class of stale-data bugs: if the battle state changes between request publication and worker pickup, the worker sees the current state, not the snapshot from when the request was created. In a fast-moving match, that difference can be significant.
The Worker Event Loop#
Workers run as independent processes — separate Docker containers in production, separate processes locally. They consume the queue, call the LLM, and store results:
# ai_engine/worker/runner.py
class AIWorkerRunner:
def __init__(self, config: WorkerConfig):
self._semaphore = asyncio.Semaphore(config.max_concurrent_calls) # default: 4
self._shutdown = asyncio.Event()
async def run(self) -> None:
consumer = PlanRequestConsumer(self.redis)
logger.info("ai_worker_started", extra={"concurrency": self._semaphore._value})
while not self._shutdown.is_set():
request = await consumer.consume()
if request is None:
continue
asyncio.create_task(self._process_with_semaphore(request))
async def _process_with_semaphore(self, request: PlanRequest) -> None:
async with self._semaphore:
await self._process(request)
async def _process(self, request: PlanRequest) -> None:
# Load live state from Redis
state = await self.state_store.get_combat_state(request.match_id)
if state is None:
return # Match ended, discard
robot = state.get_robot(request.robot_id)
ai_client = self.client_factory.get(robot.ai_model)
plan = await ai_client.generate_plan(state, request)
# Store plan with 30s TTL — long enough for the engine to pick up
plan_key = f"ai:plan:{request.match_id}:{request.robot_id}"
await self.redis.setex(plan_key, 30, plan.model_dump_json())
# Notify the combat engine
notification = PlanReadyNotification(
match_id=request.match_id,
robot_id=request.robot_id,
plan_key=plan_key,
)
await self.redis.publish(
f"ai:plan:ready:{request.match_id}",
notification.model_dump_json(),
)
# Emit billing event (non-blocking)
await self._emit_usage_event(request, plan.token_usage)
The semaphore limits concurrent LLM calls to 4. Without this, a spike of simultaneous plan requests (say, at match start when all robots initialize) would flood the LLM API with parallel calls, exhausting rate limits and causing cascading failures.
The Combat Engine: Opportunistic Pickup#
On the engine side, the tick loop is now completely non-blocking:
# games/robot_combat/combat/engine.py (after)
async def tick(self, state: CombatState) -> CombatState:
# Check for any AI plans that arrived since last tick
for robot in state.robots:
plan_key = f"ai:plan:{state.match_id}:{robot.id}"
raw = await self.redis.get(plan_key)
if raw:
plan = RobotPlan.model_validate_json(raw)
robot.apply_plan(plan)
await self.redis.delete(plan_key)
# If robot has no current plan and none is generating, request one
elif not robot.has_active_plan and not await self._is_generating(robot):
await self.plan_publisher.enqueue(
PlanRequest(match_id=state.match_id, robot_id=robot.id)
)
await self._mark_generating(robot)
# Physics always advances regardless of AI state
state = self._advance_physics(state)
return state
Robots with pending plans execute their last known action (or idle safely). Robots whose plans arrive mid-match apply them immediately on the next tick. The physics simulation never waits.
Billing: An Independent Consumer#
AI token usage needs to be tracked and charged back to the player's credit balance. This could have been done synchronously inside the worker — call the billing API, deduct credits, then continue. But billing failures should never interrupt plan generation, and billing DB writes don't need to happen at LLM-call latency.
Instead, workers emit lightweight usage events to a separate queue:
# In the worker, after a plan is generated:
async def _emit_usage_event(self, request: PlanRequest, usage: TokenUsage) -> None:
event = UsageEvent(
match_id=request.match_id,
robot_id=request.robot_id,
player_id=request.requesting_robot_player_id,
model=usage.model,
input_tokens=usage.input_tokens,
output_tokens=usage.output_tokens,
cached_tokens=usage.cached_tokens,
timestamp=datetime.utcnow(),
)
await self.redis.rpush("ai:usage:events", event.model_dump_json())
A separate billing loop — running on its own asyncio task with its own DB connection pool — consumes this queue and writes to the billing DB. Plan generation and billing accounting are completely decoupled. A billing DB hiccup doesn't slow down AI calls. A thundering herd of AI calls doesn't deadlock the billing DB.
Graceful Shutdown#
One operational challenge with async workers is graceful shutdown. If a worker process is SIGTERM'd mid-inference, we don't want to abort the LLM call and leave a robot plan-less in the middle of a match.
async def shutdown(self) -> None:
logger.info("worker_shutdown_requested")
self._shutdown.set()
# Wait up to 60s for in-flight tasks to complete
pending = [t for t in asyncio.all_tasks() if t != asyncio.current_task()]
if pending:
logger.info("waiting_for_inflight_tasks", extra={"count": len(pending)})
await asyncio.wait(pending, timeout=60.0)
logger.info("worker_shutdown_complete")
The SIGTERM handler sets the shutdown event (stopping new request consumption) but lets in-flight _process coroutines complete normally. In practice, LLM calls complete within 30 seconds, well within the 60-second drain window.
The Result#
After deploying the async architecture, match playback became smooth regardless of AI latency. The combat engine ticks at full 10 Hz. Robots may briefly repeat their last action while waiting for a fresh plan, but the arena never freezes.
More importantly, the system scales horizontally. Adding a second worker process doubles throughput. Adding a third halves queue depth under load. The combat engine doesn't know or care how many workers are running — it just reads from Redis.
The two-tier priority queue ensures player inputs feel responsive even under high system load. Neurohacks and tactical prompts jump the queue and reach the AI within milliseconds of submission, giving players a sense of real-time influence over the match.
This architecture pattern — fire-and-forget requests, asynchronous workers, opportunistic result pickup — has become the foundation for every AI-driven feature we've built since.