Skip to content

Connection adapters

Connection adapters allow you to make Better SSE sessions compatible with any protocol, framework or runtime environment.

Where sessions manage the request and response lifecycle, the formatting of data and other SSE-specific behaviour, connection adapters implement the underlying connection itself, such as extracting request headers and query parameters, sending response headers and data chunks, and reporting when the connection closes.

Better SSE ships with several built-in connection adapters which are used internally but can also be extended to add or augment functionality.

When a session is created, it will automatically determine which connection adapter to use based on the arguments provided.

When providing an instance of Request (and optionally Response) from the Fetch API, such as when using Hono, for example:

import { createResponse } from "better-sse"
app.get("/sse", async (c) =>
createResponse(c.req.raw, (session) => {
session.push("Hello world!")
})
)

The session will use the built-in FetchConnection adapter, equivalent to the following:

import { createResponse, FetchConnection } from "better-sse"
app.get("/sse", async (c) => {
const connection = new FetchConnection(c.req.raw, null)
return createResponse(connection, (session) => {
session.push("Hello world!")
})
})

When providing instances of IncomingMessage and ServerResponse from the Node HTTP/1 API, such as when using Express, for example:

import { createSession } from "better-sse"
app.get("/sse", async (req, res) => {
const session = await createSession(req, res)
session.push("Hello world!")
})

The session will use the built-in NodeHttp1Connection adapter, equivalent to the following:

import { createSession, NodeHttp1Connection } from "better-sse"
app.get("/sse", async (req, res) => {
const connection = new NodeHttp1Connection(req, res)
const session = await createSession(connection)
session.push("Hello world!")
})

When providing instances of Http2ServerRequest and Http2ServerResponse from the Node HTTP/2 Compatibility API, such as when using createServer, for example:

import { createSession } from "better-sse"
createServer(async (req, res) => {
const { ":path": path, ":method": method } = req.headers
if (req.url === "/sse") {
const session = await createSession(req, res)
session.push("Hello world!")
}
})

The session will use the built-in NodeHttp2CompatConnection adapter, equivalent to the following:

import { createSession, NodeHttp2CompatConnection } from "better-sse"
createServer(async (req, res) => {
const { ":path": path, ":method": method } = req.headers
if (req.url === "/sse") {
const connection = new NodeHttp2CompatConnection(req, res)
const session = await createSession(connection)
session.push("Hello world!")
}
})

You can create a custom connection adapter from scratch - extending from the Connection class directly - or by using any of the built-in adapters as a base:

If you want to publish your adapter as an npm-compatible package, remember to add better-sse as a peer dependency in your package.json file:

package.json
{
"name": "my-custom-connection-adapter",
"version": "0.1.0",
...
"peerDependencies": {
"better-sse": ">=0.16.0"
}
}

Let’s create a custom connection adapter that abstracts away the implementation details of SSE streaming with Koa.

To begin, create a class that extends from the base Connection class and takes a Koa Context object as its first argument:

KoaConnection.ts
import { Connection } from "better-sse"
import type { Context } from "koa"
class KoaConnection extends Connection {
constructor(private ctx: Context) {
super()
}
}

Next, we will implement the url property that the session will use to parse the query string of the request URL. Thankfully, Koa provides an instance of a WHATWG URL object natively:

KoaConnection.ts
class KoaConnection extends Connection {
url: URL
constructor(private ctx: Context) {
super()
this.url = ctx.URL
}
}

Then, we will implement the request property that the session will use to parse the request headers and detect connection closure:

KoaConnection.ts
class KoaConnection extends Connection {
private controller: AbortController
url: URL
request: Request
constructor(private ctx: Context) {
super()
this.url = ctx.URL
this.controller = new AbortController()
this.request = new Request(this.url, {
method: ctx.request.method ?? Connection.constants.REQUEST_METHOD,
signal: this.controller.signal,
})
Connection.applyHeaders(ctx.headers, this.request.headers)
}
}

Here, we create an AbortController with its signal attached to the request.

We also copy the request method across, setting it to the default method given to us by the Connection static constant properties if it is not defined.

Additionally, we use the built-in Connection.applyHeaders static utility method to copy the request headers from Koa’s Context#headers to the request headers.


Next, we will implement the response property that the session will use to parse the response status code and headers:

KoaConnection.ts
class KoaConnection extends Connection {
private controller: AbortController
url: URL
request: Request
response: Response
constructor(private ctx: Context) {
super()
this.url = ctx.URL
this.controller = new AbortController()
this.request = new Request(this.url, {
method: ctx.request.method ?? Connection.constants.REQUEST_METHOD,
signal: this.controller.signal,
})
Connection.applyHeaders(ctx.headers, this.request.headers)
ctx.status = Connection.constants.RESPONSE_CODE
ctx.set(Connection.constants.RESPONSE_HEADERS)
this.response = new Response(null, {
status: Connection.constants.RESPONSE_CODE,
headers: Connection.constants.RESPONSE_HEADERS,
})
}
}

Again, we use the built-in Connection.constants static properties to set default values for the status code and headers.

We set the response body to null as Koa currently does not support streaming Fetch response bodies. Instead, we will implement the response stream using a Node stream.


To finish off our constructor definition, we create a Node PassThrough stream that simply passes written data directly through to its output and set it to be the response body:

KoaConnection.ts
...
import { PassThrough } from "node:stream"
class KoaConnection extends Connection {
private controller: AbortController
private stream: PassThrough
url: URL
request: Request
response: Response
constructor(private ctx: Context) {
super()
this.url = ctx.URL
this.controller = new AbortController()
this.request = new Request(this.url, {
method: ctx.request.method ?? Connection.constants.REQUEST_METHOD,
signal: this.controller.signal,
})
Connection.applyHeaders(ctx.headers, this.request.headers)
ctx.status = Connection.constants.RESPONSE_CODE
ctx.set(Connection.constants.RESPONSE_HEADERS)
this.response = new Response(null, {
status: Connection.constants.RESPONSE_CODE,
headers: Connection.constants.RESPONSE_HEADERS,
})
this.stream = new PassThrough()
this.stream.once("close", this.onClose)
ctx.body = this.stream
}
private onClose = () => {
this.controller.abort()
}
}

When the stream closes we trigger the abort signal to notify the session that the connection has been terminated.


Now, we will implement the first abstract method - sendHead - that is called by the session to send the response status code, headers and any necessary preamble data to the client using Koa’s Context#flushHeaders method:

KoaConnection.ts
class KoaConnection extends Connection {
...
sendHead() {
this.ctx.flushHeaders()
}
}

Next is the sendChunk method that writes the given data to the stream:

KoaConnection.ts
class KoaConnection extends Connection {
...
sendHead() {
this.ctx.flushHeaders()
}
sendChunk(chunk: string) {
this.stream.write(chunk)
}
}

And finally we clean up our event listeners when the connection is closed with the cleanup method:

KoaConnection.ts
class KoaConnection extends Connection {
...
sendHead() {
this.ctx.flushHeaders()
}
sendChunk(chunk: string) {
this.stream.write(chunk)
}
cleanup() {
this.stream.removeListener("close", this.onClose)
}
}

By default, Koa will log errors produced by streams to the console. When streams are closed prematurely this usually is an error, but with SSE this is expected as the onus is on the client to close the connection stream that the server opens.

As such, we can add additional functionality to suppress logging errors when an SSE stream is closed “prematurely”:

KoaConnection.ts
class KoaConnection extends Connection {
...
static addListeners = (app: Koa) => {
app.on("error", (err, ctx?: Context) => {
if (
err.code === "ERR_STREAM_PREMATURE_CLOSE" &&
ctx?.response.get("Content-Type") === "text/event-stream"
) {
return
}
app.onerror(err)
})
}
}

The final completed code for the Koa adapter is given below:

KoaConnection.ts
import { PassThrough } from "node:stream"
import { createSession } from "better-sse"
import Koa, { Context } from "koa"
class KoaConnection extends Connection {
static addListeners = (app: Koa) => {
app.on("error", (err, ctx?: Context) => {
if (
err.code === "ERR_STREAM_PREMATURE_CLOSE" &&
ctx?.response.get("Content-Type") === "text/event-stream"
) {
return
}
app.onerror(err)
})
}
private controller: AbortController
private stream: PassThrough
url: URL
request: Request
response: Response
constructor(private ctx: Context) {
super()
this.url = ctx.URL
this.controller = new AbortController()
this.request = new Request(this.url, {
method: ctx.request.method ?? Connection.constants.REQUEST_METHOD,
signal: this.controller.signal,
})
Connection.applyHeaders(ctx.headers, this.request.headers)
ctx.status = Connection.constants.RESPONSE_CODE
ctx.set(Connection.constants.RESPONSE_HEADERS)
this.response = new Response(null, {
status: Connection.constants.RESPONSE_CODE,
headers: Connection.constants.RESPONSE_HEADERS,
})
this.stream = new PassThrough()
this.stream.once("close", this.onClose)
ctx.body = this.stream
}
private onClose = () => {
this.controller.abort()
}
sendHead = () => {
this.ctx.flushHeaders()
}
sendChunk = (chunk: string) => {
this.stream.write(chunk)
}
cleanup = () => {
this.stream.removeListener("close", this.onClose)
}
}
export { KoaConnection }

It can be used like the following:

server.ts
import Koa from "koa"
import Router from "@koa/router"
import { createSession } from "better-sse"
import { KoaConnection } from "./KoaConnection"
const app = new Koa()
const router = new Router()
KoaConnection.addListeners(app)
router.get("/sse", async (ctx) => {
const connection = new KoaConnection(ctx)
const session = await createSession(connection)
session.push("Hello world!")
})
app.use(router.routes()).use(router.allowedMethods())
app.listen(3000)

For frameworks that implement full compatibility with one of the built-in connection adapters but do not extend from the actual underlying base class, such as Ultimate Express, the session will not be able to determine which connection adapter to use automatically:

import express from "ultimate-express"
import { createSession } from "better-sse"
const app = express()
app.get("/sse", async (req, res) => {
const session = await createSession(req, res) // SseError thrown!
})

In this case, you can simply create an instance of the appropriate built-in connection adapter yourself and pass it to the session to skip the detection mechanism:

import express from "ultimate-express"
import { createSession, NodeHttp1Connection } from "better-sse"
const app = express()
app.get("/sse", async (req, res) => {
const session = await createSession(req, res)
const connection = new NodeHttp1Connection(req, res)
const session = await createSession(connection)
})