From f41b028d94429038108c298685660af4bf21c925 Mon Sep 17 00:00:00 2001 From: Griatch Date: Sun, 20 Sep 2015 20:01:27 +0200 Subject: [PATCH] Compressed AMP commands and now handles multi-batch sending as part of a single AMP command Argument object. --- evennia/server/amp.py | 213 +++++++++++++++++++++--------------------- 1 file changed, 105 insertions(+), 108 deletions(-) diff --git a/evennia/server/amp.py b/evennia/server/amp.py index f5b20c8ce..76f03aca4 100644 --- a/evennia/server/amp.py +++ b/evennia/server/amp.py @@ -17,9 +17,11 @@ Server - (AMP server) Handles all mud operations. The server holds its own list """ # imports needed on both server and portal side -import os +import os, sys from time import time from collections import defaultdict +from itertools import count +from cStringIO import StringIO try: import cPickle as pickle except ImportError: @@ -41,7 +43,7 @@ SSHUTD = chr(7) # server shutdown SSYNC = chr(8) # server session sync SCONN = chr(9) # server creating new connection (for irc/imc2 bots etc) PCONNSYNC = chr(10) # portal post-syncing a session -AMP_MAXLEN = 65535 # max allowed data length in AMP protocol (cannot be changed) +AMP_MAXLEN = amp.MAX_VALUE_LENGTH # max allowed data length in AMP protocol (cannot be changed) BATCH_RATE = 250 # max commands/sec before switching to batch-sending BATCH_TIMEOUT = 0.5 # how often to poll to empty batch queue, in seconds @@ -50,6 +52,14 @@ BATCH_TIMEOUT = 0.5 # how often to poll to empty batch queue, in seconds _SENDBATCH = defaultdict(list) _MSGBUFFER = defaultdict(list) +import zlib +#_ZLIB_FLUSH = zlib.Z_SYNC_FLUSH +#_ZLIB_COMP = zlib.compressobj(9) +#_ZLIB_DECOMP = zlib.decompressobj() +#_ZLIB_COMPRESS = lambda data: _ZLIB_COMP.compress(data) + _ZLIB_COMP.flush(_ZLIB_FLUSH) +#_ZLIB_DECOMPRESS = lambda data: _ZLIB_DECOMP.decompress(data) + + def get_restart_mode(restart_file): """ Parse the server/portal restart status @@ -189,18 +199,62 @@ class AmpClientFactory(protocol.ReconnectingClientFactory): # AMP Communication Command types +class Compressed(amp.String): + """ + This is an Argument that both handles too-long sends as well as + uses zlib for compression across the wire. Much of this is + borrowed from ~glyph/+junk/amphacks/mediumbox. + + """ + + def fromBox(self, name, strings, objects, proto): + """ + Converts from box representation to python. + """ + value = StringIO() + value.write(strings.get(name)) + for counter in count(2): + # count from 2 upwards + chunk = strings.get("%s.%d" % (name, counter)) + if chunk is None: + break + value.write(chunk) + objects[name] = value.getvalue() + + def toBox(self, name, strings, objects, proto): + """ + Convert from data to box. + """ + value = StringIO(objects[name]) + strings[name] = value.read(AMP_MAXLEN) + for counter in count(2): + chunk = value.read(AMP_MAXLEN) + if not chunk: + break + strings["%s.%d" % (name, counter)] = chunk + + def toString(self, inObject): + """ + Convert to send on the wire. + """ + return zlib.compress(inObject, 9) + + def fromString(self, inString): + """ + Convert from the wire to Python. + """ + return zlib.decompress(inString) + + class MsgPortal2Server(amp.Command): """ Message Portal -> Server """ key = "MsgPortal2Server" - arguments = [('hashid', amp.String()), - ('data', amp.String()), - ('ipart', amp.Integer()), - ('nparts', amp.Integer())] + arguments = [('data', Compressed())] errors = [(Exception, 'EXCEPTION')] - response = [] + response = [('timing', amp.String())] class MsgServer2Portal(amp.Command): @@ -209,12 +263,10 @@ class MsgServer2Portal(amp.Command): """ key = "MsgServer2Portal" - arguments = [('hashid', amp.String()), - ('data', amp.String()), - ('ipart', amp.Integer()), - ('nparts', amp.Integer())] + arguments = [('data', Compressed())] errors = [(Exception, 'EXCEPTION')] - response = [] + #response = [] + response = [('timing', amp.String())] class ServerAdmin(amp.Command): @@ -226,12 +278,10 @@ class ServerAdmin(amp.Command): """ key = "ServerAdmin" - arguments = [('hashid', amp.String()), - ('data', amp.String()), - ('ipart', amp.Integer()), - ('nparts', amp.Integer())] + arguments = [('data', Compressed())] errors = [(Exception, 'EXCEPTION')] - response = [] + #response = [] + response = [('timing', amp.String())] class PortalAdmin(amp.Command): @@ -243,12 +293,10 @@ class PortalAdmin(amp.Command): """ key = "PortalAdmin" - arguments = [('hashid', amp.String()), - ('data', amp.String()), - ('ipart', amp.Integer()), - ('nparts', amp.Integer())] + arguments = [('data', Compressed())] errors = [(Exception, 'EXCEPTION')] - response = [] + #response = [()] + response = [('timing', amp.String())] class FunctionCall(amp.Command): @@ -338,6 +386,8 @@ class AMPProtocol(amp.AMP): e.trap(Exception) print "AMP Error for %(info)s: %(e)s" % {'info': info, 'e': e.getErrorMessage()} + def callback(self, ret, info): + print "AMP return timing (%s): %f" % (info, time() - float(ret["timing"])) def batch_send(self, command, sessid, **kwargs): """ @@ -355,16 +405,6 @@ class AMPProtocol(amp.AMP): as batch parts get sent (or fails). """ - now = time() - batch = dumps([(sessid, kwargs)]) - hashid = "%s-%s" % (id(batch), now) - deferreds = [self.callRemote(command, - hashid=hashid, - data=batch, - ipart=0, - nparts=1).addErrback(self.errback, command.key)] - return deferreds - global _SENDBATCH @@ -399,71 +439,40 @@ class AMPProtocol(amp.AMP): self.send_task = reactor.callLater(BATCH_TIMEOUT, self.batch_send, None, None, force_direct=True) if self.send_mode or force_direct: + sendsize = 0 + sendcount = 0 for command, cmdlist in _SENDBATCH.items(): batch = dumps(cmdlist) # batch is a list of (sessid,kwargs) tuples. # We pack the data in a string-form pickle. + sendsize += sys.getsizeof(batch) + sendcount += 1 del _SENDBATCH[command] - # split in parts small enough to fit in AMP MAXLEN - to_send = [batch[i:i+AMP_MAXLEN] for i in range(0, len(batch), AMP_MAXLEN)] - nparts = len(to_send) - # tag this batch - hashid = "%s-%s" % (id(batch), now) - if nparts == 1: - deferreds = [self.callRemote(command, - hashid=hashid, - data=batch, - ipart=0, - nparts=1).addErrback(self.errback, command.key)] - else: - #print "sending in %s parts." % nparts - deferreds = [] - for ipart, part in enumerate(to_send): - deferred = self.callRemote(command, - hashid=hashid, - data=part, - ipart=ipart, - nparts=nparts) - deferred.addErrback(self.errback, "%s part %i/%i" % (command.key, ipart, nparts)) - deferreds.append(deferred) - return deferreds + deferred = self.callRemote(command, + data=batch).addErrback(self.errback, command.key + " " + repr(cmdlist)) + print " sent %i batches: %fkB." % (sendcount, sendsize / 1000.0) + return deferred - def batch_recv(self, hashid, data, ipart, nparts): + def batch_recv(self, data): """ This will receive and unpack data sent as a batch. This both handles too-long data as well as batch-sending very fast- arriving commands. Args: - hashid (str): Unique hash id representing this batch in - the cache buffer. data (str): Data coming over the wire. - ipart (int): Index of this part of the batch (ipart/nparts) - nparts (int): Total number of parts in this batch. - Returns: data (str or list): The received data. """ - global _MSGBUFFER - if nparts == 1: - # most common case - return loads(data) - else: - if ipart < nparts-1: - # not yet complete - _MSGBUFFER[hashid].append(data) - return [] - else: - # all parts in place - deserialize it - return loads("".join(_MSGBUFFER.pop(hashid)) + data) - + # most common case + return loads(data) # Message definition + helper methods to call/create each message type # Portal -> Server Msg - def amp_msg_portal2server(self, hashid, data, ipart, nparts): + def amp_msg_portal2server(self, data): """ Relays message to server. This method is executed on the Server. @@ -474,13 +483,10 @@ class AMPProtocol(amp.AMP): before continuing. Args: - hashid (str): Unique hash identifying this data batch. data (str): Data to send (often a part of a batch) - ipart (int): Index of this part of the batch. - nparts (int): Total number of batches. """ - batch = self.batch_recv(hashid, data, ipart, nparts) + batch = self.batch_recv(data) for (sessid, kwargs) in batch: #print "msg portal -> server (server side):", sessid, msg, loads(ret["data"]) from evennia.server.profiling.timetrace import timetrace @@ -488,7 +494,7 @@ class AMPProtocol(amp.AMP): self.factory.server.sessions.data_in(sessid, text=kwargs["msg"], data=kwargs["data"]) - return {} + return {"timing":"%f" % time()} MsgPortal2Server.responder(amp_msg_portal2server) def call_remote_MsgPortal2Server(self, sessid, msg, data=""): @@ -509,11 +515,11 @@ class AMPProtocol(amp.AMP): msg = timetrace(msg, "AMP.call_remote_MsgPortal2Server") return self.batch_send(MsgPortal2Server, sessid, msg=msg if msg is not None else "", - data=data, force_send=True) + data=data) # Server -> Portal message - def amp_msg_server2portal(self, hashid, data, ipart, nparts): + def amp_msg_server2portal(self, data): """ Relays message to Portal. This method is executed on the Portal. @@ -523,13 +529,9 @@ class AMPProtocol(amp.AMP): before continuing. Args: - hashid (str): Unique hash identifying this data batch. data (str): Data to send (often a part of a batch) - ipart (int): Index of this part of the batch. - nparts (int): Total number of batches. - """ - batch = self.batch_recv(hashid, data, ipart, nparts) + batch = self.batch_recv(data) for (sessid, kwargs) in batch: #print "msg server->portal (portal side):", sessid, ret["text"], loads(ret["data"]) from evennia.server.profiling.timetrace import timetrace @@ -537,10 +539,11 @@ class AMPProtocol(amp.AMP): self.factory.portal.sessions.data_out(sessid, text=kwargs["msg"], data=kwargs["data"]) - return {} + #return {} + return {"timing":"%f" % time()} MsgServer2Portal.responder(amp_msg_server2portal) - def amp_batch_server2portal(self, hashid, data, ipart, nparts): + def amp_batch_server2portal(self, data): """ Relays batch data to Portal. This method is executed on the Portal. @@ -550,13 +553,10 @@ class AMPProtocol(amp.AMP): before continuing. Args: - hashid (str): Unique hash identifying this data batch. data (str): Data to send (often a part of a batch) - ipart (int): Index of this part of the batch. - nparts (int): Total number of batches. """ - batch = self.batch_recv(hashid, data, ipart, nparts) + batch = self.batch_recv(data) if batch is not None: for (sessid, kwargs) in batch: from evennia.server.profiling.timetrace import timetrace @@ -564,16 +564,17 @@ class AMPProtocol(amp.AMP): self.factory.portal.sessions.data_out(sessid, text=kwargs["msg"], **kwargs["data"]) - return {} + #return {} + return {"timing":"%f" % time()} MsgServer2Portal.responder(amp_batch_server2portal) - def call_remote_MsgServer2Portal(self, sessid, msg, data=""): + def call_remote_MsgServer2Portal(self, sessid, msg="", data=""): """ Send Message - access method called by the Server and executed on the Server. Args: sessid (int): Unique Session id. - msg (str): Message to send over the wire. + msg (str, optional): Message to send over the wire. data (str, optional): Extra data. """ @@ -583,7 +584,7 @@ class AMPProtocol(amp.AMP): return self.batch_send(MsgServer2Portal, sessid, msg=msg, data=data) # Server administration from the Portal side - def amp_server_admin(self, hashid, data, ipart, nparts): + def amp_server_admin(self, data): """ This allows the portal to perform admin operations on the server. This is executed on the Server. @@ -594,14 +595,11 @@ class AMPProtocol(amp.AMP): before continuing. Args: - hashid (str): Unique hash identifying this data batch. data (str): Data to send (often a part of a batch) - ipart (int): Index of this part of the batch. - nparts (int): Total number of batches. """ #print "serveradmin (server side):", hashid, ipart, nparts - batch = self.batch_recv(hashid, data, ipart, nparts) + batch = self.batch_recv(data) for (sessid, kwargs) in batch: operation = kwargs["operation"] @@ -630,7 +628,8 @@ class AMPProtocol(amp.AMP): server_sessionhandler.portal_sessions_sync(data) else: raise Exception("operation %(op)s not recognized." % {'op': operation}) - return {} + #return {} + return {"timing":"%f" % time()} ServerAdmin.responder(amp_server_admin) def call_remote_ServerAdmin(self, sessid, operation="", data=""): @@ -652,7 +651,7 @@ class AMPProtocol(amp.AMP): # Portal administraton from the Server side - def amp_portal_admin(self, hashid, data, ipart, nparts): + def amp_portal_admin(self, data): """ This allows the server to perform admin operations on the portal. This is executed on the Portal. @@ -663,14 +662,11 @@ class AMPProtocol(amp.AMP): before continuing. Args: - hashid (str): Unique hash identifying this data batch. data (str): Data to send (often a part of a batch) - ipart (int): Index of this part of the batch. - nparts (int): Total number of batches. """ #print "portaladmin (portal side):", sessid, ord(operation), data - batch = self.batch_recv(hashid, data, ipart, nparts) + batch = self.batch_recv(data) for (sessid, kwargs) in batch: operation = kwargs["operation"] data = kwargs["data"] @@ -704,7 +700,8 @@ class AMPProtocol(amp.AMP): else: raise Exception("operation %(op)s not recognized." % {'op': operation}) - return {} + #return {} + return {"timing":"%f" % time()} PortalAdmin.responder(amp_portal_admin) def call_remote_PortalAdmin(self, sessid, operation="", data=""):