Inspect PCAP Files Using AWS Lambda (Part 2)

In part 1 we developed a working proof of concept that showed how to extract some basic information from a PCAP file downloaded from S3. Now we need to go back and make the project more complete. We'll add better exception handling, remove duplicate API calls, unit tests, etc.

Our tasks:

  1. write unit tests and configure CI/CD
  2. add proper exception handling
  3. lookup each OUI once
  4. put an upper limit on the size of the PCAP being downloaded from S3
  5. automate the creation of Lambda .ZIP archive

Configuring CI/CD will be a separate article

Let's get our development environment set up. Switching to the part1 branch is optional, master has the final code.

$ git clone https://github.com/mkerins/inspect-pcap-aws-lambda.git
$ cd inspect-pcap-aws-lambda
$ virtualenv --python=python2.7 venv
$ git checkout part1 (optional)
$ source venv/bin/activate
$ pip install -r requirements

Let's get started writing our tests. Our function is expecting two positional arguments: event which will tell us where the PCAP is and context which is only applicable when running in the context of AWS Lambda (see what I did there?). When testing locally we can set context=None but we'll need a way to mock up an event. Our first tests will simply check to make sure an event was received and is a dict. Put the following into the tests.py file:

import unittest  
import boto3  
from inspect_pcap import handler


class TestInspectPcap(unittest.TestCase):

    def test_no_event(self):
        handler(event=None, context=None)

    def test_bad_event(self):
        handler(event='filename.pcap', context=None)


if __name__ == '__main__':  
    unittest.main()

Run the tests while measuring code coverage:

$ coverage run --source=inspect_pcap tests.py  && coverage report -m

You'll get some TypeError exceptions which is to be expected. Let's properly handle this situation by changing our code:

...
def handler(event, context):  
    # Check to see an event of type dict was received
    if event is None or type(event) is not dict:
        raise TypeError('No event received or event is not a dict')
    # Log the event
    print('Received event: {}'.format(json.dumps(event)))
...

Modify the test code:

...
class TestInspectPcap(unittest.TestCase):

    def test_no_event(self):
        with self.assertRaises(TypeError):
            handler(event=None, context=None)

    def test_bad_event(self):
        with self.assertRaises(TypeError):
            handler(event='filename.pcap', context=None)
...

And re-run the tests:

$ coverage run --source=inspect_pcap tests.py  && coverage report -m
..
----------------------------------------------------------------------
Ran 2 tests in 0.000s

OK  
Name              Stmts   Miss  Cover   Missing  
-----------------------------------------------
inspect_pcap.py      44     33    25%   15-65, 68  

In reality, these two tests are unnecessary because when the function is deployed in Lambda it can't be run without an event being passed to it. We should focus on testing what our code is actually supposed to do and that's analyze PCAP files downloaded from S3. Our test methods will check that our function works properly in these situations:

  • test_good_pcap - Valid PCAP
  • test_nonexistant_pcap - PCAP doesn't exist
  • test_not_pcap - file that isn't actually a PCAP
  • test_invalid_mac_addresses - PCAP that has packets without valid MAC addresses but is otherwise valid
  • test_pcap_too_big - PCAP file that is > 1 MB

In tests.py I created an event template that matches what S3 will send when a new object is PUT. For each test case we can just modify one or more fields rather than copying the whole dict. An important point: the S3 event is a dict within a one item list within a one key dict. To get to the information we need the root of our variable will always be ['Records'][0]['s3']. From there we can get to the object and bucket data. The event that is dispatched from S3 looks like this:

{
    "Records": [{
        "eventVersion": "2.0",
        "eventTime": "1970-01-01T00:00:00.000Z",
        "requestParameters": {
            "sourceIPAddress": "127.0.0.1"
        },
        "s3": {
            "configurationId": "testConfigRule",
            "object": {
                "eTag": "0123456789abcdef0123456789abcdef",
                "sequencer": "0A1B2C3D4E5F678901",
                "key": "",
                "size": 0
            },
            "bucket": {
                "arn": "arn:aws:s3:::mybucket",
                "name": "uploaded-pcaps",
                "ownerIdentity": {
                    "principalId": "EXAMPLE"
                }
            },
            "s3SchemaVersion": "1.0"
        },
        "responseElements": {
            "x-amz-id-2": "EXAMPLE123/5678abcdefghijklambdaisawesome/mnopqrstuvwxyzABCDEFGH",
            "x-amz-request-id": "EXAMPLE123456789"
        },
        "awsRegion": "us-east-1",
        "eventName": "ObjectCreated:Put",
        "userIdentity": {
            "principalId": "EXAMPLE"
        },
        "eventSource": "aws:s3"
    }]
}

For the test_good_pcap, test_not_pcap and test_invalid_mac_addresses test methods I uploaded files that meet those requirements. The test_nonexistant method doesn't expect to find a PCAP and we can change a value in the event dict to test if a PCAP is too big using the test_pcap_too_big method. The final code is in tests.py on the master branch.

To improve performance and reduce the number of external API calls we'll only look up each OUI once. Looking up each OUI once can be achieved by keeping track of what OUIs we have already queried for using our old friend the dict. First let's get a baseline of how many queries are made for a PCAP we already have available:

$ python -m unittest tests.TestInspectPcap.test_invalid_mac_addresses

This should output the list of MAC addresses and their associated manufacturers. For the PCAP I'm using there were 48 queries made to the external API. Let's modify our code to avoid making unnecessary API calls:

...
# Keep track of OUIs that have already been looked up
known_ouis = {}

# Iterate over the set() of MAC addresses
for mac in mac_addresses:  
    # Get the first 24 bits (aka the OUI) of the mac address
    oui = mac[0:8]
    # Check if we've already looked up this OUI
    if oui in known_ouis:
        print('{} -> {}*'.format(mac, known_ouis[oui]))
        continue
    # Attempt to look up the manufacturer
    try:
        resp = urllib2.urlopen('http://api.macvendors.com/{}'.format(mac))
        if resp.getcode() == 200:
            vendor_str = resp.readline()
            # Add this to our dict of known OUIs
            known_ouis[oui] = vendor_str
            print('{} -> {}'.format(mac, vendor_str))
    # Handle not found queries
    except urllib2.HTTPError:
        # Add the 'Unknown' OUI
        known_ouis[oui] = 'Unknown'
        print('{} -> {}'.format(mac, known_ouis[oui]))

Pretty straightforward: if the OUI is a key in known_ouis, don't look it up again. Re-running our test method again resulted in 29 queries, a reduction of about 40%.

Our last bit of work is coming up with a better way to prepare the .ZIP file for upload to AWS Lambda. In part 1 we did something like this:

$ cd venv/lib/python2.7/site-packages/
$ zip -x "*.pyc" -r ../../../../inspect_pcap.zip scapy
$ cd ../../../../
$ zip -x "*.pyc" -r inspect_pcap.zip inspect_pcap.py

We knew to include scapy because we manually compared the output of pip freeze before and after installing scapy. A better approach might be to separate our production from development requirements. AWS Lambda has boto3 installed by default so it doesn't need to be included in the production requirements file. In order to develop and test locally we do need to include it in our dev-requirements.txt:

appdirs==1.4.3  
boto3==1.4.4  
botocore==1.5.47  
coverage==4.4.1  
docutils==0.13.1  
futures==3.1.1  
jmespath==0.9.2  
packaging==16.8  
pyparsing==2.2.0  
python-dateutil==2.6.0  
s3transfer==0.1.10  
six==1.10.0  

Our production requirements.txt file will include just one library:

scapy==2.3.3  

We know that the libraries listed in dev-requirements.txt will already be available for our function so we only need to include what's in requirements.txt.

#!/usr/bin/env python
from __future__ import print_function  
import argparse  
import os

parser = argparse.ArgumentParser(description='Prepares .ZIP archive for upload to AWS Lambda')

parser.add_argument('-r', action='store', dest='req_file',  
    help='requirements.txt file', required=True)
parser.add_argument('-f', action='store', dest='func_file',  
    help='function file', required=True)

args = parser.parse_args()  
req_file = args.req_file  
func_file = args.func_file  
py_path = os.environ['VIRTUAL_ENV'] + '/lib/python2.7/site-packages/'  
base_path = os.environ['PWD']  
zip_filename = func_file.split('.')[0] + '.zip'  
zip_file = base_path + '/' + zip_filename

print('Opening {}'.format(req_file))

modules = []

with open(req_file) as f:  
    for line in f:
        module_name = line.strip('\n').split('==')[0]
        modules.append(module_name)
        print('Adding {} to the list of modules'.format(module_name))

os.chdir(py_path)

for module in modules:  
    print('Adding {} to {}'.format(module, zip_file))
    os.system('zip -q -x "*.pyc" -r {} {}'.format(zip_file, module))

os.chdir(base_path)  
print('Adding function file {} to {}'.format(func_file, zip_file))  
os.system('zip -q -u {} {}'.format(zip_file, func_file))  

Save this as prepare_package.py and run:

$ python prepare_package.py -r requirements.txt -f inspect_pcap.py

The .ZIP archive is now packaged and ready to be uploaded to AWS Lambda. Thanks for following along, please leave any comments or questions below.