rodabase

Transactional document store for Node.js and browsers. Built on LevelDB.

Usage no npm install needed!

<script type="module">
  import rodabase from 'https://cdn.skypack.dev/rodabase';
</script>

README

Rodabase

Transactional, replicable document store for Node.js and browsers. Built on LevelDB.

Build Status Coverage Status

$ npm install rodabase

License

MIT

API

API stable; documentation in progress.

rodabase(path, [options])

var rodabase = require('rodabase');

var roda = rodabase('./db');

roda(name)

.put(id, doc, [tx], [cb])

Create a new document or update an existing document doc by specifying id.

Optionally bind to a transaction instance tx.

roda('users').put('bob', { foo: 'bar' }, function(err, doc){
  //example doc
  { 
    "_id": "bob",
    "foo": "bar", 
    "_rev": "5U42CUvHEz"
  }
});

.post(doc, [tx], [cb])

Create a new document doc with an auto-generated _id. Auto generated _id is a unique, URL-safe, time sorted string. Optionally bind to a transaction instance tx.

roda('users').post({ foo: 'bar' }, function(err, doc){
  //example doc
  { 
    "_id": "FZBJIBTCaEJk8924J0A",
    "foo": "bar", 
    "_rev": "5U42CUvHF"
  }
});

.get(id, [tx], [cb])

Retrieve a document specified by id. If id not exists, callback with notFound error. Optionally bind to a transaction instance tx.

roda('users').get('bob', function(err, doc){
  if(err){
    if(err.notFound){
      //document not exists
      return;
    }
    //I/O or other errors
    return;
  }
  //handle document here
});

.del(id, [tx], [cb])

Delete a document specified by id. If document not exists, callback with notFound error. Optionally bind to a transaction instance tx.

Transaction

Transactions in Rodabase guarantee linearizable consistency for local operations, which avoids unexpected behavior and simplifies application development.

LevelDB supports atomic batched operations, while durability is configurable via sync option of LevelDB. Rodabase leverages level-transactions for two-phase locking and snapshot isolation, which makes it ACID compliant.

roda.transaction()

Creates a new transaction instance. get(), put(), del(), getBy() methods can be binded to the transaction instance, to perform operations in a sequential, atomic, isolated manner.

//Transactional get and put
var tx = roda.transaction();
roda('users').get('bob', tx, function(err, doc){
  if(!doc)
    return tx.rollback(new Error('not exists'));

  doc.count++;

  //only presists if commit success
  roda('users').put('bob', doc, tx);
  roda('foo').put('bar', { hello: 'world' }, tx);
})
tx.commit(function(err){
  //err [Error: not exists] if 'bob' not found
});

Hooks

.use('validate', [hook...])

validation triggered when putting a document. Invoked at the beginning of a write operation, result can be validated and changes can be made before the document is locked.

Context object consists of the following properties:

  • result: Result document before locking.
var people = roda('people');

people.use('validate', function(ctx, next){
  if(typeof ctx.result.name !== 'string')
    return next(new Error('Name must be a string.'));

  //modify result
  ctx.result.name = ctx.result.name.toUpperCase();

  next();
});

people.post({ name: 123 }, function(err, val){
  //Error: Name must be a string.
});
people.put('foo', { name: 'bar' }, function(err, val){
  //val.name === 'BAR'
});
people.del('foo'); //will not trigger validate

.use('diff', [hook...])

diff triggered when putting and deleting a document. Invoked when document is locked, current and resulting states of document are accessible. It also exposes the transaction instance, which makes it a very powerful mechanism for a lot of use cases, such as enforcing data integrity and permissions, creating arbitrary triggers and versioning patterns.

Context object consists of the following properties:

  • current: Current state of document. null if this is an insert.
  • result: Resulting document. null if this is a delete.
  • transaction: Transaction instance.
var data = roda('data');
var logs = roda('logs');

data.use('diff', function(ctx, next){
  var from = ctx.current ? ctx.current.n : 0;
  var to = ctx.result ? ctx.result.n : 0;

  //Transaction works across sections
  logs.post({ delta: to - from }, ctx.transaction);

  next();
});

var tx = roda.transaction();

data.put('bob', { n: 6 }, tx);
data.put('bob', { n: 8 }, tx);
data.put('bob', { n: 9 }, tx);
data.del('bob', tx);

tx.commit(function(){
  logs.readStream().pluck('delta').toArray(...); //[6, 2, 1, -9]
});

.use('conflict', [hook...])

Indexes

Rodabase supports secondary indexes using mapper function. Indexes are calculated transactionally, results can be retrieved right after callback of a successful write.

.registerIndex(name, mapper)

Register an index named name using mapper function.

mapper is provided with document object and emit function function(doc, emit){}.

emit conists of arguments emit(key, [doc], [unique]) that must be called synchronously within the mapper:

  • key index key. Unlike _id, key can be arbitrary object for sorting, such as String, Number, Date or prefixing with Array. Except null or undefined key is not allowed.
  • doc object, optionally specify the mapped document object.
  • unique boolean. If true, key must be unique within the index, otherwise writes callback with exists error. Default false.
//Non unique index
roda('users').registerIndex('age', function(doc, emit){
  emit(doc.age); //can be non-unique
});

//Unique index
roda('users').registerIndex('email', function(doc, emit){
  emit(doc.email, true); //unique
});

//Multiple emits, Array prefixed
roda('posts').registerIndex('tag', function(doc, emit){
  if( Array.isArray(doc.tags) )
    doc.tags.forEach(function(tag){
      emit([tag, doc.updated]);
    });
});

//Conditional emit, sorted by updated
roda('posts').registerIndex('recent', function(doc, emit){
  if(doc.active) emit(doc.updated);
});

.rebuildIndex([tag], [cb])

Indexes need to be rebuilt when registerIndex() after a document is committed, or when mapper function has changed.

rebuildIndex() will rebuild all registered index within the roda section. Optionally specify tag so that indexes will only get rebuilt when tag has changed.

users.rebuildIndex('1.1', function(){
  //indexes 1.1 rebuilt successfully.
});

.readStream([options])

Obtain a ReadStream of the Roda section by calling the readStream() method. You can specify range options control the range of documents that are streamed. options accepts following properties:

  • gt (greater than), gte (greater than or equal) define the lower bound of _id or _key to be streamed. When reverse: true the order will be reversed, but the documents streamed will be the same.
  • lt (less than), lte (less than or equal) define the higher bound of _id or _key to be streamed. When reverse: true the order will be reversed, but the documents streamed will be the same.
  • reverse boolean, default false, set true to reverse stream output.
  • limit number, limit the number of results. Default no limit.
  • index define index to be used. Default indexed by _id.
  • prefix define string or array prefix of _id or _key to be streamed. Default no prefix.
var JSONStream = require('JSONStream'); //JSON transform stream

//Streams consumption
roda('stuffs').readStream()
  .pipe(JSONStream.stringify())
  .pipe(process.stdout); //pipe to console

app.get('/api/stuffs', function(req, res){
  roda('stuffs').readStream()
    .pipe(JSONStream.stringify())
    .pipe(res); //pipe to express response
});

roda('files').readStream({
  prefix: '/foo/' //String prefix
}).toArray(function(list){
  //possible output
  [{
    "_id": "/foo/bar",
    "_rev": "5U42CUvHEz",
    ...
  },{
    "_id": "/foo/boo",
    "_rev": "5U42CUvHF",
    ...
  },...]
});

roda('users').readStream({
  index: 'age', 
  gte: 15 //users of age at least 15
}).pipe(...); 

roda('posts').readStream({
  index: 'tag', 
  prefix: ['foo'], //Array prefix
  gt: Date.now() - 1000 * 60 * 60, //since last hour
  reverse: true
}).toArray(function(list){
  //possible output
  [{
    _key: ['foo', 1437203371250],
    tags: ['foo', 'bar', 'hello']
    ...
  }, {
    _key: ['foo', 1437203321128],
    tags: ['world', 'foo']
    ...
  },...]
});

.getBy(index, key, [tx], [cb])

Retrieve a uniquely indexed document specified by index and key. Only available for indexes with unique flag. If key not exists, callback with notFound error. Optionally bind to a transaction instance tx.

//email index
roda('users').registerIndex('email', function(doc, emit){
  emit(doc.email, true); //unique email index
});

//Transactional
var tx = roda.transaction();

roda('users')
  .put('foo', { email: 'foo@bar.com', age: 167 }, tx)
  .getBy('email', 'foo@bar.com', tx, function(err, doc){
     //example doc
     {
       _id: 'foo',
       _key: 'foo@bar.com',
       _rev: '5U42CUvHEz',
       email: 'foo@bar.com',
       age: 167
     }
  })
  .del('foo', tx)
  .getBy('email', 'foo@bar.com', tx, function(err, doc){
     //notFound error
  });

tx.commit(...);

Replication

Rodabase supports multi-master replication that preserves Causal+ - causal consistency with convergent conflict handling. The implementation loosely follows the COPS-CD approach as presented in the article: Don’t Settle for Eventual: Scalable Causal Consistency for Wide-Area Storage with COPS.

  • Maintaining partial ordering that respects potential causality, using Lamport clocks.
  • Keeping track of nearest gets-from dependency for each write.
  • Replication queue that commits write only when causal dependencies have been satisfied.

.replicateStream([options])

Rodabase exposes replication mechanism as Node.js duplex stream, which is transport-agnostic.

Example below shows a browser-server replication using Shoe (SockJS).

Browser:

var shoe = require('shoe');

var rodabase = require('rodabase');
var roda = rodabase('db');
var posts = roda('posts');

var stream = shoe('/posts');
var repl = posts.replicateStream();

repl.pipe(stream).pipe(repl);

//do stuffs
posts.post({ hello: 'world' });
posts.liveStream().each(...);

Server:

var shoe = require('shoe');
var http = require('http');

var rodabase = require('rodabase');
var roda = rodabase('./db');
var posts = roda('posts');

var server = http.createServer();
server.listen(9999);

var sock = shoe(function (stream) {
  var repl = posts.replicateStream();
  repl.pipe(stream).pipe(repl);
});
sock.install(server, '/posts');

//do stuffs
posts.post({ foo: 'bar' });
posts.liveStream().each(...);

Extras

Special Fields

Special fields are reserved of identifying states of documents:

  • _rev (revision) current revision of document that resembles a lamport clock. Consists of two parts:
    • mid - ID of roda() section.
    • seq - lamport timestamp that increments based on casual dependencies.
  • _from (gets from) nearest gets-from dependency. Generated on write operation from a replicated document.
  • _after (write after) seq of previous local write for keeping track of execution order.