README
PipeProc
Multi-process log processing for nodejs
Table of Contents
- Intro
- Example
- Installing
- Status
- Process management
- Committing logs
- Read API
- Procs
- SystemProcs
- LiveProcs
- Waiting for procs to complete
- GC
- Typings
- Tests
- Meta
- Contributing
Intro
PipeProc is a data processing system that can be embedded in nodejs applications (eg. electron).
It will be run in a separate process and can be used to off-load processing logic from the main “thread” in a structured manner.
Underneath it uses a structured commit log and a “topic” abstraction to categorize logs.
Inspired by Apache Kafka and Redis streams.
In practice it is a totally different kind of system since it is meant to be run embedded in the main application as a single instance node.
Another key difference is that it also handles the execution of the processing logic by itself and not only the stream pipelining.
It does this by using processors which are custom-written modules/functions that can be plugged to the system, consume topic streams, execute custom logic and push the results to another topic, thus creating a processing pipeline.
Example
const {PipeProc} = require("pipeproc");
const pipeProcClient = PipeProc();
pipeProcClient.spawn().then(function() {
//commit a log to topic "my_topic_1"
//the topic is created if it does not exists
pipeProcClient.commit({
topic: "my_topic",
body: {greeting: "hello"}
}).then(function(id) {
console.log(id);
//1518951480106-0
//{timestamp}-{sequenceNumber}
});
});
Installing
npm install --save pipeproc
Status
Process management
spawn
Spawn the node and connect to it.
If there is a need to spawn multiple nodes on the same host you can use the namespace
option with a custom name.
If a custom namespace
is used, all clients that will connect()
to it will need to provide it.
The node can also use TCP connections instead by setting the host and the port in the tcp
settings.
Clients (local and remote) can then connect()
to it by providing the same host and port.
A socket address can also be used instead of a namespace or tcp options, for example:
ipc:///tmp/mysocket
tcp://127.0.0.1:9999
TLS is also available when using TCP. A cert, key, and ca should be provided for both server and client.
Any client that also needs to connect()
should provide its client keys.
If the node is spawned with TLS, only secure connections will be allowed.
spawn(
options?: {
//use a different ipc namespace
namespace?: string,
//use a tcp socket
tcp?: {
host: string,
port: number
},
//tls settings
tls?: {
server: {
key: string;
cert: string;
ca: string;
},
client: {
key: string;
cert: string;
ca: string;
}
}
//use a socket address directly
socket?: string,
//use an in-memory store instead of the disk adapter
memory?: boolean,
//set the location of the underlying store (if memory is false)
location?: string,
//the number of workers(processes) to use (check the systemProc section below), set to 0 for no workers, defaults to 1
workers?: number,
//the number of processors that can be run concurrently by each worker, defaults to 1
workerConcurrency?: number,
//restart any worker that reaches X systemProc executions, useful to mitigate memory leaks, defaults to 0 (no restarts)
workerRestartAfter?: number,
//tune the garbage collector settings (check the gc section below)
gc?: {minPruneTime?: number, interval?: number} | boolean
}
): Promise<string>;
connect
Connect to an already spawned node.
Usecase: Connect to the same PipeProc instance from a different process (eg. electron renderer) or to a remote instance (only when using TCP)
connect(
options?: {
//use a different ipc namespace
namespace?: string,
//connect to a tcp socket
tcp?: {
host: string,
port: number
},
//tls settings
tls?: {
key: string;
cert: string;
ca: string;
} | false,
//use a socket address directly
socket?: string,
//specify a connection timeout, defaults to 1000ms
timeout?: number
}
): Promise<string>;
shutdown
Gracefully close the PipeProc instance.
shutdown(): Promise<string>;
Committing logs
This is how you add logs to a topic.
The topic will be created implicitly when its first log is committed.
Multiple logs can be committed in a batch, either in the same topic or to different topics, in that case
the write will be an atomic operation and either all logs will be successfully written or all will fail.
commit examples
Add a single log to a topic:
pipeProcClient.commit({
topic: "my_topic",
body: {greeting: "hello"}
}).then(function(id) {
console.log(id);
//=> 1518951480106-0
});
commit()
will return the id(s) of the log(s) committed.
Ids follow a format of {timestamp}-{sequenceNumber}
where timestamp is the time the log was committed in milliseconds
and the sequence number is an auto-incrementing integer (starting from 0) indicating the log's position in its topic.
The log's body can be an arbitrarily nested javascript object.
Adding multiple logs to the same topic:
pipeProcClient.commit([{
topic: "my_topic",
body: {
myData: "some data"
}
}, {
topic: "my_topic",
body: {
myData: "more data"
}
}]).then(function(ids) {
console.log(ids);
//=> ["1518951480106-0", "1518951480106-1"]
});
Notice the timestamps are the same since the two logs where inserted at the same time but the sequence number is different and auto-increments.
Adding multiple logs to different topics:
pipeProcClient.commit([{
topic: "my_topic",
body: {
myData: "some data"
}
}, {
topic: "another_topic",
body: {
myData: "some data for another topic"
}
}]).then(function(ids) {
console.log(ids);
//=> ["1518951480106-0", "1518951480106-0"]
});
As before, the timestamps are the same (since they were committed at the same time) but the sequence numbers are both 0
since these two logs are the first logs committed in their respective topics.
Read API
range
Get a slice of a topic.
range signature
range(
topic: string,
options?: {
start?: string,
end?: string,
limit?: number,
exclusive?: boolean
}
): Promise<{id: string, body: object}[]>;
range examples
pipeProcClient.range("my_topic", {
start: "1518951480106-0",
end: "1518951480107-10"
})
//timestamps only
pipeProcClient.range("my_topic", {
start: "1518951480106",
end: "1518951480107"
})
//from beginning to end
pipeProcClient.range("my_topic")
//from specific timestamp to the end
pipeProcClient.range("my_topic", {
start: "1518951480106"
})
//with a limit
pipeProcClient.range("my_topic", {
start: "1518951480106",
limit: 5
})
//by sequence id
pipeProcClient.range("my_topic", {
start: ":5",
end: ":15"
}) //=> [5..15]
//by sequence id exclusive
pipeProcClient.range("my_topic", {
start: ":5",
end: ":15"
exclusive: true
}) //=> [6..14]
//returns a Promise that resolves to an array of logs
[{
id: "1518951480106-0",
body: {
myData: "hello"
}
}]
revrange
Ranges through the topic in an inverted order.
start
and end
should also be inverted. (start >= end).
The API is the same as range()
.
eg. to get the latest log
pipeProcClient.revrange("my_topic", {
limit: 1
})
length
get the total logs in a topic
pipeProcClient.length("my_topic").then(function(length) {
console.log(length);
});
Procs
Procs are the way to consistently process logs of a topic.
Let's start with an example and explain as we go along.
//lets add some logs
await pipeProcClient.commit([{
topic: "numbers",
body: {myNumber: 1}
}, {
topic: "numbers",
body: {myNumber: 2}
}]);
//run a proc on the "numbers" topic
const log = await pipeProcClient.proc("numbers", {
name: "my_proc",
offset: ">"
});
//=> log = {id: "1518951480106-0", body: {myNumber: 1}}
try {
//process the log
const incrementedNumber = log.data.myNumber + 1;
//ack the operation and commit the result to a different topic
await pipeProcClient.ackCommit("my_proc", {
topic: "incremented_numbers",
body: {myIncrementedNumber: incrementedNumber}
});
} catch (err) {
//something went wrong on our processing, the proc should be reclaimed
console.error(err);
pipeProcClient.reclaim("my_proc");
}
Procs are the way to consistently fetch logs from a topic, process them and commit the results in a safe and serial manner.
So, what's going on in the above example?
- first we add a log to our "numbers" topic
- then we create a proc named "my_proc" with an offset of ">" (it means start fetching from the very beginning of the topic, see more below) for the "numbers" topic
- the proc returns a log (the log we added on the first commit)
- we do some processing (incrementing the number)
- we then acknowledge the operation and commit our result to a different topic
- we are also catching errors in our processing and the ack, in that case the proc must be reclaimed.
If everything goes well, the next time we call the proc it will fetch us our second log 1518951480106-1
.
If something goes wrong and reclaim()
is called the proc will be "reset" and will fetch the first log again.
Until we call ack()
(or ackCommit()
in this case) to move on or reclaim()
to reset, the proc will not fetch us any new logs.
Here is the whole proc signature:
proc(
//for what topic this proc is for
topic: string,
options: {
//the proc name
name: string,
//the proc offset (See below)
offset: string,
//how many logs to fetch
count?: number,
//reclaim settings, see below
maxReclaims?: number,
reclaimTimeout?: number,
onMaxReclaimsReached?: "disable" | "continue"
}
): Promise<null | {id: string, body: object} | {id: string, body: object}[]>;
offsets
offsets are how you position the proc to a specific point in the topic.
>
fetch the next log after the latest acked log for this proc. If no logs have been acked yet, it will start from the beginning of the topic.