package com.huaheng.framework.mqtt; import org.apache.poi.ss.usermodel.DateUtil; import org.eclipse.paho.client.mqttv3.*; import java.io.UnsupportedEncodingException; public class Mclient { private static String HOST = "tcp://47.100.176.190:1883"; private static String CLIENTID = "server"; private String name = "admin"; private String passWord = "admin"; public String topic; public MqttClient client; public MqttTopic cTopic; public MqttMessage message; public Mclient(String pd) throws MqttException { client = new MqttClient(HOST, CLIENTID); this.topic = pd + "onlineInform"; } // 前面的配置跟订阅端差不多 public void connect() throws MqttSecurityException, MqttException { MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(false); options.setUserName(name); options.setPassword(passWord.toCharArray()); options.setConnectionTimeout(10); options.setKeepAliveInterval(20); message = new MqttMessage(); message.setQos(2); //是否保留消息 message.setRetained(false); client.setCallback(new CallBack()); client.connect(); cTopic = client.getTopic(topic); } //发布消息 public void pushMessage(String Pdname) throws MqttPersistenceException, MqttException, UnsupportedEncodingException { //发布消息内容 message.setPayload(("您关注的主播" + Pdname + "上线了!" ).getBytes("UTF-8")); MqttDeliveryToken token = cTopic.publish(message); token.waitForCompletion(); System.out.println("message is published completely! " + token.isComplete()); } //发布空消息 为了清除最后发布重复的消息 public void pushNull() throws MqttPersistenceException, MqttException { message.setPayload("1".getBytes()); MqttDeliveryToken token = cTopic.publish(message); token.waitForCompletion(); System.out.println("message is published completely! " + token.isComplete()); } public void close() throws MqttException { client.close(); } }