Probably a few people are still running Havana. A critical messaging related issue is message reordering.
Nova RPC sending back results in multiple messages which may go through different connections and reach a node running master queue owner through different routes in Rabbitmq cluster.
RPC results may become corrupted and incomplete.
You will see at least two messages will be sent out for a single RPC result. Each message was sent using a connection grabbed from pool on the fly.
nova/openstack/common/rpc/amqp.py
def _process_data(self, ctxt, version, method, namespace, args):
ctxt.update_store()
try:
rval = self.proxy.dispatch(ctxt, version, method, namespace,
**args)
# Check if the result was a generator
if inspect.isgenerator(rval):
for x in rval:
ctxt.reply(x, None, connection_pool=self.connection_pool)
else:
ctxt.reply(rval, None, connection_pool=self.connection_pool)
# This final None tells multicall that it is done.
ctxt.reply(ending=True, connection_pool=self.connection_pool)
class RpcContext(rpc_common.CommonRpcContext):
def reply(self, reply=None, failure=None, ending=False,
connection_pool=None, log_failure=True):
if self.msg_id:
msg_reply(self.conf, self.msg_id, self.reply_q, connection_pool,
reply, failure, ending, log_failure)
if ending:
self.msg_id = None
def msg_reply(conf, msg_id, reply_q, connection_pool, reply=None,
failure=None, ending=False, log_failure=True):
....
with ConnectionContext(conf, connection_pool) as conn:
....
if reply_q:
msg['_msg_id'] = msg_id
conn.direct_send(reply_q, rpc_common.serialize_msg(msg))
else:
conn.direct_send(msg_id, rpc_common.serialize_msg(msg))
A quick fix is to force all messages for the same reply going through same connection.
class RpcContext(rpc_common.CommonRpcContext):
def reply2(self, reply=None, connection_pool=None):
if self.msg_id:
msg_reply2(self.conf,
self.msg_id,
self.reply_q,
connection_pool,
reply)
self.msg_id = None
def msg_reply2(conf, msg_id, reply_q,
connection_pool, reply=None):
def reply_msg(content, ending, conn):
msg = {'result': content, 'failure': None}
if ending:
msg['ending'] = True
_add_unique_id(msg)
if reply_q:
msg['_msg_id'] = msg_id
conn.direct_send(reply_q, rpc_common.serialize_msg(msg))
else:
conn.direct_send(msg_id, rpc_common.serialize_msg(msg))
with ConnectionContext(conf, connection_pool) as conn:
# Check if the result was a generator
if inspect.isgenerator(reply):
for x in reply:
reply_msg(x, False, conn)
else:
reply_msg(reply, False, conn)
reply_msg(None, True, conn)
class ProxyCallback(_ThreadPoolWithWait):
def _process_data(self, ctxt, version, method, namespace, args):
"""Process a message in a new thread.
If the proxy object we have has a dispatch method
(see rpc.dispatcher.RpcDispatcher), pass it the version,
method, and args and let it dispatch as appropriate. If not, use
the old behavior of magically calling the specified method on the
proxy we have here.
"""
ctxt.update_store()
try:
rval = self.proxy.dispatch(ctxt, version, method, namespace,
**args)
ctxt.reply2(rval, self.connection_pool)
except rpc_common.ClientException as e:
LOG.debug(_('Expected exception during message handling (%s)') %
e._exc_info[1])
ctxt.reply(None, e._exc_info,
connection_pool=self.connection_pool,
log_failure=False)
except Exception:
# sys.exc_info() is deleted by LOG.exception().
exc_info = sys.exc_info()
LOG.error(_('Exception during message handling'),
exc_info=exc_info)
ctxt.reply(None, exc_info, connection_pool=self.connection_pool)
A more robust fix should be reconstruct results based on sequence number and total number of messages. then fix alwasy need to handle timeout etc.
Kilo has already fixed this issue by returning result in one message.