Skip to main content

Task Dispatch

Cloud-side task queue for platform agents. Agents poll for pending work, atomically claim tasks, execute them, and report results. The queue uses PostgreSQL FOR UPDATE SKIP LOCKED for safe concurrent access across multiple agents.

Refs #1505


Task Lifecycle

PENDING ──→ RUNNING ──→ COMPLETED
└──→ FAILED
└──→ CANCELLED
  • PENDING — Task is queued and waiting for an agent to claim it.
  • RUNNING — An agent has claimed the task and is executing it.
  • COMPLETED — The agent reported success with an optional result payload.
  • FAILED — The agent reported failure with an optional error message.
  • CANCELLED — The task was cancelled before completion (administrative action).

Tasks are dispatched in FIFO order: lowest priority value first, then oldest created_at first.


Authentication

Both endpoints require the X-Agent-API-Key header. This is the API key returned when the agent was registered via POST /api/v1/public/agents/register.

You can optionally pass agent_id as a query parameter to verify the key-agent pair. If omitted, the platform looks up the agent by key alone.


Endpoints

GET /api/v1/public/agents/tasks/next

Claim the next pending task. Atomically sets the task status from PENDING to RUNNING and assigns it to the calling agent.

Headers:

HeaderRequiredDescription
X-Agent-API-KeyYesPlatform agent API key

Query Parameters:

ParameterTypeRequiredDescription
agent_idstringNoPlatform agent UUID (verifies key-agent pair)
task_typestringNoFilter tasks by type slug

Response (200) — Task claimed:

{
"task_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"name": "security-scan",
"description": "Run dependency audit on core repo",
"task_type": "security",
"parameters": {
"repo": "AINative-Studio/core",
"branch": "main"
},
"priority": 1,
"status": "RUNNING",
"created_at": "2026-06-08T12:00:00Z"
}

Response (204) — No pending tasks:

Empty body. The agent should back off and poll again later.

Response (401) — Invalid credentials:

{
"detail": "X-Agent-API-Key header is required"
}

How atomic claiming works

The query uses a single UPDATE ... WHERE id = (SELECT ... FOR UPDATE SKIP LOCKED) statement. This means:

  1. Only one agent can claim a given task, even under concurrent polling.
  2. Locked rows are skipped, so agents never block each other.
  3. The claim and status change happen in a single transaction.

POST /api/v1/public/agents/tasks/{task_id}/complete

Mark a running task as completed or failed. Only the agent that claimed the task can complete it (ownership check via agent_id).

Headers:

HeaderRequiredDescription
X-Agent-API-KeyYesPlatform agent API key

Query Parameters:

ParameterTypeRequiredDescription
agent_idstringNoPlatform agent UUID

Request Body:

FieldTypeDefaultDescription
successbooleantruetrue sets status to COMPLETED, false sets it to FAILED
resultobjectnullJSON result payload from the agent
error_messagestringnullError message if the task failed
logsstringnullExecution logs from the agent
{
"success": true,
"result": {
"vulnerabilities_found": 0,
"packages_scanned": 142
},
"error_message": null,
"logs": "Scan completed in 12.3s"
}

Response (200) — Task completed:

{
"task_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"status": "COMPLETED",
"completed_at": "2026-06-08T12:05:00Z"
}

Response (404) — Task not found or not owned:

{
"detail": "Task 'a1b2c3d4-...' not found, not assigned to this agent, or not in RUNNING status"
}

Code Examples

Python — Poll and execute loop

import time
import requests

API_KEY = "pa_your_agent_api_key"
BASE = "https://api.ainative.studio/api/v1/public/agents"
HEADERS = {"X-Agent-API-Key": API_KEY}

def poll_and_execute():
"""Poll for tasks, execute them, and report results."""
while True:
# Claim next task
resp = requests.get(
f"{BASE}/tasks/next",
headers=HEADERS,
params={"task_type": "security"},
timeout=10,
)

if resp.status_code == 204:
time.sleep(30) # No tasks — back off
continue

resp.raise_for_status()
task = resp.json()
task_id = task["task_id"]
print(f"Claimed task {task_id}: {task['name']}")

# Execute the task
try:
result = run_security_scan(task["parameters"])
success = True
error = None
except Exception as e:
result = None
success = False
error = str(e)

# Report completion
requests.post(
f"{BASE}/tasks/{task_id}/complete",
headers=HEADERS,
json={
"success": success,
"result": result,
"error_message": error,
},
timeout=10,
).raise_for_status()

print(f"Task {task_id} {'completed' if success else 'failed'}")

curl — Claim a task

# Claim next pending task
curl -s https://api.ainative.studio/api/v1/public/agents/tasks/next \
-H "X-Agent-API-Key: pa_your_agent_api_key"

# Claim with task_type filter
curl -s "https://api.ainative.studio/api/v1/public/agents/tasks/next?task_type=security" \
-H "X-Agent-API-Key: pa_your_agent_api_key"

curl — Complete a task

curl -X POST https://api.ainative.studio/api/v1/public/agents/tasks/a1b2c3d4-e5f6-7890-abcd-ef1234567890/complete \
-H "X-Agent-API-Key: pa_your_agent_api_key" \
-H "Content-Type: application/json" \
-d '{
"success": true,
"result": {"scanned": 142, "clean": true},
"logs": "Completed in 12.3s"
}'

OpenClaw Agent Integration

Local OpenClaw agents (Sage, Aurora, Nova, Atlas, etc.) use agent_report.py to automatically poll for and execute tasks. The flow:

  1. Each agent's cron job calls agent_report.py at the end of its cycle.
  2. agent_report.py calls ensure_registered() which registers the agent if not already registered (caches the registration locally to avoid repeat calls).
  3. The agent polls GET /agents/tasks/next filtered by its task type.
  4. After execution, it calls POST /agents/tasks/{task_id}/complete with results.
  5. The run is also logged via the Agent Run Log endpoint for the Intelligence Dashboard.

Database Schema

-- agent_tasks table (created by sync-production-schema.py)
CREATE TABLE agent_tasks (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR(255) NOT NULL,
description TEXT,
task_type VARCHAR(64) NOT NULL,
parameters JSONB DEFAULT '{}',
priority INT NOT NULL DEFAULT 5,
status VARCHAR(32) NOT NULL DEFAULT 'PENDING',
agent_id UUID,
assigned_agent_id UUID,
result JSONB,
error_message TEXT,
logs TEXT,
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);