verymodel-riak

Riak extensions for VeryModel

Usage no npm install needed!

<script type="module">
  import verymodelRiak from 'https://cdn.skypack.dev/verymodel-riak';
</script>

README

verymodel-riak

Riak extensions for VeryModel

Code Climate

Examples

Using only default functionality

var VeryRiakModel = require('verymodel-riak').VeryRiakModel;

Define our fields

var MyDef = {
    first_name: {},
    last_name: { index: true },
    name: {
        private: true,
        derive: function () { return this.first_name + ' ' + this.last_name; }
    },
    age: { index: { integer: true } },
    gender: { index: true, private: true },
    city:           {},
    state:          { index: true},
    zip:            { index: { integer: true } },
    model:          { default: 'person', required: true, private: true, static: true},
    favorite_foods: { index: { isArray: true } }
};

Define our indexes

var MyOptions = {
    allKey:     'model',
    bucket:     "test:bucket"
};

Init our model factory

var MyModel = new VeryRiakModel(MyDef, MyOptions);

Create a model instance

var myInstance = MyModel.create({
    first_name:     'Bill',
    last_name:      'Jones',
    age:            40,
    gender:         'm',
    city:           'Atlanta',
    state:          'GA',
    zip:            30303,
    favorite_foods: ['pizza', 'fried chicken', 'applesauce', 'cake']
});


myInstance.save(); // stores the instance's data to Riak

myInstance.indexes will return:

[
    {key: 'last_name_bin', value: 'Jones'},
    {key: 'age_int', value: 40},
    {key: 'gender_bin', value: 40},
    {key: 'state_bin', value: 'GA'},
    {key: 'zip_int', value: 30303},
    {key: 'model_bin', value: 'person'}
    {key: 'favorite_foods_bin', value: 'pizza'}
    {key: 'favorite_foods_bin', value: 'fried chicken'}
    {key: 'favorite_foods_bin', value: 'applesauce'}
    {key: 'favorite_foods_bin', value: 'cake'}
]

myInstance.value will return:

{
    first_name: data[0].first_name,
    last_name: 'Jones',
    city: 'Atlanta',
    state: 'GA',
    zip: 30303
}

Defaults

var _       = require('underscore');
var indexes = require('./indexes');
var request = require('./request_helpers.js');
var streams = require('./streams');

module.exports = {

definition

    definition: {

id: The "id" field is where we'll store the Riak key. By default it will just be an unvalidated, public field

        id: {},

indexes: We'll need a way to retrieve all of the fields that should be indexed, so by default that will be all of the fields defined as indexes

        indexes: {
            private: true,
            derive: indexes.derive
        },

value: By default, all non-private fields that aren't the id are expected to be the value payload

        value: {
            private: true,
            derive: function () {
                var model = this.__verymeta.model;
                if (!model.options.values) model.options.values = _.compact(_.map(model.definition, function (def, key) {
                    var isKeyField = key === model.options.keyField;
                    var isAllKey = key === model.options.allKey;

By default we'll use all of the public fields except id

                    return (def.private || isKeyField || isAllKey) ? false : key;
                }));
                return _.pick(this, model.options.values);
            }
        }
    },

model methods

    methods: {

getAllKey: Allows us to share a bucket between different model types

        getAllKey: function () {
            if (this.options.allKey === '$bucket') {
                return {key: this.options.allKey, def: {default: this.getBucket()}};
            }
            var allKeyDef = this.options.allKey && this.definition[this.options.allKey],

default + required ensures that the allKey is always populated private ensures it's not stored as part of the object's data static ensures that the default value is not overwritten

                allKeyIsValid = allKeyDef && (allKeyDef.default &&
                                allKeyDef.required && allKeyDef.static);
            if (allKeyDef && allKeyIsValid) {
                return { key: this.options.allKey + '_bin', def: allKeyDef};
            }
        },

getBucket: Returns default bucket optionally appending an additional namespace

        getBucket: function (append) {
            var bucket = this.options.bucket;
            if (!bucket && this.options.riak) bucket = this.options.riak.bucket;
            if (bucket && !append) return bucket;
            if (bucket && append) return [bucket, append].join(this.options.namespaceSeparator||"::");
            throw new Error('Please set a Riak bucket via options.bucket');
        },

getLogger: Returns specified logger if defined or creates one and returns it

        getLogger: function _logger() {
            return this.options.logger || (this.options.logger = require('bucker').createNullLogger());
        },

getRequest: Builds Riak request objects. Signature varies according to type and developer preference. Supported request types are: del, get, index, mapreduce, and search.

  • Build request to get a single object by its key:
    • type and key: model.getRequest('get', 'my-riak-key') or
    • type and object: model.getRequest('get', {key: 'my-riak-key'})
  • Build request to get a list of keys for an index exact match:
    • type, index, key: model.getRequest('index', 'my_index_bin', 'foo') or
    • type and object: model.getRequest('index', {index: 'my_index_bin', key: 'foo'})
  • Build request to get a list of keys via an index range search:
    • type, index, min, max: model.getRequest('index', 'my_index_bin', 'bar', 'foo') or
    • type and object: model.getRequest('index', {index: 'my_index_bin', range_min: 'bar', range_max: 'foo'})
  • Build request for mapreduce:
    • type, index, key, query array: model.getRequest('mapreduce', 'my_index', 'my_key', […map/reduce phases…]) or
    • type and object: model.getRequest('mapreduce', {inputs: …my inputs…, query: […map/reduce phases…]}) or
    • type, inputs array, query array: model.getRequest('mapreduce', […my inputs…], […map/reduce phases…])
  • Build request to search:
    • type, index, q: model.getRequest('search', 'my_index', 'name:Bill') or
    • type and object: model.getRequest('search', {index: 'my_index', q: 'name:Bill'})
  • Finally, any type of request can be created according to the following format:
    • model.getRequest({ type: 'index', options: {index: 'my_index_bin', key: 'foo'}}) where type is any one of the types listed and options
        getRequest: function (type) {
            if (_.isObject(type) && type.type && type.options) {
                return request[type.type](this, [type.options]);
            }
            return request[type](this, _.rest(arguments));
        },

_indexQuery: Index query wrapper that returns a stream

        _indexQuery: function () {
            var args = _.rest(arguments, 0);
            args.unshift('index');
            var request = this.getRequest.apply(this, args);

Return the readable stream

            return this.getClient().getIndex(request);
        },

all: Streams or calls back with all instances of this model that are stored in Riak or an index-filtered set of them, depending on whether filtering args are passed. If the first argument is a function, it will be called with the result.

        all: function () {
            var args = _.rest(arguments, 0),
                streaming = typeof args[0] !== 'function',
                cb = !streaming ? args[0] : null,
                requestArgs = _.rest(args, (streaming ? 0 : 1)),
                bucket;
            if (args.length > 1 && typeof args[1] === 'object') {
                bucket = args[1].bucket;
            }
            var logger = this.getLogger();
            logger.debug('query prepared: %j, streaming: %s', requestArgs, streaming);

All stream handling is done via a Transform stream that receives our key stream and transmits instances

            var stream = this._indexQuery.apply(this, requestArgs),
                streamOpts = {model: this, bucket: bucket};

            return stream.pipe(new streams.KeyToValueStream(_.defaults({}, streamOpts)))
                         .pipe(new streams.InstanceStream(_.defaults({callback: cb}, streamOpts)));
        },

find: Simplifies index lookups and can be called with the values or options object signatures.

  • values: find('index', 'key_or_min', ['max',] [function (err, instances) {}])
  • options object (shown with range query—-substitute key for range_min/range_max for exact match): find({index: 'index', range_min: 'min', range_max: 'max'}, [function (err, instance)])
        find: function (index) {
            var args = _.rest(arguments),
                hasCb = typeof _.last(args) === 'function',
                cb = hasCb && args.pop();
            if (typeof index === 'string' && !index.match(/_(bin|int)$/)) {
                index = indexes.getName(index, this.definition[index]);
            }
            args.unshift(index);
            if (cb) {
                args.unshift(cb);
            }
            return this.all.apply(this, args);
        },

find: searches for matching indexesToData: Reformats indexes from Riak so that they can be applied to model instances

        indexesToData: indexes.rehydrate,

replyToData: Reformats riak reply into the appropriate format to feed into an instance's loadData method

        replyToData: function (reply) {
            this.getLogger().debug('Function [replyToData]: %j', reply);
            if (!reply || !reply.content) {
                return {};
            }
            var content = (reply.content.length > 1) ? this.options.resolveSiblings(reply.content) : reply.content[0];

reformat our data for VeryModel

            var indexes = {};
            indexes = this.indexesToData(content.indexes);
            var data = _.extend(content.value, indexes);
            if (reply.key) {
                data[this.options.keyField] = reply.key;
            }
            if (reply.vclock) {
                data.vclock = reply.vclock;
            }
            return data;
        },
        _getQuery: function (id, bucket, cb) {
            var reqArgs = ['get', id];
            if (typeof bucket === 'function' && typeof cb === 'undefined') {
                cb = bucket;
                bucket = undefined;
            }
            if (bucket) reqArgs.push(bucket);
            this.getClient().get(this.getRequest.apply(this, reqArgs), cb);
        },
        _getInstance: function (id, reply, bucket) {

Resolve siblings, if necessary, or just grab our content

            var data = this.replyToData(reply);
            data[this.options.keyField] = id;
            if (bucket && this.definition.bucket) data.bucket = bucket;
            var instance = this.create(data);
            if (bucket && !instance.bucket) instance.bucket = bucket;
            return instance;
        },

load: Load an object's data from Riak and creates a model instance from it.

        load: function (id, bucket, cb) {
            var self = this;
            if (typeof bucket === 'function' && typeof cb === 'undefined') {
                cb = bucket;
                bucket = undefined;
            }
            this._getQuery(id, bucket, function (err, reply) {
                if (err||_.isEmpty(reply)) return cb(err||new Error('No matching key found.'));
                self._last = self._getInstance(id, reply, bucket);

Override default toJSON method to make more Hapi compatible

                if (typeof cb === 'function') {
                    cb(null, self._last);
                }
            });
        },

remove: Remove an instance from Riak

        delete: function (id, bucket, cb) {
            var self = this;
            if (typeof bucket === 'function' && typeof cb === 'undefined') {
                cb = bucket;
                bucket = undefined;
            }
            this.getLogger().debug('request to delete account(%s)', id);
            this.getClient().del(this.getRequest('del', id, bucket), function (err) {
                if (err) return cb(err);
                self.getLogger().debug('successfully deleted account');
                cb();
            });
        },

backwards compatibility

        remove: module.exports.methods.delete
    },

options

    options: {
  • Default allKey is Riak's magic 'give me all the keys' index
        allKey: '$bucket',
  • Default key field is id
        keyField: 'id',
  • pagination is on by default to prevent overloading the server
        max_results: 100,
        paginate: true,
  • Default sibling handler is "last one wins"
        resolveSiblings: function (siblings) {
            return _.max(siblings, function (sibling) {
                return parseFloat(sibling.last_mod + '.' + sibling.last_mod_usecs);
            });
        }
    },

instanceMethods

    instanceMethods: {

wrapper around this instance's

        delete: function (callback) {},

prepare: Prepare a Riak request object from this instance.

        prepare: function () {
            var content = {
                value: JSON.stringify(this.value),
                content_type: 'application/json'
            };
            var indexes = this.indexes;
            if (indexes.length) content.indexes = indexes;
            var payload = {
                content: content,
                bucket: this.getBucket(),
                return_body: true
            };
            if (this.id) {
                payload.key = this.id;
            }
            if (this.vclock) {
                payload.vclock = this.vclock;
            }
            return payload;
        },

save: Put this instance to Riak. May be called with an options object, currently only for the purpose of passing { validate: false } to bypass validation

        save: function (cb, opts) {
            if (!opts || opts.validate !== false) {
                var errors = this.doValidate();
                if (errors.length) return cb(errors);
            }
            var self = this;
            var logger = this.getLogger();
            var payload = this.prepare();
            logger.debug('save payload: %j', payload);
            this.getClient().put(payload, function (err, reply) {
                logger.debug('riak put %s', (err == null ? 'succeeded' : ('failed: ' + err)));
                if (!err) {
                    if (!self.id && reply.key)  self.id = reply.key;
                    if (reply.vclock) self.vclock = reply.vclock;
                }
                if (reply.content.length > 1 && typeof cb !== 'boolean') {
                    self.loadData(self.__verymeta.model.replyToData(reply));

The boolean arg prevents a race condition when reply.content.length continues to be > 1

                    self.save(true);
                }
                if (typeof cb === 'function') {
                    cb(err, self);
                }
            });
        },

getClient: Proxy method to get the Riak client from model

        getClient: function () { return this.__verymeta.model.getClient(); },

getBucket: return instance-level bucket property or fall back to model's getBucket

        getBucket: function () {
            return (typeof this.bucket !== 'undefined') ? this.bucket : this.__verymeta.model.getBucket();
        },

getLogger: return this instance's logger (if defined) or fall back to model's getLogger

        getLogger: function () {
            return (typeof this.logger !== 'undefined') ? this.logger : this.__verymeta.model.getLogger();
        }
    }
};

Add some logging

var logify = function (obj, name) {
    if (typeof obj[name] !== 'function') return;
    var method = obj[name];
    obj[name] = _.wrap(method, function (method) {
        this.getLogger().debug('Function [%s]', name);
        return method.apply(this, _.rest(_.toArray(arguments)));
    });
};
['load', 'remove', 'find', 'all'].forEach(_.partial(logify, module.exports.methods));
logify(module.exports.instanceMethods, 'save');
logify(module.exports.instanceMethods, 'prepare');


Acknowledgements

- First and foremost, thanks to @fritzy for making VeryModel without which verymodel-riak would be non-existent or pointless.

- Contributors: