Deployment Guide

Two typical deployments scenarios for the Engine API are:

  1. Analyze a batch of data from a data store
  2. Analyze a stream of data in real-time

The components in a typical end-to-end system look like:

Architecture

Analyze a batch of data

A simple connector to analyze a batch of data from a data store needs to:

  1. Create an Engine API job
  2. Query the data store
  3. Stream results from the data store to the Engine API job

For example, if time series data is stored in PostgreSQL as follows:

dbname=# select time,value from time_series_points where time_series_id=1395 order by time;
          time          | value
================================
 2011-03-01 05:01:00+00 |    825
 2011-03-01 05:02:00+00 |    513
 2011-03-01 05:03:00+00 |    480
 2011-03-01 05:04:00+00 |    492
 2011-03-01 05:05:00+00 |    473
...

Analyze a stream of data

Here is a simple shell script that streams results from a PostgreSQL database to the Engine API:

#!/bin/sh

PRELERT_API_HOST=localhost

# Create job and record JobId
PRELERT_JOB_ID=`\
curl -X POST -H 'Content-Type: application/json' "http://$PRELERT_API_HOST:8080/engine/v2/jobs" -d '{
        "analysisConfig" : {
        "bucketSpan":3600,
        "detectors" :[{"function":"max","fieldName":"value"}]
},
    "dataDescription" : {
        "fieldDelimiter":",",
        "timeField":"time",
        "timeFormat":"yyyy-MM-dd HH:mm:ssX"
    }
}' | awk -F'"' '{ print $4; }' \
`

echo "Created analysis job $PRELERT_JOB_ID"

echo "Querying PostgreSQL and streaming results to Engine API"

# Query database and stream results to Engine API
psql -F, -A -c "select time,value from time_series_points where time_series_id=1395 order by time;" dbname | \
curl -X POST -T - "http://$PRELERT_API_HOST:8080/engine/v2/data/$PRELERT_JOB_ID"

echo "Done."

# Close job
curl -X POST "http://$PRELERT_API_HOST:8080/engine/v2/data/$PRELERT_JOB_ID/close"

A simple results processor that retrieves the results and converts them to CSV format could be appended onto this script:

# Get results and print to stdout as csv
curl "http://$PRELERT_API_HOST:8080/engine/v2/results/$PRELERT_JOB_ID/buckets?take=1000000" | python -c "
import json,sys

obj=json.load(sys.stdin)

buckets=obj['documents']

print 'date,anomalyScore'

for bucket in buckets:
    print '{0},{1}'.format(bucket['timestamp'], bucket['anomalyScore'])
"