mqtt协议的消费者代码
基于spring 配置注解注入
public class Consumer
{
//初始化
@Bean(name = "receive",initMethod = "init")
public CallBack receive()
{
return new CallBack();
}
}
public class CallBack
{
protected static Logger log = LoggerFactory.getLogger(Consumer.class);
public static final String SERVER_URL = Global.getConfig("mqttUrl");
public static final String clientid = Global
.getConfig("mqtt");
private MqttClient client;
private MqttConnectOptions options;
private String userName = "admin";
private String passWord = "admin";
public void init()
{
new Thread(() -> {
log.info("接收端收初始化了!!");
try
{
client = new MqttClient(SERVER_URL, clientid,
new MemoryPersistence());
options = new MqttConnectOptions();
options.setCleanSession(false);
options.setUserName(userName);
options.setPassword(passWord.toCharArray());
options.setConnectionTimeout(20);
options.setKeepAliveInterval(20);
//设置回调函数
client.setCallback(new MqttCallback()
{
@Override public void connectionLost(Throwable throwable)
{
log.debug("无法连至mq服务器!");
}
@Override public void messageArrived(String s,
MqttMessage mqttMessage) throws Exception
{
log.info("接收消息主题:{}", s);
log.info("接收消息Qos:{}", mqttMessage.getQos());
log.info("接收消息内容:{}",
new String(mqttMessage.getPayload()));
new SystemInfoSocketHandler().sendMessageToAllUsers(
new String(mqttMessage.getPayload()));
}
@Override public void deliveryComplete(
IMqttDeliveryToken iMqttDeliveryToken)
{
}
});
//MqttTopic topic = client.getTopic(TOPIC);
client.connect(options);
//客户端设置好发送超时时间,防止无限阻塞
client.setTimeToWait(5000);
int[] Qos = {2, 2, 2, 2, 2, 2, 2, 2};
String[] topic1 = {Global.MQ_topic1, Global.MQ_topic2,
Global.MQ_topic3, Global.MQ_topic4, Global.MQ_topic5,
Global.MQ_topic6, Global.MQ_topic7,
Global.MQ_topic8};
client.subscribe(topic1, Qos);
}
catch (Exception e)
{
log.debug("接收端发生异常:{}", e);
}
}).start();
}
}