Mclient.java 2.15 KB
package com.huaheng.framework.mqtt;

import org.apache.poi.ss.usermodel.DateUtil;
import org.eclipse.paho.client.mqttv3.*;

import java.io.UnsupportedEncodingException;

public class Mclient {
    private static String HOST = "tcp://47.100.176.190:1883";
    private static String CLIENTID = "server";
    private String name = "admin";
    private String passWord = "admin";
    public String topic;

    public MqttClient client;
    public MqttTopic cTopic;
    public MqttMessage message;

    public Mclient(String pd) throws MqttException {
        client = new MqttClient(HOST, CLIENTID);
        this.topic = pd + "onlineInform";
    }

    // 前面的配置跟订阅端差不多
    public void connect() throws MqttSecurityException, MqttException {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(false);
        options.setUserName(name);
        options.setPassword(passWord.toCharArray());
        options.setConnectionTimeout(10);
        options.setKeepAliveInterval(20);
        message = new MqttMessage();
        message.setQos(2);
        //是否保留消息
        message.setRetained(false);
        client.setCallback(new CallBack());
        client.connect();
        cTopic = client.getTopic(topic);
    }

    //发布消息
    public void pushMessage(String Pdname)
            throws MqttPersistenceException, MqttException, UnsupportedEncodingException {
        //发布消息内容
        message.setPayload(("您关注的主播" + Pdname + "上线了!" ).getBytes("UTF-8"));
        MqttDeliveryToken token = cTopic.publish(message);
        token.waitForCompletion();
        System.out.println("message is published completely! " + token.isComplete());
    }

    //发布空消息 为了清除最后发布重复的消息
    public void pushNull() throws MqttPersistenceException, MqttException {
        message.setPayload("1".getBytes());
        MqttDeliveryToken token = cTopic.publish(message);
        token.waitForCompletion();
        System.out.println("message is published completely! " + token.isComplete());
    }

    public void close() throws MqttException {
        client.close();
    }
}