-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
104 lines (83 loc) · 2.86 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package main
import (
"fmt"
"math/rand"
"strconv"
"time"
core_pubsub "github.com/Saumya40-codes/pubsub/core"
)
func main() {
fmt.Println("Stock Monitoring System")
// Creating a broker instance
broker := core_pubsub.GetorSetBrokerInstance()
// Defining topics and partitions
topics := map[string]int{
"stock-price-update": 3,
"stock-news-update": 2,
"trade-execution": 2,
"portfolio-update": 1,
}
// Creating topics
for topicName, partitions := range topics {
newTopic, err := broker.CreateNewTopic(topicName, partitions)
if err != nil {
fmt.Println("Error creating topic:", err)
continue
}
fmt.Println("Created topic:", newTopic.Name)
}
fmt.Println("\n")
// Create consumers and subscribe them to topics
consumerGroups := map[string][]string{
"stock-price-update": {"price-analyzer", "alert-generator"},
"stock-news-update": {"news-aggregator", "sentiment-analyzer"},
"trade-execution": {"trade-processor", "order-verifier"},
"portfolio-update": {"portfolio-manager"},
}
var groupId int = 0
for topicName, groups := range consumerGroups {
for _, name := range groups {
consumer := core_pubsub.CreateConsumer(name, strconv.Itoa(groupId))
err := consumer.Subscribe(consumer, topicName)
if err != nil {
fmt.Println("Error subscribing:", err)
continue
}
go consumer.Run()
fmt.Println("==============SUBSCRIBE=======================")
fmt.Printf("Consumer %s in Consumer Group %d subscribed to %s\n", name, groupId, topicName)
fmt.Println("=============================================")
}
groupId++
}
// Simulate some delay to allow potential race conditions
time.Sleep(time.Second * 4)
// Create producers and publish messages
producers := map[string]string{
"stock-price-update-producer": "stock-price-update",
"stock-news-update-producer": "stock-news-update",
"trade-execution-producer": "trade-execution",
"portfolio-update-producer": "portfolio-update",
}
for producerName, topicName := range producers {
producer := core_pubsub.CreateProducer(producerName)
go func(topicName string, producerName string) {
for {
// Create and publish a message
var partitionIndex int = rand.Intn(topics[topicName]) // randomly select a partition
messageContent := fmt.Sprintf("Message regarding %s from %s", topicName, producerName)
message := core_pubsub.CreateMessage(topicName, messageContent, partitionIndex)
err := producer.Publish(topicName, message)
if err != nil {
fmt.Println("Error publishing message:", err)
continue
}
fmt.Println("==============PUBLISH=======================\n", "Published message to", topicName, "\n============================================")
// Simulate a delay between messages
time.Sleep(time.Second * 4)
}
}(topicName, producerName)
}
// Prevent the main function from exiting immediately
select {}
}