Data Lake API
Internal API for querying the AINative data lakehouse. Agents use these endpoints to access their own performance data, platform metrics, and external data sources stored as Parquet files in MinIO (S3-compatible object storage).
Architecture
External Data / Platform DB
|
v
Celery Beat (scheduled tasks)
|
v
Celery Worker
|
v
PyArrow Table (in-memory)
|
v
Parquet (Snappy compression)
|
v
MinIO S3 (ainative-lake bucket)
|
v
DuckDB (query engine, read_parquet over S3)
|
v
JSON response to caller
Data Flow
- Collection/Export -- Celery tasks run on a Beat schedule (or are triggered manually). Each task queries a data source (Postgres tables, external APIs, etc.) and produces a PyArrow Table.
- Storage -- The PyArrow Table is serialized to Parquet with Snappy compression and written to MinIO at a deterministic path:
raw/{partition}/date={YYYY-MM-DD}/data.parquet. - Query -- The lake query endpoint accepts SQL, rewrites table names to
read_parquet('s3://ainative-lake/raw/{partition}/date=*/*.parquet')globs, and executes via DuckDB with the httpfs extension pointed at MinIO.
Bucket Structure
ainative-lake/
├── raw/ # Source-of-truth snapshots (append-only)
│ ├── agent_run_log/
│ ├── mcp_request_logs/
│ ├── platform/ # Platform intelligence layer
│ │ ├── llm_token_usage/
│ │ ├── agent_heartbeats/
│ │ └── ...
│ ├── external/ # External data sources
│ │ ├── slack/
│ │ ├── scc/
│ │ └── ...
│ └── business/ # Business app exports
│ ├── crm/
│ ├── commerce/
│ └── voice/
├── derived/ # Computed aggregates
│ ├── intelligence_scores/
│ ├── velocity_metrics/
│ └── predictions/
└── exports/ # One-off exports (TTL: 7 days)
Endpoints
POST /api/v1/internal/lake/query
Query lake partitions using SQL. Only SELECT queries are permitted. Table names in the SQL are automatically rewritten to read_parquet() calls against MinIO.
Request
{
"sql": "SELECT agent_id, COUNT(*) as runs FROM agent_run_log GROUP BY agent_id",
"limit": 100
}
| Field | Type | Required | Default | Constraints |
|---|---|---|---|---|
| sql | string | yes | -- | 10-4000 chars, SELECT only |
| limit | int | no | 100 | 1-1000 |
Response
{
"columns": ["agent_id", "runs"],
"rows": [
["aurora", 142],
["sage", 98]
],
"row_count": 2,
"source": "lake",
"partition": "agent_run_log"
}
| Field | Type | Description |
|---|---|---|
| columns | list[str] | Column names from the result set |
| rows | list[list] | Row data as nested arrays |
| row_count | int | Number of rows returned |
| source | string | "lake" on success, "unavailable" if query failed |
| partition | string/null | The matched partition name, or null |
SQL Table Name Mapping
In your SQL, use the partition name with slashes replaced by underscores. The endpoint rewrites these to S3 glob paths automatically.
| Write in SQL | Resolves to |
|---|---|
agent_run_log | read_parquet('s3://ainative-lake/raw/agent_run_log/date=*/*.parquet') |
platform_llm_token_usage | read_parquet('s3://ainative-lake/raw/platform/llm_token_usage/date=*/*.parquet') |
external_scc_edd_labor | read_parquet('s3://ainative-lake/raw/external/scc/edd_labor/date=*/*.parquet') |
Example Queries
-- Agent activity over 30 days
SELECT agent_id, status, COUNT(*) as cnt
FROM agent_run_log
GROUP BY agent_id, status
ORDER BY cnt DESC
-- LLM token costs by model
SELECT model, SUM(total_tokens) as tokens, SUM(cost_usd) as cost
FROM platform_llm_token_usage
GROUP BY model
ORDER BY cost DESC
-- SCC labor market data
SELECT * FROM external_scc_edd_labor LIMIT 50
If a LIMIT clause is not present in the SQL, the endpoint appends LIMIT {limit} from the request body (default 100, max 1000).
GET /api/v1/internal/lake/tool-definition
Returns the LAKE_QUERY_TOOL JSON schema for registering the lake query capability with ZeroMemory or any OpenAI-compatible tool-use system.
Headers
| Header | Required | Description |
|---|---|---|
| X-Internal-API-Key | yes | Must match AINATIVE_INTERNAL_API_KEY env var |
Response
Returns the tool definition JSON that agents use to discover available lake queries and their parameters.
POST /api/v1/internal/tasks/trigger
Manually trigger a whitelisted Celery lake task without waiting for the Beat schedule. Admin-only.
Headers
| Header | Required | Description |
|---|---|---|
| X-Internal-API-Key | yes | Must match AINATIVE_INTERNAL_API_KEY env var |
Request
{
"task": "lake.export_agent_run_log",
"kwargs": {}
}
| Field | Type | Required | Description |
|---|---|---|---|
| task | string | yes | Task name from the allowed list |
| kwargs | object | no | Optional keyword arguments for the task |
Response
{
"task": "lake.export_agent_run_log",
"task_id": "a1b2c3d4-...",
"status": "queued"
}
Allowed Partitions
These are the only partition names that can appear in lake SQL queries. Any other table name will not be rewritten and will cause a DuckDB error.
Core Platform
| Partition | Description |
|---|---|
| agent_run_log | Daily agent execution logs |
| mcp_request_logs | API request logs from Kong |
| rlhf_signals | Reinforcement learning feedback |
| billing_events | Credit transactions and billing |
| package_stats | npm + PyPI download stats |
| github_events | Multi-repo GitHub activity |
| stripe_events | Stripe payment events |
| cli_telemetry | Cody CLI usage telemetry |
Platform Intelligence Layer
| Partition | Description |
|---|---|
| platform/llm_token_usage | LLM token consumption and costs |
| platform/agent_heartbeats | Agent heartbeat executions |
| platform/agent_resource_usage | Agent cloud resource metering |
| platform/billing_aggregations | MCP billing aggregations |
| platform/agent_instances | Agent instance registry |
| platform/agent_tasks | Agent task queue |
| platform/agent_workload | Agent workload distribution |
| platform/agent_traces | Agent execution traces |
| platform/a2a_interactions | Agent-to-agent communications |
| platform/developer_earnings | Echo developer earnings |
| platform/developer_payouts | Echo developer payouts |
| platform/dedicated_deployments | Dedicated deployment snapshots |
| platform/kong_metrics | Kong gateway metrics |
| platform/multimodal_usage | Image/audio/video API usage |
| platform/mcp_usage | MCP tool usage logs |
| platform/soc2_evidence | SOC2 compliance evidence |
Platform Extended
| Partition | Description |
|---|---|
| platform/ai_request_log | AI request log |
| platform/chat_completion_usage | Chat completion usage |
| platform/agent_swarm_sessions | Agent swarm session data |
| platform/agent_swarm_messages | Agent swarm messages |
| platform/agent_swarm_feedback | Agent swarm feedback |
| platform/audio_usage | Audio API usage |
| platform/ai_feedback | AI feedback signals |
| platform/agent_audit_logs | Agent audit logs |
| platform/zerodb_rlhf | ZeroDB RLHF signals |
| platform/user_onboarding | User onboarding funnels |
| platform/product_events | Product analytics events |
| platform/coding_sessions | Sandbox coding sessions |
| platform/project_executions | Project code executions |
| platform/video_views | Video view analytics |
| platform/live_streams | Live stream sessions |
| platform/zeromemory_stats | ZeroMemory usage statistics |
| platform/huggingface_usage | HuggingFace model usage |
| platform/tool_execution_logs | Tool execution logs |
| platform/webhook_deliveries | Webhook delivery logs |
| platform/prompt_usage | Prompt template usage |
| platform/search_queries | Search query logs |
| platform/agent_performance_metrics | Agent performance metrics |
| platform/agent_quality_trends | Agent quality trend data |
| platform/notification_logs | Notification delivery logs |
| platform/social_analytics | Social media analytics |
| platform/sprint_plans | Sprint planning data |
| platform/swarm_quotas | Swarm quota snapshots |
Predictions
| Partition | Description |
|---|---|
| derived/predictions/request_volume_forecast | Request volume forecasts |
| derived/predictions/agent_anomalies | Agent anomaly detection |
| derived/predictions/token_cost_forecast | Token cost projections |
External Data
| Partition | Description |
|---|---|
| external/slack | Slack channel statistics |
| external/email | Email delivery events |
| external/scc/edd_labor | CA EDD labor market data |
| external/scc/demographics | Census demographics |
| external/scc/fred_indicators | FRED economic indicators |
| external/scc/federal_awards | Federal awards (USAspending) |
| external/scc/census_business | Census business patterns |
| external/scc/sec_form_d | SEC Form D filings |
| external/scc/sec_filings | SEC public filings |
| external/scc/sec_xbrl | SEC XBRL financial data |
| external/scc/irs_soi | IRS Statistics of Income |
| external/scc/cdtfa_taxable_sales | CDTFA taxable sales data |
| external/scc/cdtfa_permits | CDTFA seller permits |
| external/luma/events | Luma event data |
| external/dealroom/sessions | Dealroom session data |
Business App Exports
| Partition | Description |
|---|---|
| business/crm/deals | ZeroPipeline CRM deals |
| business/crm/activities | ZeroPipeline CRM activities |
| business/crm/pipeline | ZeroPipeline pipeline stages |
| business/commerce/orders | ZeroCommerce orders |
| business/commerce/products | ZeroCommerce products |
| business/voice/calls | ZeroVoice call records |
| business/voice/sms | ZeroVoice SMS records |
Chatwoot
| Partition | Description |
|---|---|
| chatwoot/conversations | Support conversation exports |
Sentinel Lakehouse (cross-bucket)
These partitions live in the sentinel-lakehouse MinIO bucket with path structure {partition}/YYYY/MM/DD/batch_*.parquet. The query engine handles this transparently — use the same SQL table name syntax. Schema evolution handled via union_by_name=True.
| Partition | Description | Records |
|---|---|---|
| cable_route | TeleGeography submarine cable routes + landing stations | ~1,800 files |
| ais | AIS vessel tracking (ship positions, heading, speed) | ~1,900 files |
| solar_wind | NOAA SWPC solar wind speed, density, Bz | ~600 files |
| satellite_tle | Satellite orbital elements (N2YO) | ~550 files |
| weather | Open-Meteo weather observations | ~340 files |
| ocean_monitoring | CENCOOS ocean temperature, waves | ~360 files |
| space_weather_alert | NOAA space weather alerts with severity | ~80 files |
| ransomware | Ransomwatch ransomware group victim posts | ~225 files |
| govt_advisory | CISA KEV + CERT advisories | ~230 files |
| internet_infrastructure | PeeringDB IXP data | ~230 files |
| as_topology | CAIDA AS rank + relationships | ~160 files |
| exploit_intel | EPSS scores + ExploitDB + Nuclei templates | ~86 files |
| satellite_position | Satellite positions (N2YO) | ~266 files |
| network_traceroute | RIPE Atlas traceroute measurements | ~220 files |
| censorship | OONI censorship measurements | ~125 files |
| seismic | USGS earthquake feed | ~29 files |
| bgp_routing | RIPE RIS BGP routing data | ~29 files |
40 total sensor partitions — 8,500+ Parquet files covering maritime, space, cyber, geophysical, and network domains. Full list in _SENTINEL_PARTITIONS in src/backend/app/api/internal/lake_query.py.
Sentinel Correlation Engine
The Sentinel Correlation Engine runs every 15 minutes and performs cross-domain anomaly detection over lakehouse data, then walks the knowledge graph to find impacted infrastructure.
Anomaly Detectors
| Detector | Metrics | Threshold | Description |
|---|---|---|---|
| Solar wind | speed_km_s, bz_nt | 2σ deviation, Bz < -10 nT | Geomagnetic storm conditions |
| Seismic | magnitude | M5.0+ | Significant earthquakes near infrastructure |
| BGP routing | total_prefixes | >30% drop | Prefix count anomalies per ASN |
| Ransomware | victim_posts | >10 posts/group | Ransomware group activity spikes |
| Space weather alerts | severity | Moderate+ | NOAA alert severity classification |
| Predictive (EWMA) | speed trend, activity trend | Forecast > threshold | Early warning before anomalies develop |
Impact Traversal
When an anomaly is detected, the engine walks the knowledge graph:
Solar storm → satellite constellation → submarine cables → IXPs → landing stations
Earthquake → nearby cables → nearby stations → co-located stations
CVE advisory → ransomware group → victims → affected products → vendors
Knowledge Graph Stats
- 6,368 entities across 17 types (vessels, satellites, cables, IXPs, CVEs, ransomware groups, etc.)
- 6,196 edges across 17 predicates (lands_at, near_cable, exploited_by_ransomware, etc.)
- 62.6% connectivity — multi-hop traversals up to 5 hops verified
- 319 critical infrastructure nodes identified by degree centrality
Allowed Trigger Tasks
The task trigger endpoint accepts only whitelisted task names. The full list is organized by category:
SCC Economic Data Collection
lake.collect_scc_edd_laborlake.collect_scc_census_demographicslake.collect_scc_census_businesslake.collect_scc_fred_indicatorslake.collect_scc_federal_awardslake.collect_scc_irs_soilake.collect_scc_cdtfa_taxable_saleslake.collect_scc_cdtfa_permits
SEC EDGAR
lake.collect_sec_form_d/lake.collect_sec_form_d_backfilllake.collect_sec_public_filings/lake.collect_sec_public_filings_backfilllake.collect_sec_xbrl_financials
Platform Internal Exports
lake.export_agent_run_loglake.export_mcp_request_logslake.export_billing_eventslake.export_rlhf_signalslake.export_cli_telemetrylake.export_stripe_events
Package / Repo Stats
lake.collect_package_statslake.collect_github_events
Communication Signals
lake.collect_slack_channel_statslake.collect_email_delivery_events
Intelligence Layer
lake.export_llm_token_usagelake.export_agent_heartbeat_executionslake.export_agent_resource_usagelake.export_mcp_billing_aggregationslake.export_agent_instanceslake.export_agent_taskslake.export_agent_workloadlake.export_agent_traceslake.export_a2a_interactionslake.export_developer_earningslake.export_developer_payoutslake.export_dedicated_deploymentslake.export_kong_metricslake.export_multimodal_usagelake.export_mcp_usage_logslake.export_soc2_evidence
Business App Exports
lake.export_zerocommerce_orders/lake.export_zerocommerce_productslake.export_zeropipeline_deals/lake.export_zeropipeline_activities/lake.export_zeropipeline_stageslake.export_zerovoice_calls/lake.export_zerovoice_smslake.export_opencap_stakeholders/lake.export_opencap_grants/lake.export_opencap_share_classeslake.export_zeroinvoice_invoices/lake.export_zeroinvoice_payments
Predictive Analytics
lake.predict_request_volume_forecastlake.predict_agent_anomalieslake.predict_token_cost_forecast
Backfills
lake.backfill_mcp_request_logslake.backfill_llm_token_usagelake.backfill_chatwoot_conversationslake.backfill_audio_model_usagelake.backfill_agent_audit_logs
Bucket Health
lake.ensure_bucket
Sentinel OS
lake.ingest_cable_routes_to_graph— Sentinel lakehouse → knowledge graph entity/edge syncsentinel.correlation_engine— Cross-domain anomaly detection + graph impact traversalgraph.compute_centrality— Degree centrality + critical node identificationgraph.memory_feedback_loop— Boost edge confidence from confirmed correlation insightsgraph.resolve_entities— Fuzzy match sensor entity IDs to graph canonical namesconnector.health_check— Monitor all 40 sentinel partitions for data freshnesssentinel.stream_consumer— Real-time RedPanda → instant graph upsertlakehouse.schema_registry_scan— Track Parquet schema evolution across partitions
For the complete list, see _ALLOWED_TASKS in src/backend/app/api/internal/task_trigger.py.
How to Add a New Export Task
Follow this pattern to add a new data source to the lake:
1. Create the Celery task
Add a new task in src/backend/app/tasks/ (or in an existing lake task module). The task should:
- Query the data source for the target date window
- Build a PyArrow Table from the results
- Call
lake_storage.write_parquet(table, partition_name, run_date)
from app.services.lake.lake_storage import write_parquet
import pyarrow as pa
@celery_app.task(name="lake.export_my_new_data")
def export_my_new_data(**kwargs):
# 1. Query your data source
rows = fetch_data_from_source()
# 2. Build PyArrow table
table = pa.table({
"id": [r["id"] for r in rows],
"value": [r["value"] for r in rows],
"created_at": [r["created_at"] for r in rows],
})
# 3. Write to lake
result = write_parquet(table, "my_new_partition")
return result
2. Register the partition
Add your partition name to _ALLOWED_PARTITIONS in src/backend/app/api/internal/lake_query.py:
_ALLOWED_PARTITIONS = {
...
"my_new_partition",
}
3. Whitelist the task for manual triggering
Add the task name to _ALLOWED_TASKS in src/backend/app/api/internal/task_trigger.py:
_ALLOWED_TASKS = {
...
"lake.export_my_new_data",
}
4. Schedule the task (optional)
Add the task to the Celery Beat schedule for recurring execution.
5. Test
Trigger the task manually to verify end-to-end:
curl -X POST https://ainative-browser-builder.up.railway.app/api/v1/internal/tasks/trigger \
-H "Content-Type: application/json" \
-H "X-Internal-API-Key: $AINATIVE_INTERNAL_API_KEY" \
-d '{"task": "lake.export_my_new_data"}'
Then query the data:
curl -X POST https://ainative-browser-builder.up.railway.app/api/v1/internal/lake/query \
-H "Content-Type: application/json" \
-d '{"sql": "SELECT * FROM my_new_partition", "limit": 10}'
Public Analytics Endpoint
GET /api/v1/public/platform/lake
Historical platform analytics from the data lakehouse. Returns aggregate metrics for agent runs, API usage, model costs, and package downloads. No authentication required -- all data is aggregate and non-sensitive.
Responses are cached for 5 minutes.
Query Parameters
| Parameter | Type | Default | Constraints | Description |
|---|---|---|---|---|
| days | int | 30 | 1-90 | Number of days to analyse |
Request
curl "https://api.ainative.studio/api/v1/public/platform/lake?days=30"
Response
{
"agent_runs_total": 4280,
"agent_success_rate_pct": 94.2,
"agent_trend": [
{"date": "2026-06-01", "success_rate": 93.5, "total_runs": 312}
],
"improving": true,
"api_requests_total": 128450,
"api_cost_credits_total": 2847.50,
"api_error_rate_pct": 1.2,
"api_avg_latency_ms": 245.8,
"top_models": [
{"model": "qwen3-coder-flash", "requests": 45200, "cost_credits": 890.40}
],
"api_daily_volume": [
{"date": "2026-06-01", "value": 4500.0}
],
"package_downloads_total": 18420,
"top_packages": [
{"name": "zerodb-mcp", "ecosystem": "pypi", "downloads": 6200}
],
"days": 30,
"generated_at": "2026-06-10T12:00:00+00:00",
"source": "lake"
}
| Field | Type | Description |
|---|---|---|
| agent_runs_total | int | Total agent executions in the period |
| agent_success_rate_pct | float | Percentage of successful agent runs |
| agent_trend | AgentRunTrend[] | Daily success rate and run counts |
| improving | bool | Whether the quality trend is improving |
| api_requests_total | int | Total API requests |
| api_cost_credits_total | float | Total credit cost |
| api_error_rate_pct | float | Error rate percentage |
| api_avg_latency_ms | float | Average response latency in ms |
| top_models | ModelUsage[] | Top 10 models by request count |
| api_daily_volume | DayVolume[] | Daily request volume |
| package_downloads_total | int | Total package downloads |
| top_packages | PackageEntry[] | Top 10 packages by downloads |
| days | int | Requested analysis window |
| generated_at | string | ISO 8601 generation timestamp |
| source | string | "lake", "lake_empty" (no Parquet files), or "unavailable" (error) |
This endpoint never returns HTTP errors -- it degrades gracefully to source: "unavailable" with zeroed metrics when the lakehouse is unreachable.
Refs #3356 #3369
Security
SQL Validation
All queries are validated before execution. The following patterns are blocked (case-insensitive):
| Blocked Pattern | Reason |
|---|---|
DROP | Prevents schema destruction |
DELETE | Prevents data deletion |
INSERT | Prevents data injection |
UPDATE | Prevents data modification |
CREATE | Prevents schema creation |
ALTER | Prevents schema modification |
TRUNCATE | Prevents table truncation |
EXEC | Prevents command execution |
SYSTEM | Prevents system calls |
read_parquet( | Prevents direct parquet path injection |
.. | Prevents path traversal |
file:// | Prevents local file access |
http:// / https:// | Prevents remote file access |
Only SELECT queries are allowed. Any query missing a SELECT keyword is rejected.
Row Limits
- Default: 100 rows per query
- Maximum: 1000 rows per query
- If no LIMIT clause is present in the SQL, one is appended automatically
Rate Limiting
- Maximum 10 lake query calls per agent per cron run (enforced via request context)
- Task trigger endpoint requires
X-Internal-API-Keyheader
Authentication
- Lake query endpoint: accessible to internal agents
- Task trigger endpoint: requires
X-Internal-API-Keyheader matching theAINATIVE_INTERNAL_API_KEYenvironment variable - Tool definition endpoint: requires
X-Internal-API-Keyheader
Environment Variables
| Variable | Description | Example |
|---|---|---|
| MINIO_URL | MinIO endpoint (primary) | https://minio.example.com |
| MINIO_ENDPOINT | MinIO endpoint (fallback) | minio:9000 |
| MINIO_ACCESS_KEY | MinIO access key | -- |
| MINIO_SECRET_KEY | MinIO secret key | -- |
| MINIO_SECURE | Use SSL (true/false) | true |
| MINIO_REGION | MinIO region | us-east-1 |
| AINATIVE_INTERNAL_API_KEY | Internal API key for admin endpoints | -- |
Key Source Files
| File | Purpose |
|---|---|
src/backend/app/api/internal/lake_query.py | Lake query endpoint and SQL validation |
src/backend/app/api/internal/task_trigger.py | Manual task trigger endpoint |
src/backend/app/services/lake/lake_storage.py | MinIO storage layer (write_parquet, ensure_bucket) |
src/backend/app/services/lake/lake_tool.py | LAKE_QUERY_TOOL schema for agent tool registration |