Skip to main content
Every Noēsis episode produces structured metrics you can export to dashboards, alerting systems, and analysis tools.

Available metrics

The summary.json file contains these metrics:
MetricDescription
success1 for success, 0 for failure
plan_countNumber of planning iterations
act_countNumber of tool/adapter invocations
reflect_countNumber of reflection passes
veto_countNumber of policy vetoes
latencies.first_action_msTime to first action
plan_adherenceHow closely execution matched the plan
tool_coveragePercentage of planned tools actually used

Reading metrics

CLI

noesis insight ep_abc123 -j | jq '.metrics'

Python

import noesis as ns

episode_id = ns.last()
summary = ns.summary.read(episode_id)
metrics = summary.get("metrics", {})

print(f"Success: {metrics.get('success')}")
print(f"Actions: {metrics.get('act_count')}")
print(f"Vetoes: {metrics.get('veto_count')}")

Export to JSON files

Export all recent episodes to a JSON file:
import json
import noesis as ns


def export_metrics(output_path: str, limit: int = 100):
    """Export metrics from recent episodes to JSON."""
    episodes = ns.list_runs(limit=limit)
    
    metrics_data = []
    for ep in episodes:
        summary = ns.summary.read(ep["episode_id"])
        metrics_data.append({
            "episode_id": ep["episode_id"],
            "timestamp": ep["timestamp"],
            "task": summary.get("task"),
            "metrics": summary.get("metrics", {}),
        })
    
    with open(output_path, "w") as f:
        json.dump(metrics_data, f, indent=2)
    
    return len(metrics_data)


# Export last 100 episodes
count = export_metrics("./metrics_export.json")
print(f"Exported {count} episodes")

Export to CSV

For spreadsheet analysis:
import csv
import noesis as ns


def export_to_csv(output_path: str, limit: int = 100):
    """Export metrics to CSV for spreadsheet analysis."""
    episodes = ns.list_runs(limit=limit)
    
    fieldnames = [
        "episode_id",
        "timestamp",
        "task",
        "success",
        "plan_count",
        "act_count",
        "veto_count",
        "first_action_ms",
    ]
    
    with open(output_path, "w", newline="") as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        
        for ep in episodes:
            summary = ns.summary.read(ep["episode_id"])
            metrics = summary.get("metrics", {})
            
            writer.writerow({
                "episode_id": ep["episode_id"],
                "timestamp": ep["timestamp"],
                "task": summary.get("task", "")[:100],
                "success": metrics.get("success", 0),
                "plan_count": metrics.get("plan_count", 0),
                "act_count": metrics.get("act_count", 0),
                "veto_count": metrics.get("veto_count", 0),
                "first_action_ms": metrics.get("latencies", {}).get("first_action_ms", 0),
            })


export_to_csv("./metrics.csv")

Export to Prometheus

Push metrics to Prometheus using the pushgateway:
from prometheus_client import CollectorRegistry, Gauge, push_to_gateway
import noesis as ns


def push_to_prometheus(episode_id: str, gateway: str = "localhost:9091"):
    """Push episode metrics to Prometheus pushgateway."""
    summary = ns.summary.read(episode_id)
    metrics = summary.get("metrics", {})
    
    registry = CollectorRegistry()
    
    # Define gauges
    success = Gauge(
        "noesis_episode_success",
        "Episode success (1=success, 0=failure)",
        registry=registry,
    )
    actions = Gauge(
        "noesis_episode_actions",
        "Number of actions in episode",
        registry=registry,
    )
    vetoes = Gauge(
        "noesis_episode_vetoes",
        "Number of policy vetoes",
        registry=registry,
    )
    latency = Gauge(
        "noesis_first_action_latency_ms",
        "Latency to first action in milliseconds",
        registry=registry,
    )
    
    # Set values
    success.set(metrics.get("success", 0))
    actions.set(metrics.get("act_count", 0))
    vetoes.set(metrics.get("veto_count", 0))
    latency.set(metrics.get("latencies", {}).get("first_action_ms", 0))
    
    # Push to gateway
    push_to_gateway(gateway, job="noesis", registry=registry)


# Usage
episode_id = ns.run("my task", intuition=True)
push_to_prometheus(episode_id)

Export to Datadog

Send metrics to Datadog:
from datadog import initialize, statsd
import noesis as ns


def send_to_datadog(episode_id: str):
    """Send episode metrics to Datadog."""
    initialize(statsd_host="localhost", statsd_port=8125)
    
    summary = ns.summary.read(episode_id)
    metrics = summary.get("metrics", {})
    tags = [f"episode:{episode_id}"]
    
    # Send metrics
    statsd.gauge("noesis.success", metrics.get("success", 0), tags=tags)
    statsd.gauge("noesis.actions", metrics.get("act_count", 0), tags=tags)
    statsd.gauge("noesis.vetoes", metrics.get("veto_count", 0), tags=tags)
    statsd.histogram(
        "noesis.first_action_latency",
        metrics.get("latencies", {}).get("first_action_ms", 0),
        tags=tags,
    )


# Usage
episode_id = ns.run("my task", intuition=True)
send_to_datadog(episode_id)

Export to BigQuery

For large-scale analysis:
from google.cloud import bigquery
import noesis as ns


def export_to_bigquery(project_id: str, dataset: str, table: str, limit: int = 1000):
    """Export metrics to BigQuery for analysis."""
    client = bigquery.Client(project=project_id)
    table_id = f"{project_id}.{dataset}.{table}"
    
    episodes = ns.list_runs(limit=limit)
    rows = []
    
    for ep in episodes:
        summary = ns.summary.read(ep["episode_id"])
        metrics = summary.get("metrics", {})
        
        rows.append({
            "episode_id": ep["episode_id"],
            "timestamp": ep["timestamp"],
            "task": summary.get("task", ""),
            "success": metrics.get("success", 0),
            "plan_count": metrics.get("plan_count", 0),
            "act_count": metrics.get("act_count", 0),
            "veto_count": metrics.get("veto_count", 0),
            "first_action_ms": metrics.get("latencies", {}).get("first_action_ms", 0),
        })
    
    errors = client.insert_rows_json(table_id, rows)
    if errors:
        raise RuntimeError(f"BigQuery insert failed: {errors}")
    
    return len(rows)

Automated export hook

Run exports automatically after each episode:
import noesis as ns


class MetricsExporter:
    """Hook to export metrics after each episode."""
    
    def __init__(self, exporters: list):
        self.exporters = exporters
    
    def __call__(self, episode_id: str):
        for exporter in self.exporters:
            try:
                exporter(episode_id)
            except Exception as e:
                print(f"Export failed: {e}")


# Configure exporters
exporter = MetricsExporter([
    lambda ep: push_to_prometheus(ep),
    lambda ep: send_to_datadog(ep),
])

# Run with export
episode_id = ns.run("my task", intuition=True)
exporter(episode_id)

Grafana dashboard example

Create a Grafana dashboard with these queries:
-- Success rate over time
SELECT
  date_trunc('hour', timestamp) as time,
  avg(success) as success_rate
FROM noesis_episodes
GROUP BY 1
ORDER BY 1

-- Veto count by policy
SELECT
  policy_id,
  count(*) as veto_count
FROM noesis_events
WHERE phase = 'direction' AND status = 'blocked'
GROUP BY 1
ORDER BY 2 DESC

-- Latency percentiles
SELECT
  percentile_cont(0.50) WITHIN GROUP (ORDER BY first_action_ms) as p50,
  percentile_cont(0.95) WITHIN GROUP (ORDER BY first_action_ms) as p95,
  percentile_cont(0.99) WITHIN GROUP (ORDER BY first_action_ms) as p99
FROM noesis_episodes
WHERE timestamp > now() - interval '24 hours'

Best practices

Export asynchronously. Don’t block episode execution on metric exports—use background workers or queues.
Redact sensitive data. Tasks may contain PII or secrets. Truncate or hash before exporting.
Set retention policies. Define how long to keep detailed metrics vs. aggregates.

Next steps