mqtt协议消费者

322

mqtt协议的消费者代码

基于spring 配置注解注入
public class Consumer
{
       //初始化
	@Bean(name = "receive",initMethod = "init")
	public CallBack receive()
	{
		return new CallBack();
	}
}

public class CallBack
{
	protected static Logger log = LoggerFactory.getLogger(Consumer.class);

	public static final String SERVER_URL = Global.getConfig("mqttUrl");
	public static final String clientid = Global
			.getConfig("mqtt");

	private MqttClient client;
	private MqttConnectOptions options;
	private String userName = "admin";
	private String passWord = "admin";

	public void init()
	{
		new Thread(() -> {
			log.info("接收端收初始化了!!");
			try
			{
				client = new MqttClient(SERVER_URL, clientid,
						new MemoryPersistence());
				options = new MqttConnectOptions();
				options.setCleanSession(false);
				options.setUserName(userName);
				options.setPassword(passWord.toCharArray());
				options.setConnectionTimeout(20);
				options.setKeepAliveInterval(20);
				//设置回调函数
				client.setCallback(new MqttCallback()
				{
					@Override public void connectionLost(Throwable throwable)
					{
						log.debug("无法连至mq服务器!");
					}

					@Override public void messageArrived(String s,
							MqttMessage mqttMessage) throws Exception
					{
						log.info("接收消息主题:{}", s);
						log.info("接收消息Qos:{}", mqttMessage.getQos());
						log.info("接收消息内容:{}",
								new String(mqttMessage.getPayload()));
						new SystemInfoSocketHandler().sendMessageToAllUsers(
								new String(mqttMessage.getPayload()));
					}

					@Override public void deliveryComplete(
							IMqttDeliveryToken iMqttDeliveryToken)
					{

					}
				});
				//MqttTopic topic = client.getTopic(TOPIC);
				client.connect(options);
				//客户端设置好发送超时时间,防止无限阻塞
				client.setTimeToWait(5000);
				int[] Qos = {2, 2, 2, 2, 2, 2, 2, 2};
				String[] topic1 = {Global.MQ_topic1, Global.MQ_topic2,
						Global.MQ_topic3, Global.MQ_topic4, Global.MQ_topic5,
						Global.MQ_topic6, Global.MQ_topic7,
						Global.MQ_topic8};
				client.subscribe(topic1, Qos);
			}
			catch (Exception e)
			{
				log.debug("接收端发生异常:{}", e);
			}
		}).start();

	}
}