loopback-connector-influxdata

Loopback Connector for InfluxDB

Usage no npm install needed!

<script type="module">
  import loopbackConnectorInfluxdata from 'https://cdn.skypack.dev/loopback-connector-influxdata';
</script>

README

loopback-connector-influxdb

Loopback InfluxDB Connector

Downsampling

To use downsampling create a bootfile containing (in loopback 3):

  try {
    const influxConnector = app.datasources.points.connector;
    const models = app.models;
    influxConnector.retentionPolicies = {};

    // Iterate through the models
    const minDuration = await influxConnector.getMinimumDuration(models);

    const buildContinuousQuery = async (modelName, rule, nextDuration, duration) => {
      try {
        let continuousQueryName = `${modelName}_cq_${duration}`;
        let query = 'SELECT ';

        // Add all requested aggregations to the query
        const aggregateProps = async (property, j, inputArray) => {
          try {
            const aggregateFunction = await influxConnector.parseCQFunction(
              rule.properties[property],
            );
            property = `"${property}"`;
            const splittedAggregateFunction = aggregateFunction.split(' ');
            splittedAggregateFunction.splice(1, 0, property);
            const joinedAggregateFunction = splittedAggregateFunction.join('');
            query = `${query}${joinedAggregateFunction} AS ${property}`;
            if (j < Object.keys(inputArray).length - 1) {
              query += ', ';
            } else {
              query += ' ';
            }
            return query;
          } catch (error) {
            console.error('boot:initializeDownSampling:aggregateProps:err', error);
            return null;
          }
        };

        const aggregatePropsPromise = await Object.keys(rule.properties).map(aggregateProps);
        await Promise.all(aggregatePropsPromise);

        query += `INTO "${influxConnector.retentionPolicies[nextDuration]}"."${modelName}"`;
        query += ` FROM "${influxConnector.retentionPolicies[duration]}"."${modelName}"`;
        if (Array.isArray(rule.group)) {
          query += ` GROUP BY ${rule.group.join(',')}`;
          continuousQueryName += `_to_${rule.group.join(',')}`;
        } else {
          query += ` GROUP BY ${rule.group}`;
          continuousQueryName += `_to_${rule.group}`;
        }
        //  console.log('continuousQueryName: ', continuousQueryName);
        return { cqName: continuousQueryName, query };
      } catch (error) {
        console.error('boot:initializeDownSampling:buildContinuousQuery:err', error);
        return null;
      }
    };

    const promises = Object.keys(models).map(async modelName => {
      const model = models[modelName];
      if (model && model.settings && model.settings.downSampling) {
        const dsRules = model.settings.downSampling;

        // Create Retention Policies
        const rpPolicies = await Promise.all(
          dsRules.map(async dsRule => {
            try {
              const rpName = `rp_${dsRule.duration}`;
              await influxConnector.client.createRetentionPolicy(rpName, {
                duration: dsRule.duration,
                replication: 1,
                isDefault: dsRule.duration === minDuration,
              });
              influxConnector.retentionPolicies[dsRule.duration] = rpName;
              // console.log('rpName : ', rpName);
              return rpName;
            } catch (error) {
              console.log(
                'boot:initializeDownSampling:initializeDownSampling:err',
                error,
              );
              return null;
            }
          }),
        );

        console.log('boot:initializeDownSampling:rpPolicies', rpPolicies);

        await influxConnector.client.createRetentionPolicy('rp_forever', {
          duration: '0s',
          replication: 1,
          isDefault: false,
        });

        influxConnector.retentionPolicies['0s'] = 'rp_forever';
        const sortedDurations = await influxConnector.sortDurations(
          Object.keys(influxConnector.retentionPolicies),
        );

        // Format and create Continuous Queries
        const continuousQueries = await Promise.all(
          sortedDurations.map(async (duration, i, inputArray) => {
            try {
              if (i < inputArray.length - 1) {
                const nextDuration = inputArray[i + 1];
                const dsRule = dsRules.find(rule => rule.duration === duration);
                if (dsRule) {
                  const msg = await buildContinuousQuery(modelName, dsRule, nextDuration, duration);
                  console.log('boot:initializeDownSampling:continuousQueries', msg);

                  const res = await influxConnector.client.createContinuousQuery(
                    msg.cqName,
                    msg.query,
                  );
                  return res;
                }
                return null;
              }
              return null;
            } catch (error) {
              console.error(
                'loopback',
                'boot:initializeDownSampling:continuousQueries:err',
                error,
              );
              return null;
            }
          }),
        );
        return continuousQueries;
      }
      return null;
    });

    await Promise.all(promises);
  } catch (error) {
    console.error('loopback', 'boot:initializeDownSampling:err', error);
  }