天津网站开发技术,西安哪家做网站公司好,网站功能需求用什么做,关键词优化公司排名榜文章目录 1.介绍2.发布和订阅3.MQTT 数据包结构4.Demo5.EMQX 1.介绍 
什么是MQTT协议 MQTT#xff08;消息队列遥测传输协议#xff09;#xff0c;是一种基于发布/订阅#xff08;publish/subscribe#xff09;模式的“轻量级”通讯协议#xff0c;该协议构建于TCP/IP协… 文章目录 1.介绍2.发布和订阅3.MQTT 数据包结构4.Demo5.EMQX  1.介绍 
什么是MQTT协议 MQTT消息队列遥测传输协议是一种基于发布/订阅publish/subscribe模式的“轻量级”通讯协议该协议构建于TCP/IP协议上。 
MQTT最大优点在于用极少的代码和有限的带宽为连接远程设备提供实时可靠的消息服务。 
作为一种低开销、低带宽占用的即时通讯协议使其在物联网、小型设备、移动应用等方面有较广泛的应用。 
特点 MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。 
MQTT协议是轻量、简单、开放和易于实现的这些特点使它适用范围非常广泛。在很多情况下包括受限的环境中如机器与机器M2M通信和物联网IoT。 
MQTT 与 HTTP 一样MQTT 运行在传输控制协议/互联网协议 (TCP/IP) 堆栈之上。 
2.发布和订阅 
MQTT使用的发布/订阅消息模式它提供了一对多的消息分发机制从而实现与应用程序的解耦。 
这是一种消息传递模式消息不是直接从发送器发送到接收器即点对点而是由MQTT server或称为 MQTT Broker分发的。 MQTT 服务器是发布-订阅架构的核心。 
它可以非常简单地在Raspberry Pi或NAS等单板计算机上实现当然也可以在大型机或 Internet 服务器上实现。 
服务器分发消息因此必须是发布者但绝不是订阅者 
客户端可以发布消息发送方、订阅消息接收方或两者兼而有之。 
客户端也称为节点是一种智能设备如微控制器或具有 TCP/IP 堆栈和实现 MQTT 协议的软件的计算机。 
消息在允许过滤的主题下发布。主题是分层划分的 UTF-8 字符串。不同的主题级别用斜杠/作为分隔符号。 
EG:   QoSQuality of Service levels 服务质量是 MQTT 的一个重要特性。当我们使用 TCP/IP 时连接已经在一定程度上受到保护。但是在无线网络中中断和干扰很频繁MQTT 在这里帮助避免信息丢失及其服务质量水平。这些级别在发布时使用。如果客户端发布到 MQTT 服务器则客户端将是发送者MQTT 服务器将是接收者。当MQTT服务器向客户端发布消息时服务器是发送者客户端是接收者。 
QoS 0 : 这一级别会发生消息丢失或重复消息发布依赖于底层TCP/IP网络。即1 QoS 1 : 承诺消息将至少传送一次给订阅者。 QoS 2 : 我们保证消息仅传送到目的地一次。为此带有唯一消息 ID 的消息会存储两次首先来自发送者然后是接收者。QoS 级别 2 在网络中具有最高的开销因为在发送方和接收方之间需要两个流。  
3.MQTT 数据包结构 固定头Fixed header存在于所有MQTT数据包中表示数据包类型及数据包的分组类标识  可变头Variable header存在于部分MQTT数据包中数据包类型决定了可变头是否存在及其具体内容  消息体Payload存在于部分MQTT数据包中表示客户端收到的具体内容  
整体MQTT的消息格式如下图所示:  
MQTT固定头 固定头存在于所有MQTT数据包中其结构如下  固定头的消息格式 
消息类型 / message type 位置 byte 1, bits 7-4 4位的无符号值类型如下 标识位 / DUP/RET 位置 byte 1, bits 3-0。 
在不使用标识位的消息类型中标识位被作为保留位。如果收到无效的标志时接收端必须关闭网络连接 DUP发布消息的副本。用来在保证消息的可靠传输如果设置为 1则在下面的变长中增加MessageId并且需要回复确认以保证消息传输完成但不能用于检测消息重复发送。 
QoS发布消息的服务质量即保证消息传递的次数 RETAIN发布保留标识表示服务器要保留这次推送的信息如果有新的订阅者出现就把这消息推送给它如果设有那么推送至当前订阅者后释放。 
剩余长度 
位置byte 1 
固定头的第二字节用来保存变长头部和消息体的总大小的但不是直接保存的。这一字节是可以扩展其保存机制前7位用于保存长度后一部用做标识。当最后一位为 1时表示长度不足需要使用二个字节继续保存。例如计算出后面的大小为0 
MQTT可变头 / Variable header 
MQTT数据包中包含一个可变头它驻位于固定的头和负载之间。可变头的内容因数据包类型而不同较常的应用是做为包的标识  很多类型数据包中都包括一个2字节的数据包标识字段这些类型的包有 
PUBLISH (QoS  0)、PUBACK、PUBREC、PUBREL、PUBCOMP、 
SUBSCRIBE、SUBACK、UNSUBSCRIBE、UNSUBACK 
Payload消息体 Payload消息体是MQTT数据包的第三部分CONNECT、SUBSCRIBE、SUBACK、UNSUBSCRIBE四种类型的消息 有消息体 CONNECT消息体内容主要是客户端的ClientID、订阅的Topic、Message以及用户名和密码  SUBSCRIBE消息体内容是一系列的要订阅的主题以及QoS。  SUBACK消息体内容是服务器对于SUBSCRIBE所申请的主题及QoS进行确认和回复。  UNSUBSCRIBE消息体内容是要订阅的主题。  
4.Demo 
DEMO1 public static void main(String[] args) {String broker  tcp://172.168.1.122:2314;String clientId  JavaSample;//Use the memory persistenceMemoryPersistence persistence  new MemoryPersistence();try {MqttClient sampleClient  new MqttClient(broker, clientId, persistence);MqttConnectOptions connOpts  new MqttConnectOptions();connOpts.setCleanSession(true);System.out.println(broker:  broker);sampleClient.connect(connOpts);System.out.println(Connected);String topic  demo/topics;System.out.println(Subscribe to topic:  topic);sampleClient.subscribe(topic);//订阅主题sampleClient.setCallback(new MqttCallback() {public void messageArrived(String topic, MqttMessage message) throws Exception {String theMsg  MessageFormat.format({0} is arrived for topic {1}., new String(message.getPayload()), topic);System.out.println(theMsg);}public void deliveryComplete(IMqttDeliveryToken token) {}public void connectionLost(Throwable throwable) {}});String content  Message from MqttPublishSample;int qos  2;System.out.println(Publishing message:  content);MqttMessage message  new MqttMessage(content.getBytes());message.setQos(qos);sampleClient.publish(topic, message);//发布消息System.out.println(Message published);} catch (MqttException me) {System.out.println(reason  me.getReasonCode());System.out.println(msg  me.getMessage());System.out.println(loc  me.getLocalizedMessage());System.out.println(cause  me.getCause());System.out.println(excep  me);me.printStackTrace();}}5.EMQX 
本地搭建EMQX服务 1、下载emqx压缩文件wget https://www.emqx.com/zh/downloads/broker/5.0.3/emqx-5.0.3-el7-amd64.tar.gz 2、解压文件mkdir -p emqx  tar -zxvf emqx-5.0.3-el7-amd64.tar.gz -C emqx 3、启动服务./emqx/bin/emqx start 整合使用 配置 dependencygroupIdorg.eclipse.paho/groupIdartifactIdorg.eclipse.paho.client.mqttv3/artifactIdversion1.2.2/version/dependency!-- MQTT --dependencygroupIdorg.springframework.integration/groupIdartifactIdspring-integration-mqtt/artifactId/dependency# Mqtt配置
mqtt:#我是在本地搭建了emqx服务192.168.2.31就是我本地emqx服务的地址。也可用公共测试不需要搭建emqx服务将ip改为broker.emqx.io即可。serverURIs: tcp://192.168.111.5:1883username: #可不填写password:  #可不填写qos: 2 #等级 有 0 1 2 三种clientId: mqttxxxtopic: testTopic  #订阅的主题多个时可以使用逗号分开 如topic1topic2topicenabled: true  #是否打开mqtt服务keepalive: 100 #心跳时间  不需要动timeout: 100 # 超时时间秒 不需要动Configuration
public class MqttConfig {Autowiredprivate MqttPushClient mqttPushClient;Value(${mqtt.username:{null}})private String username;Value(${mqtt.password:{null}})private String password;Value(${mqtt.serverURIs:{null}})private String hostUrl;Value(${mqtt.clientId:{null}})private String clientId;Value(${mqtt.topic:{null}})private String defaultTopic;Value(${mqtt.qos:{null}})private int qos;Value(${mqtt.enabled:{null}})private boolean enabled;Value(${mqtt.keepalive:{null}})private int keepalive;Value(${mqtt.timeout:{null}})private int timeout;//订阅主体Beanpublic MqttPushClient getMqttPushClient() {if(enabled  true){String mqtt_topic[]  defaultTopic.split(,);mqttPushClient.connect(hostUrl, clientId, username, password, timeout, keepalive);//连接for(int i0; imqtt_topic.length; i){mqttPushClient.subscribe(mqtt_topic[i], 0);//订阅主题}}return mqttPushClient;}//发送消息到对应主题Beanpublic MqttPushClient pushMessage() {mqttPushClient.publish(0,true,wsy,呵呵哈哈哈 你好呀);return mqttPushClient;}}Component
public class MqttPushClient {private static final Logger logger  LoggerFactory.getLogger(MqttPushClient.class);Autowiredprivate PushCallback pushCallback;private static MqttClient client;private static MqttClient getClient() {return client;}private static void setClient(MqttClient client) {MqttPushClient.client  client;}/*** 客户端连接** param host      ip端口* param clientID  客户端Id* param username  用户名* param password  密码* param timeout   超时时间* param keepalive 保留数*/public void connect(String host, String clientID, String username, String password, int timeout, int keepalive) {MqttClient client;try {client  new MqttClient(host, clientID, new MemoryPersistence());MqttConnectOptions options  new MqttConnectOptions();options.setCleanSession(true);options.setUserName(username);options.setPassword(password.toCharArray());options.setConnectionTimeout(timeout);options.setKeepAliveInterval(keepalive);MqttPushClient.setClient(client);try {client.setCallback(pushCallback);client.connect(options);} catch (Exception e) {e.printStackTrace();}} catch (Exception e) {e.printStackTrace();}}/*** 发布** param qos         连接方式* param retained    是否保留* param topic       主题* param pushMessage 消息体*/public boolean publish(int qos, boolean retained, String topic, String pushMessage) {MqttMessage message  new MqttMessage();message.setQos(qos);message.setRetained(retained);message.setPayload(pushMessage.getBytes());MqttTopic mTopic  MqttPushClient.getClient().getTopic(topic);if (null  mTopic) {logger.error(topic not exist);}MqttDeliveryToken token;try {token  mTopic.publish(message);token.waitForCompletion();return true;} catch (MqttPersistenceException e) {e.printStackTrace();return false;} catch (MqttException e) {e.printStackTrace();return false;}}/*** 订阅某个主题** param topic 主题* param qos   连接方式*/public void subscribe(String topic, int qos) {logger.info(开始订阅主题  topic);try {MqttPushClient.getClient().subscribe(topic, qos);} catch (MqttException e) {e.printStackTrace();}}}Component
public class PushCallback implements MqttCallback {private static final Logger logger  LoggerFactory.getLogger(MqttPushClient.class);Autowiredprivate MqttConfig mqttConfig;private static MqttClient client;private static String _topic;private static String _qos;private static String _msg;Overridepublic void connectionLost(Throwable throwable) {// 连接丢失后一般在这里面进行重连logger.info(连接断开可以做重连);if (client  null || !client.isConnected()) {mqttConfig.getMqttPushClient();}}Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {// subscribe后得到的消息会执行到这里面logger.info(接收消息主题 :   topic);logger.info(接收消息Qos :   mqttMessage.getQos());logger.info(接收消息内容 :   new String(mqttMessage.getPayload()));_topic  topic;_qos  mqttMessage.getQos();_msg  new String(mqttMessage.getPayload());}Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {logger.info(deliveryComplete---------  iMqttDeliveryToken.isComplete());}//别的Controller层会调用这个方法来  获取  接收到的硬件数据public String receive() {JSONObject jsonObject  new JSONObject();jsonObject.put(topic, _topic);jsonObject.put(qos, _qos);jsonObject.put(msg, _msg);return jsonObject.toString();}
}注意 clientId不能重复若存在重复的 clientId 连接会导致争抢而连接不上。就是存在两个客户端 clientId 相同两个在打架一直争同一连接导致一直重连重复发布消息 
发布   订阅: 参考资料 
https://mcxiaoke.gitbooks.io/mqtt-cn/content/mqtt/01-Introduction.html 文章转载自: http://www.morning.clqpj.cn.gov.cn.clqpj.cn http://www.morning.hqykb.cn.gov.cn.hqykb.cn http://www.morning.wxfgg.cn.gov.cn.wxfgg.cn http://www.morning.sltfk.cn.gov.cn.sltfk.cn http://www.morning.glswq.cn.gov.cn.glswq.cn http://www.morning.jlnlr.cn.gov.cn.jlnlr.cn http://www.morning.hdrsr.cn.gov.cn.hdrsr.cn http://www.morning.dfdhx.cn.gov.cn.dfdhx.cn http://www.morning.qsmmq.cn.gov.cn.qsmmq.cn http://www.morning.yrjkp.cn.gov.cn.yrjkp.cn http://www.morning.tkxyx.cn.gov.cn.tkxyx.cn http://www.morning.rkwlg.cn.gov.cn.rkwlg.cn http://www.morning.pangucheng.cn.gov.cn.pangucheng.cn http://www.morning.lmmyl.cn.gov.cn.lmmyl.cn http://www.morning.ishoufeipin.cn.gov.cn.ishoufeipin.cn http://www.morning.wmlby.cn.gov.cn.wmlby.cn http://www.morning.gmgyt.cn.gov.cn.gmgyt.cn http://www.morning.cryb.cn.gov.cn.cryb.cn http://www.morning.yrjhr.cn.gov.cn.yrjhr.cn http://www.morning.kgfsz.cn.gov.cn.kgfsz.cn http://www.morning.yfmlj.cn.gov.cn.yfmlj.cn http://www.morning.pgzgy.cn.gov.cn.pgzgy.cn http://www.morning.dmzzt.cn.gov.cn.dmzzt.cn http://www.morning.krdb.cn.gov.cn.krdb.cn http://www.morning.dmwck.cn.gov.cn.dmwck.cn http://www.morning.bnjnp.cn.gov.cn.bnjnp.cn http://www.morning.mbrbk.cn.gov.cn.mbrbk.cn http://www.morning.c7630.cn.gov.cn.c7630.cn http://www.morning.mpxbl.cn.gov.cn.mpxbl.cn http://www.morning.pghgq.cn.gov.cn.pghgq.cn http://www.morning.cbczs.cn.gov.cn.cbczs.cn http://www.morning.ygth.cn.gov.cn.ygth.cn http://www.morning.zlhcw.cn.gov.cn.zlhcw.cn http://www.morning.tkrpt.cn.gov.cn.tkrpt.cn http://www.morning.bchfp.cn.gov.cn.bchfp.cn http://www.morning.kncrc.cn.gov.cn.kncrc.cn http://www.morning.sfyqs.cn.gov.cn.sfyqs.cn http://www.morning.elbae.cn.gov.cn.elbae.cn http://www.morning.fgrkc.cn.gov.cn.fgrkc.cn http://www.morning.ghkgl.cn.gov.cn.ghkgl.cn http://www.morning.qsy41.cn.gov.cn.qsy41.cn http://www.morning.hmqjj.cn.gov.cn.hmqjj.cn http://www.morning.bfhrj.cn.gov.cn.bfhrj.cn http://www.morning.wnywk.cn.gov.cn.wnywk.cn http://www.morning.jmspy.cn.gov.cn.jmspy.cn http://www.morning.ryzgp.cn.gov.cn.ryzgp.cn http://www.morning.tongweishi.cn.gov.cn.tongweishi.cn http://www.morning.hmwjk.cn.gov.cn.hmwjk.cn http://www.morning.qhmhz.cn.gov.cn.qhmhz.cn http://www.morning.xbmwm.cn.gov.cn.xbmwm.cn http://www.morning.ccsdx.cn.gov.cn.ccsdx.cn http://www.morning.tpyjr.cn.gov.cn.tpyjr.cn http://www.morning.lhsdf.cn.gov.cn.lhsdf.cn http://www.morning.rwpjq.cn.gov.cn.rwpjq.cn http://www.morning.sbyhj.cn.gov.cn.sbyhj.cn http://www.morning.aowuu.com.gov.cn.aowuu.com http://www.morning.lsjgh.cn.gov.cn.lsjgh.cn http://www.morning.zqzhd.cn.gov.cn.zqzhd.cn http://www.morning.mghgl.cn.gov.cn.mghgl.cn http://www.morning.jybj.cn.gov.cn.jybj.cn http://www.morning.bdzps.cn.gov.cn.bdzps.cn http://www.morning.bxqtq.cn.gov.cn.bxqtq.cn http://www.morning.zstry.cn.gov.cn.zstry.cn http://www.morning.c7491.cn.gov.cn.c7491.cn http://www.morning.mztyh.cn.gov.cn.mztyh.cn http://www.morning.pqqzd.cn.gov.cn.pqqzd.cn http://www.morning.bljcb.cn.gov.cn.bljcb.cn http://www.morning.rhsr.cn.gov.cn.rhsr.cn http://www.morning.rpkl.cn.gov.cn.rpkl.cn http://www.morning.pjfmq.cn.gov.cn.pjfmq.cn http://www.morning.qsswb.cn.gov.cn.qsswb.cn http://www.morning.w58hje.cn.gov.cn.w58hje.cn http://www.morning.rdkt.cn.gov.cn.rdkt.cn http://www.morning.gkxyy.cn.gov.cn.gkxyy.cn http://www.morning.wbhzr.cn.gov.cn.wbhzr.cn http://www.morning.qydgk.cn.gov.cn.qydgk.cn http://www.morning.nlygm.cn.gov.cn.nlygm.cn http://www.morning.wrdpj.cn.gov.cn.wrdpj.cn http://www.morning.zhiheliuxue.com.gov.cn.zhiheliuxue.com http://www.morning.nicetj.com.gov.cn.nicetj.com