This repository has been archived by the owner on Feb 8, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathProducerSample.java
94 lines (83 loc) · 3.34 KB
/
ProducerSample.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package com.aliyun.openservices.log.producer.sample;
import java.util.Date;
import java.util.Random;
import java.util.Vector;
import com.aliyun.openservices.log.common.LogItem;
import com.aliyun.openservices.log.producer.LogProducer;
import com.aliyun.openservices.log.producer.ProducerConfig;
import com.aliyun.openservices.log.producer.ProjectConfig;
public class ProducerSample {
private static final String MOCK_IP = "192.168.0.25";
private static final int ThreadsCount = 1;
private static ProjectConfig buildProjectConfig1() {
String projectName = System.getenv("project1");
String endpoint = System.getenv("endpoint1");
String accessKeyId = System.getenv("accessKeyId");
String accessKey = System.getenv("accessKey");
return new ProjectConfig(projectName, endpoint, accessKeyId, accessKey);
}
private static ProjectConfig buildProjectConfig2() {
String projectName = System.getenv("project2");
String endpoint = System.getenv("endpoint2");
String accessKeyId = System.getenv("accessKeyId");
String accessKey = System.getenv("accessKey");
return new ProjectConfig(projectName, endpoint, accessKeyId, accessKey);
}
public static String RandomString(int length) {
String str = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
Random random = new Random();
StringBuilder buf = new StringBuilder();
for (int i = 0; i < length; i++) {
int num = random.nextInt(62);
buf.append(str.charAt(num));
}
return buf.toString();
}
public static void main(String args[]) throws InterruptedException {
System.out.println(System.currentTimeMillis());
ProducerConfig producerConfig = new ProducerConfig();
// 使用默认producer配置
final LogProducer producer = new LogProducer(producerConfig);
// 添加多个project配置
producer.setProjectConfig(buildProjectConfig1());
producer.setProjectConfig(buildProjectConfig2());
// 生成日志集合,用于测试
final Vector<Vector<LogItem>> logGroups = new Vector<Vector<LogItem>>();
for (int i = 0; i < 100000; ++i) {
Vector<LogItem> tmpLogGroup = new Vector<LogItem>();
LogItem logItem = new LogItem((int) (new Date().getTime() / 1000));
logItem.PushBack("level", "info " + System.currentTimeMillis());
logItem.PushBack("message", "mmmmmdekdekjdefjekjfek"
+ RandomString(50));
logItem.PushBack("method", "SenderToServer " + RandomString(10));
tmpLogGroup.add(logItem);
logGroups.add(tmpLogGroup);
}
// 并发调用send发送日志
Thread[] threads = new Thread[ThreadsCount];
for (int i = 0; i < ThreadsCount; ++i) {
threads[i] = new Thread(null, new Runnable() {
Random random = new Random();
public void run() {
int j = 0, rand = random.nextInt(99999);
while (++j < 10) {
producer.send(System.getenv("project1"), "store_1s", "topic1", MOCK_IP,
logGroups.get(rand),
new CallbackSample(System.getenv("project1"), "store_1s",
"topic1", MOCK_IP, null, logGroups.get(rand), producer));
}
}
}, i + "");
threads[i].start();
}
//等待发送线程退出
Thread.sleep(5 * 1000);
for (int i = 0; i < ThreadsCount; ++i) {
threads[i].interrupt();
}
//主动刷新缓存起来的还没有被发送的日志
producer.flush();
//关闭后台io线程
producer.close();
}
}