From 86c555320891de1f093c2b89e0500b12dbc6af66 Mon Sep 17 00:00:00 2001 From: Griatch Date: Sat, 20 Oct 2012 15:40:34 +0200 Subject: [PATCH] Added msg- and data buffering to AMP protocol. This handles the rare cases when the AMP limit of 65535bytes/message becomes an issue (such as when viewing long lists or @py output. Test with @py self.msg("-"*65536). Resolves Issue 294. --- src/server/amp.py | 99 ++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 89 insertions(+), 10 deletions(-) diff --git a/src/server/amp.py b/src/server/amp.py index bcd994ea8..74218de07 100644 --- a/src/server/amp.py +++ b/src/server/amp.py @@ -16,6 +16,8 @@ Server - (AMP server) Handles all mud operations. The server holds its own list """ # imports needed on both server and portal side +import os +from collections import defaultdict try: import cPickle as pickle except ImportError: @@ -46,6 +48,8 @@ SDISCONNALL = chr(6) # server session disconnect all SSHUTD = chr(7) # server shutdown SSYNC = chr(8) # server session sync +MAXLEN = 65535 # max allowed data length in AMP protocol + def get_restart_mode(restart_file): """ Parse the server/portal restart status @@ -142,6 +146,8 @@ class MsgPortal2Server(amp.Command): """ arguments = [('sessid', amp.Integer()), ('msg', amp.String()), + ('ipart', amp.Integer()), + ('nparts', amp.Integer()), ('data', amp.String())] errors = [(Exception, 'EXCEPTION')] response = [] @@ -152,6 +158,8 @@ class MsgServer2Portal(amp.Command): """ arguments = [('sessid', amp.Integer()), ('msg', amp.String()), + ('ipart', amp.Integer()), + ('nparts', amp.Integer()), ('data', amp.String())] errors = [(Exception, 'EXCEPTION')] response = [] @@ -221,6 +229,10 @@ class FunctionCall(amp.Command): dumps = lambda data: to_str(pickle.dumps(data, pickle.HIGHEST_PROTOCOL)) loads = lambda data: pickle.loads(to_str(data)) +# multipart message store + +MSGBUFFER = defaultdict(list) + #------------------------------------------------------------ # Core AMP protocol for communication Server <-> Portal #------------------------------------------------------------ @@ -262,16 +274,58 @@ class AMPProtocol(amp.AMP): e.trap(Exception) print "AMP Error for %(info)s: %(e)s" % {'info': info, 'e': e.getErrorMessage()} + def send_split_msg(self, sessid, msg, data, command): + """ + This helper method splits the sending of a msg into multiple parts + with a maxlength of MAXLEN. This is to avoid repetition in the two + msg-sending commands. When calling this, the maximum length has + already been exceeded. + Inputs: + msg - string + data - data dictionary + command - one of MsgPortal2Server or MsgServer2Portal commands + """ + # split the strings into acceptable chunks + datastr = dumps(data) + nmsg, ndata = len(msg), len(datastr) + if nmsg > MAXLEN or ndata > MAXLEN: + msglist = [msg[i:i+MAXLEN] for i in range(0, len(msg), MAXLEN)] + datalist = [datastr[i:i+MAXLEN] for i in range(0, len(datastr), MAXLEN)] + nmsglist, ndatalist = len(msglist), len(datalist) + if ndatalist < nmsglist: + datalist.extend("" for i in range(nmsglist-ndatalist)) + if nmsglist < ndatalist: + msglist.extend("" for i in range(ndatalist-nmsglist)) + # we have split the msg/data into right-size chunks. Now we send it in sequence + return [self.callRemote(command, + sessid=sessid, + msg=to_str(msg), + ipart=icall, + nparts=nmsglist, + data=dumps(data)).addErrback(self.errback, "OOBServer2Portal") + for icall, (msg, data) in enumerate(zip(msglist, datalist))] # Message definition + helper methods to call/create each message type # Portal -> Server Msg - def amp_msg_portal2server(self, sessid, msg, data): + def amp_msg_portal2server(self, sessid, msg, ipart, nparts, data): """ Relays message to server. This method is executed on the Server. """ #print "msg portal -> server (server side):", sessid, msg + global MSGBUFFER + if nparts > 1: + # a multipart message + if len(MSGBUFFER[sessid]) != nparts: + # we don't have all parts yet. Wait. + return {} + else: + # we have all parts. Put it all together in the right order. + msg = "".join(t[1] for t in sorted(MSGBUFFER[sessid], key=lambda o:o[0])) + data = "".join(t[2] for t in sorted(MSGBUFFER[sessid], key=lambda o:o[0])) + del MSGBUFFER[sessid] + # call session hook with the data self.factory.server.sessions.data_in(sessid, msg, loads(data)) return {} MsgPortal2Server.responder(amp_msg_portal2server) @@ -281,18 +335,37 @@ class AMPProtocol(amp.AMP): Access method called by the Portal and executed on the Portal. """ #print "msg portal->server (portal side):", sessid, msg - return self.callRemote(MsgPortal2Server, - sessid=sessid, - msg=msg, - data=dumps(data)).addErrback(self.errback, "MsgPortal2Server") + try: + return self.callRemote(MsgPortal2Server, + sessid=sessid, + msg=msg, + ipart=0, + nparts=1, + data=dumps(data)).addErrback(self.errback, "MsgPortal2Server") + except amp.TooLong: + # the msg (or data) was too long for AMP to send. We need to send in blocks. + return self.send_split_msg(sessid, msg, data, MsgPortal2Server) # Server -> Portal message - def amp_msg_server2portal(self, sessid, msg, data): + def amp_msg_server2portal(self, sessid, msg, ipart, nparts, data): """ Relays message to Portal. This method is executed on the Portal. """ #print "msg server->portal (portal side):", sessid, msg + global MSGBUFFER + if nparts > 1: + # a multipart message + MSGBUFFER[sessid].append((ipart, msg, data)) + if len(MSGBUFFER[sessid]) != nparts: + # we don't have all parts yet. Wait. + return {} + else: + # we have all parts. Put it all together in the right order. + msg = "".join(t[1] for t in sorted(MSGBUFFER[sessid], key=lambda o:o[0])) + data = "".join(t[2] for t in sorted(MSGBUFFER[sessid], key=lambda o:o[0])) + del MSGBUFFER[sessid] + # call session hook with the data self.factory.portal.sessions.data_out(sessid, msg, loads(data)) return {} MsgServer2Portal.responder(amp_msg_server2portal) @@ -302,10 +375,16 @@ class AMPProtocol(amp.AMP): Access method called by the Server and executed on the Server. """ #print "msg server->portal (server side):", sessid, msg, data - return self.callRemote(MsgServer2Portal, - sessid=sessid, - msg=to_str(msg), - data=dumps(data)).addErrback(self.errback, "OOBServer2Portal") + try: + return self.callRemote(MsgServer2Portal, + sessid=sessid, + msg=to_str(msg), + ipart=0, + nparts=1, + data=dumps(data)).addErrback(self.errback, "OOBServer2Portal") + except amp.TooLong: + # the msg (or data) was too long for AMP to send. We need to send in blocks. + return self.send_split_msg(sessid, msg, data, MsgServer2Portal) # OOB Portal -> Server