Rework AMP data packet format and batch-handling. Resolves #1635

This commit is contained in:
Griatch 2018-07-23 21:09:06 +02:00
parent 20d1ab0f3d
commit c8dae28cdf
2 changed files with 49 additions and 26 deletions

View file

@ -44,9 +44,10 @@ SSHUTD = chr(17) # server shutdown
PSTATUS = chr(18) # ping server or portal status PSTATUS = chr(18) # ping server or portal status
SRESET = chr(19) # server shutdown in reset mode SRESET = chr(19) # server shutdown in reset mode
NUL = b'\0'
NULNUL = '\0\0'
AMP_MAXLEN = amp.MAX_VALUE_LENGTH # max allowed data length in AMP protocol (cannot be changed) AMP_MAXLEN = amp.MAX_VALUE_LENGTH # max allowed data length in AMP protocol (cannot be changed)
BATCH_RATE = 250 # max commands/sec before switching to batch-sending
BATCH_TIMEOUT = 0.5 # how often to poll to empty batch queue, in seconds
# buffers # buffers
_SENDBATCH = defaultdict(list) _SENDBATCH = defaultdict(list)
@ -61,11 +62,15 @@ _HTTP_WARNING = """
HTTP/1.1 200 OK HTTP/1.1 200 OK
Content-Type: text/html Content-Type: text/html
<html><body> <html>
This is Evennia's interal AMP port. It handles communication <body>
between Evennia's different processes.<h3><p>This port should NOT be This is Evennia's internal AMP port. It handles communication
publicly visible.</p></h3> between Evennia's different processes.
</body></html>""".strip() <p>
<h3>This port should NOT be publicly visible.</h3>
</p>
</body>
</html>""".strip()
# Helper functions for pickling. # Helper functions for pickling.
@ -107,43 +112,45 @@ class Compressed(amp.String):
def fromBox(self, name, strings, objects, proto): def fromBox(self, name, strings, objects, proto):
""" """
Converts from box representation to python. We Converts from box string representation to python. We read back too-long batched data and
group very long data into batches. put it back together here.
""" """
value = StringIO() value = StringIO()
value.write(strings.get(name)) value.write(self.fromStringProto(strings.get(name), proto))
for counter in count(2): for counter in count(2):
# count from 2 upwards # count from 2 upwards
chunk = strings.get("%s.%d" % (name, counter)) chunk = strings.get("%s.%d" % (name, counter))
if chunk is None: if chunk is None:
break break
value.write(chunk) value.write(self.fromStringProto(chunk, proto))
objects[name] = value.getvalue() objects[name] = value.getvalue()
def toBox(self, name, strings, objects, proto): def toBox(self, name, strings, objects, proto):
""" """
Convert from data to box. We handled too-long Convert from python object to string box representation.
batched data and put it together here. we break up too-long data snippets into multiple batches here.
""" """
value = StringIO(objects[name]) value = StringIO(objects[name])
strings[name] = value.read(AMP_MAXLEN) strings[name] = self.toStringProto(value.read(AMP_MAXLEN), proto)
for counter in count(2): for counter in count(2):
chunk = value.read(AMP_MAXLEN) chunk = value.read(AMP_MAXLEN)
if not chunk: if not chunk:
break break
strings["%s.%d" % (name, counter)] = chunk strings["%s.%d" % (name, counter)] = self.toStringProto(chunk, proto)
def toString(self, inObject): def toString(self, inObject):
""" """
Convert to send on the wire, with compression. Convert to send as a string on the wire, with compression.
""" """
return zlib.compress(inObject, 9) return zlib.compress(super(Compressed, self).toString(inObject), 9)
def fromString(self, inString): def fromString(self, inString):
""" """
Convert (decompress) from the wire to Python. Convert (decompress) from the string-representation on the wire to Python.
""" """
return zlib.decompress(inString) return super(Compressed, self).fromString(zlib.decompress(inString))
class MsgLauncher2Portal(amp.Command): class MsgLauncher2Portal(amp.Command):
@ -261,16 +268,29 @@ class AMPMultiConnectionProtocol(amp.AMP):
self.send_reset_time = time.time() self.send_reset_time = time.time()
self.send_mode = True self.send_mode = True
self.send_task = None self.send_task = None
self.multibatches = 0
def dataReceived(self, data): def dataReceived(self, data):
""" """
Handle non-AMP messages, such as HTTP communication. Handle non-AMP messages, such as HTTP communication.
""" """
if data[0] != b'\0': if data[0] == NUL:
# an AMP communication
if data[-2:] != NULNUL:
# an incomplete AMP box means more batches are forthcoming.
self.multibatches += 1
super(AMPMultiConnectionProtocol, self).dataReceived(data)
elif self.multibatches:
# invalid AMP, but we have a pending multi-batch that is not yet complete
if data[-2:] == NULNUL:
# end of existing multibatch
self.multibatches = max(0, self.multibatches - 1)
super(AMPMultiConnectionProtocol, self).dataReceived(data)
else:
# not an AMP communication, return warning
self.transport.write(_HTTP_WARNING) self.transport.write(_HTTP_WARNING)
self.transport.loseConnection() self.transport.loseConnection()
else: print("HTML received: %s" % data)
super(AMPMultiConnectionProtocol, self).dataReceived(data)
def makeConnection(self, transport): def makeConnection(self, transport):
""" """

View file

@ -356,10 +356,13 @@ class AMPServerProtocol(amp.AMPMultiConnectionProtocol):
packed_data (str): Pickled data (sessid, kwargs) coming over the wire. packed_data (str): Pickled data (sessid, kwargs) coming over the wire.
""" """
sessid, kwargs = self.data_in(packed_data) try:
session = self.factory.portal.sessions.get(sessid, None) sessid, kwargs = self.data_in(packed_data)
if session: session = self.factory.portal.sessions.get(sessid, None)
self.factory.portal.sessions.data_out(session, **kwargs) if session:
self.factory.portal.sessions.data_out(session, **kwargs)
except Exception:
logger.log_trace("packed_data len {}".format(len(packed_data)))
return {} return {}
@amp.AdminServer2Portal.responder @amp.AdminServer2Portal.responder