mqtt协议-broker之moqutte源码研究五之UNSUBSCRIBE与DISCONN报文处理
本文讲解moquette对UNSUBSCRIBE和DISCONNECT的处理
先说UNSUBSCRIBE,代码比较简单
public void processUnsubscribe(Channel channel, MqttUnsubscribeMessage msg) { List<String> topics = msg.payload().topics(); String clientID = NettyUtils.clientID(channel); LOG.info("Processing UNSUBSCRIBE message. CId={}, topics={}", clientID, topics); ClientSession clientSession = m_sessionsStore.sessionForClient(clientID); for (String t : topics) { Topic topic = new Topic(t); boolean validTopic = topic.isValid(); if (!validTopic) { // close the connection, not valid topicFilter is a protocol violation channel.close(); LOG.error("Topic filter is not valid. CId={}, topics={}, badTopicFilter={}", clientID, topics, topic); return; } if(LOG.isDebugEnabled()){ LOG.debug("Removing subscription. CId={}, topic={}", clientID, topic); } subscriptions.removeSubscription(topic, clientID); clientSession.unsubscribeFrom(topic); String username = NettyUtils.userName(channel); m_interceptor.notifyTopicUnsubscribed(topic.toString(), clientID, username); } // ack the client int messageID = msg.variableHeader().messageId(); MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, AT_LEAST_ONCE, false, 0); MqttUnsubAckMessage ackMessage = new MqttUnsubAckMessage(fixedHeader, from(messageID)); LOG.info("Sending UNSUBACK message. CId={}, topics={}, messageId={}", clientID, topics, messageID); channel.writeAndFlush(ackMessage);}
主要分为以下几步
1.从目录树下,移除该client的订阅,这个移除过程有点复杂,后面单独一篇专门讲解topic树
2.清除ClientSession里面的订阅,包括Set<Subscription> subscriptions,同时还得移除ISubscriptionsStore里面的Map<Topic, Subscription> subscriptions
3.唤醒拦截器
4.返回UNSUBACK ,这里注意UNSUBACK 是没有payload的。
再说DISCONNECT的处理
public void processDisconnect(Channel channel) throws InterruptedException { final String clientID = NettyUtils.clientID(channel); LOG.info("Processing DISCONNECT message. CId={}", clientID); channel.flush(); final ConnectionDescriptor existingDescriptor = this.connectionDescriptors.getConnection(clientID); if (existingDescriptor == null) { // another client with same ID removed the descriptor, we must exit channel.close(); return; } if (existingDescriptor.doesNotUseChannel(channel)) { // another client saved it's descriptor, exit LOG.warn("Another client is using the connection descriptor. Closing connection. CId={}", clientID); existingDescriptor.abort(); return; } if (!removeSubscriptions(existingDescriptor, clientID)) { LOG.warn("Unable to remove subscriptions. Closing connection. CId={}", clientID); existingDescriptor.abort(); return; } if (!dropStoredMessages(existingDescriptor, clientID)) { LOG.warn("Unable to drop stored messages. Closing connection. CId={}", clientID); existingDescriptor.abort(); return; } if (!cleanWillMessageAndNotifyInterceptor(existingDescriptor, clientID)) { LOG.warn("Unable to drop will message. Closing connection. CId={}", clientID); existingDescriptor.abort(); return; } if (!existingDescriptor.close()) { LOG.info("The connection has been closed. CId={}", clientID); return; } boolean stillPresent = this.connectionDescriptors.removeConnection(existingDescriptor); if (!stillPresent) { // another descriptor was inserted LOG.warn("Another descriptor has been inserted. CId={}", clientID); return; } LOG.info("The DISCONNECT message has been processed. CId={}", clientID);}
1.检查连接描述符是否还存在,如果不存在,说明之前已经有客户端删除了它,直接关闭通道
2.判断这个client的连接描述符是不是,是不是还是当前使用这个通道的client?作者要先防止这种情况呢?先卖个关子,后面的第6条会说明
3.清除订阅请求,这里面好像只清楚了不要求保存会话信息的clientsession里面的ISessionsStore里面的Map<Topic, Subscription> subscriptions,而并没有清除ClientSession里面的Set<Subscription> subscriptions和topic树里面的订阅,这能够解释https://blog.51cto.com/13579730/2073914 这篇文章结尾讨论的问题了,只有Map<Topic, Subscription> subscriptions的订阅才是最准确的。
4.丢弃存储的消息,这里面也只是会丢弃不要去保存会话信息的消息
5.清除遗愿消息,对于遗愿消息,这里稍微啰嗦一点,遗愿消息是在初次连接的存储到ProtocolProcessor的ConcurrentMap<String, WillMessage> m_willStore这里面的,那么什么时候发送给订阅者呢?看下面
io.moquette.server.netty.NettyMQTTHandler#channelInactive @Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception { String clientID = NettyUtils.clientID(ctx.channel()); if (clientID != null && !clientID.isEmpty()) { LOG.info("Notifying connection lost event. MqttClientId = {}.", clientID); m_processor.processConnectionLost(clientID, ctx.channel()); } ctx.close();} 说明是当netty检测到通道不活跃的时候通知ProtocolProcessor处理ConnectionLost事件的。 public void processConnectionLost(String clientID, Channel channel) { LOG.info("Processing connection lost event. CId={}", clientID); ConnectionDescriptor oldConnDescr = new ConnectionDescriptor(clientID, channel, true); connectionDescriptors.removeConnection(oldConnDescr);//移除连接描述符 // publish the Will message (if any) for the clientID if (m_willStore.containsKey(clientID)) { WillMessage will = m_willStore.get(clientID); forwardPublishWill(will, clientID);//发布遗愿消息 m_willStore.remove(clientID);//移除遗愿消息存储 } String username = NettyUtils.userName(channel); m_interceptor.notifyClientConnectionLost(clientID, username);//唤醒拦截器} 在以下这种情况下会发布遗愿消息 遗嘱消息发布的条件,包括但不限于: 服务端检测到了一个I/O错误或者网络故障。 客户端在保持连接(Keep Alive)的时间内未能通讯。 客户端没有先发送DISCONNECT报文直接关闭了网络连接。 由于协议错误服务端关闭了网络连接。 另外说明一下,遗愿消息是可以设置消息等级的,而且可以被设置成retain消息
6.连接描述符集合里面清除该通道对应的连接描述符,这里有一点很容易误解,强调一下
boolean stillPresent = this.connectionDescriptors.removeConnection(existingDescriptor); if (!stillPresent) { // another descriptor was inserted LOG.warn("Another descriptor has been inserted. CId={}", clientID); return; } 作者调用的是ConcurrentMap里面的boolean remove(Object key, Object value);这个方法要求key存在,且value 与预期的一样才会删除,也就说,是有可能存在的,key一样而value不一样的情况的,什么时候会出现?答案是client在两个设备上先后登陆,这个时候由于是存在一个map里面的所以后面的登陆所创建的连接描述符会覆盖前面的一个。当然这里面,也可以在覆盖之前强制断开之前那个连接,但是moquette并没有这么做,具体看源码io.moquette.server.ConnectionDescriptorStore#addConnection
也就说说moquette是允许存在一个账号多设备登陆的。将入client先后在A,B两个设备上建立连接,B连接会覆盖A连接,这个时候A连接虽然还在,但其实是永远也收不到消息的,因为发送消息的时候,会以ConnectionDescriptorStore里面存储的为准,具体看源码
io.moquette.server.ConnectionDescriptorStore#sendMessage,也就是说A连接会无谓的占用broker的资源,个人觉得这样并不好,也非常没有必要,大家可以自行改进。
现在大家就能够理解上面的第2步了,因为这个就是为双登陆的情况下,被覆盖的那个连接准备的。
moquette-broker还要处理以下的报文,包括1.PINGREQ,心跳报文 2.PUBACK,当broker向client发送qos1消息的时候,client需要回复PUBACK消息,消息存储在io.moquette.spi.ClientSession.OutboundFlightZone outboundFlightZone里面(底层使用map存储的),消息是io.moquette.spi.impl.MessagesPublisher#publish3Subscribers(io.moquette.spi.IMessagesStore.StoredMessage, io.moquette.spi.impl.subscriptions.Topic)这里被存储进去的,这是一个临时的存储,存储完之后消息会被删除掉3.PUBREC 这个是当broker向client发送qos2消息之后,client需要向broker作的第一个返回报文,这里面有个动作是将消息从inboundFlightMessages转移到secondPhaseStore和outboundFlightMessages,具体看这里io.moquette.persistence.memory.MemorySessionStore#moveInFlightToSecondPhaseAckWaiting4.PUBCOMP 当broker收到这个报文的时候会负责从内存里面删除飞行窗口的消息,具体怎么删除的详见下篇,moquette拦截器5.PUBREL。当client向broker发送qos2消息的时候,broker会回复PUBREC,告诉client已经记录下来了,client收到PUBREC之后会发送PUBREL,告诉broker,我知道你已经记录了消息,既然你记录了,那这边就释放消息了(确保只要broker才会该消息,避免client重发),当broker收到PUBREL报文的时候,就知道client那边已经把该消息释放了,然后消息的主导权到了他这边,他开始发送消息。当消息发送完成了,会向client发送PUBCOMP报文。关于qos2消息的介绍可以看一下这里https://github.com/mcxiaoke/mqtt/blob/master/mqtt/04-OperationalBehavior.md 的4.3条
声明:本站所有文章资源内容,如无特殊说明或标注,均为采集网络资源。如若本站内容侵犯了原著者的合法权益,可联系本站删除。