In this lab, we will generate simulated POS data that we will ingest as it happens. We will also process this data in near real-time and detect anomalies on the fly. And finally, we will store this data, along with the generated anomaly scores and associated explanations (why a value was considered anomalous along with attribution scores that show which specific column caused it to be flagged as an anomaly) for our records and historic analysis.
A note on the KPI metric Theretail_kpi column is meant to be some (any) calculated value that makes sense for your business that you want to maximize (or minimize). You can certainly define multiple such KPIs (Key Performance Indicators). |
---|
What we’re doing when simulating this data, is artificially introducing anomalous values for this KPI. Most values are between 80 - 95, while some values are below 10 (about 3% of the time).
COL_timestamp | store_id | workstation_id | operator_id | item_id | quantity | regular_sales_unit_price | retail_price_modifier | retail_kpi_metric |
---|---|---|---|---|---|---|---|---|
2019-08-31T10:40:05.0 | store_36 | pos_2 | cashier_75 | item_1098 | 5 | 64.42 | 5.83 | 87 |
2019-09-27T17:12:33.0 | store_43 | pos_10 | cashier_175 | item_4159 | 5 | 50.25 | 7.68 | 85 |
… | … | … | … | … | … | … | … | … |
To ingest data, we’ll first set up an Amazon Kinesis Data Stream to which we can send our generated data. To set this up:
Point browser to https://console.aws.amazon.com/kinesis
Note the region that you are defaulted to, which in the case of the screenshot below is ‘Oregon’. Any of the 6 (as of now) regions that you prefer where Amazon Forecast is supported is fine, so long as you ensure that you remain within that region across all labs.
Click on ‘Get Started’
Click on ‘Create data stream’
For ‘Kinesis stream name’, give it a descriptive name such as ‘RetailDataStream’ for example. (note the restrictions in what characters can be used for the name right below the textbox).
Note: You’re free to name this stream anything you prefer, but if you change it, be sure to also change line no. 5 in the gen_pos_log_stream.rb
script in the /src
directory and use the same name for STREAM_NAME
STREAM_NAME = 'RetailDataStream'
For ‘Number of shards*’ enter ‘2’
Click ‘Create Kinesis Streams’. This will take around 10 to 15 seconds after which, the Kinesis Data Stream should have been created. You should see a success message like this:
We have successfully created a Amazon Kinesis Stream resource that can ingest our data.
Since we’ve created the Kinesis Data Stream to which we can send data to, we’ll start running our simulation script that generates the PoS data and send that data to the stream that we just created.
To run this script, switch to the browser tab where you have the Cloud9 IDE open. Go to the Cloud9 IDE terminal and run:
cd ~/environment/retail/lab1/src
gem install bundler
bundle install
mkdir config
touch config/aws.yml
Replace ACCESS_KEY_ID
, SECRET_ACCESS_KEY
, and SESSION_TOKEN
with what you copied into your notepad at the beginning and then run:
Note The whitespace after each of the colons: below is critical, so preserve the whitespace after the colon when you copy-paste each of the values. Otherwise, it isn’t valid YAML syntax and the script will fail to parse the values. |
---|
echo "access_key_id: ACCESS_KEY_ID" >> config/aws.yml
echo "secret_access_key: SECRET_ACCESS_KEY" >> config/aws.yml
echo "session_token: SESSION_TOKEN" >> config/aws.yml
Note Did you remember to replaceACCESS_KEY_ID , SECRET_ACCESS_KEY , and SESSION_TOKEN above? And preserve the whitespace after the : ? |
---|
OPTIONAL: If you’re using the AWS account provided via Event Engine, ignore this. But if you’re running in your own account and selected a different region, make sure you edit the script and change us-west-2
on line 15 to the region you prefer.
As mentioned earlier, the gen_pos_log_stream.rb
script artificially introduces anomalous values for the KPI metric. Most values are between 80 - 95, but some values are below 10 (we inject these anomalous values about 3% of the time).
Execute the script by running:
ruby gen_pos_log_stream.rb
Wait for the script to start running and then switch back to the AWS console to continue with the steps below.
We will now create an Amazon Kinesis Data Analytics App (SQL-based) that we will connect to the above Kinesis Data Stream to allow us to process the data we are ingesting.
Click on ‘Data Analytics’ in the left hand tab.
You may see either the screen above or the one below, depending on whether or not you have already created an analytics app in the past.
Click on ‘Create Kinesis stream’ or ‘Create application’ depending on the screen.
For ‘Application name’, enter something descriptive. The ‘Description’ is optional. Leave the rest of the options as-is.
Then click on ‘Create application’
Click on ‘Connect streaming data’ to connect the Kinesis Data Stream we created in Step A with the Kinesis Data Analytics application.
For the ‘Kinesis data stream’ drop-down, choose the Kinesis Data Stream name that we just created
Scroll down until you see this and click on ‘Discover schema’
Now look at the discovered schema, the column names, and associated data types. You’ll notice that Kinesis Data Analytics has automatically discovered most of the schema types correctly. Except one. The very first column, COL_timestamp
has been classified as VARCHAR
instead of TIMESTAMP
. We deliberately introduced an uncommon TIMESTAMP
format to trip it. However, this is easy to fix.
To fix the schema, click on the ‘Edit schema’ button.
Update COL_timestamp
’s ‘Column type’ value to TIMESTAMP
from the drop-down options.
Click on ‘Save schema and update stream samples’. This will take around 10 - 15 seconds, but once complete, and the stream samples are updated, scroll down and you will notice the corrected schema as shown:
Click on ‘Exit (done)’ to exit the screen
In this step, we’ll configure the Kinesis Data Analytics SQL to process the data, that we’re now ingesting, on the fly.
Click on ‘Go to SQL editor’
After the Kinesis Data Analytics application starts up successfully, you should see the data start to stream in.
Here, we’ll add some streaming SQL to process ingested PoS data on the fly. We’ll experiment with two different streaming SQL statements
For our first experiment, copy the streaming SQL from the file retail_pos_analytics_kpi_attribution.sql
into the SQL editor window. The SQL is also provided here for convenience
--
-- Creates a stream with a subset of the data that we are ingesting,
-- which is our way of ensuring that we only run anomaly detection on
-- the numeric columns that we pick.
--
CREATE OR REPLACE STREAM "RETAIL_KPI_ANOMALY_DETECTION_STREAM" (
"store_id" varchar(8),
"workstation_id" varchar(8),
"operator_id" varchar(16),
"item_id" varchar(16),
"retail_kpi_metric" integer,
"ANOMALY_SCORE" double,
"ANOMALY_EXPLANATION" varchar(512)
);
--
-- Compute an anomaly score for each record in the input stream. The
-- anomaly detection algorithm considers ALL numeric columns and
-- ignores the rest.
--
CREATE OR REPLACE PUMP "RETAIL_KPI_ANOMALY_DETECTION_STREAM_PUMP" AS
INSERT INTO "RETAIL_KPI_ANOMALY_DETECTION_STREAM"
SELECT STREAM "store_id",
"workstation_id",
"operator_id",
"item_id",
"retail_kpi_metric",
ANOMALY_SCORE,
ANOMALY_EXPLANATION
FROM TABLE(RANDOM_CUT_FOREST_WITH_EXPLANATION (
CURSOR( SELECT STREAM "store_id",
"workstation_id",
"operator_id",
"item_id",
"retail_kpi_metric"
FROM "SOURCE_SQL_STREAM_001"), 100, 256, 100000, 1, false)
);
--
-- Create a destination stream that combines (JOINs) all the values in
-- the source stream along with the anomaly values in the anomaly stream
-- which we will then store for historic records.
--
CREATE OR REPLACE STREAM "DESTINATION_STREAM" (
"COL_timestamp" timestamp,
"store_id" varchar(8),
"workstation_id" varchar(8),
"operator_id" varchar(16),
"item_id" varchar(16),
"quantity" integer,
"regular_sales_unit_price" real,
"retail_price_modifier" real,
"retail_kpi_metric" integer,
"ANOMALY_SCORE" double
--"ANOMALY_EXPLANATION" varchar(512)
);
CREATE OR REPLACE PUMP "DESTINATION_STREAM_PUMP" AS
INSERT INTO "DESTINATION_STREAM"
SELECT STREAM "SOURCE_STREAM"."COL_timestamp",
"SOURCE_STREAM"."store_id",
"SOURCE_STREAM"."workstation_id",
"SOURCE_STREAM"."operator_id",
"SOURCE_STREAM"."item_id",
"SOURCE_STREAM"."quantity",
"SOURCE_STREAM"."regular_sales_unit_price",
"SOURCE_STREAM"."retail_price_modifier",
"SOURCE_STREAM"."retail_kpi_metric",
"ANOMALY_STREAM"."ANOMALY_SCORE"
--"ANOMALY_STREAM"."ANOMALY_EXPLANATION"
FROM "SOURCE_SQL_STREAM_001" AS "SOURCE_STREAM"
JOIN "RETAIL_KPI_ANOMALY_DETECTION_STREAM" AS "ANOMALY_STREAM"
ON "SOURCE_STREAM"."store_id" = "ANOMALY_STREAM"."store_id"
AND "SOURCE_STREAM"."workstation_id" = "ANOMALY_STREAM"."workstation_id"
AND "SOURCE_STREAM"."operator_id" = "ANOMALY_STREAM"."operator_id"
AND "SOURCE_STREAM"."item_id" = "ANOMALY_STREAM"."item_id";
Paste the above SQL into the SQL editor window.
Click on ‘Save and run SQL’ button.
You will need to wait for a few 10 seconds for the results to start streaming. While waiting for this to update…
Useful Terms It might also be helpful to understand the simple concepts of a STREAM and a PUMP. |
---|
STREAM: An in-application stream works like a table that you can query using SQL statements, but it’s called a stream because it represents continuous data flow.
PUMP: A pump is a continuously running insert query that inserts data from one in-application stream to another in-application stream.
See the docs here for a more detailed explanation: https://docs.aws.amazon.com/kinesisanalytics/latest/dev/streaming-sql-concepts.html
Once the results start streaming in, under the ‘Source’ and ‘Real-time analytics’ tab, you will notice multiple streams.
SOURCE_SQL_STREAM_001
- raw source data. You will see this under the ‘Source’ tabs. RETAIL_KPI_ANOMALY_DETECTION_STREAM
- stream anomaly scores. You will see this under the ‘Real-time analytics’ tabs. DESTINATION_STREAM
- raw source data JOIN
ed with anomaly scores to store in Amazon S3 for historical analysis. You will see this under the ‘Real-time analytics’ tabs. Under ‘In-application streams’ click on the RETAIL_KPI_ANOMALY_DETECTION_STREAM
and wait a few seconds for the data to start flowing.
Scroll to the right until you reach the ANOMALY_SCORE
column.
Results Explained You’ll notice floating point values. What these values mean are that, the algorithm considers records with higherANOMALY_SCORE s as more anomalous. |
---|
Look further to the right of this and you’ll see the ANOMALY_EXPLANATION
column adjacent to it. Notice the values in this (JSON formatted) column.
Results Explained In theANOMALY_EXPLANATION column, you’ll see retail_kpi_metric being called out with an associated ATTRIBUTION_SCORE . This is the algorithm’s way of indicating the extent to which this column contributed to the anomaly score. |
---|
Now we’ll slightly modify the above streaming SQL and compare the results.
Now copy the streaming SQL in retail_pos_analytics_multicolumn_attributions.sql
into the same SQL Editor as you did before. The SQL statements are also included below for convenience.
CREATE OR REPLACE STREAM "RETAIL_KPI_ANOMALY_DETECTION_STREAM" (
"store_id" varchar(8),
"workstation_id" varchar(8),
"operator_id" varchar(16),
"item_id" varchar(16),
"quantity" integer,
"regular_sales_unit_price" real,
"retail_price_modifier" real,
"retail_kpi_metric" integer,
"ANOMALY_SCORE" double,
"ANOMALY_EXPLANATION" varchar(512)
);
-- Compute an anomaly score for each record in the input stream
CREATE OR REPLACE PUMP "RETAIL_KPI_ANOMALY_DETECTION_STREAM_PUMP" AS
INSERT INTO "RETAIL_KPI_ANOMALY_DETECTION_STREAM"
SELECT STREAM "store_id",
"workstation_id",
"operator_id",
"item_id",
"quantity",
"regular_sales_unit_price",
"retail_price_modifier",
"retail_kpi_metric",
ANOMALY_SCORE,
ANOMALY_EXPLANATION
FROM TABLE(RANDOM_CUT_FOREST_WITH_EXPLANATION (
CURSOR( SELECT STREAM "store_id",
"workstation_id",
"operator_id",
"item_id",
"quantity",
"regular_sales_unit_price",
"retail_price_modifier",
"retail_kpi_metric"
FROM "SOURCE_SQL_STREAM_001"), 100, 256, 100000, 1, false)
);
CREATE OR REPLACE STREAM "DESTINATION_STREAM" (
"COL_timestamp" timestamp,
"store_id" varchar(8),
"workstation_id" varchar(8),
"operator_id" varchar(16),
"item_id" varchar(16),
"quantity" integer,
"regular_sales_unit_price" real,
"retail_price_modifier" real,
"retail_kpi_metric" integer,
"ANOMALY_SCORE" double
-- "ANOMALY_EXPLANATION" varchar(512),
);
CREATE OR REPLACE PUMP "DESTINATION_STREAM_PUMP" AS
INSERT INTO "DESTINATION_STREAM"
SELECT STREAM "SOURCE_STREAM"."COL_timestamp",
"SOURCE_STREAM"."store_id",
"SOURCE_STREAM"."workstation_id",
"SOURCE_STREAM"."operator_id",
"SOURCE_STREAM"."item_id",
"SOURCE_STREAM"."quantity",
"SOURCE_STREAM"."regular_sales_unit_price",
"SOURCE_STREAM"."retail_price_modifier",
"SOURCE_STREAM"."retail_kpi_metric",
"ANOMALY_STREAM"."ANOMALY_SCORE"
-- "ANOMALY_STREAM"."ANOMALY_EXPLANATION"
FROM "SOURCE_SQL_STREAM_001" AS "SOURCE_STREAM"
JOIN "RETAIL_KPI_ANOMALY_DETECTION_STREAM" AS "ANOMALY_STREAM"
ON "SOURCE_STREAM"."store_id" = "ANOMALY_STREAM"."store_id"
AND "SOURCE_STREAM"."workstation_id" = "ANOMALY_STREAM"."workstation_id"
AND "SOURCE_STREAM"."operator_id" = "ANOMALY_STREAM"."operator_id"
AND "SOURCE_STREAM"."item_id" = "ANOMALY_STREAM"."item_id";
Click on ‘Save and run SQL’ and wait for the analytics application to update the stream.
Once the data starts flowing, click on the RETAIL_KPI_ANOMALY_DETECTION_STREAM
and wait a seconds for the data to start flowing again.
Now scroll again all the way to the right until you see the ANOMALY_SCORE
and ANOMALY_EXPLANATION
columns. Notice more values in the ANOMALY_EXPLANATION
column than before?
Results Explained In theANOMALY_EXPLANATION column, in addition to retail_kpi_metric , you’ll also notice the presence of quantity ,retail_price_modifier , and regular_sales_unit_price metrics. The algorithm is now using all of these numeric values to determine the anomaly score of a record. In this way, you get to cherry pick the metrics data that you think are most relevant and want the algorithm to score. |
---|
To Recap, we
Before jumping into Lab 2, now would be a good time to jump back to Lab 3 and check up on Amazon Forecast predictor training, which should be done.
Switch over to Lab 3 and continue from Step E to generate forecasts.
[Ignore anything the below this, including any <style>
directives]