-
Notifications
You must be signed in to change notification settings - Fork 829
Open
Description
Expected behavior
After reconnect of the same client with clean_session = false and subscribing to the same topic with QoS = 1 all missed messages should appear
Actual behavior
Exception is thrown while trying to resend unacked message:
2019-03-08 14:19:37.194 ERROR 31746 --- [ntLoopGroup-5-5] io.moquette.broker.NewNettyMQTTHandler : Unexpected exception while processing MQTT message. Closing Netty channel. CId=client123
java.lang.ClassCastException: class io.moquette.broker.SessionRegistry$PubRelMarker cannot be cast to class io.moquette.broker.SessionRegistry$PublishedMessage (io.moquette.broker.SessionRegistry$PubRelMarker and io.moquette.broker.SessionRegistry$PublishedMessage are in unnamed module of loader 'app')
at io.moquette.broker.Session.resendInflightNotAcked(Session.java:293)
at io.moquette.broker.MQTTConnection.resendNotAckedPublishes(MQTTConnection.java:481)
at io.moquette.broker.NewNettyMQTTHandler.userEventTriggered(NewNettyMQTTHandler.java:106)
at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
at io.netty.handler.codec.ByteToMessageDecoder.userEventTriggered(ByteToMessageDecoder.java:366)
at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
at io.moquette.broker.MoquetteIdleTimeoutHandler.userEventTriggered(MoquetteIdleTimeoutHandler.java:48)
at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
at io.moquette.broker.InflightResender.resendNotAcked(InflightResender.java:160)
at io.moquette.broker.InflightResender.access$100(InflightResender.java:32)
at io.moquette.broker.InflightResender$WriterIdleTimeoutTask.run(InflightResender.java:58)
at io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:127)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:495)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)
2019-03-08 14:19:37.194 ERROR 31746 --- [ntLoopGroup-5-5] io.moquette.broker.NewNettyMQTTHandler : Unexpected exception while processing MQTT message. Closing Netty channel. CId=client123
java.nio.channels.ClosedChannelException: null
at io.netty.channel.AbstractChannel$AbstractUnsafe.close(...)(Unknown Source)
Steps to reproduce
- Run moquette broker
- Publish messages every second to any topic, e.g. "/test-topic"
- Run subscriber with client = "client123", clean_session = false and subscribe to topic "/test-topic" with QoS = 1
- Kill subscriber
- Rerun subscriber
- Retry 4.-5. until you get an exception
Minimal yet complete reproducer code (or URL to code) or complete log file
Client using python 3.7
import paho.mqtt.client as mqtt
# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
print("Connected with result code " + str(rc))
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
client.subscribe("/test-topic", 1)
# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
print(msg.topic + " " + str(msg.payload))
client = mqtt.Client(client_id="client123", clean_session=False)
client.username_pw_set("user123", "session_id")
client.on_connect = on_connect
client.on_message = on_message
print("connecting")
client.connect("localhost", 1883, 60)
print("connected")
client.loop_forever()
Moquette MQTT version
moquette-broker-0.12.1.jar
JVM version (e.g. java -version
)
java -version
openjdk version "11.0.1" 2018-10-16
OpenJDK Runtime Environment 18.9 (build 11.0.1+13)
OpenJDK 64-Bit Server VM 18.9 (build 11.0.1+13, mixed mode)
OS version (e.g. uname -a
)
uname -a
Darwin Admins-MacBook-Pro.local 18.2.0 Darwin Kernel Version 18.2.0: Thu Dec 20 20:46:53 PST 2018; root:xnu-4903.241.1~1/RELEASE_X86_64 x86_64