Last week I've described what is Kinesis Data Analytics and promised we'll have some fun with processing streams the pro way. It's time to turn these promises into real action!
Before we do anything useful, we need some material to work with. In other words, we need a DATA STREAM. Or more streams.
Obviously, we don't want to spend much time setting it up (we have more amusing stuff coming this way), but we're picky. The stream has to be easy to deploy (IaC!), operate (managed service), and use (good API!). Is there anything else? Yes! Fast (low latency), performant (high throughput), and scalable (preferably serverless). Phew, I thought it's gonna be hard ;)
The good news is that there's an easy option available at hand. It's called Kinesis Data Streams, and we can have it up and running in no time.
First things first
Let's start with some pre-requisites:
- you need an active AWS account (here's how to do it)
- I highly recommend you to install AWS CLI tool (v2) - it's not required, but it helps a lot (here's how you can get it)
- I'll be using AWS CDK as an Infrastructure-as-Code tool. You can learn more about installing it here.
Some hints and tips (before we get it rolling for real):
- DON'T (pretty please ...) use your root account user for anything we're gonna do here. Create another IAM user, and assign only the privileges (policies) it needs (yes, you will need access keys for programmatic API calls)
- If you have more than 1 AWS account (which is fine, an account is just a container for cloud resources), create separate profiles for AWS CLI
- CDK requires Node, just like plenty of other apps you may be using - becoming friends with nvm may save you a lot of grey hair ... (because of version clashes, direct and indirect dependencies, and such) Just sayin' ...
OK, stellar. I assume that you're all set (with AWS CLI v2 and CDK ready to rumble). There's one more crucial decision to make: which AWS region you'd like to make your hunting ground? The easiest way is to pick from this list:
aws ec2 describe-regions
In my case, I've decided to go with 'eu-central-1'. You can set your default region by using aws configure
or specify it manually each time by applying the switch like this: --region=eu-central-1
(I'll be skipping this in my examples below for the sake of brevity).
It's time to make a readiness check:
aws kinesis list-streams --profile={profile_name} --region={region_name}
You should be getting an empty JSON (as a result). It makes sense as we didn't create any stream yet. Let's fix it.
CDK project
To use the CDK, you need to bootstrap it first. It creates some resources (no worries, they'll be covered entirely by the free tier) that will be used, e.g., to contain the CDK stack definitions. It's just one command that you need to run just once per region:
cdk bootstrap --profile={profile_name}
Now it's time to create the actual CDK project in a language of your choice (as long as this choice is: Python, Java, JavaScript/TypeScript, or C# ... AFAIK Golang is in tech preview as well). You can find the description of creating a new project here. Important! I will be using Python, so e.g., working with dependencies (pyenv, pip, etc.) and general syntax will be specific to that language!
I assume that by this moment, you have a CDK app and an empty stack. That stack will contain the definition of our Kinesis Data Stream. Just make sure that you link stack with your app as a scope, just like that (in your `app.py`):
from xyz_infra.xyz_infra_kinesis_stack import XyzInfraKinesisStack
app = cdk.App()
xyz_kinesis_stack = XyzInfraKinesisStack(app, "xyz-infra-kinesis")
Brilliant! Now your CDK stack should be visible from the command line:
cdk ls
cdk diff xyz-infra-kinesis --profile={profile_name}
We're ready to actually fill in the details of the stack:
from aws_cdk import (
aws_kinesis as kinesis,
core as cdk
)
class XyzInfraKinesisStack(cdk.Stack):
def __init__(self,
scope: cdk.Construct,
construct_id: str,
**kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
kinesis_data_stream = kinesis.Stream(
self, "xyz-kinesis-data-stream",
stream_name="xyz-kinesis-data-stream",
shard_count=1,
retention_period=cdk.Duration.hours(24)
)
And ... that's about it. We're just creating a new stream, named "xyz-kinesis-data-stream", starting with just one shard ("virtual" unit of stream scaling) and 24h of the retention period for events (which is minimal value).
Does it work?
Great, let's try to deploy it. First of all, we should make a dry-run to see what's about to get deployed:
cdk diff --profile={profile_name} xyz-infra-kinesis
You should get just one resource - the data stream itself. Let's have it rolled out!
cdk deploy --profile={profile_name} xyz-infra-kinesis
As a result, you should get an ARN of the recently deployed stack (in the console output). The easiest way to get it inspected is ... AWS Management Console:
- Go to http://console.aws.amazon.com
- Log in, using your non-root user credentials (and hopefully a successfully set up MFA)
- Navigate to Cloud Formation service console (conceptually, CDK stacks are just Cloud Formation stacks)
- Find the stack named 'xyz-infra-kinesis' and click its name
- Verify whether the ARN (identifier) is the same, and the status is 'CREATE_COMPLETE'
Well, as you can see, using AWS Management Console requires some clicking, so why don't we use AWS CLI instead?
aws cloudformation list-stacks --profile={profile_name} --region={region_name}
aws cloudformation list-stack-resources --profile={profile_name} --stack-name=xyz-infra-kinesis
Awesome. You should see not only the stack but also the data stream in it.
But how to make sure it really works (accepts the messages)? Fortunately, it's super-easy. Let's use a Python script for that (boto3 has to be downloaded as a dependency ofc):
import json
import sys
import boto3
from botocore.config import Config
config = Config(
region_name = "{region_name}"
)
session = boto3.Session(profile_name="{profile_name}")
kinesis = session.client("kinesis", config=config)
event = {
# event's fields here
}
json_load = json.dumps(event)
response = kinesis.put_record(
StreamName="xyz-kinesis-data-stream",
Data=json_load,
PartitionKey={partition_key}
)
sequence_number = response["SequenceNumber"]
It is really THAT simple, but there's one new element that we haven't covered yet - the partition key required when we put anything into a stream. Let's clarify what it is and how it works:
- First of all, as you see, we're sending events in JSON, but there's no mechanism to enforce any schema. Yupp, it's schema-on-read.
- Partitioning is not fixed to any property. You pass the value, and it's supposed to be a Unicode string, up to 256 characters. In our case, we have just one shard, so it doesn't really matter.
The sequence number you receive is unique per partition key within its shard. It's generally monotonic (increases over time), but don't treat it as a counter.
OK, I assume you've got your sequence number, so you were able to successfully put an event in the stream — a small win, but still a win!
Let's stop here, for now. Next week we'll start consuming the events from the stream and making some sense out of them.
Stay tuned!
P.S. To avoid costs, don't forget to ditch your stack by using the following command:
cdk destroy --profile={profile_name} xyz-infra-kinesis