-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconsumer.py
82 lines (63 loc) · 2.24 KB
/
consumer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import os
import sys
import json
import warnings
from dotenv import load_dotenv
from confluent_kafka import Consumer, Producer
from logger import logging
sys.path.append("./audio")
from audio import convert
# Config
load_dotenv("./env")
warnings.filterwarnings("ignore")
class KafkaProducer:
def __init__(self, bootstrap_servers: str, topic: str) -> None:
self.bootstrap_servers = bootstrap_servers
self.topic = topic
self.producer = Producer({
"bootstrap.servers": self.bootstrap_servers,
})
def produce_messages(self, message):
self.producer.produce(self.topic, value = json.dumps(message).encode('utf-8'))
class KafkaConsumer:
def __init__(self, bootstrap_servers: str, group_id: str, topic: str) -> None:
self.consume = True
self.bootstrap_servers = bootstrap_servers
self.group_id = group_id
self.topic = topic
self.consumer = Consumer({
"bootstrap.servers": self.bootstrap_servers,
"group.id": self.group_id,
"auto.offset.reset": "earliest",
})
self.consumer.subscribe([self.topic])
def consume_messages(self):
while self.consume:
message = self.consumer.poll(1.0)
if message is None:
continue
if message.error():
print("Consumer error: {}".format(message.error()))
continue
data = json.loads(message.value())
logging.info(data)
video = data.get("video")
email = data.get("email")
self.consume = False
t_instance = convert.Transform()
t_instance.create_folder()
t_instance.get_object(video)
mp3_file = t_instance.convert(video)
text = t_instance.transform(video)
message = {
"email": email,
"video": video,
"audio": mp3_file,
"text": str(text)
}
producer.produce_messages(message)
self.consume = True
if __name__ == "__main__":
producer = KafkaProducer("192.168.29.7:9092", "Kafkatopic2")
consumer = KafkaConsumer("192.168.29.7:9092", "group1", "Kafkatopic1")
consumer.consume_messages()