Skip to main content

Data Lake API

Internal API for querying the AINative data lakehouse. Agents use these endpoints to access their own performance data, platform metrics, and external data sources stored as Parquet files in MinIO (S3-compatible object storage).


Architecture

External Data / Platform DB
|
v
Celery Beat (scheduled tasks)
|
v
Celery Worker
|
v
PyArrow Table (in-memory)
|
v
Parquet (Snappy compression)
|
v
MinIO S3 (ainative-lake bucket)
|
v
DuckDB (query engine, read_parquet over S3)
|
v
JSON response to caller

Data Flow

  1. Collection/Export -- Celery tasks run on a Beat schedule (or are triggered manually). Each task queries a data source (Postgres tables, external APIs, etc.) and produces a PyArrow Table.
  2. Storage -- The PyArrow Table is serialized to Parquet with Snappy compression and written to MinIO at a deterministic path: raw/{partition}/date={YYYY-MM-DD}/data.parquet.
  3. Query -- The lake query endpoint accepts SQL, rewrites table names to read_parquet('s3://ainative-lake/raw/{partition}/date=*/*.parquet') globs, and executes via DuckDB with the httpfs extension pointed at MinIO.

Bucket Structure

ainative-lake/
├── raw/ # Source-of-truth snapshots (append-only)
│ ├── agent_run_log/
│ ├── mcp_request_logs/
│ ├── platform/ # Platform intelligence layer
│ │ ├── llm_token_usage/
│ │ ├── agent_heartbeats/
│ │ └── ...
│ ├── external/ # External data sources
│ │ ├── slack/
│ │ ├── scc/
│ │ └── ...
│ └── business/ # Business app exports
│ ├── crm/
│ ├── commerce/
│ └── voice/
├── derived/ # Computed aggregates
│ ├── intelligence_scores/
│ ├── velocity_metrics/
│ └── predictions/
└── exports/ # One-off exports (TTL: 7 days)

Endpoints

POST /api/v1/internal/lake/query

Query lake partitions using SQL. Only SELECT queries are permitted. Table names in the SQL are automatically rewritten to read_parquet() calls against MinIO.

Request

{
"sql": "SELECT agent_id, COUNT(*) as runs FROM agent_run_log GROUP BY agent_id",
"limit": 100
}
FieldTypeRequiredDefaultConstraints
sqlstringyes--10-4000 chars, SELECT only
limitintno1001-1000

Response

{
"columns": ["agent_id", "runs"],
"rows": [
["aurora", 142],
["sage", 98]
],
"row_count": 2,
"source": "lake",
"partition": "agent_run_log"
}
FieldTypeDescription
columnslist[str]Column names from the result set
rowslist[list]Row data as nested arrays
row_countintNumber of rows returned
sourcestring"lake" on success, "unavailable" if query failed
partitionstring/nullThe matched partition name, or null

SQL Table Name Mapping

In your SQL, use the partition name with slashes replaced by underscores. The endpoint rewrites these to S3 glob paths automatically.

Write in SQLResolves to
agent_run_logread_parquet('s3://ainative-lake/raw/agent_run_log/date=*/*.parquet')
platform_llm_token_usageread_parquet('s3://ainative-lake/raw/platform/llm_token_usage/date=*/*.parquet')
external_scc_edd_laborread_parquet('s3://ainative-lake/raw/external/scc/edd_labor/date=*/*.parquet')

Example Queries

-- Agent activity over 30 days
SELECT agent_id, status, COUNT(*) as cnt
FROM agent_run_log
GROUP BY agent_id, status
ORDER BY cnt DESC

-- LLM token costs by model
SELECT model, SUM(total_tokens) as tokens, SUM(cost_usd) as cost
FROM platform_llm_token_usage
GROUP BY model
ORDER BY cost DESC

-- SCC labor market data
SELECT * FROM external_scc_edd_labor LIMIT 50

If a LIMIT clause is not present in the SQL, the endpoint appends LIMIT {limit} from the request body (default 100, max 1000).


GET /api/v1/internal/lake/tool-definition

Returns the LAKE_QUERY_TOOL JSON schema for registering the lake query capability with ZeroMemory or any OpenAI-compatible tool-use system.

Headers

HeaderRequiredDescription
X-Internal-API-KeyyesMust match AINATIVE_INTERNAL_API_KEY env var

Response

Returns the tool definition JSON that agents use to discover available lake queries and their parameters.


POST /api/v1/internal/tasks/trigger

Manually trigger a whitelisted Celery lake task without waiting for the Beat schedule. Admin-only.

Headers

HeaderRequiredDescription
X-Internal-API-KeyyesMust match AINATIVE_INTERNAL_API_KEY env var

Request

{
"task": "lake.export_agent_run_log",
"kwargs": {}
}
FieldTypeRequiredDescription
taskstringyesTask name from the allowed list
kwargsobjectnoOptional keyword arguments for the task

Response

{
"task": "lake.export_agent_run_log",
"task_id": "a1b2c3d4-...",
"status": "queued"
}

Allowed Partitions

These are the only partition names that can appear in lake SQL queries. Any other table name will not be rewritten and will cause a DuckDB error.

Core Platform

PartitionDescription
agent_run_logDaily agent execution logs
mcp_request_logsAPI request logs from Kong
rlhf_signalsReinforcement learning feedback
billing_eventsCredit transactions and billing
package_statsnpm + PyPI download stats
github_eventsMulti-repo GitHub activity
stripe_eventsStripe payment events
cli_telemetryCody CLI usage telemetry

Platform Intelligence Layer

PartitionDescription
platform/llm_token_usageLLM token consumption and costs
platform/agent_heartbeatsAgent heartbeat executions
platform/agent_resource_usageAgent cloud resource metering
platform/billing_aggregationsMCP billing aggregations
platform/agent_instancesAgent instance registry
platform/agent_tasksAgent task queue
platform/agent_workloadAgent workload distribution
platform/agent_tracesAgent execution traces
platform/a2a_interactionsAgent-to-agent communications
platform/developer_earningsEcho developer earnings
platform/developer_payoutsEcho developer payouts
platform/dedicated_deploymentsDedicated deployment snapshots
platform/kong_metricsKong gateway metrics
platform/multimodal_usageImage/audio/video API usage
platform/mcp_usageMCP tool usage logs
platform/soc2_evidenceSOC2 compliance evidence

Platform Extended

PartitionDescription
platform/ai_request_logAI request log
platform/chat_completion_usageChat completion usage
platform/agent_swarm_sessionsAgent swarm session data
platform/agent_swarm_messagesAgent swarm messages
platform/agent_swarm_feedbackAgent swarm feedback
platform/audio_usageAudio API usage
platform/ai_feedbackAI feedback signals
platform/agent_audit_logsAgent audit logs
platform/zerodb_rlhfZeroDB RLHF signals
platform/user_onboardingUser onboarding funnels
platform/product_eventsProduct analytics events
platform/coding_sessionsSandbox coding sessions
platform/project_executionsProject code executions
platform/video_viewsVideo view analytics
platform/live_streamsLive stream sessions
platform/zeromemory_statsZeroMemory usage statistics
platform/huggingface_usageHuggingFace model usage
platform/tool_execution_logsTool execution logs
platform/webhook_deliveriesWebhook delivery logs
platform/prompt_usagePrompt template usage
platform/search_queriesSearch query logs
platform/agent_performance_metricsAgent performance metrics
platform/agent_quality_trendsAgent quality trend data
platform/notification_logsNotification delivery logs
platform/social_analyticsSocial media analytics
platform/sprint_plansSprint planning data
platform/swarm_quotasSwarm quota snapshots

Predictions

PartitionDescription
derived/predictions/request_volume_forecastRequest volume forecasts
derived/predictions/agent_anomaliesAgent anomaly detection
derived/predictions/token_cost_forecastToken cost projections

External Data

PartitionDescription
external/slackSlack channel statistics
external/emailEmail delivery events
external/scc/edd_laborCA EDD labor market data
external/scc/demographicsCensus demographics
external/scc/fred_indicatorsFRED economic indicators
external/scc/federal_awardsFederal awards (USAspending)
external/scc/census_businessCensus business patterns
external/scc/sec_form_dSEC Form D filings
external/scc/sec_filingsSEC public filings
external/scc/sec_xbrlSEC XBRL financial data
external/scc/irs_soiIRS Statistics of Income
external/scc/cdtfa_taxable_salesCDTFA taxable sales data
external/scc/cdtfa_permitsCDTFA seller permits
external/luma/eventsLuma event data
external/dealroom/sessionsDealroom session data

Business App Exports

PartitionDescription
business/crm/dealsZeroPipeline CRM deals
business/crm/activitiesZeroPipeline CRM activities
business/crm/pipelineZeroPipeline pipeline stages
business/commerce/ordersZeroCommerce orders
business/commerce/productsZeroCommerce products
business/voice/callsZeroVoice call records
business/voice/smsZeroVoice SMS records

Chatwoot

PartitionDescription
chatwoot/conversationsSupport conversation exports

Allowed Trigger Tasks

The task trigger endpoint accepts only whitelisted task names. The full list is organized by category:

SCC Economic Data Collection

  • lake.collect_scc_edd_labor
  • lake.collect_scc_census_demographics
  • lake.collect_scc_census_business
  • lake.collect_scc_fred_indicators
  • lake.collect_scc_federal_awards
  • lake.collect_scc_irs_soi
  • lake.collect_scc_cdtfa_taxable_sales
  • lake.collect_scc_cdtfa_permits

SEC EDGAR

  • lake.collect_sec_form_d / lake.collect_sec_form_d_backfill
  • lake.collect_sec_public_filings / lake.collect_sec_public_filings_backfill
  • lake.collect_sec_xbrl_financials

Platform Internal Exports

  • lake.export_agent_run_log
  • lake.export_mcp_request_logs
  • lake.export_billing_events
  • lake.export_rlhf_signals
  • lake.export_cli_telemetry
  • lake.export_stripe_events

Package / Repo Stats

  • lake.collect_package_stats
  • lake.collect_github_events

Communication Signals

  • lake.collect_slack_channel_stats
  • lake.collect_email_delivery_events

Intelligence Layer

  • lake.export_llm_token_usage
  • lake.export_agent_heartbeat_executions
  • lake.export_agent_resource_usage
  • lake.export_mcp_billing_aggregations
  • lake.export_agent_instances
  • lake.export_agent_tasks
  • lake.export_agent_workload
  • lake.export_agent_traces
  • lake.export_a2a_interactions
  • lake.export_developer_earnings
  • lake.export_developer_payouts
  • lake.export_dedicated_deployments
  • lake.export_kong_metrics
  • lake.export_multimodal_usage
  • lake.export_mcp_usage_logs
  • lake.export_soc2_evidence

Business App Exports

  • lake.export_zerocommerce_orders / lake.export_zerocommerce_products
  • lake.export_zeropipeline_deals / lake.export_zeropipeline_activities / lake.export_zeropipeline_stages
  • lake.export_zerovoice_calls / lake.export_zerovoice_sms
  • lake.export_opencap_stakeholders / lake.export_opencap_grants / lake.export_opencap_share_classes
  • lake.export_zeroinvoice_invoices / lake.export_zeroinvoice_payments

Predictive Analytics

  • lake.predict_request_volume_forecast
  • lake.predict_agent_anomalies
  • lake.predict_token_cost_forecast

Backfills

  • lake.backfill_mcp_request_logs
  • lake.backfill_llm_token_usage
  • lake.backfill_chatwoot_conversations
  • lake.backfill_audio_model_usage
  • lake.backfill_agent_audit_logs

Bucket Health

  • lake.ensure_bucket

For the complete list, see _ALLOWED_TASKS in src/backend/app/api/internal/task_trigger.py.


How to Add a New Export Task

Follow this pattern to add a new data source to the lake:

1. Create the Celery task

Add a new task in src/backend/app/tasks/ (or in an existing lake task module). The task should:

  • Query the data source for the target date window
  • Build a PyArrow Table from the results
  • Call lake_storage.write_parquet(table, partition_name, run_date)
from app.services.lake.lake_storage import write_parquet
import pyarrow as pa

@celery_app.task(name="lake.export_my_new_data")
def export_my_new_data(**kwargs):
# 1. Query your data source
rows = fetch_data_from_source()

# 2. Build PyArrow table
table = pa.table({
"id": [r["id"] for r in rows],
"value": [r["value"] for r in rows],
"created_at": [r["created_at"] for r in rows],
})

# 3. Write to lake
result = write_parquet(table, "my_new_partition")
return result

2. Register the partition

Add your partition name to _ALLOWED_PARTITIONS in src/backend/app/api/internal/lake_query.py:

_ALLOWED_PARTITIONS = {
...
"my_new_partition",
}

3. Whitelist the task for manual triggering

Add the task name to _ALLOWED_TASKS in src/backend/app/api/internal/task_trigger.py:

_ALLOWED_TASKS = {
...
"lake.export_my_new_data",
}

4. Schedule the task (optional)

Add the task to the Celery Beat schedule for recurring execution.

5. Test

Trigger the task manually to verify end-to-end:

curl -X POST https://ainative-browser-builder.up.railway.app/api/v1/internal/tasks/trigger \
-H "Content-Type: application/json" \
-H "X-Internal-API-Key: $AINATIVE_INTERNAL_API_KEY" \
-d '{"task": "lake.export_my_new_data"}'

Then query the data:

curl -X POST https://ainative-browser-builder.up.railway.app/api/v1/internal/lake/query \
-H "Content-Type: application/json" \
-d '{"sql": "SELECT * FROM my_new_partition", "limit": 10}'

Security

SQL Validation

All queries are validated before execution. The following patterns are blocked (case-insensitive):

Blocked PatternReason
DROPPrevents schema destruction
DELETEPrevents data deletion
INSERTPrevents data injection
UPDATEPrevents data modification
CREATEPrevents schema creation
ALTERPrevents schema modification
TRUNCATEPrevents table truncation
EXECPrevents command execution
SYSTEMPrevents system calls
read_parquet(Prevents direct parquet path injection
..Prevents path traversal
file://Prevents local file access
http:// / https://Prevents remote file access

Only SELECT queries are allowed. Any query missing a SELECT keyword is rejected.

Row Limits

  • Default: 100 rows per query
  • Maximum: 1000 rows per query
  • If no LIMIT clause is present in the SQL, one is appended automatically

Rate Limiting

  • Maximum 10 lake query calls per agent per cron run (enforced via request context)
  • Task trigger endpoint requires X-Internal-API-Key header

Authentication

  • Lake query endpoint: accessible to internal agents
  • Task trigger endpoint: requires X-Internal-API-Key header matching the AINATIVE_INTERNAL_API_KEY environment variable
  • Tool definition endpoint: requires X-Internal-API-Key header

Environment Variables

VariableDescriptionExample
MINIO_URLMinIO endpoint (primary)https://minio.example.com
MINIO_ENDPOINTMinIO endpoint (fallback)minio:9000
MINIO_ACCESS_KEYMinIO access key--
MINIO_SECRET_KEYMinIO secret key--
MINIO_SECUREUse SSL (true/false)true
MINIO_REGIONMinIO regionus-east-1
AINATIVE_INTERNAL_API_KEYInternal API key for admin endpoints--

Key Source Files

FilePurpose
src/backend/app/api/internal/lake_query.pyLake query endpoint and SQL validation
src/backend/app/api/internal/task_trigger.pyManual task trigger endpoint
src/backend/app/services/lake/lake_storage.pyMinIO storage layer (write_parquet, ensure_bucket)
src/backend/app/services/lake/lake_tool.pyLAKE_QUERY_TOOL schema for agent tool registration