Observers
Observers let you react to an agent's progress in real time without changing the agent's run loop. The streaming runtime emits structured events at every meaningful boundary: model start/finish, each token, each tool call's start and end, each loop iteration.
The trait
#[async_trait]
pub trait AgentLoopObserver: Send + Sync {
async fn on_event(&self, event: &AgentEvent);
}
That's it. One method, async, fully typed events.
Installing an observer
let observer: Arc<dyn AgentLoopObserver> = Arc::new(MyObserver::new());
agent.set_observer(observer);
Or at construction time:
let agent = StreamingToolLoopAgent::new(/* ... */)
.with_observer(observer);
Atomic swap
The observer is held behind an RwLock<Arc<dyn AgentLoopObserver>>, so you
can replace it at any time without rebuilding the agent:
// Hot-swap during a long-running session
agent.set_observer(Arc::new(VerboseObserver::new()));
// Drop back to a no-op
agent.clear_observer();
Swaps complete atomically — in-flight events use the observer that was installed when the event was emitted; subsequent events use the new one.
Built-ins
NoOpObserver— installed by default. Drops every event.PrintObserver(behindtracingfeature) — logs events viatracing::info.ChannelObserver— forwards every event into atokio::sync::mpscchannel for use with custom UIs or long-haul observation pipelines.
Building your own
use forge::prelude::*;
use std::sync::atomic::{AtomicUsize, Ordering};
struct CountingObserver {
iterations: AtomicUsize,
tool_calls: AtomicUsize,
}
#[async_trait]
impl AgentLoopObserver for CountingObserver {
async fn on_event(&self, event: &AgentEvent) {
match event {
AgentEvent::Iteration { .. } => {
self.iterations.fetch_add(1, Ordering::Relaxed);
}
AgentEvent::ToolCalled { .. } => {
self.tool_calls.fetch_add(1, Ordering::Relaxed);
}
_ => {}
}
}
}
Observer + tool invocation symmetry
Every AgentEvent::ToolCalled and AgentEvent::ToolFinished your observer
sees corresponds 1-to-1 with a ToolInvocationRecord in the final
AgentOutput. The observer is real-time; the records are post-hoc — but the
two views are guaranteed symmetrical.
This means you can:
- Drive a live status panel from the observer
- Persist a complete invocation log from
output.tool_invocations() - Reconstruct the same view either way
See Agent events for the full taxonomy.
Bridging to the harness
The harness-sdk-forge bridge will (in a follow-up release) expose an
observer that translates AgentEvents into harness_sdk::AutonomyEvents for
the terminal's activity feed. The current alpha translates the stream chunks
directly; full observer bridging tracks
harness-sdk-forge#1.
Performance
Observer dispatch happens on the agent's task. If your observer awaits I/O
(writing to disk, posting to a webhook), wrap the work in tokio::spawn:
async fn on_event(&self, event: &AgentEvent) {
let event = event.clone();
let client = self.http_client.clone();
tokio::spawn(async move {
let _ = client.post(URL).json(&event).send().await;
});
}
The RwLock is read-locked for the duration of on_event. Long-running
observers should not block — a writer (a set_observer call) will be parked
until the read completes.
Next
- Agent events — full event reference
- Streaming — the underlying chunk stream