Skip to content

Langfuse Integration Guide

Shekel includes built-in integration with Langfuse, an open-source LLM observability platform. When a budget is exceeded or a fallback is triggered, Langfuse captures the event automatically. Between those moments, per-call spend streams to Langfuse metadata so you can see exactly where your budget went and when it broke.

Quick Start

Installation

# Install shekel with Langfuse support
pip install shekel[langfuse]

Basic Setup

from langfuse import Langfuse
from shekel import budget
from shekel.integrations import AdapterRegistry
from shekel.integrations.langfuse import LangfuseAdapter

# Initialize Langfuse client
lf = Langfuse(
    public_key="pk-lf-...",
    secret_key="sk-lf-...",
    host="https://cloud.langfuse.com"  # or self-hosted URL
)

# Register the adapter (do this once at app startup)
adapter = LangfuseAdapter(
    client=lf,
    trace_name="my-app",  # Name for traces in Langfuse UI
    tags=["production", "api-v2"]  # Optional tags
)
AdapterRegistry.register(adapter)

# Use budgets as normal - costs automatically flow to Langfuse!
with budget(max_usd=5.00, name="user-query") as b:
    response = openai_client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": "Hello!"}]
    )
    print(f"Spent: ${b.spent:.4f}")

Features

Feature #1: Per-Call Spend Streaming

Every LLM call automatically updates Langfuse metadata with current spend — so you can see budget burn in real time, not just at the end.

What's tracked: - shekel_spent: Total USD spent so far - shekel_limit: Budget limit (or null for track-only mode) - shekel_utilization: Percentage of budget used (0.0 to 1.0) - shekel_budget_name: Full hierarchical budget name - shekel_last_model: Model used for last call

Example:

with budget(max_usd=10.00, name="research") as b:
    # Each call updates Langfuse metadata
    for query in queries:
        response = openai_client.chat.completions.create(
            model="gpt-4o",
            messages=[{"role": "user", "content": query}]
        )

In Langfuse UI: - Navigate to your trace - View metadata panel - See real-time updates: shekel_spent, shekel_utilization, etc.

Feature #2: Nested Budget Mapping

Shekel's nested budgets automatically create a span hierarchy in Langfuse, making it easy to visualize budget allocation across workflow stages.

Example:

with budget(max_usd=10.00, name="workflow") as parent:
    # Parent budget → Langfuse trace

    with budget(max_usd=3.00, name="research") as child1:
        # Child budget → Langfuse span "workflow.research"
        do_research()

    with budget(max_usd=5.00, name="generation") as child2:
        # Sibling budget → Langfuse span "workflow.generation"
        generate_response()

In Langfuse UI: - Trace: "workflow" (parent budget) - Span: "workflow.research" (child 1) - Span: "workflow.generation" (child 2) - Each span has its own budget metadata - Waterfall view shows timing and cost allocation

Benefits: - Identify expensive workflow stages - Track budget utilization per stage - Debug nested budget violations

Feature #3: Circuit Break Events

When a budget limit is exceeded, Shekel creates a WARNING event in Langfuse with full debugging context.

Event metadata: - budget_name: Name of exceeded budget - spent: Total amount spent - limit: The budget limit - overage: Amount over budget (e.g., $0.50) - model: Model being used when limit hit - tokens: Token counts {input: N, output: M} - parent_remaining: Parent budget remaining (if nested)

Example:

try:
    with budget(max_usd=1.00, name="api-call") as b:
        # This will exceed the budget
        response = openai_client.chat.completions.create(
            model="gpt-4o",
            messages=[{"role": "user", "content": "Long query..."}]
        )
except BudgetExceededError as e:
    print(f"Budget exceeded: ${e.spent:.2f} > ${e.limit:.2f}")

In Langfuse UI: - Filter events by name:budget_exceeded and level:WARNING - See which budgets are being exceeded - Analyze token counts and model costs - Track overage patterns

Use cases: - Debug "bill shock" incidents - Identify inefficient prompts - Monitor production budget violations - Set up alerts for cost overruns

Feature #4: Fallback Annotations

When Shekel switches to a fallback model, it creates an INFO event and updates metadata to show the transition.

Event metadata: - from_model: Original model (e.g., "gpt-4o") - to_model: Fallback model (e.g., "gpt-4o-mini") - switched_at: Cost when switch occurred - cost_primary: Total cost on primary model - cost_fallback: Total cost on fallback model - savings: Estimated savings from fallback

Additional trace/span metadata: - shekel_fallback_active: true (persists across updates) - shekel_fallback_model: Current fallback model

Example:

with budget(max_usd=5.00, fallback={"at_pct": 0.8, "model": "gpt-4o-mini"}, name="agent") as b:
    # Runs with gpt-4o until 80% of $5.00 ($4.00), then switches to gpt-4o-mini
    run_agent_loop()
    # Raises BudgetExceededError at $5.00

    if b.model_switched:
        print(f"Switched to {b.fallback['model']} at ${b.switched_at_usd:.2f}")

In Langfuse UI: - Filter events by name:fallback_activated and level:INFO - See which workflows trigger fallbacks - Analyze cost savings from fallback strategy - Compare primary vs. fallback performance

Use cases: - Validate fallback strategy is working - Measure cost savings - Track when fallbacks occur in production - Optimize fallback thresholds

Configuration

Trace Names

Customize the trace name to group related operations:

# Per-user traces
adapter = LangfuseAdapter(
    client=lf,
    trace_name=f"user-{user_id}"
)

# Per-endpoint traces
adapter = LangfuseAdapter(
    client=lf,
    trace_name=f"api-{endpoint_name}"
)

Tags

Apply tags for filtering and organization:

adapter = LangfuseAdapter(
    client=lf,
    tags=[
        "production",
        "api-v2",
        f"region-{region}",
        f"tenant-{tenant_id}"
    ]
)

Multiple Adapters

Register multiple adapters to send data to different destinations:

from shekel.integrations import AdapterRegistry
from shekel.integrations.langfuse import LangfuseAdapter

# Langfuse for observability
langfuse_adapter = LangfuseAdapter(client=lf_client)
AdapterRegistry.register(langfuse_adapter)

# Custom adapter for internal metrics
custom_adapter = MyMetricsAdapter()
AdapterRegistry.register(custom_adapter)

# Both receive events automatically

Best Practices

1. Register Once at Startup

# ✅ Good: Register at app startup
def init_observability():
    lf = Langfuse(...)
    adapter = LangfuseAdapter(client=lf)
    AdapterRegistry.register(adapter)

init_observability()

# ❌ Bad: Don't register in hot paths
def handle_request():
    AdapterRegistry.register(LangfuseAdapter(...))  # NO!

2. Use Meaningful Budget Names

# ✅ Good: Descriptive names
with budget(max_usd=5.00, name="user-query-generation"):
    ...

with budget(max_usd=2.00, name="rag-retrieval"):
    ...

# ❌ Bad: Generic names
with budget(max_usd=5.00, name="budget1"):
    ...

3. Leverage Nested Budgets

# ✅ Good: Clear hierarchy
with budget(max_usd=20.00, name="agent-workflow") as parent:
    with budget(max_usd=5.00, name="planning") as plan:
        plan_actions()

    with budget(max_usd=10.00, name="execution") as exec:
        execute_plan()

    with budget(max_usd=5.00, name="reflection") as reflect:
        reflect_on_results()

4. Set Appropriate Fallbacks

# ✅ Good: Same provider fallback with at_pct threshold
with budget(max_usd=5.00, fallback={"at_pct": 0.8, "model": "gpt-4o-mini"}):
    run_workflow()
# Switches to gpt-4o-mini at $4.00 (80%), raises BudgetExceededError at $5.00

5. Handle Exceptions Gracefully

# ✅ Good: Catch and log
try:
    with budget(max_usd=1.00, name="api-call") as b:
        response = call_llm()
except BudgetExceededError as e:
    logger.error(f"Budget exceeded: {e}")
    return fallback_response()

Performance

The Langfuse adapter is designed for zero-impact on your application:

  • Non-blocking: Events are emitted asynchronously (no API calls in hot path)
  • Lightweight: Minimal memory overhead (~100 bytes per event)
  • Resilient: Langfuse failures never break Shekel functionality
  • Fast: <1ms overhead per LLM call

Benchmark (on modern hardware): - Adapter event emission: <0.5ms per call - Langfuse API calls: Background thread (non-blocking) - Memory: ~50 bytes per trace, ~30 bytes per span

Troubleshooting

Events Not Appearing in Langfuse

Check 1: Verify Langfuse client is initialized correctly

from langfuse import Langfuse

lf = Langfuse(
    public_key="pk-lf-...",
    secret_key="sk-lf-...",
    host="https://cloud.langfuse.com"
)

# Test connection
lf.trace(name="test").update(metadata={"test": "ok"})
lf.flush()  # Force sync

Check 2: Ensure adapter is registered before budget use

# ✅ Register BEFORE using budgets
AdapterRegistry.register(adapter)

with budget(...) as b:  # Events will flow
    ...

Check 3: Call lf.flush() at app shutdown

# Ensure all events are sent before exit
lf.flush()

Missing Nested Spans

Nested budgets must have unique names at each level:

# ✅ Good: Unique names
with budget(name="parent"):
    with budget(name="child1"):
        ...
    with budget(name="child2"):
        ...

# ❌ Bad: Duplicate names may cause issues
with budget(name="parent"):
    with budget(name="child"):
        ...
    with budget(name="child"):  # Same name!
        ...

Adapter Errors

If you see warnings like "Adapter X failed on Y", check:

  1. Langfuse package installed: pip install langfuse
  2. Credentials valid: Test lf.trace(name="test")
  3. Network access: Ensure app can reach Langfuse host

Adapter errors are logged but won't break Shekel:

import logging
logging.basicConfig(level=logging.WARNING)

# You'll see warnings if adapter fails, but Shekel continues
with budget(max_usd=5.00) as b:
    response = call_llm()
    print(f"Spent: ${b.spent}")  # Works regardless of adapter status

Examples

Complete Web API Example

from fastapi import FastAPI, HTTPException
from langfuse import Langfuse
from openai import OpenAI
from shekel import budget, BudgetExceededError
from shekel.integrations import AdapterRegistry
from shekel.integrations.langfuse import LangfuseAdapter

app = FastAPI()
openai_client = OpenAI()

# Initialize at startup
@app.on_event("startup")
def setup_observability():
    lf = Langfuse(
        public_key=os.getenv("LANGFUSE_PUBLIC_KEY"),
        secret_key=os.getenv("LANGFUSE_SECRET_KEY"),
        host=os.getenv("LANGFUSE_HOST", "https://cloud.langfuse.com")
    )
    adapter = LangfuseAdapter(
        client=lf,
        trace_name="chat-api",
        tags=["production", os.getenv("ENV", "dev")]
    )
    AdapterRegistry.register(adapter)

@app.post("/chat")
async def chat(query: str, user_id: str):
    try:
        with budget(max_usd=0.50, fallback={"at_pct": 0.8, "model": "gpt-4o-mini"}, name=f"user-{user_id}") as b:
            response = openai_client.chat.completions.create(
                model="gpt-4o",
                messages=[{"role": "user", "content": query}]
            )
            return {
                "response": response.choices[0].message.content,
                "cost": b.spent,
                "model_switched": b.model_switched
            }
    except BudgetExceededError as e:
        raise HTTPException(status_code=429, detail=f"Budget exceeded: ${e.spent:.2f}")

@app.on_event("shutdown")
def cleanup():
    # Flush pending events
    lf.flush()

Multi-Stage Agent Example

from langfuse import Langfuse
from openai import OpenAI
from shekel import budget
from shekel.integrations import AdapterRegistry
from shekel.integrations.langfuse import LangfuseAdapter

# Setup
lf = Langfuse(...)
adapter = LangfuseAdapter(client=lf, trace_name="agent-workflow")
AdapterRegistry.register(adapter)
openai_client = OpenAI()

def run_agent(user_query: str):
    """Multi-stage agent with nested budgets."""
    with budget(max_usd=10.00, fallback={"at_pct": 0.8, "model": "gpt-4o-mini"}, name="agent") as agent_budget:

        # Stage 1: Planning
        with budget(max_usd=2.00, name="planning") as plan_budget:
            plan = openai_client.chat.completions.create(
                model="gpt-4o",
                messages=[
                    {"role": "system", "content": "You are a planning assistant."},
                    {"role": "user", "content": f"Create a plan for: {user_query}"}
                ]
            )
            print(f"Planning cost: ${plan_budget.spent:.4f}")

        # Stage 2: Research (can have multiple sub-budgets)
        with budget(max_usd=5.00, name="research") as research_budget:
            for topic in extract_topics(plan):
                with budget(max_usd=1.00, name=f"topic-{topic}") as topic_budget:
                    info = research_topic(topic)

        # Stage 3: Generation
        with budget(max_usd=3.00, name="generation") as gen_budget:
            response = openai_client.chat.completions.create(
                model="gpt-4o-mini" if agent_budget.model_switched else "gpt-4o",
                messages=[
                    {"role": "system", "content": "Generate final response."},
                    {"role": "user", "content": f"Plan: {plan}\nGenerate response."}
                ]
            )

        print(f"Total agent cost: ${agent_budget.spent:.4f}")
        return response.choices[0].message.content

# Run and view in Langfuse UI:
# - Trace: "agent-workflow"
#   - Span: "agent.planning"
#   - Span: "agent.research"
#     - Span: "agent.research.topic-1"
#     - Span: "agent.research.topic-2"
#   - Span: "agent.generation"

Migration from v0.2.3

If you're upgrading from Shekel v0.2.3, no breaking changes! Just add the Langfuse integration:

# Your existing code (works the same)
with budget(max_usd=5.00) as b:
    response = call_llm()
    print(f"Spent: ${b.spent}")

# Add Langfuse (one-time setup)
from langfuse import Langfuse
from shekel.integrations import AdapterRegistry
from shekel.integrations.langfuse import LangfuseAdapter

lf = Langfuse(...)
adapter = LangfuseAdapter(client=lf)
AdapterRegistry.register(adapter)

# Same code, now with observability!
with budget(max_usd=5.00) as b:
    response = call_llm()
    print(f"Spent: ${b.spent}")

Resources

  • Langfuse Docs: https://langfuse.com/docs
  • Shekel Repository: https://github.com/arieradle/shekel
  • Shekel Docs: https://arieradle.github.io/shekel/
  • Report Issues: https://github.com/arieradle/shekel/issues

License

This integration is part of Shekel and follows the same license (MIT).