AWS Lambda Serverless Reporting Actuals to DataRobot MLOps

February 2, 2021
by
· 8 min read

This post was originally part of the DataRobot Community. Visit now to browse discussions and ask questions about DataRobot, AI Platformdata science, and more.

This article will describe a simple serverless way to report actuals data back to DataRobot once results are available for predicted items. Below we describe an easy, automated way to send actuals once they arrive. Python 3.7 will be used for the executable.

Architecture

Screen Shot 2021-01-11 at 10.54.30 PM.png

The process works as follows:

  1. CSV file(s) arrive at AWS S3 containing results to report back to DataRobot. These could be files created from any process; examples above include a database writing out results to S3 or another process that simply sends over a CSV file.
  2. Upon arrival in the monitored directory, a serverless compute AWS Lambda function is triggered.
  3. The related deployment in DataRobot is specified in the S3 bucket path name to the CSV file, so the Lambda will be able to work generically for any deployment.
  4. The Lambda will parse out the deployment, read through the CSV file, and report results back to DataRobot for processing.  The results can then be explored from various angles within the platform.
Screen Shot 2021-01-27 at 2.54.09 AM.png

Create (or use an existing) S3 Bucket to Receive Actual Results

Actual CSV prediction results will be written to a monitored area of an AWS S3 bucket.  If one does not exist, create the new area to receive the results. Files are expected to be copied into this bucket from external sources such as servers, programs, or databases. To create a bucket, navigate to the S3 service within the AWS console and select the Create bucket button. Provide a name (like “datarobot-actualbucket”) and region for the bucket, then click Create bucket. Change the defaults if required for organizational policies.

Screen Shot 2021-01-28 at 9.20.01 PM.png

Create an IAM Role for the Lambda

Model Monitoring with Serverless MLOps Agents helps with creating an IAM security role for a Lambda. Follow that process, but instead create a role called lambda_upload_actuals_role. The function we define in this article will use that role.

Two policies must be attached to this role:

  1. The AWS-managed policy, AWSLambdaBasicExecutionRole.
  2. An inline policy used for accessing and managing the S3 objects/files associated with this Lambda. The inline policy can be specified as explained below, for the specific S3 bucket to monitor:
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:ListBucket",
                "s3:DeleteObject"
            ],
            "Resource": [
                "arn:aws:s3:::datarobot-actualbucket",
                "arn:aws:s3:::datarobot-actualbucket/*"
            ]
        }
    ]
}

Create the Lambda

Navigate to the AWS Lambda service in the GUI console and, from the dashboard, click Create function. Provide a name, such as lambda_upload_actuals. In the Runtime environment section, choose Python 3.7. Lastly, expand the execution role section, select Use an existing role, and choose the lambda_upload_actuals_role created above.

Add the Lambda Trigger

This Lambda will run any time a CSV file lands in the path it is monitoring. From the Designer screen, choose +Add trigger, and select S3 from the dropdown list. For the bucket, choose the one specified in the IAM role policy you created above. Optionally, specify a prefix if the bucket is used for other purposes. For example, use the value upload_actuals/ as a prefix if you want to only monitor objects that land in s3://datarobot-actualbucket/upload_actuals/. (Note that the data for this example would be expected to arrive similar to s3://datarobot-actualbucket/upload_actuals/2f5e3433_DEPLOYMENT_ID_123/actuals.csv.) Click Add to save the trigger.

Create and Add a Lambda Layer

Lambda layers provide the opportunity to build Lambda code on top of libraries, and separate that code from the delivery package. Although not required to separate the libraries, using layers simplifies the process of bringing in necessary packages and maintaining code. This code will require the requests and pandas libraries, which are not part of the base Amazon Linux image Lambda runs in, and must be added via a layer. This can be done by creating a virtual environment. In this example, the environment used to execute the code below is an Amazon Linux EC2 box. (See the instructions to install Python 3 on Amazon Linux here.)

Creating a ZIP file for a layer can then be done as follows:

python3 -m venv my_app/env
source ~/my_app/env/bin/activate
pip install requests
pip install pandas
deactivate

Per the Amazon documentation, this must be placed in the python or site-packages directory and  expanded under /opt.

cd ~/my_app/env
mkdir -p python/lib/python3.7/site-packages
cp -r lib/python3.7/site-packages/* python/lib/python3.7/site-packages/.
zip -r9 ~/layer.zip python

Copy the layer.zip file to a location on S3; this is required if the Lambda layer is > 10MB.

aws s3 cp layer.zip s3://datarobot-bucket/layers/layer.zip

Navigate to the Lambda service > Layers > Create Layer.  Provide a name and link to the file in S3; note that this will be the Object URL of the uploaded ZIP. It is recommended but not necessary to set compatible environments; this will make them more easily accessible in a dropdown menu when adding them to a Lambda. Select to save the layer and its ARN.

Screen Shot 2020-06-16 at 2.53.12 AM.png

Navigate back to the Lambda and click Layers (below the Lambda title). Add a layer and provide the ARN from the previous step.

Define the Lambda Code

import boto3
import os
import os.path
import urllib.parse
import pandas as pd
import requests
import json

#10,000 maximum allowed payload
REPORT_ROWS = int(os.environ['REPORT_ROWS'])
DR_API_TOKEN = os.environ['DR_API_TOKEN']
DR_INSTANCE = os.environ['DR_INSTANCE']
    
s3 = boto3.resource('s3')

def report_rows(list_to_report, url, total):
    
    print('reporting ' + str(len(list_to_report)) + ' records!')
    df = pd.DataFrame(list_to_report)
    
    # this must be provided as a string
    df['associationId'] = df['associationId'].apply(str)
    
    report_json = json.dumps({'data': df.to_dict('records')})
    
    response = requests.post(url, data=report_json, headers={'Authorization':'Bearer ' + DR_API_TOKEN, 'Content-Type':'application/json'})
    print('response status code: ' + str(response.status_code))
    
    if response.status_code == 202:
        print('success!  reported ' + str(total) + ' total records!')
    else:
        print('error reporting!')
        print('response content: ' + str(response.content))

def lambda_handler(event, context):

    # get the object that triggered lambda
    bucket = event['Records'][0]['s3']['bucket']['name']
    key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
    filenm = os.path.basename(key)
    fulldir = os.path.dirname(key)
    deployment = os.path.basename(fulldir)

    print('bucket is ' + bucket)
    print('key is ' + key)
    print('filenm is ' + filenm)
    print('fulldir is ' + fulldir)
    print('deployment is ' + deployment)
    
    url = DR_INSTANCE + '/api/v2/deployments/' + deployment + '/actuals/fromJSON/'
    
    session = boto3.session.Session()
    client = session.client('s3')
    
    i = -1
    total = 0
    
    rows_list = []
    
    for lines in client.get_object(Bucket=bucket, Key=key)["Body"].iter_lines():

            # if the header, make sure the case sensitive required fields are present
            if i == -1:
                    header = lines.decode('utf-8').split(',')
                    col1 = header[0]
                    col2 = header[1]
                    
                    expectedHeaders = ['associationId', 'actualValue']
                    if col1 not in expectedHeaders or col2 not in expectedHeaders:
                        print('ERROR: data must be csv with 2 columns, headers case sensitive: associationId and actualValue')
                        break
                    else:
                        i = 0
                        
            else:
                    input_dict = {}
                    input_row = lines.decode('utf-8').split(',')
                    input_dict.update({col1: input_row[0]})
                    input_dict.update({col2: input_row[1]})
                    rows_list.append(input_dict)
                    i = i + 1
                    total = total + 1

                    if i == REPORT_ROWS:
                        report_rows(rows_list, url, total)
                        rows_list = []
                        i = 0
    
    if i > 0:
        report_rows(rows_list, url, total)

    # delete the processed input
    s3.Object(bucket, key).delete()

Set Lambda Environment Variables

Three variables need to be set for the Lambda.

Screen Shot 2021-01-11 at 11.39.17 PM.png
  • DR_API_TOKEN is the API token of the account with access to the deployment, which will be used for submitting the actuals to the DataRobot environment. It is advised to use a service account for this configuration, rather than a personal user account.
  • DR_INSTANCE is the application server of the DataRobot instance that is being used.
  • REPORT_ROWS is the number of actuals records to upload in a payload; 10000 is the maximum.

Set Lambda Resource Settings

Edit the Basic settings to set some configuration items for the Lambda. When reading input data to buffer up and submit payloads, the Lambda uses a fairly low amount of local compute and memory. 512MB should be more than sufficient for memory settings, allocating half a vCPU accordingly. The Timeout is the amount of time during which the Lambda may not experience any output, causing AWS to terminate it; this is most likely to happen when waiting for a response after submitting the payload. The default of 3 seconds is likely too short, especially when using the max size 10000 record payloads. Although 5-6 seconds is likely adequate, the tested configuration was set to 30 seconds.

Run the Lambda

The Lambda is coded to expect a report-ready pair of data columns. It is expecting a CSV file with a header and case-sensitive columns associationId and actualValue.  Sample file contents are shown below for a Titanic passenger scoring model.

associationId,actualValue
892,1
893,0
894,0
895,1
896,1

An AWS CLI command to leverage the S3 service and copy a local file to the monitored directory is as follows:

aws s3 cp actuals.csv s3://datarobot-actualbucket/upload_actuals/deploy/5aa1a4e24eaaa003b4caa4/actuals.csv

Note the deployment id is included as part of the path (shown above in red). This is the DataRobot deployment with which the actuals will be associated. Files from a process or database exports can be directly written to S3 similarly.

Additionally, consider that the maximum length of time for a Lambda to run is 15 minutes; in our testing, this was comfortably good for 1 million records. In production usage, you may want to explore approaches that include more files with fewer records. Also, you may want to report actuals for multiple deployments simultaneously. It may be prudent to disable API rate limiting for the associated API token/service account reporting these values.

It’s up to you to flesh out any additional error handling, such as sending an email, a queue data message, creating custom code to fit into an environment, moving the S3 file, etc. This Lambda will simply delete the input file upon full successful processing, and write errors to the log in the event of failure.

Integrate the Lambda Into an Actuals Reporting Pipeline

At this point, the Lambda is complete and ready to report any actuals data fed to it (i.e., the defined S3 location receives a file in the expected format). Set up a process to perform this operation once actual results arrive, then monitor and manage the model with DataRobot MLOps to understand how it’s performing for your use case.

Pathfinder
Explore Our Marketplace of AI Use Cases
Visit Now

About the author
Linda Haviland
Linda Haviland

Community Manager

Meet Linda Haviland
  • Listen to the blog
     
  • Share this post
    Subscribe to DataRobot Blog
    Newsletter Subscription
    Subscribe to our Blog