Skip to main content

Overview

Emitters define where metric events are sent after each LLM API call. The SDK provides several built-in emitters and supports custom implementations.
from typing import Callable
from aden import MetricEvent

MetricEmitter = Callable[[MetricEvent], None]

Console Emitter

Logs metrics to the console. Perfect for development and debugging.
from aden import create_console_emitter

emitter = create_console_emitter(
    level="info",  # "debug", "info", "warning", "error"
    pretty=True,   # Formatted JSON output
)
Output:
{
  "provider": "openai",
  "model": "gpt-4o",
  "input_tokens": 15,
  "output_tokens": 42,
  "latency_ms": 523,
  "timestamp": "2024-01-15T10:30:00.000Z"
}

Batch Emitter

Collects events and flushes them in batches for efficiency.
from aden import create_batch_emitter

def send_batch(events: list[MetricEvent]):
    import requests
    requests.post("https://your-backend.com/metrics", json=events)

emitter = create_batch_emitter(
    handler=send_batch,
    batch_size=50,           # Flush after 50 events
    flush_interval_ms=5000,  # Or every 5 seconds
)

Multi Emitter

Send metrics to multiple destinations simultaneously.
from aden import create_multi_emitter, create_console_emitter, create_file_emitter

emitter = create_multi_emitter([
    create_console_emitter(pretty=True),
    create_file_emitter(log_dir="./metrics"),
    custom_backend_emitter,
])

Filtered Emitter

Conditionally emit events based on criteria.
from aden import create_filtered_emitter

emitter = create_filtered_emitter(
    emitter=backend_emitter,
    filter_fn=lambda event: event.model in ["gpt-4o", "claude-3-5-sonnet"],
)

Transform Emitter

Modify events before emission.
from aden import create_transform_emitter

def add_metadata(event: MetricEvent) -> MetricEvent:
    return MetricEvent(
        **event.__dict__,
        metadata={
            **(event.metadata or {}),
            "environment": os.environ.get("ENV", "dev"),
            "service": "my-app",
        }
    )

emitter = create_transform_emitter(
    emitter=backend_emitter,
    transform_fn=add_metadata,
)

File Emitter

Write metrics to JSON Lines files.
from aden import create_file_emitter

emitter = create_file_emitter(
    log_dir="./meter_logs",  # Directory for log files
)

# Creates files like: ./meter_logs/metrics-2024-01-15.jsonl

Memory Emitter

Store events in memory. Useful for testing.
from aden import create_memory_emitter

emitter = create_memory_emitter()

# After some API calls...
print(emitter.events)
# [MetricEvent(...), MetricEvent(...)]

# Clear events
emitter.clear()

Noop Emitter

Discards all events. Useful for performance testing.
from aden import create_noop_emitter

emitter = create_noop_emitter()

Custom Emitter

Implement your own emitter:
from aden import MetricEvent

def custom_emitter(event: MetricEvent) -> None:
    # Send to your analytics service
    analytics.track("llm_call", {
        "provider": event.provider,
        "model": event.model,
        "tokens": event.usage.total_tokens if event.usage else 0,
    })

    # Store in database
    db.insert("metrics", event.__dict__)

    # Alert on errors
    if event.error:
        alerting.notify("LLM Error", event.error)

Async Custom Emitter

async def async_emitter(event: MetricEvent) -> None:
    await asyncio.gather(
        send_to_analytics(event),
        store_in_db(event),
    )

# Use with async_emit=True
instrument(MeterOptions(
    emit_metric=async_emitter,
    async_emit=True,
))

Control Agent Emitter

Send metrics to the Aden control server:
from aden import create_control_agent, create_control_agent_emitter

agent = create_control_agent(ControlAgentOptions(
    api_key=os.environ["ADEN_API_KEY"],
    server_url=os.environ.get("ADEN_API_URL"),
))

emitter = create_control_agent_emitter(agent)

Combining Emitters

Create sophisticated pipelines:
from aden import (
    create_multi_emitter,
    create_filtered_emitter,
    create_batch_emitter,
    create_transform_emitter,
    create_console_emitter,
)
import os

emitters = []

# Always log to console in development
if os.environ.get("ENV") == "development":
    emitters.append(create_console_emitter(pretty=True))

# Send all events to primary backend
emitters.append(
    create_batch_emitter(
        handler=lambda batch: send_to_primary(batch),
        batch_size=100,
    )
)

# Send only errors to monitoring
emitters.append(
    create_filtered_emitter(
        emitter=lambda e: send_to_monitoring(e),
        filter_fn=lambda e: e.error is not None,
    )
)

# Send anonymized data to analytics
emitters.append(
    create_transform_emitter(
        emitter=lambda e: send_to_analytics(e),
        transform_fn=lambda e: MetricEvent(
            provider=e.provider,
            model=e.model,
            usage=e.usage,
            # Remove identifiable info
            trace_id="",
            span_id="",
            metadata=None,
        ),
    )
)

final_emitter = create_multi_emitter(emitters)

MetricEvent Schema

Every emitter receives events with this structure:
@dataclass
class MetricEvent:
    # Identity
    trace_id: str
    span_id: str

    # Request details
    provider: str  # "openai", "anthropic", "gemini"
    model: str
    stream: bool
    timestamp: str  # ISO 8601

    # Performance
    latency_ms: float
    request_id: str | None = None
    error: str | None = None

    # Usage
    usage: NormalizedUsage | None = None

    # Tool tracking
    tool_calls: list[ToolCallMetric] | None = None

    # Rate limits
    rate_limit: RateLimitInfo | None = None

    # Custom
    metadata: dict[str, Any] | None = None


@dataclass
class NormalizedUsage:
    input_tokens: int
    output_tokens: int
    total_tokens: int
    reasoning_tokens: int = 0
    cached_tokens: int = 0
    accepted_prediction_tokens: int = 0
    rejected_prediction_tokens: int = 0

Next Steps