Balancer core
Usage sample
import {Db, migrateDb, MessageBalancer, createMethodPerLevelLoggerAdapter, MessageCache, MessageStorage} from '@targetprocess/balancer-core'
import {Pool} from 'pg'
import {makeLogger} from 'loggerism'
import * as promClient from 'prom-client'
type MessageBalancerA = MessageBalancer<{data?: string; retry?: number}>
type MessageBalancerB = MessageBalancer<{data?: string}>
async function main() {
const [balancerA, balancerB] = await createAndInitBalancers()
balancerA.storeMessage({
partitionKey: 'partition#1',
content: Buffer.alloc(128),
properties: {data: 'some arbitrary data'}
})
balancerB.storeMessage({
partitionKey: 'partition#2',
content: Buffer.alloc(128),
properties: {data: 'some arbitrary data'}
})
balancerA.processNextMessage(async message => {
try {
const {partitionGroup, partitionKey} = message
console.log(`Processed message from partition "${partitionGroup}/${partitionKey}"`)
return {type: 'Ok'}
} catch {
const properties = {
...message.properties,
retry: (message.properties?.retry || 0) + 1
}
// Push message back with updated properties
return {type: 'Requeue', update: {properties}}
}
})
balancerB.processNextMessage(async message => {
try {
const {partitionGroup, partitionKey} = message
console.log(`Processed message from partition "${partitionGroup}/${partitionKey}"`)
return {type: 'Ok'}
} catch {
// Push message back with no properties update
return {type: 'Requeue'}
}
})
}
async function createAndInitBalancers(): Promise<[MessageBalancerA, MessageBalancerB]> {
const pool = new Pool({
connectionString: process.env.POSTGRES_CONNECTION_STRING,
max: process.env.POSTGRES_POOL_MAX
})
pool.on('error', error => {
// Handle error here
console.error(error)
})
await migrateDb({pool})
const logger = createMethodPerLevelLoggerAdapter(makeLogger({
logLevel: process.env.LOG_LEVEL,
handleExceptions: false
}))
const db = new Db({pool})
const storage = new MessageStorage({
db,
logger,
createMessagesDurationMetric: summaryMetric('create_messages_duration_in_ms'),
updateMessageDurationMetric: summaryMetric('update_message_duration_in_ms'),
removeMessagesDurationMetric: summaryMetric('remove_messages_duration_in_ms'),
readMessagesDurationMetric: summaryMetric('read_messages_duration_in_ms'),
readMessagesOrderedByIdDurationMetric: summaryMetric('read_messages_ordered_by_id_duration_in_ms')
})
const cache = new MessageCache({
maxMessageSize: Number(process.env.MESSAGE_CACHE_MAX_MESSAGE_SIZE),
maxSize: Number(process.env.MESSAGE_CACHE_MAX_SIZE),
logger,
messageCountMetric: gaugeMetric('cache_message_count'),
messageSizeMetric: gaugeMetric('cache_message_size')
})
const balancerA = new MessageBalancer<{data?: string; retry?: number}>({
partitionGroup: 'A',
partitionSizeLimit: Number(process.env.PARTITION_SIZE_LIMIT),
lockPartition: true,
storage,
cache,
logger,
endToEndMessageProcessingDurationMetric: summaryMetric('balancer_a_end_to_end_processing_duration_in_ms'),
centrifugePartitionCountMetric: gaugeMetric('balancer_a_centrifuge_partition_count'),
centrifugeMessageCountMetric: gaugeMetric('balancer_a_centrifuge_message_count')
})
const balancerB = new MessageBalancer<{data?: string}>({
partitionGroup: 'B',
partitionSizeLimit: Number(process.env.PARTITION_SIZE_LIMIT),
lockPartition: true,
storage,
cache,
logger,
endToEndMessageProcessingDurationMetric: summaryMetric('balancer_b_end_to_end_processing_duration_in_ms'),
centrifugePartitionCountMetric: gaugeMetric('balancer_b_centrifuge_partition_count'),
centrifugeMessageCountMetric: gaugeMetric('balancer_b_centrifuge_message_count')
})
await balancerA.init()
await balancerB.init()
return [
balancerA,
balancerA
]
}
function summaryMetric(name: string) {
return new promClient.Summary({
name,
help: 'Write it yourself',
percentiles: [0.1, 0.5, 0.9, 0.99],
maxAgeSeconds: 10 * 60,
ageBuckets: 10
})
}
function gaugeMetric(name: string) {
return new promClient.Gauge({
name,
help: 'Write it yourself'
})
}
main()