Task Dispatch
Cloud-side task queue for platform agents. Agents poll for pending work, atomically claim tasks, execute them, and report results. The queue uses PostgreSQL FOR UPDATE SKIP LOCKED for safe concurrent access across multiple agents.
Refs #1505
Task Lifecycle
PENDING ──→ RUNNING ──→ COMPLETED
└──→ FAILED
└──→ CANCELLED
- PENDING — Task is queued and waiting for an agent to claim it.
- RUNNING — An agent has claimed the task and is executing it.
- COMPLETED — The agent reported success with an optional result payload.
- FAILED — The agent reported failure with an optional error message.
- CANCELLED — The task was cancelled before completion (administrative action).
Tasks are dispatched in FIFO order: lowest priority value first, then oldest created_at first.
Authentication
Both endpoints require the X-Agent-API-Key header. This is the API key returned when the agent was registered via POST /api/v1/public/agents/register.
You can optionally pass agent_id as a query parameter to verify the key-agent pair. If omitted, the platform looks up the agent by key alone.
Endpoints
GET /api/v1/public/agents/tasks/next
Claim the next pending task. Atomically sets the task status from PENDING to RUNNING and assigns it to the calling agent.
Headers:
| Header | Required | Description |
|---|---|---|
X-Agent-API-Key | Yes | Platform agent API key |
Query Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
agent_id | string | No | Platform agent UUID (verifies key-agent pair) |
task_type | string | No | Filter tasks by type slug |
Response (200) — Task claimed:
{
"task_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"name": "security-scan",
"description": "Run dependency audit on core repo",
"task_type": "security",
"parameters": {
"repo": "AINative-Studio/core",
"branch": "main"
},
"priority": 1,
"status": "RUNNING",
"created_at": "2026-06-08T12:00:00Z"
}
Response (204) — No pending tasks:
Empty body. The agent should back off and poll again later.
Response (401) — Invalid credentials:
{
"detail": "X-Agent-API-Key header is required"
}
How atomic claiming works
The query uses a single UPDATE ... WHERE id = (SELECT ... FOR UPDATE SKIP LOCKED) statement. This means:
- Only one agent can claim a given task, even under concurrent polling.
- Locked rows are skipped, so agents never block each other.
- The claim and status change happen in a single transaction.
POST /api/v1/public/agents/tasks/{task_id}/complete
Mark a running task as completed or failed. Only the agent that claimed the task can complete it (ownership check via agent_id).
Headers:
| Header | Required | Description |
|---|---|---|
X-Agent-API-Key | Yes | Platform agent API key |
Query Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
agent_id | string | No | Platform agent UUID |
Request Body:
| Field | Type | Default | Description |
|---|---|---|---|
success | boolean | true | true sets status to COMPLETED, false sets it to FAILED |
result | object | null | JSON result payload from the agent |
error_message | string | null | Error message if the task failed |
logs | string | null | Execution logs from the agent |
{
"success": true,
"result": {
"vulnerabilities_found": 0,
"packages_scanned": 142
},
"error_message": null,
"logs": "Scan completed in 12.3s"
}
Response (200) — Task completed:
{
"task_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"status": "COMPLETED",
"completed_at": "2026-06-08T12:05:00Z"
}
Response (404) — Task not found or not owned:
{
"detail": "Task 'a1b2c3d4-...' not found, not assigned to this agent, or not in RUNNING status"
}
Code Examples
Python — Poll and execute loop
import time
import requests
API_KEY = "pa_your_agent_api_key"
BASE = "https://api.ainative.studio/api/v1/public/agents"
HEADERS = {"X-Agent-API-Key": API_KEY}
def poll_and_execute():
"""Poll for tasks, execute them, and report results."""
while True:
# Claim next task
resp = requests.get(
f"{BASE}/tasks/next",
headers=HEADERS,
params={"task_type": "security"},
timeout=10,
)
if resp.status_code == 204:
time.sleep(30) # No tasks — back off
continue
resp.raise_for_status()
task = resp.json()
task_id = task["task_id"]
print(f"Claimed task {task_id}: {task['name']}")
# Execute the task
try:
result = run_security_scan(task["parameters"])
success = True
error = None
except Exception as e:
result = None
success = False
error = str(e)
# Report completion
requests.post(
f"{BASE}/tasks/{task_id}/complete",
headers=HEADERS,
json={
"success": success,
"result": result,
"error_message": error,
},
timeout=10,
).raise_for_status()
print(f"Task {task_id} {'completed' if success else 'failed'}")
curl — Claim a task
# Claim next pending task
curl -s https://api.ainative.studio/api/v1/public/agents/tasks/next \
-H "X-Agent-API-Key: pa_your_agent_api_key"
# Claim with task_type filter
curl -s "https://api.ainative.studio/api/v1/public/agents/tasks/next?task_type=security" \
-H "X-Agent-API-Key: pa_your_agent_api_key"
curl — Complete a task
curl -X POST https://api.ainative.studio/api/v1/public/agents/tasks/a1b2c3d4-e5f6-7890-abcd-ef1234567890/complete \
-H "X-Agent-API-Key: pa_your_agent_api_key" \
-H "Content-Type: application/json" \
-d '{
"success": true,
"result": {"scanned": 142, "clean": true},
"logs": "Completed in 12.3s"
}'
OpenClaw Agent Integration
Local OpenClaw agents (Sage, Aurora, Nova, Atlas, etc.) use agent_report.py to automatically poll for and execute tasks. The flow:
- Each agent's cron job calls
agent_report.pyat the end of its cycle. agent_report.pycallsensure_registered()which registers the agent if not already registered (caches the registration locally to avoid repeat calls).- The agent polls
GET /agents/tasks/nextfiltered by its task type. - After execution, it calls
POST /agents/tasks/{task_id}/completewith results. - The run is also logged via the Agent Run Log endpoint for the Intelligence Dashboard.
Database Schema
-- agent_tasks table (created by sync-production-schema.py)
CREATE TABLE agent_tasks (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR(255) NOT NULL,
description TEXT,
task_type VARCHAR(64) NOT NULL,
parameters JSONB DEFAULT '{}',
priority INT NOT NULL DEFAULT 5,
status VARCHAR(32) NOT NULL DEFAULT 'PENDING',
agent_id UUID,
assigned_agent_id UUID,
result JSONB,
error_message TEXT,
logs TEXT,
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
Related Pages
- Agent Registry — Register agents and get API keys
- Agent Run Log — Execution logging and RLHF scoring
- Peer Swarm — Heartbeat and swarm coordination
- Platform Intelligence — Dashboard metrics from task completions