Data Lake API
Internal API for querying the AINative data lakehouse. Agents use these endpoints to access their own performance data, platform metrics, and external data sources stored as Parquet files in MinIO (S3-compatible object storage).
Architecture
External Data / Platform DB
|
v
Celery Beat (scheduled tasks)
|
v
Celery Worker
|
v
PyArrow Table (in-memory)
|
v
Parquet (Snappy compression)
|
v
MinIO S3 (ainative-lake bucket)
|
v
DuckDB (query engine, read_parquet over S3)
|
v
JSON response to caller
Data Flow
- Collection/Export -- Celery tasks run on a Beat schedule (or are triggered manually). Each task queries a data source (Postgres tables, external APIs, etc.) and produces a PyArrow Table.
- Storage -- The PyArrow Table is serialized to Parquet with Snappy compression and written to MinIO at a deterministic path:
raw/{partition}/date={YYYY-MM-DD}/data.parquet. - Query -- The lake query endpoint accepts SQL, rewrites table names to
read_parquet('s3://ainative-lake/raw/{partition}/date=*/*.parquet')globs, and executes via DuckDB with the httpfs extension pointed at MinIO.
Bucket Structure
ainative-lake/
├── raw/ # Source-of-truth snapshots (append-only)
│ ├── agent_run_log/
│ ├── mcp_request_logs/
│ ├── platform/ # Platform intelligence layer
│ │ ├── llm_token_usage/
│ │ ├── agent_heartbeats/
│ │ └── ...
│ ├── external/ # External data sources
│ │ ├── slack/
│ │ ├── scc/
│ │ └── ...
│ └── business/ # Business app exports
│ ├── crm/
│ ├── commerce/
│ └── voice/
├── derived/ # Computed aggregates
│ ├── intelligence_scores/
│ ├── velocity_metrics/
│ └── predictions/
└── exports/ # One-off exports (TTL: 7 days)
Endpoints
POST /api/v1/internal/lake/query
Query lake partitions using SQL. Only SELECT queries are permitted. Table names in the SQL are automatically rewritten to read_parquet() calls against MinIO.
Request
{
"sql": "SELECT agent_id, COUNT(*) as runs FROM agent_run_log GROUP BY agent_id",
"limit": 100
}
| Field | Type | Required | Default | Constraints |
|---|---|---|---|---|
| sql | string | yes | -- | 10-4000 chars, SELECT only |
| limit | int | no | 100 | 1-1000 |
Response
{
"columns": ["agent_id", "runs"],
"rows": [
["aurora", 142],
["sage", 98]
],
"row_count": 2,
"source": "lake",
"partition": "agent_run_log"
}
| Field | Type | Description |
|---|---|---|
| columns | list[str] | Column names from the result set |
| rows | list[list] | Row data as nested arrays |
| row_count | int | Number of rows returned |
| source | string | "lake" on success, "unavailable" if query failed |
| partition | string/null | The matched partition name, or null |
SQL Table Name Mapping
In your SQL, use the partition name with slashes replaced by underscores. The endpoint rewrites these to S3 glob paths automatically.
| Write in SQL | Resolves to |
|---|---|
agent_run_log | read_parquet('s3://ainative-lake/raw/agent_run_log/date=*/*.parquet') |
platform_llm_token_usage | read_parquet('s3://ainative-lake/raw/platform/llm_token_usage/date=*/*.parquet') |
external_scc_edd_labor | read_parquet('s3://ainative-lake/raw/external/scc/edd_labor/date=*/*.parquet') |
Example Queries
-- Agent activity over 30 days
SELECT agent_id, status, COUNT(*) as cnt
FROM agent_run_log
GROUP BY agent_id, status
ORDER BY cnt DESC
-- LLM token costs by model
SELECT model, SUM(total_tokens) as tokens, SUM(cost_usd) as cost
FROM platform_llm_token_usage
GROUP BY model
ORDER BY cost DESC
-- SCC labor market data
SELECT * FROM external_scc_edd_labor LIMIT 50
If a LIMIT clause is not present in the SQL, the endpoint appends LIMIT {limit} from the request body (default 100, max 1000).
GET /api/v1/internal/lake/tool-definition
Returns the LAKE_QUERY_TOOL JSON schema for registering the lake query capability with ZeroMemory or any OpenAI-compatible tool-use system.
Headers
| Header | Required | Description |
|---|---|---|
| X-Internal-API-Key | yes | Must match AINATIVE_INTERNAL_API_KEY env var |
Response
Returns the tool definition JSON that agents use to discover available lake queries and their parameters.
POST /api/v1/internal/tasks/trigger
Manually trigger a whitelisted Celery lake task without waiting for the Beat schedule. Admin-only.
Headers
| Header | Required | Description |
|---|---|---|
| X-Internal-API-Key | yes | Must match AINATIVE_INTERNAL_API_KEY env var |
Request
{
"task": "lake.export_agent_run_log",
"kwargs": {}
}
| Field | Type | Required | Description |
|---|---|---|---|
| task | string | yes | Task name from the allowed list |
| kwargs | object | no | Optional keyword arguments for the task |
Response
{
"task": "lake.export_agent_run_log",
"task_id": "a1b2c3d4-...",
"status": "queued"
}
Allowed Partitions
These are the only partition names that can appear in lake SQL queries. Any other table name will not be rewritten and will cause a DuckDB error.
Core Platform
| Partition | Description |
|---|---|
| agent_run_log | Daily agent execution logs |
| mcp_request_logs | API request logs from Kong |
| rlhf_signals | Reinforcement learning feedback |
| billing_events | Credit transactions and billing |
| package_stats | npm + PyPI download stats |
| github_events | Multi-repo GitHub activity |
| stripe_events | Stripe payment events |
| cli_telemetry | Cody CLI usage telemetry |
Platform Intelligence Layer
| Partition | Description |
|---|---|
| platform/llm_token_usage | LLM token consumption and costs |
| platform/agent_heartbeats | Agent heartbeat executions |
| platform/agent_resource_usage | Agent cloud resource metering |
| platform/billing_aggregations | MCP billing aggregations |
| platform/agent_instances | Agent instance registry |
| platform/agent_tasks | Agent task queue |
| platform/agent_workload | Agent workload distribution |
| platform/agent_traces | Agent execution traces |
| platform/a2a_interactions | Agent-to-agent communications |
| platform/developer_earnings | Echo developer earnings |
| platform/developer_payouts | Echo developer payouts |
| platform/dedicated_deployments | Dedicated deployment snapshots |
| platform/kong_metrics | Kong gateway metrics |
| platform/multimodal_usage | Image/audio/video API usage |
| platform/mcp_usage | MCP tool usage logs |
| platform/soc2_evidence | SOC2 compliance evidence |
Platform Extended
| Partition | Description |
|---|---|
| platform/ai_request_log | AI request log |
| platform/chat_completion_usage | Chat completion usage |
| platform/agent_swarm_sessions | Agent swarm session data |
| platform/agent_swarm_messages | Agent swarm messages |
| platform/agent_swarm_feedback | Agent swarm feedback |
| platform/audio_usage | Audio API usage |
| platform/ai_feedback | AI feedback signals |
| platform/agent_audit_logs | Agent audit logs |
| platform/zerodb_rlhf | ZeroDB RLHF signals |
| platform/user_onboarding | User onboarding funnels |
| platform/product_events | Product analytics events |
| platform/coding_sessions | Sandbox coding sessions |
| platform/project_executions | Project code executions |
| platform/video_views | Video view analytics |
| platform/live_streams | Live stream sessions |
| platform/zeromemory_stats | ZeroMemory usage statistics |
| platform/huggingface_usage | HuggingFace model usage |
| platform/tool_execution_logs | Tool execution logs |
| platform/webhook_deliveries | Webhook delivery logs |
| platform/prompt_usage | Prompt template usage |
| platform/search_queries | Search query logs |
| platform/agent_performance_metrics | Agent performance metrics |
| platform/agent_quality_trends | Agent quality trend data |
| platform/notification_logs | Notification delivery logs |
| platform/social_analytics | Social media analytics |
| platform/sprint_plans | Sprint planning data |
| platform/swarm_quotas | Swarm quota snapshots |
Predictions
| Partition | Description |
|---|---|
| derived/predictions/request_volume_forecast | Request volume forecasts |
| derived/predictions/agent_anomalies | Agent anomaly detection |
| derived/predictions/token_cost_forecast | Token cost projections |
External Data
| Partition | Description |
|---|---|
| external/slack | Slack channel statistics |
| external/email | Email delivery events |
| external/scc/edd_labor | CA EDD labor market data |
| external/scc/demographics | Census demographics |
| external/scc/fred_indicators | FRED economic indicators |
| external/scc/federal_awards | Federal awards (USAspending) |
| external/scc/census_business | Census business patterns |
| external/scc/sec_form_d | SEC Form D filings |
| external/scc/sec_filings | SEC public filings |
| external/scc/sec_xbrl | SEC XBRL financial data |
| external/scc/irs_soi | IRS Statistics of Income |
| external/scc/cdtfa_taxable_sales | CDTFA taxable sales data |
| external/scc/cdtfa_permits | CDTFA seller permits |
| external/luma/events | Luma event data |
| external/dealroom/sessions | Dealroom session data |
Business App Exports
| Partition | Description |
|---|---|
| business/crm/deals | ZeroPipeline CRM deals |
| business/crm/activities | ZeroPipeline CRM activities |
| business/crm/pipeline | ZeroPipeline pipeline stages |
| business/commerce/orders | ZeroCommerce orders |
| business/commerce/products | ZeroCommerce products |
| business/voice/calls | ZeroVoice call records |
| business/voice/sms | ZeroVoice SMS records |
Chatwoot
| Partition | Description |
|---|---|
| chatwoot/conversations | Support conversation exports |
Allowed Trigger Tasks
The task trigger endpoint accepts only whitelisted task names. The full list is organized by category:
SCC Economic Data Collection
lake.collect_scc_edd_laborlake.collect_scc_census_demographicslake.collect_scc_census_businesslake.collect_scc_fred_indicatorslake.collect_scc_federal_awardslake.collect_scc_irs_soilake.collect_scc_cdtfa_taxable_saleslake.collect_scc_cdtfa_permits
SEC EDGAR
lake.collect_sec_form_d/lake.collect_sec_form_d_backfilllake.collect_sec_public_filings/lake.collect_sec_public_filings_backfilllake.collect_sec_xbrl_financials
Platform Internal Exports
lake.export_agent_run_loglake.export_mcp_request_logslake.export_billing_eventslake.export_rlhf_signalslake.export_cli_telemetrylake.export_stripe_events
Package / Repo Stats
lake.collect_package_statslake.collect_github_events
Communication Signals
lake.collect_slack_channel_statslake.collect_email_delivery_events
Intelligence Layer
lake.export_llm_token_usagelake.export_agent_heartbeat_executionslake.export_agent_resource_usagelake.export_mcp_billing_aggregationslake.export_agent_instanceslake.export_agent_taskslake.export_agent_workloadlake.export_agent_traceslake.export_a2a_interactionslake.export_developer_earningslake.export_developer_payoutslake.export_dedicated_deploymentslake.export_kong_metricslake.export_multimodal_usagelake.export_mcp_usage_logslake.export_soc2_evidence
Business App Exports
lake.export_zerocommerce_orders/lake.export_zerocommerce_productslake.export_zeropipeline_deals/lake.export_zeropipeline_activities/lake.export_zeropipeline_stageslake.export_zerovoice_calls/lake.export_zerovoice_smslake.export_opencap_stakeholders/lake.export_opencap_grants/lake.export_opencap_share_classeslake.export_zeroinvoice_invoices/lake.export_zeroinvoice_payments
Predictive Analytics
lake.predict_request_volume_forecastlake.predict_agent_anomalieslake.predict_token_cost_forecast
Backfills
lake.backfill_mcp_request_logslake.backfill_llm_token_usagelake.backfill_chatwoot_conversationslake.backfill_audio_model_usagelake.backfill_agent_audit_logs
Bucket Health
lake.ensure_bucket
For the complete list, see _ALLOWED_TASKS in src/backend/app/api/internal/task_trigger.py.
How to Add a New Export Task
Follow this pattern to add a new data source to the lake:
1. Create the Celery task
Add a new task in src/backend/app/tasks/ (or in an existing lake task module). The task should:
- Query the data source for the target date window
- Build a PyArrow Table from the results
- Call
lake_storage.write_parquet(table, partition_name, run_date)
from app.services.lake.lake_storage import write_parquet
import pyarrow as pa
@celery_app.task(name="lake.export_my_new_data")
def export_my_new_data(**kwargs):
# 1. Query your data source
rows = fetch_data_from_source()
# 2. Build PyArrow table
table = pa.table({
"id": [r["id"] for r in rows],
"value": [r["value"] for r in rows],
"created_at": [r["created_at"] for r in rows],
})
# 3. Write to lake
result = write_parquet(table, "my_new_partition")
return result
2. Register the partition
Add your partition name to _ALLOWED_PARTITIONS in src/backend/app/api/internal/lake_query.py:
_ALLOWED_PARTITIONS = {
...
"my_new_partition",
}
3. Whitelist the task for manual triggering
Add the task name to _ALLOWED_TASKS in src/backend/app/api/internal/task_trigger.py:
_ALLOWED_TASKS = {
...
"lake.export_my_new_data",
}
4. Schedule the task (optional)
Add the task to the Celery Beat schedule for recurring execution.
5. Test
Trigger the task manually to verify end-to-end:
curl -X POST https://ainative-browser-builder.up.railway.app/api/v1/internal/tasks/trigger \
-H "Content-Type: application/json" \
-H "X-Internal-API-Key: $AINATIVE_INTERNAL_API_KEY" \
-d '{"task": "lake.export_my_new_data"}'
Then query the data:
curl -X POST https://ainative-browser-builder.up.railway.app/api/v1/internal/lake/query \
-H "Content-Type: application/json" \
-d '{"sql": "SELECT * FROM my_new_partition", "limit": 10}'
Security
SQL Validation
All queries are validated before execution. The following patterns are blocked (case-insensitive):
| Blocked Pattern | Reason |
|---|---|
DROP | Prevents schema destruction |
DELETE | Prevents data deletion |
INSERT | Prevents data injection |
UPDATE | Prevents data modification |
CREATE | Prevents schema creation |
ALTER | Prevents schema modification |
TRUNCATE | Prevents table truncation |
EXEC | Prevents command execution |
SYSTEM | Prevents system calls |
read_parquet( | Prevents direct parquet path injection |
.. | Prevents path traversal |
file:// | Prevents local file access |
http:// / https:// | Prevents remote file access |
Only SELECT queries are allowed. Any query missing a SELECT keyword is rejected.
Row Limits
- Default: 100 rows per query
- Maximum: 1000 rows per query
- If no LIMIT clause is present in the SQL, one is appended automatically
Rate Limiting
- Maximum 10 lake query calls per agent per cron run (enforced via request context)
- Task trigger endpoint requires
X-Internal-API-Keyheader
Authentication
- Lake query endpoint: accessible to internal agents
- Task trigger endpoint: requires
X-Internal-API-Keyheader matching theAINATIVE_INTERNAL_API_KEYenvironment variable - Tool definition endpoint: requires
X-Internal-API-Keyheader
Environment Variables
| Variable | Description | Example |
|---|---|---|
| MINIO_URL | MinIO endpoint (primary) | https://minio.example.com |
| MINIO_ENDPOINT | MinIO endpoint (fallback) | minio:9000 |
| MINIO_ACCESS_KEY | MinIO access key | -- |
| MINIO_SECRET_KEY | MinIO secret key | -- |
| MINIO_SECURE | Use SSL (true/false) | true |
| MINIO_REGION | MinIO region | us-east-1 |
| AINATIVE_INTERNAL_API_KEY | Internal API key for admin endpoints | -- |
Key Source Files
| File | Purpose |
|---|---|
src/backend/app/api/internal/lake_query.py | Lake query endpoint and SQL validation |
src/backend/app/api/internal/task_trigger.py | Manual task trigger endpoint |
src/backend/app/services/lake/lake_storage.py | MinIO storage layer (write_parquet, ensure_bucket) |
src/backend/app/services/lake/lake_tool.py | LAKE_QUERY_TOOL schema for agent tool registration |