Skip to content

Latest commit

 

History

History
 
 

ingestion-pipeline

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 

ingestion-pipeline

Streaming pipeline demo.

Prerequisites

  1. Google PubSub Emulator
    gcloud components install pubsub-emulator

Run

Local Run

Start Google PubSub Emulator

export PROJECT_ID=my-project-id
export PIPELINE_NAME=ingestion

gcloud beta emulators pubsub start --project=${PROJECT_ID} --host-port=localhost:8085

Run Job

publish sample data into PubSub Emulator for testing

# make sure @Ignore is uncommented in ingestion's `PubSubProducerTest.kt`
gradle :apps:ingestion-pipeline:test --tests "micro.apps.pipeline.PubSubProducerTest.generateTestData"

run subscription job

export PROJECT_ID=my-project-id
export PIPELINE_NAME=ingestion
GCS_BUCKET=ingestion-bucket
export PUBSUB_EMULATOR_HOST=http://localhost:8085

gradle :apps:ingestion-pipeline:run --args="--runner=DirectRunner \
--project=${PROJECT_ID} \
--jobName=${PIPELINE_NAME} \
--gcsBucket=${GCS_BUCKET} \
--pubsubRootUrl=${PUBSUB_EMULATOR_HOST}"

# run non-stream mode 
gradle :apps:ingestion-pipeline:run --args="--runner=DirectRunner \
--project=${PROJECT_ID} \
--jobName=${PIPELINE_NAME} \
--gcsBucket=${GCS_BUCKET} \
--pubsubRootUrl=${PUBSUB_EMULATOR_HOST} \
--stream=false \
--gcpTempLocation=./build/temp \
--inputPath=./src/main/resources/data/*.avro \
--outputSuccessPath=./build/output/success \
--outputFailurePath=./build/output/failure"

# or via jar [gradle :apps:ingestion-pipeline:build -x test]
java -jar ./apps/ingestion-pipeline/build/libs/ingestion-pipeline-0.1.2-SNAPSHOT-all.jar --runner=DirectRunner \
--project=${PROJECT_ID} \
--jobName=${PIPELINE_NAME} \
--gcsBucket=${GCS_BUCKET} \
--pubsubRootUrl=${PUBSUB_EMULATOR_HOST}

Cloud Run

PROJECT_ID=<my-project-id>
PIPELINE_NAME=streaming
GCS_BUCKET=<my-gcs-bucket>
export GOOGLE_APPLICATION_CREDENTIALS=<full-path-to-your-json>

gradle :apps:ingestion-pipeline:run --args="--runner=DataflowRunner \
--project=${PROJECT_ID} \
--jobName=${PIPELINE_NAME} \
--gcsBucket=${GCS_BUCKET} \
--inputSubscription=projects/${PROJECT_ID}/subscriptions/${PIPELINE_NAME}-input \
--outputSuccessTopic=projects/${PROJECT_ID}/topics/${PIPELINE_NAME}-output-success" \
--outputFailureTopic=projects/${PROJECT_ID}/topics/${PIPELINE_NAME}-output-failure" \
--gcpTempLocation=gs://${PROJECT_ID}/dataflow/${PIPELINE_NAME}/temp/ \
--stagingLocation=gs://${PROJECT_ID}/dataflow/${PIPELINE_NAME}/staging"

# Or with fatJar
java -jar ./apps/ingestion-pipeline/build/libs/ingestion-pipeline-0.1.2-SNAPSHOT-all.jar --runner=DataflowRunner \
--project=${PROJECT_ID} \
--jobName=${PIPELINE_NAME} \
--gcsBucket=${GCS_BUCKET} \
--inputSubscription=projects/${PROJECT_ID}/subscriptions/${PIPELINE_NAME}-input \
--outputSuccessTopic=projects/${PROJECT_ID}/topics/${PIPELINE_NAME}-output-success" \
--outputFailureTopic=projects/${PROJECT_ID}/topics/${PIPELINE_NAME}-output-failure" \
--gcpTempLocation=gs://${PROJECT_ID}/dataflow/${PIPELINE_NAME}/temp/ \
--stagingLocation=gs://${PROJECT_ID}/dataflow/${PIPELINE_NAME}/staging/ \

Creating Template

gradle :apps:ingestion-pipeline:run --args="--runner=DataflowRunner --project=$PROJECT_ID --gcpTempLocation=gs://${PROJECT_ID}/dataflow/pipelines/${PIPELINE_NAME}/temp/ --stagingLocation=gs://${PROJECT_ID}/dataflow/pipelines/${PIPELINE_NAME}/staging/ --templateLocation=gs://${PROJECT_ID}/dataflow/pipelines/${PIPELINE_NAME}/template/${PIPELINE_NAME}"

Creating as template from VM

java -jar ./apps/ingestion-pipeline/build/libs/ingestion-pipeline-0.1.6-SNAPSHOT-all.jar --runner=DataFlowRunner \
    --project=$PROJECT_ID \
    --gcpTempLocation=gs://${PROJECT_ID}/dataflow/pipelines/${PIPELINE_NAME}/temp/ \
    --stagingLocation=gs://${PROJECT_ID}/dataflow/pipelines/${PIPELINE_NAME}/staging/ \
    --templateLocation=gs://${PROJECT_ID}/dataflow/pipelines/${PIPELINE_NAME}/template/${PIPELINE_NAME}

Running template

Create Job

gcloud dataflow jobs run wordcount \
    --gcs-location gs://${PROJECT_ID}/dataflow/pipelines/${PIPELINE_NAME}/template/${PIPELINE_NAME} \
    --parameters inputFile=gs://${PROJECT_ID}/dataflow/pipelines/${PIPELINE_NAME}/input/shakespeare.txt,gs://${PROJECT_ID/dataflow/pipelines/${PIPELINE_NAME}/output/output.txt

Test

gradle :apps:ingestion-pipeline:test

Build

# clean
gradle :apps:ingestion-pipeline:clean
# make fatJar
gradle :apps:ingestion-pipeline:build -x test

Using PubSub Emulator

you can generate topics and subscription for Emulator via REST API

Setup Env

export PROJECT_ID=my-project-id
export PIPELINE_NAME=ingestion
export PUBSUB_EMULATOR_HOST=http://localhost:8085

Create

# Create Topics and Subscriptions every time you restart pubsub emulator 
# if you run `generateTestData` test, it will also generate below topics.
curl -X PUT ${PUBSUB_EMULATOR_HOST}/v1/projects/${PROJECT_ID}/topics/${PIPELINE_NAME}-input
curl -X PUT ${PUBSUB_EMULATOR_HOST}/v1/projects/${PROJECT_ID}/topics/${PIPELINE_NAME}-output-success
curl -X PUT ${PUBSUB_EMULATOR_HOST}/v1/projects/${PROJECT_ID}/topics/${PIPELINE_NAME}-output-failure

# Create a subscription to input topic
curl -X PUT ${PUBSUB_EMULATOR_HOST}/v1/projects/${PROJECT_ID}/subscriptions/${PIPELINE_NAME}-input \
-H "Content-Type: application/json" \
-d '{
"topic": "'"projects/${PROJECT_ID}/topics/${PIPELINE_NAME}-input"'"
}' 

# Create a subscription to output success topic
curl -X PUT ${PUBSUB_EMULATOR_HOST}/v1/projects/${PROJECT_ID}/subscriptions/${PIPELINE_NAME}-output-success \
-H "Content-Type: application/json" \
-d '{
"topic": "'"projects/${PROJECT_ID}/topics/${PIPELINE_NAME}-output-success"'"
}' 

# Create a subscription to output failure topic
curl -X PUT ${PUBSUB_EMULATOR_HOST}/v1/projects/${PROJECT_ID}/subscriptions/${PIPELINE_NAME}-output-failure \
-H "Content-Type: application/json" \
-d '{
"topic": "'"projects/${PROJECT_ID}/topics/${PIPELINE_NAME}-output-failure"'"
}' 

Verify

# List Topics (optional)
curl -X GET ${PUBSUB_EMULATOR_HOST}/v1/projects/${PROJECT_ID}/topics
# List Subscriptions (optional)
curl -X GET ${PUBSUB_EMULATOR_HOST}/v1/projects/${PROJECT_ID}/subscriptions
 
# publishing a message to input topic
curl -d '{"messages": [{"data": "c3Vwc3VwCg=="}]}' -H "Content-Type: application/json" -X POST ${PUBSUB_EMULATOR_HOST}/v1/projects/${PROJECT_ID}/topics/${PIPELINE_NAME}-input:publish
 
# Read messages from success topic
curl -d '{"returnImmediately":true, "maxMessages":1}' -H "Content-Type: application/json" -X POST ${PUBSUB_EMULATOR_HOST}/v1/projects/${PROJECT_ID}/subscriptions/${PIPELINE_NAME}-output-success:pull

# Read messages from error topic
curl -d '{"returnImmediately":true, "maxMessages":1}' -H "Content-Type: application/json" -X POST ${PUBSUB_EMULATOR_HOST}/v1/projects/${PROJECT_ID}/subscriptions/${PIPELINE_NAME}-output-failure:pull

TODO