pg-logical-replication

PostgreSQL Location Replication client - logical WAL replication streaming

Usage no npm install needed!

<script type="module">
  import pgLogicalReplication from 'https://cdn.skypack.dev/pg-logical-replication';
</script>

README

pg-logical-replication

  • PostgreSQL Logical Replication client for node.js

1. Install

$ npm install pg-logical-replication

2. LogicalReplication

new LogicalReplication( object config ) : Stream
var LogicalReplication = require('pg-logical-replication');
var stream = new LogicalReplication({/*config*/});

3. Stream

3-1. Method - Stream.getChanges

  • Start WAL streaming of changes.
stream.getChanges( /*string*/ slotName, /*string*/ uptoLsn, /*object*/ option, /*function(err)*/ initialErrorCallback );
  • uptoLsn can be null, the minimum value is "0/00000000".
  • option can contain any of the following optional properties
    • standbyMessageTimeout : maximum seconds between keepalive messages (default: 10)
    • includeXids : bool (default: false)
    • includeTimestamp : bool (default: false)
    • queryOptions : object containing decoder specific options (optional)
      • 'include-types': false
      • 'filter-tables': 'foo.bar'

3-2. Method - Stream.stop

  • Stop WAL streaming.
stream.stop();

3-3. Event - Stream.on('data')

  • Raised when new data streamed from PostgreSQL server.
stream.on('data', (/*object*/ msg)=>{/*...*/});
  • msg contains lsn (string), log (buffer)

3-4. Event - Stream.on('error')

  • Raised when error or disconnected.
stream.on('error', (/*object*/ err)=>{/*...*/});

4. Plugin

4-1. test_decoding output

  • If you are using test_decoding, this plugin will be useful.
var PluginTestDecoding = LogicalReplication.LoadPlugin('output/test_decoding');
PluginTestDecoding.parse(msg.log.toString('utf8'));

Example

/*
 * Test progress
 * 1. Create logical replication slot with test_decoding output plugin : SELECT * FROM pg_create_logical_replication_slot('test_slot', 'test_decoding');
 * 2. Launch nodejs with this file : node test.js
 * 3. Modify data of database
 */
var LogicalReplication = require('pg-logical-replication');
var PluginTestDecoding = LogicalReplication.LoadPlugin('output/test_decoding');

//Connection parameter : https://github.com/brianc/node-postgres/wiki/Client#parameters
var connInfo = {};

//Initialize with last LSN value
var lastLsn = null;

var stream = (new LogicalReplication(connInfo))
    .on('data', function(msg) {
        lastLsn = msg.lsn || lastLsn;

        var log = (msg.log || '').toString('utf8');
        try {
            console.log(PluginTestDecoding.parse(log));
            //TODO: DO SOMETHING. eg) replicate to other dbms(pgsql, mysql, ...)
        } catch (e) {
            console.trace(log, e);
        }
    }).on('error', function(err) {
        console.trace('Error #2', err);
        setTimeout(proc, 1000);
    });

(function proc() {
    stream.getChanges('test_slot', lastLsn, {
        includeXids: false, //default: false
        includeTimestamp: false, //default: false
    }, function(err) {
        if (err) {
            console.trace('Logical replication initialize error', err);
            setTimeout(proc, 1000);
        }
    });
})();

PostgreSQL side

  • postgresql.conf
wal_level = logical
max_wal_senders = [bigger than 1]
max_replication_slots = [bigger than 1]
  • Create logical replication slot
SELECT * FROM pg_create_logical_replication_slot('test_slot', 'test_decoding');
  • Delete logical replication slot
SELECT pg_drop_replication_slot('test_slot');