@financial-times/athena-aggregation

Data driven aggregation of data via AWS Athena

Usage no npm install needed!

<script type="module">
  import financialTimesAthenaAggregation from 'https://cdn.skypack.dev/@financial-times/athena-aggregation';
</script>

README

athena-aggregation

Generic, data driven, AWS Athena aggregation

How does it work?

Aggregation is the process of taking (loading) a number of sources of data, combining and summerising (transforming) them, and then outputting (exporting) new data that is of interest to the user.

Each individual pass through this load/transform/export process is known as an aggregation. To achieve all the customer's needs many aggregations may need to be performed.

This library is only capable of handing data that is of JSON format; both the sources of data and the exported output must both be JSON. Further, these JSON files must be stored in one or more folders within one or more S3 buckets within a single AWS account.

The whole aggregation process relies on AWS Athena.

The first step is the creation of an Athena database. This will have to be done manually (via a CREATE DATABASE command) as each set of aggregations needs to be placed in isolation (you don't want one system's data interfering with another). There is no API for the CREATE DATABASE command.

The source JSON is read via Athena mapping into Athena tables which then become available to SQL SELECT statements which generate output.

This library uses yaml configuration files to remove the manual effort required to execute the mapping and the creation of tables.

You will need to identify the location of an S3 bucket that will be used to hold these configuration files. You will also need to identify the location of the S3 bucket that will be used to hold the output of the resulting aggregations.

Each of the table configuration files used to map source JSON data contains the command required to map a single source of data within a single folder of a single S3 bucket into a single Athena table within the chosen Athena database. As you are going to need many of these table configuration files this library expects them to exist in a folder named Athena_DDL/tables within your chosen configuration bucket.

Once the source data has been mapped it can be read by aggregations (SQL commands) to create new data. This data will be output as JSON into your chosen output folder. Each aggregation is defined in a yaml calculation file as a SQL SELECT command which combines and summerises data from one or more tables. As you are going to need many of these calculation configuration files this library expects them to exist in a folder named Athena_DDL/calculations within your chosen configuration bucket.

When you call the library it runs all the mappings to create/recreate the tables followed by all the calculations to generate the aggregated data.

Using the Library

A generic definition of aggregation expressed as:

  • an input bucket which contains raw data to be stored and aggregated
  • an output bucket into which aggregations will be stored
  • a configuation bucket which contains ...
    • yaml configuration files for athena tables; each containing a single query which defines the DDL for a table creation. The name of the table will be the name of the yaml file.
    • yaml configuration files for aggregations; each containing a filenaming convention, an output format and a query to generate results. The results go into a bucket named after the yaml file.
  • an athena database which maps input data and executes aggregation commands

This version only implements:

  • defining a database, config, input and output quartet where config is assumed to contain the Athena config files in /tables and /calculations folders
  • a function which reads the {config}/tables configuration files to config athena tables
  • a function which reads the {config}/calculations for aggregations to run some aggregation queries to write output to S3
  • tests for athenaConfig and aggregate using assume-test-role

To avoid issues with multiple deployments of Athena and/or local execution there is a need to ensure the names of the input and output buckets can include tokens. To achieve this aim two additional parameters may be passed:

  • user - the person running the aggregation.
  • account - the AWS account ID in which the execution is running.

All six parameters can then be used within the text of the yaml files to provide unique bucket, folder and file names.

Known issues

  • The yaml configuration files must be named with a yaml file extension not yml or any other casing.
  • The configuration bucket must contain both a tables and a calculations folder; no variation in casing.
  • The SQL in the yaml configuration files must be defined using the multi-line syntax (the >) and must be indented to avoid confusion with yaml keys.
  • The configuration of the output filenames is not consistent in its use of .json. filenames require explicit .json; filenamePrefix will automatically acquire a .json extension.
  • There is no separation of the definition of the real input tables (which could be derived from the format of the input json) from the intermediate tables that are used to store aggregations (which cannot be derived)
  • The aggregations don't always create folders if they don't already exist? Won't create the output bucket!

Configuration Files

Both the tables and aggregations are defined in yaml files.

Tables just contain a single query param which must be a valid CREATE EXTERNAL TABLE command.

Aggregations contain up to five params as follows:

  • filename/filenamePrefix: Either a precise name for the output file of the aggregation or the name of a field from the aggregation which will be used as the output filename
  • archive: Optional field which contains the name of the bucket that will be used to store historic copies of output files from his aggregation.
  • outputFormat: Either JsonObject, JsonList or JsonArray to match with Athena's concepts of a single json object, a list of objects or an array of objects.
  • priority: Optional field which defines the sequence in which the aggregations will run. Lower numbers run first. Defaults to 999. All aggregations with the same priority run in parallel.
  • query: A valid SQL SELECT statement to run to achieve the desired aggregation.

Athena is only able to map/read rows of data; it will not read a JSON array or any other multi-line formatting. When Athena reads a file it expects each line in that file to contain a complete JSON object (JsonObject) (no multiple line wrapping). Each array needs to be reformatted as a set of individual JSON objects (JsonList). However, these formatting constraints only apply to Athena. Any client of the aggregated results would be expected to understand the concept of a JSON array; so feel free to use JsonArray as an output format for those clients.

Note that both table and aggregation yaml files allow the use of six tokens whose value will be substituted at runtime:

  • {account}: The id of the aws account
  • {user}: The name of the user
  • {database}: The name of the database
  • {input}: The name of the input bucket
  • {output}: The name of the output bucket
  • {filename}: The name of the file being processed (no directory prefix or file extension)

Also note that the names of the aggregation yaml files will be used to define the folder within the output bucket into which the results of the aggregation are written. Any underscores in the name are assumed to be folder separators. For example the output of the average_group.yaml aggregation will be written to {output}/average/group/.

Aggregation output options outputFormat

You either choose to output the aggregation to a single fixed filename or to multiple files each of which is named after the field defined in the filenamePrefix.

filename outputFormat SQL constraints Effect
filename jsonObject One row of data filename.json will contain one row of data
filename jsonList Multiple rows of data filename.json will contain multiple rows of data
filenamePrefix outputFormat SQL constraints Effect
filenamePrefix jsonObject One row of data A ${data[filenamePrefix]}.json containing one row of data
filenamePrefix JsonObject Multiple rows of data Many ${data[filenamePrefix]}.json each containing one row of data
filenamePrefix jsonArray Multiple rows of data Many ${data[filenamePrefix]}.json each containing one array of data

All Athena aggregation output is written to the {output} bucket; it is therefore available to any client who has access to that S3.

If you define an archive property for an aggregation (in addition to its filename/filenamePrefix and outputFormat) then daily historic copies of the output files will be retained in the bucket identified by the archive property. As above underscores in the property will be assumed to be folder separators so an archive of archive_team will trigger the creation of archive files in {output}/archive/team. Those archive files will be named with the date prefix: yyyy-mm-dd-aggregation.json . E.g: 2019-03-18-groups.json

Aggregation query parameter

The query parameter is an optional parameter in a file. You can:

  • Leave it blank, meaning the file will not be processed for aggregation, or
  • Supply it in the provided format below, which will add the corresponding file's query to the batch aggregation.

Using the query parameter

You must supply a syntactically correct SQL SELECT command as the query parameter. This SELECT statement is run during the aggregation process to provide your desired output. You may use any SQL syntax permitted by the Presto SQL engine including WHERE, JOIN, subSELECT, GROUP BY ORDER BY, UNION and LIMIT.

You must name your output fields using the 'AS' syntax or you will be unable to access them. Further, your output will only be comprised of string fields unless you explicitly state the output format of each non-string field, as follows:

  • strings to be suffixed with :string. e.g. SELECT name AS "name:string"
  • integers to be suffixed with :integer. e.g. SELECT count(name) AS "nameCount:integer"
  • booleans to be suffixed with :boolean. e.g. SELECT flag AS "active:boolean"
  • floats to be suffixed with :float. e.g. SELECT sum(cost) AS "total:float"
  • string arrays to be suffixed with :stringArray. e.g. SELECT array_agg(name) AS "names:stringArray"
  • integer arrays to be suffixed with :integerArray. e.g. SELECT array_agg(counter) AS "counts:integerArray"
  • boolean arrays to be suffixed with :booleanArray. e.g. SELECT array_agg(flag) AS "flags:booleanArray"
  • float arrays to be suffixed with :floatArray. e.g. SELECT array_agg(cost) AS "costs:integerArray"

If you do not provide a type suffix :string will be assumed.

Note that the type suffix shown above is not written into the resulting output column name; its just used to cast the output into the required format. Columns are named as the prefix.

Nested arrays

The Array suffix may appear multiple times within single output name to allow for the handing of nested arrays.

For example:

  • A simple string from SELECT name AS "name:string"
    • results in the raw "name1"
    • becoming "name1"
  • A array of strings from SELECT array_agg(name) AS "names:stringArray"
    • results in the raw "[name1,name2]"
    • becoming [ "name1", "name2" ]
  • A nested array of strings from SELECT array_agg(array_agg(name)) AS "nestednames:stringArrayArray"
    • results in the raw "[[name1,name2],[name3,name4]]"
    • becoming [ [ "name1", "name2" ], [ "name3", "name4" ] ]

Example Table definition:

systemscores.yaml

Defines a table named systemscores (by using the filename token) which maps the format of all the JSON files contained in the {input}/individual/systems/ folder.

query: >

   CREATE EXTERNAL TABLE {database}.{filename} (
    systemcode string,
    servicetier string,
    scores struct <
     name: int,description: int,primaryURL: int,serviceTier: int>,
    totalCounts struct <
     ok: int,info: int,warning: int,error: int,critical: int,successes: int,failings: int
    >,
    lastupdated string,
    weightedscore float
   )
   ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
   WITH SERDEPROPERTIES (
    'serialization.format' = '1'
   )
   LOCATION
    's3://{input}/individual/systems/'
   TBLPROPERTIES ('has_encrypted_data'='false')

Example aggregation definition:

average_groups.yaml

Produces some averages from the data in the groupAggregation table (a table that was probably populated by a prior aggregation). The output is a single JSON object which is placed in {output}/average/groups/groups.json

filename: groups.json
outputFormat: JsonObject
query: >
 SELECT
      avg(coalesce(percentages_ok,0.0)) as "percentages_ok:float",
      avg(coalesce(percentages_info,0.0)) as "percentages_info:float",
      avg(coalesce(percentages_warning,0.0)) as "percentages_warning:float",
      avg(coalesce(percentages_error,0.0)) as "percentages_error:float",
      avg(coalesce(percentages_critical,0.0)) as "percentages_critical:float",
      avg(coalesce(weightedscore,0)) as "weightedScore:float",
      avg(coalesce(scores_name,0)) as "scores_name:integer",
      avg(coalesce(scores_description,0)) as "scores_description:integer",
      sum(coalesce(totalCounts_ok,0)) as "totalCounts_ok:integer",
      sum(coalesce(totalCounts_info,0)) as "totalCounts_info:integer",
      sum(coalesce(totalCounts_warning,0)) as "totalCounts_warning:integer",
      sum(coalesce(totalCounts_error,0)) as "totalCounts_error:integer",
      sum(coalesce(totalCounts_critical,0)) as "totalCounts_critical:integer"
     FROM {database}.groupAggregate

Example Code:

const instance = new AthenaAggregation({
    account: '510688331160',
    user: 'example.user',
    athenaDatabase: 'sampledb',
    configurationBucket: 'system-operability-score-reports.510688331160/athenaDDL',
    inputBucket: 'an-input',
    outputBucket: 'system-operability-score-reports.510688331160/geoffdemo',
});
const configuration = await instance.configureTables();
const aggregation = await instance.aggregateData();

Example Code with token expansion:

const instance = new AthenaAggregation({
    account: '510688331160',
    user: 'example.user',
    athenaDatabase: 'sampledb',
    configurationBucket: 'system-operability-score-reports{.account}/athenaDDL',
    inputBucket: 'an-input',
    outputBucket: 'system-operability-score-reports{.account}/geoffdemo',
});
const configuration = await instance.configureTables();
const aggregation = await instance.aggregateData();

Please note that in the above we can use the value of account in the bucket names to create account specific naming. Further, please note the use of the dot within the token. This special syntax enables the dot to appear in the result when the token is expanded.