Skip to content

Latest commit

 

History

History

Amazon Redshift Streaming Ingestion from Amazon Managed Streaming for Apache Kafka (MSK) CDK Python project!

redshift_streaming_from_msk

This is an Amazon Redshift Streaming Ingestion from MSK project for CDK development with Python.

The cdk.json file tells the CDK Toolkit how to execute your app.

This project is set up like a standard Python project. The initialization process also creates a virtualenv within this project, stored under the .venv directory. To create the virtualenv it assumes that there is a python3 (or python for Windows) executable in your path with access to the venv package. If for any reason the automatic creation of the virtualenv fails, you can create the virtualenv manually.

To manually create a virtualenv on MacOS and Linux:

$ python3 -m venv .venv

After the init process completes and the virtualenv is created, you can use the following step to activate your virtualenv.

$ source .venv/bin/activate

If you are a Windows platform, you would activate the virtualenv like this:

% .venv\Scripts\activate.bat

Once the virtualenv is activated, you can install the required dependencies.

$ pip install -r requirements.txt

At this point you can now synthesize the CloudFormation template for this code.

(.venv) $ export CDK_DEFAULT_ACCOUNT=$(aws sts get-caller-identity --query Account --output text)
(.venv) $ export CDK_DEFAULT_REGION=$(aws configure get region)
(.venv) $ cdk synth \
              -c aws_secret_name='your_redshift_secret_name'

ℹ️ Before you deploy this project, you should create an AWS Secret for your Redshift Serverless Admin user. You can create an AWS Secret like this:

$ aws secretsmanager create-secret \
    --name "your_redshift_secret_name" \
    --description "(Optional) description of the secret" \
    --secret-string '{"admin_username": "admin", "admin_user_password": "password_of_at_last_8_characters"}'

Use cdk deploy command to create the stack shown above.

(.venv) $ cdk deploy \
              -c aws_secret_name='your_redshift_secret_name'

To add additional dependencies, for example other CDK libraries, just add them to your setup.py file and rerun the pip install -r requirements.txt command.

Clean Up

Delete the CloudFormation stack by running the below command.

(.venv) $ cdk destroy --force \
              -c aws_secret_name=your_redshift_secret_name

Run Test

Amazon MSK setup

After MSK is succesfully created, you can now create topic, and produce on the topic in MSK as the following example.

  1. Get cluster information

    $ export MSK_CLUSTER_ARN=$(aws kafka list-clusters-v2 | jq -r '.ClusterInfoList[] | select(.ClusterName == "your-msk-cluster-name") | .ClusterArn')
    $ aws kafka describe-cluster-v2 --cluster-arn $MSK_CLUSTER_ARN
    {
     "ClusterInfo": {
      "ClusterType": "PROVISIONED",
      "ClusterArn": "arn:aws:kafka:us-east-1:123456789012:cluster/{msk-cluster-name}/b5555d61-dbd3-4dab-b9d4-67921490c072-3",
      "ClusterName": "{msk-cluster-name}",
      "CreationTime": "2022-12-20T12:26:29.328000+00:00",
      "CurrentVersion": "K3AEGXETSR30VB",
      "State": "ACTIVE",
      "Tags": {},
      "Provisioned": {
       "BrokerNodeGroupInfo": {
        "BrokerAZDistribution": "DEFAULT",
        "ClientSubnets": [
         "subnet-036e818e577297ddc",
         "subnet-0113628395a293b98",
         "subnet-090240f6a94a4b5aa"
        ],
        "InstanceType": "kafka.m5.large",
        "SecurityGroups": [
         "sg-05c2c5594681e3910",
         "sg-066ae826faa6a4bb6"
        ],
        "StorageInfo": {
         "EbsStorageInfo": {
          "VolumeSize": 100
         }
        },
        "ConnectivityInfo": {
         "PublicAccess": {
          "Type": "DISABLED"
         }
        }
       },
       "CurrentBrokerSoftwareInfo": {
        "KafkaVersion": "2.8.1"
       },
       "EncryptionInfo": {
        "EncryptionAtRest": {
         "DataVolumeKMSKeyId": "arn:aws:kms:us-east-1:123456789012:key/cfd8f515-305d-4e6b-9c53-90869065347e"
        },
        "EncryptionInTransit": {
         "ClientBroker": "TLS_PLAINTEXT",
         "InCluster": true
        }
       },
       "EnhancedMonitoring": "PER_TOPIC_PER_BROKER",
       "OpenMonitoring": {
        "Prometheus": {
         "JmxExporter": {
          "EnabledInBroker": false
         },
         "NodeExporter": {
          "EnabledInBroker": false
         }
        }
       },
       "NumberOfBrokerNodes": 3,
       "ZookeeperConnectString": "z-1.{msk-cluster-name}.0o776j.c3.kafka.us-east-1.amazonaws.com:2181,z-3.{msk-cluster-name}.0o776j.c3.kafka.us-east-1.amazonaws.com:2181,z-2.{msk-cluster-name}.0o776j.c3.kafka.us-east-1.amazonaws.com:2181",
       "ZookeeperConnectStringTls": "z-1.{msk-cluster-name}.0o776j.c3.kafka.us-east-1.amazonaws.com:2182,z-3.{msk-cluster-name}.0o776j.c3.kafka.us-east-1.amazonaws.com:2182,z-2.{msk-cluster-name}.0o776j.c3.kafka.us-east-1.amazonaws.com:2182",
       "StorageMode": "LOCAL"
      }
     }
    }
    
  2. Get booststrap brokers

    $ aws kafka get-bootstrap-brokers --cluster-arn $MSK_CLUSTER_ARN
    {
     "BootstrapBrokerString": "b-2.{msk-cluster-name}.0o776j.c3.kafka.us-east-1.amazonaws.com:9092,b-1.{msk-cluster-name}.0o776j.c3.kafka.us-east-1.amazonaws.com:9092,b-3.{msk-cluster-name}.0o776j.c3.kafka.us-east-1.amazonaws.com:9092",
     "BootstrapBrokerStringTls": "b-2.{msk-cluster-name}.0o776j.c3.kafka.us-east-1.amazonaws.com:9094,b-1.{msk-cluster-name}.0o776j.c3.kafka.us-east-1.amazonaws.com:9094,b-3.{msk-cluster-name}.0o776j.c3.kafka.us-east-1.amazonaws.com:9094"
    }
    
  3. Connect the MSK client EC2 Host.

    You can connect to an EC2 instance using the EC2 Instance Connect CLI.
    Install ec2instanceconnectcli python package and Use the mssh command with the instance ID as follows.

    $ sudo pip install ec2instanceconnectcli
    $ mssh ec2-user@i-001234a4bf70dec41EXAMPLE
    
  4. Create an Apache Kafka topic After connect your EC2 Host, you use the client machine to create a topic on the cluster. Run the following command to create a topic called ev_stream_data.

    [ec2-user@ip-172-31-0-180 ~]$ export PATH=$HOME/opt/kafka/bin:$PATH
    [ec2-user@ip-172-31-0-180 ~]$ export BS={BootstrapBrokerString}
    [ec2-user@ip-172-31-0-180 ~]$ kafka-topics.sh --create --bootstrap-server $BS --topic ev_stream_data --partitions 3 --replication-factor 2
    
  5. Produce and consume data

    (1) To produce messages

    Run the following command to generate messages into the topic on the cluster.

    [ec2-user@ip-172-31-0-180 ~]$ python3 gen_fake_kafka_data.py --bootstrap-servers $BS --topic ev_stream_data
    

    (2) To consume messages

    Keep the connection to the client machine open, and then open a second, separate connection to that machine in a new window.

    [ec2-user@ip-172-31-0-180 ~]$ kafka-console-consumer.sh --bootstrap-server $BS --topic ev_stream_data --from-beginning
    

    You start seeing the messages you entered earlier when you used the console producer command. Enter more messages in the producer window, and watch them appear in the consumer window.

Amazon Redshift setup

These steps show you how to configure the materialized view to ingest data.

  1. Connect to the Redshift query editor v2

    redshift-query-editor-v2-connection

  2. Create an external schema to map the data from MSK to a Redshift object.

    CREATE EXTERNAL SCHEMA evdata
    FROM MSK
    IAM_ROLE 'arn:aws:iam::{AWS-ACCOUNT-ID}:role/RedshiftStreamingRole'
    AUTHENTICATION none
    CLUSTER_ARN 'arn:aws:kafka:{region}:{AWS-ACCOUNT-ID}:cluster/{cluser-name}/b5555d61-dbd3-4dab-b9d4-67921490c072-3';
    

    For information about how to configure the IAM role, see Getting started with streaming ingestion from Amazon Managed Streaming for Apache Kafka.

  3. Create a materialized view to consume the stream data.

    Note that MSK cluster names are case-sensitive and can contain both uppercase and lowercase letters. To use case-sensitive identifiers, you can set the configuration setting enable_case_sensitive_identifier to true at either the session or cluster level.

    -- To create and use case sensitive identifiers
    SET enable_case_sensitive_identifier TO true;
    
    -- To check if enable_case_sensitive_identifier is turned on
    SHOW enable_case_sensitive_identifier;
    

    The following example defines a materialized view with JSON source data.
    Create the materialized view so it’s distributed on the UUID value from the stream and is sorted by the refresh_time value. The refresh_time is the start time of the materialized view refresh that loaded the record. The materialized view is set to auto refresh and will be refreshed as data keeps arriving in the stream.

    CREATE MATERIALIZED VIEW ev_station_data_extract DISTKEY(6) sortkey(1) AUTO REFRESH YES AS
      SELECT refresh_time,
        kafka_timestamp_type,
        kafka_timestamp,
        kafka_key,
        kafka_partition,
        kafka_offset,
        kafka_headers,
        json_extract_path_text(from_varbyte(kafka_value,'utf-8'),'_id',true)::character(36) as ID,
        json_extract_path_text(from_varbyte(kafka_value,'utf-8'),'clusterID',true)::varchar(30) as clusterID,
        json_extract_path_text(from_varbyte(kafka_value,'utf-8'),'connectionTime',true)::varchar(20) as connectionTime,
        json_extract_path_text(from_varbyte(kafka_value,'utf-8'),'kWhDelivered',true)::DECIMAL(10,2) as kWhDelivered,
        json_extract_path_text(from_varbyte(kafka_value,'utf-8'),'stationID',true)::INTEGER as stationID,
        json_extract_path_text(from_varbyte(kafka_value,'utf-8'),'spaceID',true)::varchar(100) as spaceID,
        json_extract_path_text(from_varbyte(kafka_value,'utf-8'),'timezone',true)::varchar(30)as timezone,
        json_extract_path_text(from_varbyte(kafka_value,'utf-8'),'userID',true)::varchar(30) as userID
      FROM evdata.ev_stream_data
      WHERE LENGTH(kafka_value) < 65355 AND CAN_JSON_PARSE(kafka_value);
    

    The code above filters records larger than 65355 bytes. This is because json_extract_path_text is limited to varchar data type. The Materialized view should be defined so that there aren’t any type conversion errors.

  4. Refreshing materialized views for streaming ingestion

    The materialized view is auto-refreshed as long as there is new data on the MSK stream. You can also disable auto-refresh and run a manual refresh or schedule a manual refresh using the Redshift Console UI.
    To update the data in a materialized view, you can use the REFRESH MATERIALIZED VIEW statement at any time.

    REFRESH MATERIALIZED VIEW ev_station_data_extract;
    

Query the stream

  1. Query data in the materialized view.
    SELECT *
    FROM ev_station_data_extract;
    
  2. Query the refreshed materialized view to get usage statistics.
    SELECT to_timestamp(connectionTime, 'YYYY-MM-DD HH24:MI:SS') as connectiontime
       ,SUM(kWhDelivered) AS Energy_Consumed
       ,count(distinct userID) AS #Users
    FROM ev_station_data_extract
    GROUP BY to_timestamp(connectionTime, 'YYYY-MM-DD HH24:MI:SS')
    ORDER BY 1 DESC;
    

Useful commands

  • cdk ls list all stacks in the app
  • cdk synth emits the synthesized CloudFormation template
  • cdk deploy deploy this stack to your default AWS account/region
  • cdk diff compare deployed stack with current state
  • cdk docs open CDK documentation

Enjoy!

References