{"id":39,"date":"2016-06-27T15:09:49","date_gmt":"2016-06-27T23:09:49","guid":{"rendered":"http:\/\/iaastalk.com\/?p=39"},"modified":"2016-06-27T15:09:49","modified_gmt":"2016-06-27T23:09:49","slug":"havana-message-reorder-fix","status":"publish","type":"post","link":"http:\/\/iaastalk.com\/?p=39","title":{"rendered":"Havana message reorder fix"},"content":{"rendered":"<p>Probably a few people are still running Havana. A critical messaging related issue is message reordering.<\/p>\n<p>Nova RPC sending back results in multiple messages which may go through different connections and reach a node running master queue owner through different routes in Rabbitmq cluster.<\/p>\n<p>RPC results may become corrupted and incomplete.<\/p>\n<p>You will see at least two messages will be sent out for a single RPC result. Each message was sent using a connection grabbed from pool on the fly.<\/p>\n<p>nova\/openstack\/common\/rpc\/amqp.py<\/p>\n<pre>\u00a0\u00a0\u00a0 \r\n   def _process_data(self, ctxt, version, method, namespace, args):\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 ctxt.update_store()\r\n\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 try:\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 rval = self.proxy.dispatch(ctxt, version, method, namespace,\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 **args)\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 # Check if the result was a generator\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 if inspect.isgenerator(rval):\r\n<span style=\"color: #ff0000;\">\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 for x in rval:<\/span>\r\n<span style=\"color: #ff0000;\">\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 ctxt.reply(x, None, connection_pool=self.connection_pool)<\/span>\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 else:\r\n<span style=\"color: #ff0000;\">\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 ctxt.reply(rval, None, connection_pool=self.connection_pool)<\/span>\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 # This final None tells multicall that it is done.\r\n<span style=\"color: #ff0000;\">\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 ctxt.reply(ending=True, connection_pool=self.connection_pool) <\/span>\r\n\r\nclass RpcContext(rpc_common.CommonRpcContext):\r\n\r\n    def reply(self, reply=None, failure=None, ending=False,\r\n              connection_pool=None, log_failure=True):\r\n        if self.msg_id:\r\n            <span style=\"color: #ff0000;\">msg_reply(self.conf, self.msg_id, self.reply_q, connection_pool,<\/span>\r\n                     <span style=\"color: #ff0000;\"> reply, failure, ending, log_failure)<\/span>\r\n            if ending:\r\n                self.msg_id = None\r\ndef msg_reply(conf, msg_id, reply_q, connection_pool, reply=None,\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 failure=None, ending=False, log_failure=True):\r\n    ....\r\n\u00a0\u00a0\u00a0 with ConnectionContext(conf, connection_pool) as <strong><span style=\"color: #ff0000;\">conn<\/span><\/strong>:\r\n    ....\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 if reply_q:\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 msg['_msg_id'] = msg_id\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 <strong><span style=\"color: #ff0000;\">conn<\/span><\/strong>.direct_send(reply_q, rpc_common.serialize_msg(msg))\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 else:\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 <span style=\"color: #ff0000;\"><strong>conn<\/strong><\/span>.direct_send(msg_id, rpc_common.serialize_msg(msg))<\/pre>\n<p>A quick fix is to force all messages for the same reply going through same connection.<\/p>\n<pre>class RpcContext(rpc_common.CommonRpcContext):\r\n    def reply2(self, reply=None, connection_pool=None):\r\n        if self.msg_id:\r\n            msg_reply2(self.conf,\r\n                                             self.msg_id,\r\n                                             self.reply_q,\r\n                                             connection_pool,\r\n                                             reply)\r\n            self.msg_id = None\r\n\r\n\r\n\r\ndef msg_reply2(conf, msg_id, reply_q,\r\n                                     connection_pool, reply=None):\r\n    def reply_msg(content, ending, conn):\r\n        msg = {'result': content, 'failure': None}\r\n        if ending:\r\n            msg['ending'] = True\r\n        _add_unique_id(msg)\r\n        if reply_q:\r\n            msg['_msg_id'] = msg_id\r\n            conn.direct_send(reply_q, rpc_common.serialize_msg(msg))\r\n        else:\r\n            conn.direct_send(msg_id, rpc_common.serialize_msg(msg))\r\n\r\n    with ConnectionContext(conf, connection_pool) as conn:\r\n    # Check if the result was a generator\r\n        if inspect.isgenerator(reply):\r\n            for x in reply:\r\n                reply_msg(x, False, conn)\r\n        else:\r\n            reply_msg(reply, False, conn)\r\n        reply_msg(None, True, conn)\r\n\r\n\r\n\r\nclass ProxyCallback(_ThreadPoolWithWait):\r\n\r\n    def _process_data(self, ctxt, version, method, namespace, args):\r\n        \"\"\"Process a message in a new thread.\r\n\r\n        If the proxy object we have has a dispatch method\r\n        (see rpc.dispatcher.RpcDispatcher), pass it the version,\r\n        method, and args and let it dispatch as appropriate.  If not, use\r\n        the old behavior of magically calling the specified method on the\r\n        proxy we have here.\r\n        \"\"\"\r\n        ctxt.update_store()\r\n        try:\r\n            rval = self.proxy.dispatch(ctxt, version, method, namespace,\r\n                                       **args)\r\n            ctxt.reply2(rval, self.connection_pool)\r\n        except rpc_common.ClientException as e:\r\n            LOG.debug(_('Expected exception during message handling (%s)') %\r\n                      e._exc_info[1])\r\n            ctxt.reply(None, e._exc_info,\r\n                       connection_pool=self.connection_pool,\r\n                       log_failure=False)\r\n        except Exception:\r\n            # sys.exc_info() is deleted by LOG.exception().\r\n            exc_info = sys.exc_info()\r\n            LOG.error(_('Exception during message handling'),\r\n                      exc_info=exc_info)\r\n            ctxt.reply(None, exc_info, connection_pool=self.connection_pool)\r\n\r\n\r\n<\/pre>\n<p>A more robust fix should be reconstruct results based on sequence number and total number of messages. then fix alwasy need to handle timeout etc.<\/p>\n<p>Kilo has already fixed this issue by returning result in one message.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>Probably a few people are still running Havana. A critical messaging related issue is message reordering. Nova RPC sending back results in multiple messages which may go through different connections and reach a node running master queue owner through different routes in Rabbitmq cluster. RPC results may become corrupted and incomplete. You will see at [&hellip;]<\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":[],"categories":[3],"tags":[],"_links":{"self":[{"href":"http:\/\/iaastalk.com\/index.php?rest_route=\/wp\/v2\/posts\/39"}],"collection":[{"href":"http:\/\/iaastalk.com\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"http:\/\/iaastalk.com\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"http:\/\/iaastalk.com\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"http:\/\/iaastalk.com\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=39"}],"version-history":[{"count":1,"href":"http:\/\/iaastalk.com\/index.php?rest_route=\/wp\/v2\/posts\/39\/revisions"}],"predecessor-version":[{"id":40,"href":"http:\/\/iaastalk.com\/index.php?rest_route=\/wp\/v2\/posts\/39\/revisions\/40"}],"wp:attachment":[{"href":"http:\/\/iaastalk.com\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=39"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"http:\/\/iaastalk.com\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=39"},{"taxonomy":"post_tag","embeddable":true,"href":"http:\/\/iaastalk.com\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=39"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}