-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathHamlet_Stream_Calculator_Each.py
65 lines (51 loc) · 2.24 KB
/
Hamlet_Stream_Calculator_Each.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import argparse
import io
import json
import logging
import os
import pandas as pd
import re
from collections import Counter
from io import StringIO
from pyflink.common import SimpleStringSchema
from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import (DataTypes, EnvironmentSettings, FormatDescriptor,
Schema, StreamTableEnvironment, TableDescriptor,
TableEnvironment, udf)
from pyflink.table.expressions import col, lit
def remove_punctuation(text):
return re.sub(r'[^\w\s]','',text)
def count_words(text):
words = text.split()
# return Counter(words)
result = dict(Counter(words))
return(result)
def read_from_kafka():
# Create a Flink execution environment
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
# Add the Flink SQL Kafka connector jar file to the classpath
env.add_jars("file:///home/hadoop/Desktop/PyFlink-Tutorial/flink-sql-connector-kafka-3.1-SNAPSHOT.jar")
# Print a message to indicate that data reading from Kafka has started
print("start reading data from kafka")
# Create a Kafka consumer
kafka_consumer = FlinkKafkaConsumer(
topics='hamlet', # The topic to consume messages from
deserialization_schema= SimpleStringSchema('UTF-8'), # The schema to deserialize messages
properties={'bootstrap.servers': 'localhost:9092', 'group.id': 'my-group'} # The Kafka broker address and consumer group ID
)
# Start reading messages from the earliest offset
kafka_consumer.set_start_from_earliest()
# Add the Kafka consumer as a source to the Flink execution environment
stream_original_text = env.add_source(kafka_consumer)
# Remove punctuation from the text
stream_remove_punctuation = stream_original_text.map(lambda x: remove_punctuation(x))
# Count the words in the text
stream_count_words = stream_remove_punctuation.map(lambda x: count_words(x))
# Print the word counts to the console
stream_count_words.print()
print('\n\n',type(stream_count_words),'\n\n')
# Start the Flink job
env.execute()
read_from_kafka()