Build on top

Stream API

Subscribe to wide events flowing through evlog, in-process, with createStreamDrain — sync listeners, async iterators, and a ring buffer.

createStreamDrain() exposes the events flowing through evlog as an in-process pub/sub. It's the primitive any local consumer can subscribe to — dashboards, devtools, CLIs, custom UIs — without re-implementing a drain.

Scope: the stream lives inside a single Node / Bun / Deno process. It sees events emitted from that process only — like the filesystem drain. It is not a cross-process bus.That means it works perfectly during local development, on long-lived self-hosted servers, and inside containers. On serverless platforms (Vercel Functions, Cloudflare Workers, AWS Lambda…), each invocation is a separate isolate, so a subscriber in one invocation will not see events emitted from another. Use a real broker (Redis Streams, NATS, Pub/Sub…) when you need cross-instance fan-out.
import { createStreamDrain } from 'evlog/stream'

const stream = createStreamDrain({ buffer: 200 })

// Register as a normal evlog drain (Nitro hook or plugin runner):
nitroApp.hooks.hook('evlog:drain', stream.drain)

Subscribing

Two consumption styles are supported.

Sync listener

const unsubscribe = stream.subscribe((event) => {
  if (event.level === 'error') notifyOps(event)
})

// Later:
unsubscribe()

Listener errors are caught and logged — they never affect other subscribers or the drain.

Async iterator

for await (const event of stream.events()) {
  console.log(event.timestamp, event.action ?? event.message)
  if (shouldStop(event)) break  // breaking cleanly unsubscribes
}

Each call to events() returns a fresh independent iterator. Past buffered events are not replayed; pair with stream.recent() to seed history.

Replay buffer

stream.recent() returns a defensive copy of the most recent events (oldest first). The default buffer holds 500 events; pass buffer: 0 to disable, or set it explicitly:

const stream = createStreamDrain({ buffer: 1000 })

const initial = stream.recent()
for (const past of initial) seedDashboard(past)

stream.subscribe(liveEvent => updateDashboard(liveEvent))

Backpressure

A slow async-iterator consumer never blocks the drain. Each iterator has a per-subscriber queue (default 1000); when it overflows, the oldest queued events are dropped and stream.droppedCount increments.

Filter

Events that fail the optional filter predicate are not buffered nor delivered:

const errors = createStreamDrain({
  filter: event => event.level === 'error' || event.status >= 500,
})

Default singleton

When several pieces of code in the same process need to share a single stream — typically a framework integration that wires the drain on one side and a consumer (UI, route, plugin) on the other — use the singleton accessors instead of passing references around:

import { getDefaultStream, setDefaultStream } from 'evlog/stream'

// Lazily creates a singleton on first call
const stream = getDefaultStream({ buffer: 500 })

// Reset (mostly useful in tests)
setDefaultStream(null)

Going further

  • Network bridge — expose this stream over HTTP (Server-Sent Events) so a browser tab, a CLI, or any external consumer can subscribe. See the SSE bridge.
  • Recipes — concrete patterns built on top of the stream + SSE bridge: minimal devtool, curl + jq inspection, replay-then-live, consumer-side filtering. See Recipes.