From d56892c9d74bb67624050f54811a2a0cc12f49e5 Mon Sep 17 00:00:00 2001 From: Michael Vierling Date: Fri, 21 Jul 2023 14:40:11 -0700 Subject: [PATCH 1/8] create mobile job --- .../mobile-rewards-share-delta-lake-sink.yaml | 66 +++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100755 manifests/poc-data-cluster/prod/helium/mobile-rewards-share-delta-lake-sink.yaml diff --git a/manifests/poc-data-cluster/prod/helium/mobile-rewards-share-delta-lake-sink.yaml b/manifests/poc-data-cluster/prod/helium/mobile-rewards-share-delta-lake-sink.yaml new file mode 100755 index 00000000..39ed7f8b --- /dev/null +++ b/manifests/poc-data-cluster/prod/helium/mobile-rewards-share-delta-lake-sink.yaml @@ -0,0 +1,66 @@ +apiVersion: batch/v1 +kind: CronJob +metadata: + name: mobile-rewards-share-delta-lake-sink + namespace: helium +spec: + concurrencyPolicy: Forbid + schedule: "10 */4 * * *" + jobTemplate: + spec: + backoffLimit: 10 + template: + spec: + serviceAccountName: s3-data-lake-bucket-access + tolerations: # Schedule executor pods on spot instance group + - key: dedicated + operator: Equal + value: spark + effect: NoSchedule + nodeSelector: + nodegroup-type: spot + containers: + - name: mobile-rewards-delta-lake-sink + image: public.ecr.aws/k0m1p4t7/protobuf-delta-lake-sink:0.0.10 + imagePullPolicy: IfNotPresent + resources: + requests: + cpu: 1000m + memory: 6900Mi + limits: + memory: 6900Mi + env: + - name: AWS_S3_ALLOW_UNSAFE_RENAME + value: "true" + args: + - --source-bucket + - foundation-poc-data-requester-pays + - --source-region + - us-west-2 + - --file-prefix + - foundation-iot-verified-rewards/mobile_reward_share + - --source-proto-name + - "mobile_reward_share" + - --source-proto-base-url + - https://raw.githubusercontent.com/helium/proto/master/src + - --source-protos + - data_rate.proto + - --source-protos + - service/packet_verifier.proto + - --source-protos + - service/poc_lora.proto + - --source-protos + - region.proto + - --target-bucket + - foundation-data-lake-requester-pays + - --target-table + - bronze/mobile_reward_share + - --target-region + - us-west-2 + - --partition-timestamp-column + - start_period + - --partition-timestamp-date-divisor + - "86400" + - --batch-size + - "500000000" # Targetting 500mb parquet files, per databricks recs on large tables + restartPolicy: OnFailure \ No newline at end of file From f7d58b68c6b2cc1a32e18ca7bcaee6fbfe46f320 Mon Sep 17 00:00:00 2001 From: Michael Vierling Date: Fri, 21 Jul 2023 14:46:22 -0700 Subject: [PATCH 2/8] use poc_mobile.proto --- .../prod/helium/mobile-rewards-share-delta-lake-sink.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/manifests/poc-data-cluster/prod/helium/mobile-rewards-share-delta-lake-sink.yaml b/manifests/poc-data-cluster/prod/helium/mobile-rewards-share-delta-lake-sink.yaml index 39ed7f8b..0e2843f3 100755 --- a/manifests/poc-data-cluster/prod/helium/mobile-rewards-share-delta-lake-sink.yaml +++ b/manifests/poc-data-cluster/prod/helium/mobile-rewards-share-delta-lake-sink.yaml @@ -48,7 +48,7 @@ spec: - --source-protos - service/packet_verifier.proto - --source-protos - - service/poc_lora.proto + - service/poc_mobile.proto - --source-protos - region.proto - --target-bucket From 9def84e1ac2afaa8ff1a68aa0ea6af8101027b39 Mon Sep 17 00:00:00 2001 From: Michael Vierling Date: Fri, 21 Jul 2023 14:55:10 -0700 Subject: [PATCH 3/8] mobile silver job --- .../prod/spark/mobile-rewards-silver.yaml | 87 +++++++++++++++++++ 1 file changed, 87 insertions(+) create mode 100644 manifests/poc-data-cluster/prod/spark/mobile-rewards-silver.yaml diff --git a/manifests/poc-data-cluster/prod/spark/mobile-rewards-silver.yaml b/manifests/poc-data-cluster/prod/spark/mobile-rewards-silver.yaml new file mode 100644 index 00000000..c114851d --- /dev/null +++ b/manifests/poc-data-cluster/prod/spark/mobile-rewards-silver.yaml @@ -0,0 +1,87 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: mobile-rewards-silver-query + namespace: spark +data: + query.sql: | + SELECT + date, + start_period, + end_period, + b58encodeChecked(gateway_reward.hotspot_key) as gateway, + gateway_reward.dc_transfer_reward as dc_transfer_amount_mobile + FROM mobile_reward_share +--- +apiVersion: "sparkoperator.k8s.io/v1beta2" +kind: SparkApplication +metadata: + name: mobile-rewards-silver + namespace: spark +spec: + type: Scala + mode: cluster + image: "public.ecr.aws/k0m1p4t7/spark:v3.4.0-aws" + imagePullPolicy: Always + mainClass: Main + mainApplicationFile: "s3a://foundation-data-lake-requester-pays/jars/spark-streaming-sql-assembly-1.0.jar" + sparkVersion: "3.4.0" + restartPolicy: + type: OnFailure + onFailureRetries: 3 + onFailureRetryInterval: 10 + onSubmissionFailureRetries: 3 + onSubmissionFailureRetryInterval: 10 + sparkConf: + spark.databricks.delta.autoCompact.enabled: "true" + hadoopConf: + fs.s3a.aws.credentials.provider: com.amazonaws.auth.WebIdentityTokenCredentialsProvider + volumes: + - name: "tmp" + hostPath: + path: "/tmp" + type: Directory + - name: config-vol + configMap: + name: mobile-rewards-silver-query + items: + - key: query.sql + path: query.sql + driver: + serviceAccount: spark-data-lake-access + cores: 1 + coreLimit: "1200m" + memory: "512m" + nodeSelector: + node.kubernetes.io/instance-type: m5.large + envVars: + TABLE_MOBILE_REWARD_SHARE: s3a://foundation-data-lake-requester-pays/bronze/mobile_reward_share + PARTITION_BY: "date" + CHECKPOINT: s3a://foundation-data-lake-requester-pays/checkpoints/mobile-reward-share + OUTPUT: s3a://foundation-data-lake-requester-pays/silver/mobile-reward-share + QUERY_PATH: /app/query.sql + labels: + version: 3.4.0 + volumeMounts: + - name: "test-volume" + mountPath: "/tmp" + - name: config-vol + mountPath: /app + executor: + serviceAccount: spark-data-lake-access + cores: 1 + coreLimit: "1200m" + instances: 2 + memory: "10G" + tolerations: # Schedule executor pods on spot instance group + - key: dedicated + operator: Equal + value: spark + effect: NoSchedule + nodeSelector: + nodegroup-type: spot + labels: + version: 3.4.0 + volumeMounts: + - name: "tmp" + mountPath: "/tmp" \ No newline at end of file From 577a1fff7ab8b0e706e9ea5ecad5d96c273ea997 Mon Sep 17 00:00:00 2001 From: Michael Vierling Date: Sat, 22 Jul 2023 00:17:50 -0700 Subject: [PATCH 4/8] netid-counts, oui-counts and region-counts --- .../prod/spark/iot-netid-counts-silver.yaml | 83 +++++++++++++++++++ .../prod/spark/iot-oui-counts-silver.yaml | 83 +++++++++++++++++++ .../prod/spark/iot-region-counts-silver.yaml | 83 +++++++++++++++++++ 3 files changed, 249 insertions(+) create mode 100644 manifests/poc-data-cluster/prod/spark/iot-netid-counts-silver.yaml create mode 100644 manifests/poc-data-cluster/prod/spark/iot-oui-counts-silver.yaml create mode 100644 manifests/poc-data-cluster/prod/spark/iot-region-counts-silver.yaml diff --git a/manifests/poc-data-cluster/prod/spark/iot-netid-counts-silver.yaml b/manifests/poc-data-cluster/prod/spark/iot-netid-counts-silver.yaml new file mode 100644 index 00000000..912e3624 --- /dev/null +++ b/manifests/poc-data-cluster/prod/spark/iot-netid-counts-silver.yaml @@ -0,0 +1,83 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: iot-oui-counts-silver-query + namespace: spark +data: + query.sql: | + SELECT + date, net_id, count(net_id) AS count + FROM TABLE_IOT_PACKETS GROUP BY net_d ORDER BY neti_d DESC +--- +apiVersion: "sparkoperator.k8s.io/v1beta2" +kind: SparkApplication +metadata: + name: iot-netid-counts-silver + namespace: spark +spec: + type: Scala + mode: cluster + image: "public.ecr.aws/k0m1p4t7/spark:v3.4.0-aws" + imagePullPolicy: Always + mainClass: Main + mainApplicationFile: "s3a://foundation-data-lake-requester-pays/jars/spark-streaming-sql-assembly-1.0.jar" + sparkVersion: "3.4.0" + restartPolicy: + type: OnFailure + onFailureRetries: 3 + onFailureRetryInterval: 10 + onSubmissionFailureRetries: 3 + onSubmissionFailureRetryInterval: 10 + sparkConf: + spark.databricks.delta.autoCompact.enabled: "true" + hadoopConf: + fs.s3a.aws.credentials.provider: com.amazonaws.auth.WebIdentityTokenCredentialsProvider + volumes: + - name: "tmp" + hostPath: + path: "/tmp" + type: Directory + - name: config-vol + configMap: + name: iot-netid-counts-silver-query + items: + - key: query.sql + path: query.sql + driver: + serviceAccount: spark-data-lake-access + cores: 1 + coreLimit: "1200m" + memory: "512m" + nodeSelector: + node.kubernetes.io/instance-type: m5.large + envVars: + TABLE_IOT_PACKETS: s3a://foundation-data-lake-requester-pays/silver/iot-packets + PARTITION_BY: "date" + CHECKPOINT: s3a://foundation-data-lake-requester-pays/checkpoints/iot-netid-counts + OUTPUT: s3a://foundation-data-lake-requester-pays/silver/iot-netid-counts + QUERY_PATH: /app/query.sql + labels: + version: 3.4.0 + volumeMounts: + - name: "test-volume" + mountPath: "/tmp" + - name: config-vol + mountPath: /app + executor: + serviceAccount: spark-data-lake-access + cores: 1 + coreLimit: "1200m" + instances: 3 + memory: "10G" + tolerations: # Schedule executor pods on spot instance group + - key: dedicated + operator: Equal + value: spark + effect: NoSchedule + nodeSelector: + nodegroup-type: spot + labels: + version: 3.4.0 + volumeMounts: + - name: "tmp" + mountPath: "/tmp" \ No newline at end of file diff --git a/manifests/poc-data-cluster/prod/spark/iot-oui-counts-silver.yaml b/manifests/poc-data-cluster/prod/spark/iot-oui-counts-silver.yaml new file mode 100644 index 00000000..42ef03a3 --- /dev/null +++ b/manifests/poc-data-cluster/prod/spark/iot-oui-counts-silver.yaml @@ -0,0 +1,83 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: iot-oui-counts-silver-query + namespace: spark +data: + query.sql: | + SELECT + date, oui, count(oui) AS count + FROM TABLE_IOT_PACKETS GROUP BY oui ORDER BY oui DESC +--- +apiVersion: "sparkoperator.k8s.io/v1beta2" +kind: SparkApplication +metadata: + name: iot-oui-counts-silver + namespace: spark +spec: + type: Scala + mode: cluster + image: "public.ecr.aws/k0m1p4t7/spark:v3.4.0-aws" + imagePullPolicy: Always + mainClass: Main + mainApplicationFile: "s3a://foundation-data-lake-requester-pays/jars/spark-streaming-sql-assembly-1.0.jar" + sparkVersion: "3.4.0" + restartPolicy: + type: OnFailure + onFailureRetries: 3 + onFailureRetryInterval: 10 + onSubmissionFailureRetries: 3 + onSubmissionFailureRetryInterval: 10 + sparkConf: + spark.databricks.delta.autoCompact.enabled: "true" + hadoopConf: + fs.s3a.aws.credentials.provider: com.amazonaws.auth.WebIdentityTokenCredentialsProvider + volumes: + - name: "tmp" + hostPath: + path: "/tmp" + type: Directory + - name: config-vol + configMap: + name: iot-oui-counts-silver-query + items: + - key: query.sql + path: query.sql + driver: + serviceAccount: spark-data-lake-access + cores: 1 + coreLimit: "1200m" + memory: "512m" + nodeSelector: + node.kubernetes.io/instance-type: m5.large + envVars: + TABLE_IOT_PACKETS: s3a://foundation-data-lake-requester-pays/silver/iot-packets + PARTITION_BY: "date" + CHECKPOINT: s3a://foundation-data-lake-requester-pays/checkpoints/iot-oui-counts + OUTPUT: s3a://foundation-data-lake-requester-pays/silver/iot-oui-counts + QUERY_PATH: /app/query.sql + labels: + version: 3.4.0 + volumeMounts: + - name: "test-volume" + mountPath: "/tmp" + - name: config-vol + mountPath: /app + executor: + serviceAccount: spark-data-lake-access + cores: 1 + coreLimit: "1200m" + instances: 3 + memory: "10G" + tolerations: # Schedule executor pods on spot instance group + - key: dedicated + operator: Equal + value: spark + effect: NoSchedule + nodeSelector: + nodegroup-type: spot + labels: + version: 3.4.0 + volumeMounts: + - name: "tmp" + mountPath: "/tmp" \ No newline at end of file diff --git a/manifests/poc-data-cluster/prod/spark/iot-region-counts-silver.yaml b/manifests/poc-data-cluster/prod/spark/iot-region-counts-silver.yaml new file mode 100644 index 00000000..9b64b41b --- /dev/null +++ b/manifests/poc-data-cluster/prod/spark/iot-region-counts-silver.yaml @@ -0,0 +1,83 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: iot-region-counts-silver-query + namespace: spark +data: + query.sql: | + SELECT + date, region, count(region) AS count + FROM TABLE_IOT_PACKETS GROUP BY region ORDER BY region DESC +--- +apiVersion: "sparkoperator.k8s.io/v1beta2" +kind: SparkApplication +metadata: + name: iot-region-counts-silver + namespace: spark +spec: + type: Scala + mode: cluster + image: "public.ecr.aws/k0m1p4t7/spark:v3.4.0-aws" + imagePullPolicy: Always + mainClass: Main + mainApplicationFile: "s3a://foundation-data-lake-requester-pays/jars/spark-streaming-sql-assembly-1.0.jar" + sparkVersion: "3.4.0" + restartPolicy: + type: OnFailure + onFailureRetries: 3 + onFailureRetryInterval: 10 + onSubmissionFailureRetries: 3 + onSubmissionFailureRetryInterval: 10 + sparkConf: + spark.databricks.delta.autoCompact.enabled: "true" + hadoopConf: + fs.s3a.aws.credentials.provider: com.amazonaws.auth.WebIdentityTokenCredentialsProvider + volumes: + - name: "tmp" + hostPath: + path: "/tmp" + type: Directory + - name: config-vol + configMap: + name: iot-region-counts-silver-query + items: + - key: query.sql + path: query.sql + driver: + serviceAccount: spark-data-lake-access + cores: 1 + coreLimit: "1200m" + memory: "512m" + nodeSelector: + node.kubernetes.io/instance-type: m5.large + envVars: + TABLE_IOT_PACKETS: s3a://foundation-data-lake-requester-pays/silver/iot-packets + PARTITION_BY: "date" + CHECKPOINT: s3a://foundation-data-lake-requester-pays/checkpoints/iot-region-counts + OUTPUT: s3a://foundation-data-lake-requester-pays/silver/iot-region-counts + QUERY_PATH: /app/query.sql + labels: + version: 3.4.0 + volumeMounts: + - name: "test-volume" + mountPath: "/tmp" + - name: config-vol + mountPath: /app + executor: + serviceAccount: spark-data-lake-access + cores: 1 + coreLimit: "1200m" + instances: 3 + memory: "10G" + tolerations: # Schedule executor pods on spot instance group + - key: dedicated + operator: Equal + value: spark + effect: NoSchedule + nodeSelector: + nodegroup-type: spot + labels: + version: 3.4.0 + volumeMounts: + - name: "tmp" + mountPath: "/tmp" \ No newline at end of file From 9d6da05d86878715ca91c61e5271206caf465e58 Mon Sep 17 00:00:00 2001 From: Michael Vierling Date: Sat, 22 Jul 2023 19:35:56 -0700 Subject: [PATCH 5/8] add new jobs --- .../spark/iot-data-reward-totals-silver.yaml | 83 +++++++++++++++++++ .../prod/spark/iot-packet-counts-silver.yaml | 83 +++++++++++++++++++ .../prod/spark/iot-payload-totals-silver.yaml | 83 +++++++++++++++++++ 3 files changed, 249 insertions(+) create mode 100644 manifests/poc-data-cluster/prod/spark/iot-data-reward-totals-silver.yaml create mode 100644 manifests/poc-data-cluster/prod/spark/iot-packet-counts-silver.yaml create mode 100644 manifests/poc-data-cluster/prod/spark/iot-payload-totals-silver.yaml diff --git a/manifests/poc-data-cluster/prod/spark/iot-data-reward-totals-silver.yaml b/manifests/poc-data-cluster/prod/spark/iot-data-reward-totals-silver.yaml new file mode 100644 index 00000000..8a425f6f --- /dev/null +++ b/manifests/poc-data-cluster/prod/spark/iot-data-reward-totals-silver.yaml @@ -0,0 +1,83 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: iot-data-reward-totals-silver-query + namespace: spark +data: + query.sql: | + SELECT + date, sum(dc_transfer_amount_iot) AS data-iot-total + FROM TABLE_IOT_REWARDS +--- +apiVersion: "sparkoperator.k8s.io/v1beta2" +kind: SparkApplication +metadata: + name: iot-data-reward-totals-silver + namespace: spark +spec: + type: Scala + mode: cluster + image: "public.ecr.aws/k0m1p4t7/spark:v3.4.0-aws" + imagePullPolicy: Always + mainClass: Main + mainApplicationFile: "s3a://foundation-data-lake-requester-pays/jars/spark-streaming-sql-assembly-1.0.jar" + sparkVersion: "3.4.0" + restartPolicy: + type: OnFailure + onFailureRetries: 3 + onFailureRetryInterval: 10 + onSubmissionFailureRetries: 3 + onSubmissionFailureRetryInterval: 10 + sparkConf: + spark.databricks.delta.autoCompact.enabled: "true" + hadoopConf: + fs.s3a.aws.credentials.provider: com.amazonaws.auth.WebIdentityTokenCredentialsProvider + volumes: + - name: "tmp" + hostPath: + path: "/tmp" + type: Directory + - name: config-vol + configMap: + name: iot-data-reward-totals-silver-query + items: + - key: query.sql + path: query.sql + driver: + serviceAccount: spark-data-lake-access + cores: 1 + coreLimit: "1200m" + memory: "512m" + nodeSelector: + node.kubernetes.io/instance-type: m5.large + envVars: + TABLE_IOT_REWARDS: s3a://foundation-data-lake-requester-pays/silver/iot-reward-share + PARTITION_BY: "date" + CHECKPOINT: s3a://foundation-data-lake-requester-pays/checkpoints/iot-data-reward-totals + OUTPUT: s3a://foundation-data-lake-requester-pays/silver/iot-data-reward-totals + QUERY_PATH: /app/query.sql + labels: + version: 3.4.0 + volumeMounts: + - name: "test-volume" + mountPath: "/tmp" + - name: config-vol + mountPath: /app + executor: + serviceAccount: spark-data-lake-access + cores: 1 + coreLimit: "1200m" + instances: 3 + memory: "10G" + tolerations: # Schedule executor pods on spot instance group + - key: dedicated + operator: Equal + value: spark + effect: NoSchedule + nodeSelector: + nodegroup-type: spot + labels: + version: 3.4.0 + volumeMounts: + - name: "tmp" + mountPath: "/tmp" \ No newline at end of file diff --git a/manifests/poc-data-cluster/prod/spark/iot-packet-counts-silver.yaml b/manifests/poc-data-cluster/prod/spark/iot-packet-counts-silver.yaml new file mode 100644 index 00000000..b1512ad1 --- /dev/null +++ b/manifests/poc-data-cluster/prod/spark/iot-packet-counts-silver.yaml @@ -0,0 +1,83 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: iot-packet-counts-silver-query + namespace: spark +data: + query.sql: | + SELECT + date, count(payload_hash) AS count + FROM TABLE_IOT_PACKETS +--- +apiVersion: "sparkoperator.k8s.io/v1beta2" +kind: SparkApplication +metadata: + name: iot-packet-counts-silver + namespace: spark +spec: + type: Scala + mode: cluster + image: "public.ecr.aws/k0m1p4t7/spark:v3.4.0-aws" + imagePullPolicy: Always + mainClass: Main + mainApplicationFile: "s3a://foundation-data-lake-requester-pays/jars/spark-streaming-sql-assembly-1.0.jar" + sparkVersion: "3.4.0" + restartPolicy: + type: OnFailure + onFailureRetries: 3 + onFailureRetryInterval: 10 + onSubmissionFailureRetries: 3 + onSubmissionFailureRetryInterval: 10 + sparkConf: + spark.databricks.delta.autoCompact.enabled: "true" + hadoopConf: + fs.s3a.aws.credentials.provider: com.amazonaws.auth.WebIdentityTokenCredentialsProvider + volumes: + - name: "tmp" + hostPath: + path: "/tmp" + type: Directory + - name: config-vol + configMap: + name: iot-packet-counts-silver-query + items: + - key: query.sql + path: query.sql + driver: + serviceAccount: spark-data-lake-access + cores: 1 + coreLimit: "1200m" + memory: "512m" + nodeSelector: + node.kubernetes.io/instance-type: m5.large + envVars: + TABLE_IOT_PACKETS: s3a://foundation-data-lake-requester-pays/silver/iot-packets + PARTITION_BY: "date" + CHECKPOINT: s3a://foundation-data-lake-requester-pays/checkpoints/iot-packet-counts + OUTPUT: s3a://foundation-data-lake-requester-pays/silver/iot-packet-counts + QUERY_PATH: /app/query.sql + labels: + version: 3.4.0 + volumeMounts: + - name: "test-volume" + mountPath: "/tmp" + - name: config-vol + mountPath: /app + executor: + serviceAccount: spark-data-lake-access + cores: 1 + coreLimit: "1200m" + instances: 3 + memory: "10G" + tolerations: # Schedule executor pods on spot instance group + - key: dedicated + operator: Equal + value: spark + effect: NoSchedule + nodeSelector: + nodegroup-type: spot + labels: + version: 3.4.0 + volumeMounts: + - name: "tmp" + mountPath: "/tmp" \ No newline at end of file diff --git a/manifests/poc-data-cluster/prod/spark/iot-payload-totals-silver.yaml b/manifests/poc-data-cluster/prod/spark/iot-payload-totals-silver.yaml new file mode 100644 index 00000000..43eb7743 --- /dev/null +++ b/manifests/poc-data-cluster/prod/spark/iot-payload-totals-silver.yaml @@ -0,0 +1,83 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: iot-payload-totals-silver-query + namespace: spark +data: + query.sql: | + SELECT + date, sum(payload_size) AS payload_total + FROM TABLE_IOT_PACKETS +--- +apiVersion: "sparkoperator.k8s.io/v1beta2" +kind: SparkApplication +metadata: + name: iot-payload-totals-silver + namespace: spark +spec: + type: Scala + mode: cluster + image: "public.ecr.aws/k0m1p4t7/spark:v3.4.0-aws" + imagePullPolicy: Always + mainClass: Main + mainApplicationFile: "s3a://foundation-data-lake-requester-pays/jars/spark-streaming-sql-assembly-1.0.jar" + sparkVersion: "3.4.0" + restartPolicy: + type: OnFailure + onFailureRetries: 3 + onFailureRetryInterval: 10 + onSubmissionFailureRetries: 3 + onSubmissionFailureRetryInterval: 10 + sparkConf: + spark.databricks.delta.autoCompact.enabled: "true" + hadoopConf: + fs.s3a.aws.credentials.provider: com.amazonaws.auth.WebIdentityTokenCredentialsProvider + volumes: + - name: "tmp" + hostPath: + path: "/tmp" + type: Directory + - name: config-vol + configMap: + name: iot-payload-totals-silver-query + items: + - key: query.sql + path: query.sql + driver: + serviceAccount: spark-data-lake-access + cores: 1 + coreLimit: "1200m" + memory: "512m" + nodeSelector: + node.kubernetes.io/instance-type: m5.large + envVars: + TABLE_IOT_PACKETS: s3a://foundation-data-lake-requester-pays/silver/iot-packets + PARTITION_BY: "date" + CHECKPOINT: s3a://foundation-data-lake-requester-pays/checkpoints/iot-payload-totals + OUTPUT: s3a://foundation-data-lake-requester-pays/silver/iot-payload-totals + QUERY_PATH: /app/query.sql + labels: + version: 3.4.0 + volumeMounts: + - name: "test-volume" + mountPath: "/tmp" + - name: config-vol + mountPath: /app + executor: + serviceAccount: spark-data-lake-access + cores: 1 + coreLimit: "1200m" + instances: 3 + memory: "10G" + tolerations: # Schedule executor pods on spot instance group + - key: dedicated + operator: Equal + value: spark + effect: NoSchedule + nodeSelector: + nodegroup-type: spot + labels: + version: 3.4.0 + volumeMounts: + - name: "tmp" + mountPath: "/tmp" \ No newline at end of file From 5018a1adc6ddb7b7663327c690e84a28b4d95fae Mon Sep 17 00:00:00 2001 From: Michael Vierling Date: Sun, 23 Jul 2023 20:05:35 -0700 Subject: [PATCH 6/8] oui is daily operator --- .../prod/spark/iot-netid-counts-silver.yaml | 2 +- .../prod/spark/iot-oui-counts-silver.yaml | 142 +++++++++--------- 2 files changed, 76 insertions(+), 68 deletions(-) diff --git a/manifests/poc-data-cluster/prod/spark/iot-netid-counts-silver.yaml b/manifests/poc-data-cluster/prod/spark/iot-netid-counts-silver.yaml index 912e3624..ee07749f 100644 --- a/manifests/poc-data-cluster/prod/spark/iot-netid-counts-silver.yaml +++ b/manifests/poc-data-cluster/prod/spark/iot-netid-counts-silver.yaml @@ -7,7 +7,7 @@ data: query.sql: | SELECT date, net_id, count(net_id) AS count - FROM TABLE_IOT_PACKETS GROUP BY net_d ORDER BY neti_d DESC + FROM TABLE_IOT_PACKETS GROUP BY net_id ORDER BY net_id DESC --- apiVersion: "sparkoperator.k8s.io/v1beta2" kind: SparkApplication diff --git a/manifests/poc-data-cluster/prod/spark/iot-oui-counts-silver.yaml b/manifests/poc-data-cluster/prod/spark/iot-oui-counts-silver.yaml index 42ef03a3..30fde404 100644 --- a/manifests/poc-data-cluster/prod/spark/iot-oui-counts-silver.yaml +++ b/manifests/poc-data-cluster/prod/spark/iot-oui-counts-silver.yaml @@ -7,77 +7,85 @@ data: query.sql: | SELECT date, oui, count(oui) AS count - FROM TABLE_IOT_PACKETS GROUP BY oui ORDER BY oui DESC + FROM TABLE_IOT_PACKETS + GROUP BY oui + ORDER BY oui DESC + WHERE date_add(current_date(), -1); --- apiVersion: "sparkoperator.k8s.io/v1beta2" -kind: SparkApplication +kind: ScheduledSparkApplication metadata: name: iot-oui-counts-silver namespace: spark spec: - type: Scala - mode: cluster - image: "public.ecr.aws/k0m1p4t7/spark:v3.4.0-aws" - imagePullPolicy: Always - mainClass: Main - mainApplicationFile: "s3a://foundation-data-lake-requester-pays/jars/spark-streaming-sql-assembly-1.0.jar" - sparkVersion: "3.4.0" - restartPolicy: - type: OnFailure - onFailureRetries: 3 - onFailureRetryInterval: 10 - onSubmissionFailureRetries: 3 - onSubmissionFailureRetryInterval: 10 - sparkConf: - spark.databricks.delta.autoCompact.enabled: "true" - hadoopConf: - fs.s3a.aws.credentials.provider: com.amazonaws.auth.WebIdentityTokenCredentialsProvider - volumes: - - name: "tmp" - hostPath: - path: "/tmp" - type: Directory - - name: config-vol - configMap: - name: iot-oui-counts-silver-query - items: - - key: query.sql - path: query.sql - driver: - serviceAccount: spark-data-lake-access - cores: 1 - coreLimit: "1200m" - memory: "512m" - nodeSelector: - node.kubernetes.io/instance-type: m5.large - envVars: - TABLE_IOT_PACKETS: s3a://foundation-data-lake-requester-pays/silver/iot-packets - PARTITION_BY: "date" - CHECKPOINT: s3a://foundation-data-lake-requester-pays/checkpoints/iot-oui-counts - OUTPUT: s3a://foundation-data-lake-requester-pays/silver/iot-oui-counts - QUERY_PATH: /app/query.sql - labels: - version: 3.4.0 - volumeMounts: - - name: "test-volume" - mountPath: "/tmp" - - name: config-vol - mountPath: /app - executor: - serviceAccount: spark-data-lake-access - cores: 1 - coreLimit: "1200m" - instances: 3 - memory: "10G" - tolerations: # Schedule executor pods on spot instance group - - key: dedicated - operator: Equal - value: spark - effect: NoSchedule - nodeSelector: - nodegroup-type: spot - labels: - version: 3.4.0 - volumeMounts: + schedule: "@daily" + concurrencyPolicy: Allow + successfulRunHistoryLimit: 1 + failedRunHistoryLimit: 3 + template: + type: Scala + mode: cluster + image: "public.ecr.aws/k0m1p4t7/spark:v3.4.0-aws" + imagePullPolicy: Always + mainClass: Main + mainApplicationFile: "s3a://foundation-data-lake-requester-pays/jars/spark-streaming-sql-assembly-1.0.jar" + sparkVersion: "3.4.0" + restartPolicy: + type: OnFailure + onFailureRetries: 3 + onFailureRetryInterval: 10 + onSubmissionFailureRetries: 3 + onSubmissionFailureRetryInterval: 10 + sparkConf: + spark.databricks.delta.autoCompact.enabled: "true" + hadoopConf: + fs.s3a.aws.credentials.provider: com.amazonaws.auth.WebIdentityTokenCredentialsProvider + volumes: - name: "tmp" - mountPath: "/tmp" \ No newline at end of file + hostPath: + path: "/tmp" + type: Directory + - name: config-vol + configMap: + name: iot-oui-counts-silver-query + items: + - key: query.sql + path: query.sql + driver: + serviceAccount: spark-data-lake-access + cores: 1 + coreLimit: "1200m" + memory: "512m" + nodeSelector: + node.kubernetes.io/instance-type: m5.large + envVars: + TABLE_IOT_PACKETS: s3a://foundation-data-lake-requester-pays/silver/iot-packets + PARTITION_BY: "date" + CHECKPOINT: s3a://foundation-data-lake-requester-pays/checkpoints/iot-oui-counts + OUTPUT: s3a://foundation-data-lake-requester-pays/silver/iot-oui-counts + QUERY_PATH: /app/query.sql + labels: + version: 3.4.0 + volumeMounts: + - name: "test-volume" + mountPath: "/tmp" + - name: config-vol + mountPath: /app + executor: + serviceAccount: spark-data-lake-access + cores: 1 + coreLimit: "1200m" + instances: 3 + memory: "10G" + tolerations: # Schedule executor pods on spot instance group + - key: dedicated + operator: Equal + value: spark + effect: NoSchedule + nodeSelector: + nodegroup-type: spot + labels: + version: 3.4.0 + volumeMounts: + - name: "tmp" + mountPath: "/tmp" \ No newline at end of file From 946b830d4f87c83584cd0a82be5c1ce1af6b38bd Mon Sep 17 00:00:00 2001 From: Michael Vierling Date: Mon, 24 Jul 2023 11:15:44 -0700 Subject: [PATCH 7/8] refine table name --- .../prod/spark/iot-data-reward-totals-silver.yaml | 4 ++-- .../poc-data-cluster/prod/spark/iot-netid-counts-silver.yaml | 4 +++- .../poc-data-cluster/prod/spark/iot-oui-counts-silver.yaml | 2 +- .../poc-data-cluster/prod/spark/iot-packet-counts-silver.yaml | 2 +- .../prod/spark/iot-payload-totals-silver.yaml | 2 +- .../poc-data-cluster/prod/spark/iot-region-counts-silver.yaml | 4 +++- 6 files changed, 11 insertions(+), 7 deletions(-) diff --git a/manifests/poc-data-cluster/prod/spark/iot-data-reward-totals-silver.yaml b/manifests/poc-data-cluster/prod/spark/iot-data-reward-totals-silver.yaml index 8a425f6f..9ad93179 100644 --- a/manifests/poc-data-cluster/prod/spark/iot-data-reward-totals-silver.yaml +++ b/manifests/poc-data-cluster/prod/spark/iot-data-reward-totals-silver.yaml @@ -7,7 +7,7 @@ data: query.sql: | SELECT date, sum(dc_transfer_amount_iot) AS data-iot-total - FROM TABLE_IOT_REWARDS + FROM iot_reward_share --- apiVersion: "sparkoperator.k8s.io/v1beta2" kind: SparkApplication @@ -51,7 +51,7 @@ spec: nodeSelector: node.kubernetes.io/instance-type: m5.large envVars: - TABLE_IOT_REWARDS: s3a://foundation-data-lake-requester-pays/silver/iot-reward-share + TABLE_IOT_REWARD_SHARE: s3a://foundation-data-lake-requester-pays/silver/iot-reward-share PARTITION_BY: "date" CHECKPOINT: s3a://foundation-data-lake-requester-pays/checkpoints/iot-data-reward-totals OUTPUT: s3a://foundation-data-lake-requester-pays/silver/iot-data-reward-totals diff --git a/manifests/poc-data-cluster/prod/spark/iot-netid-counts-silver.yaml b/manifests/poc-data-cluster/prod/spark/iot-netid-counts-silver.yaml index ee07749f..2849bd62 100644 --- a/manifests/poc-data-cluster/prod/spark/iot-netid-counts-silver.yaml +++ b/manifests/poc-data-cluster/prod/spark/iot-netid-counts-silver.yaml @@ -7,7 +7,9 @@ data: query.sql: | SELECT date, net_id, count(net_id) AS count - FROM TABLE_IOT_PACKETS GROUP BY net_id ORDER BY net_id DESC + FROM iot_packets + GROUP BY net_id + ORDER BY net_id DESC --- apiVersion: "sparkoperator.k8s.io/v1beta2" kind: SparkApplication diff --git a/manifests/poc-data-cluster/prod/spark/iot-oui-counts-silver.yaml b/manifests/poc-data-cluster/prod/spark/iot-oui-counts-silver.yaml index 30fde404..ddc81d53 100644 --- a/manifests/poc-data-cluster/prod/spark/iot-oui-counts-silver.yaml +++ b/manifests/poc-data-cluster/prod/spark/iot-oui-counts-silver.yaml @@ -7,7 +7,7 @@ data: query.sql: | SELECT date, oui, count(oui) AS count - FROM TABLE_IOT_PACKETS + FROM iot_packets GROUP BY oui ORDER BY oui DESC WHERE date_add(current_date(), -1); diff --git a/manifests/poc-data-cluster/prod/spark/iot-packet-counts-silver.yaml b/manifests/poc-data-cluster/prod/spark/iot-packet-counts-silver.yaml index b1512ad1..6ebe2d20 100644 --- a/manifests/poc-data-cluster/prod/spark/iot-packet-counts-silver.yaml +++ b/manifests/poc-data-cluster/prod/spark/iot-packet-counts-silver.yaml @@ -7,7 +7,7 @@ data: query.sql: | SELECT date, count(payload_hash) AS count - FROM TABLE_IOT_PACKETS + FROM iot_packets --- apiVersion: "sparkoperator.k8s.io/v1beta2" kind: SparkApplication diff --git a/manifests/poc-data-cluster/prod/spark/iot-payload-totals-silver.yaml b/manifests/poc-data-cluster/prod/spark/iot-payload-totals-silver.yaml index 43eb7743..5119d67e 100644 --- a/manifests/poc-data-cluster/prod/spark/iot-payload-totals-silver.yaml +++ b/manifests/poc-data-cluster/prod/spark/iot-payload-totals-silver.yaml @@ -7,7 +7,7 @@ data: query.sql: | SELECT date, sum(payload_size) AS payload_total - FROM TABLE_IOT_PACKETS + FROM iot_packets --- apiVersion: "sparkoperator.k8s.io/v1beta2" kind: SparkApplication diff --git a/manifests/poc-data-cluster/prod/spark/iot-region-counts-silver.yaml b/manifests/poc-data-cluster/prod/spark/iot-region-counts-silver.yaml index 9b64b41b..efac80a3 100644 --- a/manifests/poc-data-cluster/prod/spark/iot-region-counts-silver.yaml +++ b/manifests/poc-data-cluster/prod/spark/iot-region-counts-silver.yaml @@ -7,7 +7,9 @@ data: query.sql: | SELECT date, region, count(region) AS count - FROM TABLE_IOT_PACKETS GROUP BY region ORDER BY region DESC + FROM iot_packets + GROUP BY region + ORDER BY region DESC --- apiVersion: "sparkoperator.k8s.io/v1beta2" kind: SparkApplication From c0899caeba51d2509ab2ef2d0cc535ff05b1ce90 Mon Sep 17 00:00:00 2001 From: Michael Vierling Date: Thu, 27 Jul 2023 10:45:17 -0700 Subject: [PATCH 8/8] prefer underscores in sql column names --- .../prod/spark/iot-data-reward-totals-silver.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/manifests/poc-data-cluster/prod/spark/iot-data-reward-totals-silver.yaml b/manifests/poc-data-cluster/prod/spark/iot-data-reward-totals-silver.yaml index 9ad93179..97ef20db 100644 --- a/manifests/poc-data-cluster/prod/spark/iot-data-reward-totals-silver.yaml +++ b/manifests/poc-data-cluster/prod/spark/iot-data-reward-totals-silver.yaml @@ -6,7 +6,7 @@ metadata: data: query.sql: | SELECT - date, sum(dc_transfer_amount_iot) AS data-iot-total + date, sum(dc_transfer_amount_iot) AS data_iot_total FROM iot_reward_share --- apiVersion: "sparkoperator.k8s.io/v1beta2"