Probably a few people are still running Havana. A critical messaging related issue is message reordering.
Nova RPC sending back results in multiple messages which may go through different connections and reach a node running master queue owner through different routes in Rabbitmq cluster.
RPC results may become corrupted and incomplete.
You will see at least two messages will be sent out for a single RPC result. Each message was sent using a connection grabbed from pool on the fly.
nova/openstack/common/rpc/amqp.py
def _process_data(self, ctxt, version, method, namespace, args): ctxt.update_store() try: rval = self.proxy.dispatch(ctxt, version, method, namespace, **args) # Check if the result was a generator if inspect.isgenerator(rval): for x in rval: ctxt.reply(x, None, connection_pool=self.connection_pool) else: ctxt.reply(rval, None, connection_pool=self.connection_pool) # This final None tells multicall that it is done. ctxt.reply(ending=True, connection_pool=self.connection_pool) class RpcContext(rpc_common.CommonRpcContext): def reply(self, reply=None, failure=None, ending=False, connection_pool=None, log_failure=True): if self.msg_id: msg_reply(self.conf, self.msg_id, self.reply_q, connection_pool, reply, failure, ending, log_failure) if ending: self.msg_id = None def msg_reply(conf, msg_id, reply_q, connection_pool, reply=None, failure=None, ending=False, log_failure=True): .... with ConnectionContext(conf, connection_pool) as conn: .... if reply_q: msg['_msg_id'] = msg_id conn.direct_send(reply_q, rpc_common.serialize_msg(msg)) else: conn.direct_send(msg_id, rpc_common.serialize_msg(msg))
A quick fix is to force all messages for the same reply going through same connection.
class RpcContext(rpc_common.CommonRpcContext): def reply2(self, reply=None, connection_pool=None): if self.msg_id: msg_reply2(self.conf, self.msg_id, self.reply_q, connection_pool, reply) self.msg_id = None def msg_reply2(conf, msg_id, reply_q, connection_pool, reply=None): def reply_msg(content, ending, conn): msg = {'result': content, 'failure': None} if ending: msg['ending'] = True _add_unique_id(msg) if reply_q: msg['_msg_id'] = msg_id conn.direct_send(reply_q, rpc_common.serialize_msg(msg)) else: conn.direct_send(msg_id, rpc_common.serialize_msg(msg)) with ConnectionContext(conf, connection_pool) as conn: # Check if the result was a generator if inspect.isgenerator(reply): for x in reply: reply_msg(x, False, conn) else: reply_msg(reply, False, conn) reply_msg(None, True, conn) class ProxyCallback(_ThreadPoolWithWait): def _process_data(self, ctxt, version, method, namespace, args): """Process a message in a new thread. If the proxy object we have has a dispatch method (see rpc.dispatcher.RpcDispatcher), pass it the version, method, and args and let it dispatch as appropriate. If not, use the old behavior of magically calling the specified method on the proxy we have here. """ ctxt.update_store() try: rval = self.proxy.dispatch(ctxt, version, method, namespace, **args) ctxt.reply2(rval, self.connection_pool) except rpc_common.ClientException as e: LOG.debug(_('Expected exception during message handling (%s)') % e._exc_info[1]) ctxt.reply(None, e._exc_info, connection_pool=self.connection_pool, log_failure=False) except Exception: # sys.exc_info() is deleted by LOG.exception(). exc_info = sys.exc_info() LOG.error(_('Exception during message handling'), exc_info=exc_info) ctxt.reply(None, exc_info, connection_pool=self.connection_pool)
A more robust fix should be reconstruct results based on sequence number and total number of messages. then fix alwasy need to handle timeout etc.
Kilo has already fixed this issue by returning result in one message.