@x12/rx-pool

Rx Generic Pool

Usage no npm install needed!

<script type="module">
  import x12RxPool from 'https://cdn.skypack.dev/@x12/rx-pool';
</script>

README

@x12/rx-pool

export declare class RxPool<T> {
  readonly pool: GenericPool; // Pool<T>
  constructor(factory: PoolFactory, poolOpts?: PoolOption);

  // Observable 改造方法
  // 从连接池中获取一个 T
  acquire(priority?: number): Observable<T>;
  use<U>(cb: (resource: T) => U | PromiseLike<U> | Observable<U>): Observable<U>;

  // 以下返回为操作符 (Operator)
  // 新增方法,用于使用 T 进行查询
  run(cb: (resorce: T) => any): OperatorFunction<T, T>;
  // 释放,给进程继续使用
  release(): OperatorFunction<T, T>;
  // 摧毁,将会重新创建连接
  destroy(): OperatorFunction<T, T>;

  // 以下为 Generic Pool 原有方法
  release(resource: T): PromiseLike<void>;
  destroy(resource: T): PromiseLike<void>;
  start(): void;
  drain(): PromiseLike<void>;
  clear(): PromiseLike<void>;
  isBorrowedResource(resource: T): boolean;
}

Demo:

import { RxPool, PoolFactory, PoolOption } from '@x12/rx-pool';
import { MongoClient } from 'mongodb';

const poolFactory: PoolFactory = {
  create() {
    const mongoClient = new MongoClient('url', {
      /* mongodb options */
    });
    return mongoClient.connect();
  },
  destroy(client) {
    return client.close();
  },
  validate(client) {
    const db = client.db('test').admin();
    return db
      .ping()
      .then(() => true)
      .catch(() => false);
  }
};
const poolOpts: PoolOption = {};

const pool = new RxPool<MongoClient>(poolFactory, poolOpts);

const s$ = pool.acquire().pipe(
  pool.run(async (client) => {
    const db = client.db('test').admin();
    console.log(await db.ping());
  }),
  pool.release()
);

s$.subscribe({
  complete() {
    console.log('complete');
  }
});