graphql-sse

Zero-dependency, HTTP/1 safe, simple, GraphQL over Server-Sent Events Protocol server and client

Usage no npm install needed!

<script type="module">
  import graphqlSse from 'https://cdn.skypack.dev/graphql-sse';
</script>

README


graphql-sse

Zero-dependency, HTTP/1 safe, simple, GraphQL over Server-Sent Events Protocol server and client.

Continuous integration graphql-sse


Getting started

Install

yarn add graphql-sse

Create a GraphQL schema

import { GraphQLSchema, GraphQLObjectType, GraphQLString } from 'graphql';

/**
 * Construct a GraphQL schema and define the necessary resolvers.
 *
 * type Query {
 *   hello: String
 * }
 * type Subscription {
 *   greetings: String
 * }
 */
const schema = new GraphQLSchema({
  query: new GraphQLObjectType({
    name: 'Query',
    fields: {
      hello: {
        type: GraphQLString,
        resolve: () => 'world',
      },
    },
  }),
  subscription: new GraphQLObjectType({
    name: 'Subscription',
    fields: {
      greetings: {
        type: GraphQLString,
        subscribe: async function* () {
          for (const hi of ['Hi', 'Bonjour', 'Hola', 'Ciao', 'Zdravo']) {
            yield { greetings: hi };
          }
        },
      },
    },
  }),
});

Start the server

With http
import http from 'http';
import { createHandler } from 'graphql-sse';

// Create the GraphQL over SSE handler
const handler = createHandler({
  schema, // from the previous step
});

// Create a HTTP server using the handler on `/graphql/stream`
const server = http.createServer((req, res) => {
  if (req.url.startsWith('/graphql/stream')) return handler(req, res);
  return res.writeHead(404).end();
});

server.listen(4000);
console.log('Listening to port 4000');
With http2

Browsers might complain about self-signed SSL/TLS certificates. Help can be found on StackOverflow.

$ openssl req -x509 -newkey rsa:2048 -nodes -sha256 -subj '/CN=localhost' \
  -keyout localhost-privkey.pem -out localhost-cert.pem
import fs from 'fs';
import http2 from 'http2';
import { createHandler } from 'graphql-sse';

// Create the GraphQL over SSE handler
const handler = createHandler({
  schema, // from the previous step
});

// Create a HTTP/2 server using the handler on `/graphql/stream`
const server = http2.createSecureServer(
  {
    key: fs.readFileSync('localhost-privkey.pem'),
    cert: fs.readFileSync('localhost-cert.pem'),
  },
  (req, res) => {
    if (req.url.startsWith('/graphql/stream')) return handler(req, res);
    return res.writeHead(404).end();
  },
);

server.listen(4000);
console.log('Listening to port 4000');
With express
import express from 'express'; // yarn add express
import { createHandler } from 'graphql-sse';

// Create the GraphQL over SSE handler
const handler = createHandler({ schema });

// Create an express app serving all methods on `/graphql/stream`
const app = express();
app.use('/graphql/stream', handler);

app.listen(4000);
console.log('Listening to port 4000');
With fastify
import Fastify from 'fastify'; // yarn add fastify
import { createHandler } from 'graphql-sse';

// Create the GraphQL over SSE handler
const handler = createHandler({ schema });

// Create a fastify instance serving all methods on `/graphql/stream`
const fastify = Fastify();
fastify.all('/graphql/stream', (req, res) =>
  handler(
    req.raw,
    res.raw,
    req.body, // fastify reads the body for you
  ),
);

fastify.listen(4000);
console.log('Listening to port 4000');

Use the client

import { createClient } from 'graphql-sse';

const client = createClient({
  // singleConnection: true, use "single connection mode" instead of the default "distinct connection mode"
  url: 'http://localhost:4000/graphql/stream',
});

// query
(async () => {
  const result = await new Promise((resolve, reject) => {
    let result;
    client.subscribe(
      {
        query: '{ hello }',
      },
      {
        next: (data) => (result = data),
        error: reject,
        complete: () => resolve(result),
      },
    );
  });

  expect(result).toEqual({ hello: 'world' });
})();

// subscription
(async () => {
  const onNext = () => {
    /* handle incoming values */
  };

  let unsubscribe = () => {
    /* complete the subscription */
  };

  await new Promise((resolve, reject) => {
    unsubscribe = client.subscribe(
      {
        query: 'subscription { greetings }',
      },
      {
        next: onNext,
        error: reject,
        complete: resolve,
      },
    );
  });

  expect(onNext).toBeCalledTimes(5); // we say "Hi" in 5 languages
})();

Recipes

🔗 Client usage with Promise
import { createClient, SubscribePayload } from 'graphql-sse';

const client = createClient({
  url: 'http://hey.there:4000/graphql/stream',
});

async function execute<T>(payload: SubscribePayload) {
  return new Promise<T>((resolve, reject) => {
    let result: T;
    client.subscribe<T>(payload, {
      next: (data) => (result = data),
      error: reject,
      complete: () => resolve(result),
    });
  });
}

// use
(async () => {
  try {
    const result = await execute({
      query: '{ hello }',
    });
    // complete
    // next = result = { data: { hello: 'Hello World!' } }
  } catch (err) {
    // error
  }
})();
🔗 Client usage with AsyncIterator
import { createClient, SubscribePayload } from 'graphql-sse';

const client = createClient({
  url: 'http://iterators.ftw:4000/graphql/stream',
});

function subscribe<T>(payload: SubscribePayload): AsyncGenerator<T> {
  let deferred: {
    resolve: (done: boolean) => void;
    reject: (err: unknown) => void;
  } | null = null;
  const pending: T[] = [];
  let throwMe: unknown = null,
    done = false;
  const dispose = client.subscribe<T>(payload, {
    next: (data) => {
      pending.push(data);
      deferred?.resolve(false);
    },
    error: (err) => {
      throwMe = err;
      deferred?.reject(throwMe);
    },
    complete: () => {
      done = true;
      deferred?.resolve(true);
    },
  });
  return {
    [Symbol.asyncIterator]() {
      return this;
    },
    async next() {
      if (done) return { done: true, value: undefined };
      if (throwMe) throw throwMe;
      if (pending.length) return { value: pending.shift()! };
      return (await new Promise<boolean>(
        (resolve, reject) => (deferred = { resolve, reject }),
      ))
        ? { done: true, value: undefined }
        : { value: pending.shift()! };
    },
    async throw(err) {
      throw err;
    },
    async return() {
      dispose();
      return { done: true, value: undefined };
    },
  };
}

(async () => {
  const subscription = subscribe({
    query: 'subscription { greetings }',
  });
  // subscription.return() to dispose

  for await (const result of subscription) {
    // next = result = { data: { greetings: 5x } }
  }
  // complete
})();
🔗 Client usage with Observable
import { Observable } from 'relay-runtime';
// or
import { Observable } from '@apollo/client/core';
// or
import { Observable } from 'rxjs';
// or
import Observable from 'zen-observable';
// or any other lib which implements Observables as per the ECMAScript proposal: https://github.com/tc39/proposal-observable

const client = createClient({
  url: 'http://graphql.loves:4000/observables',
});

function toObservable(operation) {
  return new Observable((observer) =>
    client.subscribe(operation, {
      next: (data) => observer.next(data),
      error: (err) => observer.error(err),
      complete: () => observer.complete(),
    }),
  );
}

const observable = toObservable({ query: `subscription { ping }` });

const subscription = observable.subscribe({
  next: (data) => {
    expect(data).toBe({ data: { ping: 'pong' } });
  },
});

// ⏱

subscription.unsubscribe();
🔗 Client usage with Relay
import { GraphQLError } from 'graphql';
import {
  Network,
  Observable,
  RequestParameters,
  Variables,
} from 'relay-runtime';
import { createClient } from 'graphql-sse';

const subscriptionsClient = createClient({
  url: 'http://i.love:4000/graphql/stream',
  headers: () => {
    const session = getSession();
    if (!session) return {};
    return {
      Authorization: `Bearer ${session.token}`,
    };
  },
});

// yes, both fetch AND subscribe can be handled in one implementation
function fetchOrSubscribe(operation: RequestParameters, variables: Variables) {
  return Observable.create((sink) => {
    if (!operation.text) {
      return sink.error(new Error('Operation text cannot be empty'));
    }
    return subscriptionsClient.subscribe(
      {
        operationName: operation.name,
        query: operation.text,
        variables,
      },
      sink,
    );
  });
}

export const network = Network.create(fetchOrSubscribe, fetchOrSubscribe);
🔗 Client usage with urql
import { createClient, defaultExchanges, subscriptionExchange } from 'urql';
import { createClient as createWSClient } from 'graphql-sse';

const sseClient = createWSClient({
  url: 'http://its.urql:4000/graphql/stream',
});

const client = createClient({
  url: '/graphql/stream',
  exchanges: [
    ...defaultExchanges,
    subscriptionExchange({
      forwardSubscription(operation) {
        return {
          subscribe: (sink) => {
            const dispose = sseClient.subscribe(operation, sink);
            return {
              unsubscribe: dispose,
            };
          },
        };
      },
    }),
  ],
});
🔗 Client usage with Apollo
import {
  ApolloLink,
  Operation,
  FetchResult,
  Observable,
} from '@apollo/client/core';
import { print, GraphQLError } from 'graphql';
import { createClient, ClientOptions, Client } from 'graphql-sse';

class SSELink extends ApolloLink {
  private client: Client;

  constructor(options: ClientOptions) {
    super();
    this.client = createClient(options);
  }

  public request(operation: Operation): Observable<FetchResult> {
    return new Observable((sink) => {
      return this.client.subscribe<FetchResult>(
        { ...operation, query: print(operation.query) },
        {
          next: sink.next.bind(sink),
          complete: sink.complete.bind(sink),
          error: sink.error.bind(sink),
        },
      );
    });
  }
}

const link = new SSELink({
  url: 'http://where.is:4000/graphql/stream',
  headers: () => {
    const session = getSession();
    if (!session) return {};
    return {
      Authorization: `Bearer ${session.token}`,
    };
  },
});
🔗 Client usage for HTTP/1 (aka. single connection mode)
import { createClient } from 'graphql-sse';

const client = createClient({
  singleConnection: true, // this is literally it 😄
  url: 'http://use.single:4000/connection/graphql/stream',
  // lazy: true (default) -> connect on first subscribe and disconnect on last unsubscribe
  // lazy: false -> connect as soon as the client is created
});

// The client will now run in a "single connection mode" mode. Meaning,
// a single SSE connection will be used to transmit all operation results
// while separate HTTP requests will be issued to dictate the behaviour.
🔗 Client usage with custom retry timeout strategy
import { createClient } from 'graphql-sse';
import { waitForHealthy } from './my-servers';

const url = 'http://i.want.retry:4000/control/graphql/stream';

const client = createClient({
  url,
  retryWait: async function waitForServerHealthyBeforeRetry() {
    // if you have a server healthcheck, you can wait for it to become
    // healthy before retrying after an abrupt disconnect (most commonly a restart)
    await waitForHealthy(url);

    // after the server becomes ready, wait for a second + random 1-4s timeout
    // (avoid DDoSing yourself) and try connecting again
    await new Promise((resolve) =>
      setTimeout(resolve, 1000 + Math.random() * 3000),
    );
  },
});
🔗 Client usage in browser
<!DOCTYPE html>
<html>
  <head>
    <meta charset="utf-8" />
    <title>GraphQL over Server-Sent Events</title>
    <script
      type="text/javascript"
      src="https://unpkg.com/graphql-sse/umd/graphql-sse.min.js"
    ></script>
  </head>
  <body>
    <script type="text/javascript">
      const client = graphqlSse.createClient({
        url: 'http://umdfor.the:4000/win/graphql/stream',
      });

      // consider other recipes for usage inspiration
    </script>
  </body>
</html>
🔗 Client usage in Node
const ws = require('ws'); // yarn add ws
const fetch = require('node-fetch'); // yarn add node-fetch
const { AbortController } = require('node-abort-controller'); // (node < v15) yarn add node-abort-controller
const Crypto = require('crypto');
const { createClient } = require('graphql-sse');

const client = createClient({
  url: 'http://no.browser:4000/graphql/stream',
  fetchFn: fetch,
  abortControllerImpl: AbortController, // node < v15
  /**
   * Generates a v4 UUID to be used as the ID.
   * Reference: https://gist.github.com/jed/982883
   */
  generateID: () =>
    ([1e7] + -1e3 + -4e3 + -8e3 + -1e11).replace(/[018]/g, (c) =>
      (c ^ (Crypto.randomBytes(1)[0] & (15 >> (c / 4)))).toString(16),
    ),
});

// consider other recipes for usage inspiration
🔗 Server handler usage with custom authentication
import { createHandler } from 'graphql-sse';
import {
  schema,
  getOrCreateTokenFromCookies,
  customAuthenticationTokenDiscovery,
  processAuthorizationHeader,
} from './my-graphql';

const handler = createHandler({
  schema,
  authenticate: async (req, res) => {
    let token = req.headers['x-graphql-event-stream-token'];
    if (token) {
      // When the client is working in a "single connection mode"
      // all subsequent requests for operations will have the
      // stream token set in the `X-GraphQL-Event-Stream-Token` header.
      //
      // It is considered safe to accept the header token always
      // because if a stream reservation does not exist, or is already
      // fulfilled, the handler itself will reject the request.
      //
      // Read more: https://github.com/enisdenjo/graphql-sse/blob/master/PROTOCOL.md#single-connection-mode
      return Array.isArray(token) ? token.join('') : token;
    }

    // It is necessary to generate a unique token when dealing with
    // clients that operate in the "single connection mode". The process
    // of generating the token is completely up to the implementor.
    token = getOrCreateTokenFromCookies(req);
    // or
    token = processAuthorizationHeader(req.headers['authorization']);
    // or
    token = await customAuthenticationTokenDiscovery(req);

    // Using the response argument the implementor may respond to
    // authentication issues however he sees fit.
    if (!token) return res.writeHead(401, 'Unauthorized').end();

    // Clients that operate in "distinct connections mode" dont
    // need a unique stream token. It is completely ok to simply
    // return an empty string for authenticated clients.
    //
    // Read more: https://github.com/enisdenjo/graphql-sse/blob/master/PROTOCOL.md#distinct-connections-mode
    if (req.method === 'POST' && req.headers.accept === 'text/event-stream') {
      // "distinct connections mode" requests an event-stream with a POST
      // method. These two checks, together with the lack of `X-GraphQL-Event-Stream-Token`
      // header, are sufficient for accurate detection.
      return ''; // return token; is OK too
    }

    // On the other hand, clients operating in "single connection mode"
    // need a unique stream token which will be provided alongside the
    // incoming event stream request inside the `X-GraphQL-Event-Stream-Token` header.
    //
    // Read more: https://github.com/enisdenjo/graphql-sse/blob/master/PROTOCOL.md#single-connection-mode
    return token;
  },
});

// use `handler` with your favourite http library
🔗 Server handler usage with dynamic schema
import { createHandler } from 'graphql-sse';
import { schema, checkIsAdmin, getDebugSchema } from './my-graphql';

const handler = createHandler({
  schema: async (req, executionArgsWithoutSchema) => {
    // will be called on every subscribe request
    // allowing you to dynamically supply the schema
    // using the depending on the provided arguments
    const isAdmin = await checkIsAdmin(req);
    if (isAdmin) return getDebugSchema(req, executionArgsWithoutSchema);
    return schema;
  },
});

// use `handler` with your favourite http library
🔗 Server handler usage with custom context value
import { createHandler } from 'graphql-sse';
import { schema, getDynamicContext } from './my-graphql';

const handler = createHandler({
  schema,
  // or static context by supplying the value direcly
  context: async (req, args) => {
    return getDynamicContext(req, args);
  },
});

// use `handler` with your favourite http library
🔗 Server handler usage with custom execution arguments
import { parse } from 'graphql';
import { createHandler } from 'graphql-sse';
import { getSchema, myValidationRules } from './my-graphql';

const handler = createHandler({
  onSubscribe: async (req, _res, params) => {
    const schema = await getSchema(req);

    const args = {
      schema,
      operationName: params.operationName,
      document: parse(params.query),
      variableValues: params.variables,
    };

    return args;
  },
});

// use `handler` with your favourite http library
🔗 Server handler and client usage with persisted queries
// 🛸 server

import { parse, ExecutionArgs } from 'graphql';
import { createHandler } from 'graphql-sse';
import { schema } from './my-graphql-schema';

// a unique GraphQL execution ID used for representing
// a query in the persisted queries store. when subscribing
// you should use the `SubscriptionPayload.query` to transmit the id
type QueryID = string;

const queriesStore: Record<QueryID, ExecutionArgs> = {
  iWantTheGreetings: {
    schema, // you may even provide different schemas in the queries store
    document: parse('subscription Greetings { greetings }'),
  },
};

const handler = createHandler(
  {
    onSubscribe: (req, res, params) => {
      const persistedQuery = queriesStore[params.extensions?.persistedQuery];
      if (persistedQuery) {
        return {
          ...persistedQuery,
          variableValues: params.variables, // use the variables from the client
        };
      }

      // for extra security only allow the queries from the store
      return res.writeHead(404, 'Query Not Found').end();
    },
  },
  wsServer,
);

// use `handler` with your favourite http library
// 📺 client

import { createClient } from 'graphql-sse';

const client = createClient({
  url: 'http://persisted.graphql:4000/queries',
});

(async () => {
  const onNext = () => {
    /**/
  };

  await new Promise((resolve, reject) => {
    client.subscribe(
      {
        query: '', // query field is required, but you can leave it empty for persisted queries
        extensions: {
          persistedQuery: 'iWantTheGreetings',
        },
      },
      {
        next: onNext,
        error: reject,
        complete: resolve,
      },
    );
  });

  expect(onNext).toBeCalledTimes(5); // greetings in 5 languages
})();

Documentation

Check the docs folder out for TypeDoc generated documentation.

How does it work?

Read about the exact transport intricacies used by the library in the GraphQL over Server-Sent Events Protocol document.

Want to help?

File a bug, contribute with code, or improve documentation? Read up on our guidelines for contributing and drive development with yarn test --watch away!