Compressed AMP commands and now handles multi-batch sending as part of a single AMP command Argument object.

This commit is contained in:
Griatch 2015-09-20 20:01:27 +02:00
parent 018250e4e5
commit f41b028d94

View file

@ -17,9 +17,11 @@ Server - (AMP server) Handles all mud operations. The server holds its own list
""" """
# imports needed on both server and portal side # imports needed on both server and portal side
import os import os, sys
from time import time from time import time
from collections import defaultdict from collections import defaultdict
from itertools import count
from cStringIO import StringIO
try: try:
import cPickle as pickle import cPickle as pickle
except ImportError: except ImportError:
@ -41,7 +43,7 @@ SSHUTD = chr(7) # server shutdown
SSYNC = chr(8) # server session sync SSYNC = chr(8) # server session sync
SCONN = chr(9) # server creating new connection (for irc/imc2 bots etc) SCONN = chr(9) # server creating new connection (for irc/imc2 bots etc)
PCONNSYNC = chr(10) # portal post-syncing a session PCONNSYNC = chr(10) # portal post-syncing a session
AMP_MAXLEN = 65535 # max allowed data length in AMP protocol (cannot be changed) AMP_MAXLEN = amp.MAX_VALUE_LENGTH # max allowed data length in AMP protocol (cannot be changed)
BATCH_RATE = 250 # max commands/sec before switching to batch-sending BATCH_RATE = 250 # max commands/sec before switching to batch-sending
BATCH_TIMEOUT = 0.5 # how often to poll to empty batch queue, in seconds BATCH_TIMEOUT = 0.5 # how often to poll to empty batch queue, in seconds
@ -50,6 +52,14 @@ BATCH_TIMEOUT = 0.5 # how often to poll to empty batch queue, in seconds
_SENDBATCH = defaultdict(list) _SENDBATCH = defaultdict(list)
_MSGBUFFER = defaultdict(list) _MSGBUFFER = defaultdict(list)
import zlib
#_ZLIB_FLUSH = zlib.Z_SYNC_FLUSH
#_ZLIB_COMP = zlib.compressobj(9)
#_ZLIB_DECOMP = zlib.decompressobj()
#_ZLIB_COMPRESS = lambda data: _ZLIB_COMP.compress(data) + _ZLIB_COMP.flush(_ZLIB_FLUSH)
#_ZLIB_DECOMPRESS = lambda data: _ZLIB_DECOMP.decompress(data)
def get_restart_mode(restart_file): def get_restart_mode(restart_file):
""" """
Parse the server/portal restart status Parse the server/portal restart status
@ -189,18 +199,62 @@ class AmpClientFactory(protocol.ReconnectingClientFactory):
# AMP Communication Command types # AMP Communication Command types
class Compressed(amp.String):
"""
This is an Argument that both handles too-long sends as well as
uses zlib for compression across the wire. Much of this is
borrowed from ~glyph/+junk/amphacks/mediumbox.
"""
def fromBox(self, name, strings, objects, proto):
"""
Converts from box representation to python.
"""
value = StringIO()
value.write(strings.get(name))
for counter in count(2):
# count from 2 upwards
chunk = strings.get("%s.%d" % (name, counter))
if chunk is None:
break
value.write(chunk)
objects[name] = value.getvalue()
def toBox(self, name, strings, objects, proto):
"""
Convert from data to box.
"""
value = StringIO(objects[name])
strings[name] = value.read(AMP_MAXLEN)
for counter in count(2):
chunk = value.read(AMP_MAXLEN)
if not chunk:
break
strings["%s.%d" % (name, counter)] = chunk
def toString(self, inObject):
"""
Convert to send on the wire.
"""
return zlib.compress(inObject, 9)
def fromString(self, inString):
"""
Convert from the wire to Python.
"""
return zlib.decompress(inString)
class MsgPortal2Server(amp.Command): class MsgPortal2Server(amp.Command):
""" """
Message Portal -> Server Message Portal -> Server
""" """
key = "MsgPortal2Server" key = "MsgPortal2Server"
arguments = [('hashid', amp.String()), arguments = [('data', Compressed())]
('data', amp.String()),
('ipart', amp.Integer()),
('nparts', amp.Integer())]
errors = [(Exception, 'EXCEPTION')] errors = [(Exception, 'EXCEPTION')]
response = [] response = [('timing', amp.String())]
class MsgServer2Portal(amp.Command): class MsgServer2Portal(amp.Command):
@ -209,12 +263,10 @@ class MsgServer2Portal(amp.Command):
""" """
key = "MsgServer2Portal" key = "MsgServer2Portal"
arguments = [('hashid', amp.String()), arguments = [('data', Compressed())]
('data', amp.String()),
('ipart', amp.Integer()),
('nparts', amp.Integer())]
errors = [(Exception, 'EXCEPTION')] errors = [(Exception, 'EXCEPTION')]
response = [] #response = []
response = [('timing', amp.String())]
class ServerAdmin(amp.Command): class ServerAdmin(amp.Command):
@ -226,12 +278,10 @@ class ServerAdmin(amp.Command):
""" """
key = "ServerAdmin" key = "ServerAdmin"
arguments = [('hashid', amp.String()), arguments = [('data', Compressed())]
('data', amp.String()),
('ipart', amp.Integer()),
('nparts', amp.Integer())]
errors = [(Exception, 'EXCEPTION')] errors = [(Exception, 'EXCEPTION')]
response = [] #response = []
response = [('timing', amp.String())]
class PortalAdmin(amp.Command): class PortalAdmin(amp.Command):
@ -243,12 +293,10 @@ class PortalAdmin(amp.Command):
""" """
key = "PortalAdmin" key = "PortalAdmin"
arguments = [('hashid', amp.String()), arguments = [('data', Compressed())]
('data', amp.String()),
('ipart', amp.Integer()),
('nparts', amp.Integer())]
errors = [(Exception, 'EXCEPTION')] errors = [(Exception, 'EXCEPTION')]
response = [] #response = [()]
response = [('timing', amp.String())]
class FunctionCall(amp.Command): class FunctionCall(amp.Command):
@ -338,6 +386,8 @@ class AMPProtocol(amp.AMP):
e.trap(Exception) e.trap(Exception)
print "AMP Error for %(info)s: %(e)s" % {'info': info, print "AMP Error for %(info)s: %(e)s" % {'info': info,
'e': e.getErrorMessage()} 'e': e.getErrorMessage()}
def callback(self, ret, info):
print "AMP return timing (%s): %f" % (info, time() - float(ret["timing"]))
def batch_send(self, command, sessid, **kwargs): def batch_send(self, command, sessid, **kwargs):
""" """
@ -355,16 +405,6 @@ class AMPProtocol(amp.AMP):
as batch parts get sent (or fails). as batch parts get sent (or fails).
""" """
now = time()
batch = dumps([(sessid, kwargs)])
hashid = "%s-%s" % (id(batch), now)
deferreds = [self.callRemote(command,
hashid=hashid,
data=batch,
ipart=0,
nparts=1).addErrback(self.errback, command.key)]
return deferreds
global _SENDBATCH global _SENDBATCH
@ -399,71 +439,40 @@ class AMPProtocol(amp.AMP):
self.send_task = reactor.callLater(BATCH_TIMEOUT, self.batch_send, None, None, force_direct=True) self.send_task = reactor.callLater(BATCH_TIMEOUT, self.batch_send, None, None, force_direct=True)
if self.send_mode or force_direct: if self.send_mode or force_direct:
sendsize = 0
sendcount = 0
for command, cmdlist in _SENDBATCH.items(): for command, cmdlist in _SENDBATCH.items():
batch = dumps(cmdlist) # batch is a list of (sessid,kwargs) tuples. batch = dumps(cmdlist) # batch is a list of (sessid,kwargs) tuples.
# We pack the data in a string-form pickle. # We pack the data in a string-form pickle.
sendsize += sys.getsizeof(batch)
sendcount += 1
del _SENDBATCH[command] del _SENDBATCH[command]
# split in parts small enough to fit in AMP MAXLEN
to_send = [batch[i:i+AMP_MAXLEN] for i in range(0, len(batch), AMP_MAXLEN)]
nparts = len(to_send)
# tag this batch
hashid = "%s-%s" % (id(batch), now)
if nparts == 1:
deferreds = [self.callRemote(command,
hashid=hashid,
data=batch,
ipart=0,
nparts=1).addErrback(self.errback, command.key)]
else:
#print "sending in %s parts." % nparts
deferreds = []
for ipart, part in enumerate(to_send):
deferred = self.callRemote(command, deferred = self.callRemote(command,
hashid=hashid, data=batch).addErrback(self.errback, command.key + " " + repr(cmdlist))
data=part, print " sent %i batches: %fkB." % (sendcount, sendsize / 1000.0)
ipart=ipart, return deferred
nparts=nparts)
deferred.addErrback(self.errback, "%s part %i/%i" % (command.key, ipart, nparts))
deferreds.append(deferred)
return deferreds
def batch_recv(self, hashid, data, ipart, nparts): def batch_recv(self, data):
""" """
This will receive and unpack data sent as a batch. This both This will receive and unpack data sent as a batch. This both
handles too-long data as well as batch-sending very fast- handles too-long data as well as batch-sending very fast-
arriving commands. arriving commands.
Args: Args:
hashid (str): Unique hash id representing this batch in
the cache buffer.
data (str): Data coming over the wire. data (str): Data coming over the wire.
ipart (int): Index of this part of the batch (ipart/nparts)
nparts (int): Total number of parts in this batch.
Returns: Returns:
data (str or list): The received data. data (str or list): The received data.
""" """
global _MSGBUFFER
if nparts == 1:
# most common case # most common case
return loads(data) return loads(data)
else:
if ipart < nparts-1:
# not yet complete
_MSGBUFFER[hashid].append(data)
return []
else:
# all parts in place - deserialize it
return loads("".join(_MSGBUFFER.pop(hashid)) + data)
# Message definition + helper methods to call/create each message type # Message definition + helper methods to call/create each message type
# Portal -> Server Msg # Portal -> Server Msg
def amp_msg_portal2server(self, hashid, data, ipart, nparts): def amp_msg_portal2server(self, data):
""" """
Relays message to server. This method is executed on the Relays message to server. This method is executed on the
Server. Server.
@ -474,13 +483,10 @@ class AMPProtocol(amp.AMP):
before continuing. before continuing.
Args: Args:
hashid (str): Unique hash identifying this data batch.
data (str): Data to send (often a part of a batch) data (str): Data to send (often a part of a batch)
ipart (int): Index of this part of the batch.
nparts (int): Total number of batches.
""" """
batch = self.batch_recv(hashid, data, ipart, nparts) batch = self.batch_recv(data)
for (sessid, kwargs) in batch: for (sessid, kwargs) in batch:
#print "msg portal -> server (server side):", sessid, msg, loads(ret["data"]) #print "msg portal -> server (server side):", sessid, msg, loads(ret["data"])
from evennia.server.profiling.timetrace import timetrace from evennia.server.profiling.timetrace import timetrace
@ -488,7 +494,7 @@ class AMPProtocol(amp.AMP):
self.factory.server.sessions.data_in(sessid, self.factory.server.sessions.data_in(sessid,
text=kwargs["msg"], text=kwargs["msg"],
data=kwargs["data"]) data=kwargs["data"])
return {} return {"timing":"%f" % time()}
MsgPortal2Server.responder(amp_msg_portal2server) MsgPortal2Server.responder(amp_msg_portal2server)
def call_remote_MsgPortal2Server(self, sessid, msg, data=""): def call_remote_MsgPortal2Server(self, sessid, msg, data=""):
@ -509,11 +515,11 @@ class AMPProtocol(amp.AMP):
msg = timetrace(msg, "AMP.call_remote_MsgPortal2Server") msg = timetrace(msg, "AMP.call_remote_MsgPortal2Server")
return self.batch_send(MsgPortal2Server, sessid, return self.batch_send(MsgPortal2Server, sessid,
msg=msg if msg is not None else "", msg=msg if msg is not None else "",
data=data, force_send=True) data=data)
# Server -> Portal message # Server -> Portal message
def amp_msg_server2portal(self, hashid, data, ipart, nparts): def amp_msg_server2portal(self, data):
""" """
Relays message to Portal. This method is executed on the Portal. Relays message to Portal. This method is executed on the Portal.
@ -523,13 +529,9 @@ class AMPProtocol(amp.AMP):
before continuing. before continuing.
Args: Args:
hashid (str): Unique hash identifying this data batch.
data (str): Data to send (often a part of a batch) data (str): Data to send (often a part of a batch)
ipart (int): Index of this part of the batch.
nparts (int): Total number of batches.
""" """
batch = self.batch_recv(hashid, data, ipart, nparts) batch = self.batch_recv(data)
for (sessid, kwargs) in batch: for (sessid, kwargs) in batch:
#print "msg server->portal (portal side):", sessid, ret["text"], loads(ret["data"]) #print "msg server->portal (portal side):", sessid, ret["text"], loads(ret["data"])
from evennia.server.profiling.timetrace import timetrace from evennia.server.profiling.timetrace import timetrace
@ -537,10 +539,11 @@ class AMPProtocol(amp.AMP):
self.factory.portal.sessions.data_out(sessid, self.factory.portal.sessions.data_out(sessid,
text=kwargs["msg"], text=kwargs["msg"],
data=kwargs["data"]) data=kwargs["data"])
return {} #return {}
return {"timing":"%f" % time()}
MsgServer2Portal.responder(amp_msg_server2portal) MsgServer2Portal.responder(amp_msg_server2portal)
def amp_batch_server2portal(self, hashid, data, ipart, nparts): def amp_batch_server2portal(self, data):
""" """
Relays batch data to Portal. This method is executed on the Portal. Relays batch data to Portal. This method is executed on the Portal.
@ -550,13 +553,10 @@ class AMPProtocol(amp.AMP):
before continuing. before continuing.
Args: Args:
hashid (str): Unique hash identifying this data batch.
data (str): Data to send (often a part of a batch) data (str): Data to send (often a part of a batch)
ipart (int): Index of this part of the batch.
nparts (int): Total number of batches.
""" """
batch = self.batch_recv(hashid, data, ipart, nparts) batch = self.batch_recv(data)
if batch is not None: if batch is not None:
for (sessid, kwargs) in batch: for (sessid, kwargs) in batch:
from evennia.server.profiling.timetrace import timetrace from evennia.server.profiling.timetrace import timetrace
@ -564,16 +564,17 @@ class AMPProtocol(amp.AMP):
self.factory.portal.sessions.data_out(sessid, self.factory.portal.sessions.data_out(sessid,
text=kwargs["msg"], text=kwargs["msg"],
**kwargs["data"]) **kwargs["data"])
return {} #return {}
return {"timing":"%f" % time()}
MsgServer2Portal.responder(amp_batch_server2portal) MsgServer2Portal.responder(amp_batch_server2portal)
def call_remote_MsgServer2Portal(self, sessid, msg, data=""): def call_remote_MsgServer2Portal(self, sessid, msg="", data=""):
""" """
Send Message - access method called by the Server and executed on the Server. Send Message - access method called by the Server and executed on the Server.
Args: Args:
sessid (int): Unique Session id. sessid (int): Unique Session id.
msg (str): Message to send over the wire. msg (str, optional): Message to send over the wire.
data (str, optional): Extra data. data (str, optional): Extra data.
""" """
@ -583,7 +584,7 @@ class AMPProtocol(amp.AMP):
return self.batch_send(MsgServer2Portal, sessid, msg=msg, data=data) return self.batch_send(MsgServer2Portal, sessid, msg=msg, data=data)
# Server administration from the Portal side # Server administration from the Portal side
def amp_server_admin(self, hashid, data, ipart, nparts): def amp_server_admin(self, data):
""" """
This allows the portal to perform admin This allows the portal to perform admin
operations on the server. This is executed on the Server. operations on the server. This is executed on the Server.
@ -594,14 +595,11 @@ class AMPProtocol(amp.AMP):
before continuing. before continuing.
Args: Args:
hashid (str): Unique hash identifying this data batch.
data (str): Data to send (often a part of a batch) data (str): Data to send (often a part of a batch)
ipart (int): Index of this part of the batch.
nparts (int): Total number of batches.
""" """
#print "serveradmin (server side):", hashid, ipart, nparts #print "serveradmin (server side):", hashid, ipart, nparts
batch = self.batch_recv(hashid, data, ipart, nparts) batch = self.batch_recv(data)
for (sessid, kwargs) in batch: for (sessid, kwargs) in batch:
operation = kwargs["operation"] operation = kwargs["operation"]
@ -630,7 +628,8 @@ class AMPProtocol(amp.AMP):
server_sessionhandler.portal_sessions_sync(data) server_sessionhandler.portal_sessions_sync(data)
else: else:
raise Exception("operation %(op)s not recognized." % {'op': operation}) raise Exception("operation %(op)s not recognized." % {'op': operation})
return {} #return {}
return {"timing":"%f" % time()}
ServerAdmin.responder(amp_server_admin) ServerAdmin.responder(amp_server_admin)
def call_remote_ServerAdmin(self, sessid, operation="", data=""): def call_remote_ServerAdmin(self, sessid, operation="", data=""):
@ -652,7 +651,7 @@ class AMPProtocol(amp.AMP):
# Portal administraton from the Server side # Portal administraton from the Server side
def amp_portal_admin(self, hashid, data, ipart, nparts): def amp_portal_admin(self, data):
""" """
This allows the server to perform admin This allows the server to perform admin
operations on the portal. This is executed on the Portal. operations on the portal. This is executed on the Portal.
@ -663,14 +662,11 @@ class AMPProtocol(amp.AMP):
before continuing. before continuing.
Args: Args:
hashid (str): Unique hash identifying this data batch.
data (str): Data to send (often a part of a batch) data (str): Data to send (often a part of a batch)
ipart (int): Index of this part of the batch.
nparts (int): Total number of batches.
""" """
#print "portaladmin (portal side):", sessid, ord(operation), data #print "portaladmin (portal side):", sessid, ord(operation), data
batch = self.batch_recv(hashid, data, ipart, nparts) batch = self.batch_recv(data)
for (sessid, kwargs) in batch: for (sessid, kwargs) in batch:
operation = kwargs["operation"] operation = kwargs["operation"]
data = kwargs["data"] data = kwargs["data"]
@ -704,7 +700,8 @@ class AMPProtocol(amp.AMP):
else: else:
raise Exception("operation %(op)s not recognized." % {'op': operation}) raise Exception("operation %(op)s not recognized." % {'op': operation})
return {} #return {}
return {"timing":"%f" % time()}
PortalAdmin.responder(amp_portal_admin) PortalAdmin.responder(amp_portal_admin)
def call_remote_PortalAdmin(self, sessid, operation="", data=""): def call_remote_PortalAdmin(self, sessid, operation="", data=""):