@targetprocess/balancer-core

Message balancer that allow to process messages grouped by partition key in a fair way

Usage no npm install needed!

<script type="module">
  import targetprocessBalancerCore from 'https://cdn.skypack.dev/@targetprocess/balancer-core';
</script>

README

Balancer core

Usage sample

import {Db, migrateDb, MessageBalancer, createMethodPerLevelLoggerAdapter, MessageCache, MessageStorage} from '@targetprocess/balancer-core'
import {Pool} from 'pg'
import {makeLogger} from 'loggerism'
import * as promClient from 'prom-client'

type MessageBalancerA = MessageBalancer<{data?: string; retry?: number}>

type MessageBalancerB = MessageBalancer<{data?: string}>

async function main() {
  const [balancerA, balancerB] = await createAndInitBalancers()

  balancerA.storeMessage({
    partitionKey: 'partition#1',
    content: Buffer.alloc(128),
    properties: {data: 'some arbitrary data'}
  })

  balancerB.storeMessage({
    partitionKey: 'partition#2',
    content: Buffer.alloc(128),
    properties: {data: 'some arbitrary data'}
  })

  balancerA.processNextMessage(async message => {
    try {
      const {partitionGroup, partitionKey} = message
      console.log(`Processed message from partition "${partitionGroup}/${partitionKey}"`)
      return {type: 'Ok'}
    } catch {
      const properties = {
        ...message.properties,
        retry: (message.properties?.retry || 0) + 1
      }
      // Push message back with updated properties
      return {type: 'Requeue', update: {properties}}
    }
  })

  balancerB.processNextMessage(async message => {
    try {
      const {partitionGroup, partitionKey} = message
      console.log(`Processed message from partition "${partitionGroup}/${partitionKey}"`)
      return {type: 'Ok'}
    } catch {
      // Push message back with no properties update
      return {type: 'Requeue'}
    }
  })
}

async function createAndInitBalancers(): Promise<[MessageBalancerA, MessageBalancerB]> {
  const pool = new Pool({
    connectionString: process.env.POSTGRES_CONNECTION_STRING,
    max: process.env.POSTGRES_POOL_MAX
  })

  pool.on('error', error => {
    // Handle error here
    console.error(error)
  })

  await migrateDb({pool})

  const logger = createMethodPerLevelLoggerAdapter(makeLogger({
    logLevel: process.env.LOG_LEVEL,
    handleExceptions: false
  }))

  const db = new Db({pool})
  const storage = new MessageStorage({
    db,
    logger,
    createMessagesDurationMetric: summaryMetric('create_messages_duration_in_ms'),
    updateMessageDurationMetric: summaryMetric('update_message_duration_in_ms'),
    removeMessagesDurationMetric: summaryMetric('remove_messages_duration_in_ms'),
    readMessagesDurationMetric: summaryMetric('read_messages_duration_in_ms'),
    readMessagesOrderedByIdDurationMetric: summaryMetric('read_messages_ordered_by_id_duration_in_ms')
  })
  const cache = new MessageCache({
    maxMessageSize: Number(process.env.MESSAGE_CACHE_MAX_MESSAGE_SIZE),
    maxSize: Number(process.env.MESSAGE_CACHE_MAX_SIZE),
    logger,
    messageCountMetric: gaugeMetric('cache_message_count'),
    messageSizeMetric: gaugeMetric('cache_message_size')
  })
  const balancerA = new MessageBalancer<{data?: string; retry?: number}>({
    partitionGroup: 'A',
    partitionSizeLimit: Number(process.env.PARTITION_SIZE_LIMIT),
    lockPartition: true,
    storage,
    cache,
    logger,
    endToEndMessageProcessingDurationMetric: summaryMetric('balancer_a_end_to_end_processing_duration_in_ms'),
    centrifugePartitionCountMetric: gaugeMetric('balancer_a_centrifuge_partition_count'),
    centrifugeMessageCountMetric: gaugeMetric('balancer_a_centrifuge_message_count')
  })
  const balancerB = new MessageBalancer<{data?: string}>({
    partitionGroup: 'B',
    partitionSizeLimit: Number(process.env.PARTITION_SIZE_LIMIT),
    lockPartition: true,
    storage,
    cache,
    logger,
    endToEndMessageProcessingDurationMetric: summaryMetric('balancer_b_end_to_end_processing_duration_in_ms'),
    centrifugePartitionCountMetric: gaugeMetric('balancer_b_centrifuge_partition_count'),
    centrifugeMessageCountMetric: gaugeMetric('balancer_b_centrifuge_message_count')
  })

  await balancerA.init()
  await balancerB.init()

  return [
    balancerA,
    balancerA
  ]
}

function summaryMetric(name: string) {
  return new promClient.Summary({
    name,
    help: 'Write it yourself',
    percentiles: [0.1, 0.5, 0.9, 0.99],
    maxAgeSeconds: 10 * 60,
    ageBuckets: 10
  })
}

function gaugeMetric(name: string) {
  return new promClient.Gauge({
    name,
    help: 'Write it yourself'
  })
}

main()