Every Noēsis episode produces structured metrics you can export to dashboards, alerting systems, and analysis tools.
Available metrics
The summary.json file contains these metrics:
| Metric | Description |
|---|
success | 1 for success, 0 for failure |
plan_count | Number of planning iterations |
act_count | Number of tool/adapter invocations |
reflect_count | Number of reflection passes |
veto_count | Number of policy vetoes |
latencies.first_action_ms | Time to first action |
plan_adherence | How closely execution matched the plan |
tool_coverage | Percentage of planned tools actually used |
Reading metrics
CLI
noesis insight ep_abc123 -j | jq '.metrics'
Python
import noesis as ns
episode_id = ns.last()
summary = ns.summary.read(episode_id)
metrics = summary.get("metrics", {})
print(f"Success: {metrics.get('success')}")
print(f"Actions: {metrics.get('act_count')}")
print(f"Vetoes: {metrics.get('veto_count')}")
Export to JSON files
Export all recent episodes to a JSON file:
import json
import noesis as ns
def export_metrics(output_path: str, limit: int = 100):
"""Export metrics from recent episodes to JSON."""
episodes = ns.list_runs(limit=limit)
metrics_data = []
for ep in episodes:
summary = ns.summary.read(ep["episode_id"])
metrics_data.append({
"episode_id": ep["episode_id"],
"timestamp": ep["timestamp"],
"task": summary.get("task"),
"metrics": summary.get("metrics", {}),
})
with open(output_path, "w") as f:
json.dump(metrics_data, f, indent=2)
return len(metrics_data)
# Export last 100 episodes
count = export_metrics("./metrics_export.json")
print(f"Exported {count} episodes")
Export to CSV
For spreadsheet analysis:
import csv
import noesis as ns
def export_to_csv(output_path: str, limit: int = 100):
"""Export metrics to CSV for spreadsheet analysis."""
episodes = ns.list_runs(limit=limit)
fieldnames = [
"episode_id",
"timestamp",
"task",
"success",
"plan_count",
"act_count",
"veto_count",
"first_action_ms",
]
with open(output_path, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for ep in episodes:
summary = ns.summary.read(ep["episode_id"])
metrics = summary.get("metrics", {})
writer.writerow({
"episode_id": ep["episode_id"],
"timestamp": ep["timestamp"],
"task": summary.get("task", "")[:100],
"success": metrics.get("success", 0),
"plan_count": metrics.get("plan_count", 0),
"act_count": metrics.get("act_count", 0),
"veto_count": metrics.get("veto_count", 0),
"first_action_ms": metrics.get("latencies", {}).get("first_action_ms", 0),
})
export_to_csv("./metrics.csv")
Export to Prometheus
Push metrics to Prometheus using the pushgateway:
from prometheus_client import CollectorRegistry, Gauge, push_to_gateway
import noesis as ns
def push_to_prometheus(episode_id: str, gateway: str = "localhost:9091"):
"""Push episode metrics to Prometheus pushgateway."""
summary = ns.summary.read(episode_id)
metrics = summary.get("metrics", {})
registry = CollectorRegistry()
# Define gauges
success = Gauge(
"noesis_episode_success",
"Episode success (1=success, 0=failure)",
registry=registry,
)
actions = Gauge(
"noesis_episode_actions",
"Number of actions in episode",
registry=registry,
)
vetoes = Gauge(
"noesis_episode_vetoes",
"Number of policy vetoes",
registry=registry,
)
latency = Gauge(
"noesis_first_action_latency_ms",
"Latency to first action in milliseconds",
registry=registry,
)
# Set values
success.set(metrics.get("success", 0))
actions.set(metrics.get("act_count", 0))
vetoes.set(metrics.get("veto_count", 0))
latency.set(metrics.get("latencies", {}).get("first_action_ms", 0))
# Push to gateway
push_to_gateway(gateway, job="noesis", registry=registry)
# Usage
episode_id = ns.run("my task", intuition=True)
push_to_prometheus(episode_id)
Export to Datadog
Send metrics to Datadog:
from datadog import initialize, statsd
import noesis as ns
def send_to_datadog(episode_id: str):
"""Send episode metrics to Datadog."""
initialize(statsd_host="localhost", statsd_port=8125)
summary = ns.summary.read(episode_id)
metrics = summary.get("metrics", {})
tags = [f"episode:{episode_id}"]
# Send metrics
statsd.gauge("noesis.success", metrics.get("success", 0), tags=tags)
statsd.gauge("noesis.actions", metrics.get("act_count", 0), tags=tags)
statsd.gauge("noesis.vetoes", metrics.get("veto_count", 0), tags=tags)
statsd.histogram(
"noesis.first_action_latency",
metrics.get("latencies", {}).get("first_action_ms", 0),
tags=tags,
)
# Usage
episode_id = ns.run("my task", intuition=True)
send_to_datadog(episode_id)
Export to BigQuery
For large-scale analysis:
from google.cloud import bigquery
import noesis as ns
def export_to_bigquery(project_id: str, dataset: str, table: str, limit: int = 1000):
"""Export metrics to BigQuery for analysis."""
client = bigquery.Client(project=project_id)
table_id = f"{project_id}.{dataset}.{table}"
episodes = ns.list_runs(limit=limit)
rows = []
for ep in episodes:
summary = ns.summary.read(ep["episode_id"])
metrics = summary.get("metrics", {})
rows.append({
"episode_id": ep["episode_id"],
"timestamp": ep["timestamp"],
"task": summary.get("task", ""),
"success": metrics.get("success", 0),
"plan_count": metrics.get("plan_count", 0),
"act_count": metrics.get("act_count", 0),
"veto_count": metrics.get("veto_count", 0),
"first_action_ms": metrics.get("latencies", {}).get("first_action_ms", 0),
})
errors = client.insert_rows_json(table_id, rows)
if errors:
raise RuntimeError(f"BigQuery insert failed: {errors}")
return len(rows)
Automated export hook
Run exports automatically after each episode:
import noesis as ns
class MetricsExporter:
"""Hook to export metrics after each episode."""
def __init__(self, exporters: list):
self.exporters = exporters
def __call__(self, episode_id: str):
for exporter in self.exporters:
try:
exporter(episode_id)
except Exception as e:
print(f"Export failed: {e}")
# Configure exporters
exporter = MetricsExporter([
lambda ep: push_to_prometheus(ep),
lambda ep: send_to_datadog(ep),
])
# Run with export
episode_id = ns.run("my task", intuition=True)
exporter(episode_id)
Grafana dashboard example
Create a Grafana dashboard with these queries:
-- Success rate over time
SELECT
date_trunc('hour', timestamp) as time,
avg(success) as success_rate
FROM noesis_episodes
GROUP BY 1
ORDER BY 1
-- Veto count by policy
SELECT
policy_id,
count(*) as veto_count
FROM noesis_events
WHERE phase = 'direction' AND status = 'blocked'
GROUP BY 1
ORDER BY 2 DESC
-- Latency percentiles
SELECT
percentile_cont(0.50) WITHIN GROUP (ORDER BY first_action_ms) as p50,
percentile_cont(0.95) WITHIN GROUP (ORDER BY first_action_ms) as p95,
percentile_cont(0.99) WITHIN GROUP (ORDER BY first_action_ms) as p99
FROM noesis_episodes
WHERE timestamp > now() - interval '24 hours'
Best practices
Export asynchronously. Don’t block episode execution on metric exports—use background workers or queues.
Redact sensitive data. Tasks may contain PII or secrets. Truncate or hash before exporting.
Set retention policies. Define how long to keep detailed metrics vs. aggregates.
Next steps