Langfuse Integration Guide¶
Shekel includes built-in integration with Langfuse, an open-source LLM observability platform. When a budget is exceeded or a fallback is triggered, Langfuse captures the event automatically. Between those moments, per-call spend streams to Langfuse metadata so you can see exactly where your budget went and when it broke.
Quick Start¶
Installation¶
Basic Setup¶
from langfuse import Langfuse
from shekel import budget
from shekel.integrations import AdapterRegistry
from shekel.integrations.langfuse import LangfuseAdapter
# Initialize Langfuse client
lf = Langfuse(
public_key="pk-lf-...",
secret_key="sk-lf-...",
host="https://cloud.langfuse.com" # or self-hosted URL
)
# Register the adapter (do this once at app startup)
adapter = LangfuseAdapter(
client=lf,
trace_name="my-app", # Name for traces in Langfuse UI
tags=["production", "api-v2"] # Optional tags
)
AdapterRegistry.register(adapter)
# Use budgets as normal - costs automatically flow to Langfuse!
with budget(max_usd=5.00, name="user-query") as b:
response = openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "Hello!"}]
)
print(f"Spent: ${b.spent:.4f}")
Features¶
Feature #1: Per-Call Spend Streaming¶
Every LLM call automatically updates Langfuse metadata with current spend — so you can see budget burn in real time, not just at the end.
What's tracked:
- shekel_spent: Total USD spent so far
- shekel_limit: Budget limit (or null for track-only mode)
- shekel_utilization: Percentage of budget used (0.0 to 1.0)
- shekel_budget_name: Full hierarchical budget name
- shekel_last_model: Model used for last call
Example:
with budget(max_usd=10.00, name="research") as b:
# Each call updates Langfuse metadata
for query in queries:
response = openai_client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": query}]
)
In Langfuse UI:
- Navigate to your trace
- View metadata panel
- See real-time updates: shekel_spent, shekel_utilization, etc.
Feature #2: Nested Budget Mapping¶
Shekel's nested budgets automatically create a span hierarchy in Langfuse, making it easy to visualize budget allocation across workflow stages.
Example:
with budget(max_usd=10.00, name="workflow") as parent:
# Parent budget → Langfuse trace
with budget(max_usd=3.00, name="research") as child1:
# Child budget → Langfuse span "workflow.research"
do_research()
with budget(max_usd=5.00, name="generation") as child2:
# Sibling budget → Langfuse span "workflow.generation"
generate_response()
In Langfuse UI: - Trace: "workflow" (parent budget) - Span: "workflow.research" (child 1) - Span: "workflow.generation" (child 2) - Each span has its own budget metadata - Waterfall view shows timing and cost allocation
Benefits: - Identify expensive workflow stages - Track budget utilization per stage - Debug nested budget violations
Feature #3: Circuit Break Events¶
When a budget limit is exceeded, Shekel creates a WARNING event in Langfuse with full debugging context.
Event metadata:
- budget_name: Name of exceeded budget
- spent: Total amount spent
- limit: The budget limit
- overage: Amount over budget (e.g., $0.50)
- model: Model being used when limit hit
- tokens: Token counts {input: N, output: M}
- parent_remaining: Parent budget remaining (if nested)
Example:
try:
with budget(max_usd=1.00, name="api-call") as b:
# This will exceed the budget
response = openai_client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Long query..."}]
)
except BudgetExceededError as e:
print(f"Budget exceeded: ${e.spent:.2f} > ${e.limit:.2f}")
In Langfuse UI:
- Filter events by name:budget_exceeded and level:WARNING
- See which budgets are being exceeded
- Analyze token counts and model costs
- Track overage patterns
Use cases: - Debug "bill shock" incidents - Identify inefficient prompts - Monitor production budget violations - Set up alerts for cost overruns
Feature #4: Fallback Annotations¶
When Shekel switches to a fallback model, it creates an INFO event and updates metadata to show the transition.
Event metadata:
- from_model: Original model (e.g., "gpt-4o")
- to_model: Fallback model (e.g., "gpt-4o-mini")
- switched_at: Cost when switch occurred
- cost_primary: Total cost on primary model
- cost_fallback: Total cost on fallback model
- savings: Estimated savings from fallback
Additional trace/span metadata:
- shekel_fallback_active: true (persists across updates)
- shekel_fallback_model: Current fallback model
Example:
with budget(max_usd=5.00, fallback={"at_pct": 0.8, "model": "gpt-4o-mini"}, name="agent") as b:
# Runs with gpt-4o until 80% of $5.00 ($4.00), then switches to gpt-4o-mini
run_agent_loop()
# Raises BudgetExceededError at $5.00
if b.model_switched:
print(f"Switched to {b.fallback['model']} at ${b.switched_at_usd:.2f}")
In Langfuse UI:
- Filter events by name:fallback_activated and level:INFO
- See which workflows trigger fallbacks
- Analyze cost savings from fallback strategy
- Compare primary vs. fallback performance
Use cases: - Validate fallback strategy is working - Measure cost savings - Track when fallbacks occur in production - Optimize fallback thresholds
Configuration¶
Trace Names¶
Customize the trace name to group related operations:
# Per-user traces
adapter = LangfuseAdapter(
client=lf,
trace_name=f"user-{user_id}"
)
# Per-endpoint traces
adapter = LangfuseAdapter(
client=lf,
trace_name=f"api-{endpoint_name}"
)
Tags¶
Apply tags for filtering and organization:
adapter = LangfuseAdapter(
client=lf,
tags=[
"production",
"api-v2",
f"region-{region}",
f"tenant-{tenant_id}"
]
)
Multiple Adapters¶
Register multiple adapters to send data to different destinations:
from shekel.integrations import AdapterRegistry
from shekel.integrations.langfuse import LangfuseAdapter
# Langfuse for observability
langfuse_adapter = LangfuseAdapter(client=lf_client)
AdapterRegistry.register(langfuse_adapter)
# Custom adapter for internal metrics
custom_adapter = MyMetricsAdapter()
AdapterRegistry.register(custom_adapter)
# Both receive events automatically
Best Practices¶
1. Register Once at Startup¶
# ✅ Good: Register at app startup
def init_observability():
lf = Langfuse(...)
adapter = LangfuseAdapter(client=lf)
AdapterRegistry.register(adapter)
init_observability()
# ❌ Bad: Don't register in hot paths
def handle_request():
AdapterRegistry.register(LangfuseAdapter(...)) # NO!
2. Use Meaningful Budget Names¶
# ✅ Good: Descriptive names
with budget(max_usd=5.00, name="user-query-generation"):
...
with budget(max_usd=2.00, name="rag-retrieval"):
...
# ❌ Bad: Generic names
with budget(max_usd=5.00, name="budget1"):
...
3. Leverage Nested Budgets¶
# ✅ Good: Clear hierarchy
with budget(max_usd=20.00, name="agent-workflow") as parent:
with budget(max_usd=5.00, name="planning") as plan:
plan_actions()
with budget(max_usd=10.00, name="execution") as exec:
execute_plan()
with budget(max_usd=5.00, name="reflection") as reflect:
reflect_on_results()
4. Set Appropriate Fallbacks¶
# ✅ Good: Same provider fallback with at_pct threshold
with budget(max_usd=5.00, fallback={"at_pct": 0.8, "model": "gpt-4o-mini"}):
run_workflow()
# Switches to gpt-4o-mini at $4.00 (80%), raises BudgetExceededError at $5.00
5. Handle Exceptions Gracefully¶
# ✅ Good: Catch and log
try:
with budget(max_usd=1.00, name="api-call") as b:
response = call_llm()
except BudgetExceededError as e:
logger.error(f"Budget exceeded: {e}")
return fallback_response()
Performance¶
The Langfuse adapter is designed for zero-impact on your application:
- Non-blocking: Events are emitted asynchronously (no API calls in hot path)
- Lightweight: Minimal memory overhead (~100 bytes per event)
- Resilient: Langfuse failures never break Shekel functionality
- Fast: <1ms overhead per LLM call
Benchmark (on modern hardware): - Adapter event emission: <0.5ms per call - Langfuse API calls: Background thread (non-blocking) - Memory: ~50 bytes per trace, ~30 bytes per span
Troubleshooting¶
Events Not Appearing in Langfuse¶
Check 1: Verify Langfuse client is initialized correctly
from langfuse import Langfuse
lf = Langfuse(
public_key="pk-lf-...",
secret_key="sk-lf-...",
host="https://cloud.langfuse.com"
)
# Test connection
lf.trace(name="test").update(metadata={"test": "ok"})
lf.flush() # Force sync
Check 2: Ensure adapter is registered before budget use
# ✅ Register BEFORE using budgets
AdapterRegistry.register(adapter)
with budget(...) as b: # Events will flow
...
Check 3: Call lf.flush() at app shutdown
Missing Nested Spans¶
Nested budgets must have unique names at each level:
# ✅ Good: Unique names
with budget(name="parent"):
with budget(name="child1"):
...
with budget(name="child2"):
...
# ❌ Bad: Duplicate names may cause issues
with budget(name="parent"):
with budget(name="child"):
...
with budget(name="child"): # Same name!
...
Adapter Errors¶
If you see warnings like "Adapter X failed on Y", check:
- Langfuse package installed:
pip install langfuse - Credentials valid: Test
lf.trace(name="test") - Network access: Ensure app can reach Langfuse host
Adapter errors are logged but won't break Shekel:
import logging
logging.basicConfig(level=logging.WARNING)
# You'll see warnings if adapter fails, but Shekel continues
with budget(max_usd=5.00) as b:
response = call_llm()
print(f"Spent: ${b.spent}") # Works regardless of adapter status
Examples¶
Complete Web API Example¶
from fastapi import FastAPI, HTTPException
from langfuse import Langfuse
from openai import OpenAI
from shekel import budget, BudgetExceededError
from shekel.integrations import AdapterRegistry
from shekel.integrations.langfuse import LangfuseAdapter
app = FastAPI()
openai_client = OpenAI()
# Initialize at startup
@app.on_event("startup")
def setup_observability():
lf = Langfuse(
public_key=os.getenv("LANGFUSE_PUBLIC_KEY"),
secret_key=os.getenv("LANGFUSE_SECRET_KEY"),
host=os.getenv("LANGFUSE_HOST", "https://cloud.langfuse.com")
)
adapter = LangfuseAdapter(
client=lf,
trace_name="chat-api",
tags=["production", os.getenv("ENV", "dev")]
)
AdapterRegistry.register(adapter)
@app.post("/chat")
async def chat(query: str, user_id: str):
try:
with budget(max_usd=0.50, fallback={"at_pct": 0.8, "model": "gpt-4o-mini"}, name=f"user-{user_id}") as b:
response = openai_client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": query}]
)
return {
"response": response.choices[0].message.content,
"cost": b.spent,
"model_switched": b.model_switched
}
except BudgetExceededError as e:
raise HTTPException(status_code=429, detail=f"Budget exceeded: ${e.spent:.2f}")
@app.on_event("shutdown")
def cleanup():
# Flush pending events
lf.flush()
Multi-Stage Agent Example¶
from langfuse import Langfuse
from openai import OpenAI
from shekel import budget
from shekel.integrations import AdapterRegistry
from shekel.integrations.langfuse import LangfuseAdapter
# Setup
lf = Langfuse(...)
adapter = LangfuseAdapter(client=lf, trace_name="agent-workflow")
AdapterRegistry.register(adapter)
openai_client = OpenAI()
def run_agent(user_query: str):
"""Multi-stage agent with nested budgets."""
with budget(max_usd=10.00, fallback={"at_pct": 0.8, "model": "gpt-4o-mini"}, name="agent") as agent_budget:
# Stage 1: Planning
with budget(max_usd=2.00, name="planning") as plan_budget:
plan = openai_client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "You are a planning assistant."},
{"role": "user", "content": f"Create a plan for: {user_query}"}
]
)
print(f"Planning cost: ${plan_budget.spent:.4f}")
# Stage 2: Research (can have multiple sub-budgets)
with budget(max_usd=5.00, name="research") as research_budget:
for topic in extract_topics(plan):
with budget(max_usd=1.00, name=f"topic-{topic}") as topic_budget:
info = research_topic(topic)
# Stage 3: Generation
with budget(max_usd=3.00, name="generation") as gen_budget:
response = openai_client.chat.completions.create(
model="gpt-4o-mini" if agent_budget.model_switched else "gpt-4o",
messages=[
{"role": "system", "content": "Generate final response."},
{"role": "user", "content": f"Plan: {plan}\nGenerate response."}
]
)
print(f"Total agent cost: ${agent_budget.spent:.4f}")
return response.choices[0].message.content
# Run and view in Langfuse UI:
# - Trace: "agent-workflow"
# - Span: "agent.planning"
# - Span: "agent.research"
# - Span: "agent.research.topic-1"
# - Span: "agent.research.topic-2"
# - Span: "agent.generation"
Migration from v0.2.3¶
If you're upgrading from Shekel v0.2.3, no breaking changes! Just add the Langfuse integration:
# Your existing code (works the same)
with budget(max_usd=5.00) as b:
response = call_llm()
print(f"Spent: ${b.spent}")
# Add Langfuse (one-time setup)
from langfuse import Langfuse
from shekel.integrations import AdapterRegistry
from shekel.integrations.langfuse import LangfuseAdapter
lf = Langfuse(...)
adapter = LangfuseAdapter(client=lf)
AdapterRegistry.register(adapter)
# Same code, now with observability!
with budget(max_usd=5.00) as b:
response = call_llm()
print(f"Spent: ${b.spent}")
Resources¶
- Langfuse Docs: https://langfuse.com/docs
- Shekel Repository: https://github.com/arieradle/shekel
- Shekel Docs: https://arieradle.github.io/shekel/
- Report Issues: https://github.com/arieradle/shekel/issues
License¶
This integration is part of Shekel and follows the same license (MIT).