Mclient.java
2.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package com.huaheng.framework.mqtt;
import org.apache.poi.ss.usermodel.DateUtil;
import org.eclipse.paho.client.mqttv3.*;
import java.io.UnsupportedEncodingException;
public class Mclient {
private static String HOST = "tcp://47.100.176.190:1883";
private static String CLIENTID = "server";
private String name = "admin";
private String passWord = "admin";
public String topic;
public MqttClient client;
public MqttTopic cTopic;
public MqttMessage message;
public Mclient(String pd) throws MqttException {
client = new MqttClient(HOST, CLIENTID);
this.topic = pd + "onlineInform";
}
// 前面的配置跟订阅端差不多
public void connect() throws MqttSecurityException, MqttException {
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
options.setUserName(name);
options.setPassword(passWord.toCharArray());
options.setConnectionTimeout(10);
options.setKeepAliveInterval(20);
message = new MqttMessage();
message.setQos(2);
//是否保留消息
message.setRetained(false);
client.setCallback(new CallBack());
client.connect();
cTopic = client.getTopic(topic);
}
//发布消息
public void pushMessage(String Pdname)
throws MqttPersistenceException, MqttException, UnsupportedEncodingException {
//发布消息内容
message.setPayload(("您关注的主播" + Pdname + "上线了!" ).getBytes("UTF-8"));
MqttDeliveryToken token = cTopic.publish(message);
token.waitForCompletion();
System.out.println("message is published completely! " + token.isComplete());
}
//发布空消息 为了清除最后发布重复的消息
public void pushNull() throws MqttPersistenceException, MqttException {
message.setPayload("1".getBytes());
MqttDeliveryToken token = cTopic.publish(message);
token.waitForCompletion();
System.out.println("message is published completely! " + token.isComplete());
}
public void close() throws MqttException {
client.close();
}
}