Added msg- and data buffering to AMP protocol. This handles the rare cases when the AMP limit of 65535bytes/message becomes an issue (such as when viewing long lists or @py output. Test with @py self.msg("-"*65536). Resolves Issue 294.
This commit is contained in:
parent
32b069c5fc
commit
86c5553208
1 changed files with 89 additions and 10 deletions
|
|
@ -16,6 +16,8 @@ Server - (AMP server) Handles all mud operations. The server holds its own list
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# imports needed on both server and portal side
|
# imports needed on both server and portal side
|
||||||
|
import os
|
||||||
|
from collections import defaultdict
|
||||||
try:
|
try:
|
||||||
import cPickle as pickle
|
import cPickle as pickle
|
||||||
except ImportError:
|
except ImportError:
|
||||||
|
|
@ -46,6 +48,8 @@ SDISCONNALL = chr(6) # server session disconnect all
|
||||||
SSHUTD = chr(7) # server shutdown
|
SSHUTD = chr(7) # server shutdown
|
||||||
SSYNC = chr(8) # server session sync
|
SSYNC = chr(8) # server session sync
|
||||||
|
|
||||||
|
MAXLEN = 65535 # max allowed data length in AMP protocol
|
||||||
|
|
||||||
def get_restart_mode(restart_file):
|
def get_restart_mode(restart_file):
|
||||||
"""
|
"""
|
||||||
Parse the server/portal restart status
|
Parse the server/portal restart status
|
||||||
|
|
@ -142,6 +146,8 @@ class MsgPortal2Server(amp.Command):
|
||||||
"""
|
"""
|
||||||
arguments = [('sessid', amp.Integer()),
|
arguments = [('sessid', amp.Integer()),
|
||||||
('msg', amp.String()),
|
('msg', amp.String()),
|
||||||
|
('ipart', amp.Integer()),
|
||||||
|
('nparts', amp.Integer()),
|
||||||
('data', amp.String())]
|
('data', amp.String())]
|
||||||
errors = [(Exception, 'EXCEPTION')]
|
errors = [(Exception, 'EXCEPTION')]
|
||||||
response = []
|
response = []
|
||||||
|
|
@ -152,6 +158,8 @@ class MsgServer2Portal(amp.Command):
|
||||||
"""
|
"""
|
||||||
arguments = [('sessid', amp.Integer()),
|
arguments = [('sessid', amp.Integer()),
|
||||||
('msg', amp.String()),
|
('msg', amp.String()),
|
||||||
|
('ipart', amp.Integer()),
|
||||||
|
('nparts', amp.Integer()),
|
||||||
('data', amp.String())]
|
('data', amp.String())]
|
||||||
errors = [(Exception, 'EXCEPTION')]
|
errors = [(Exception, 'EXCEPTION')]
|
||||||
response = []
|
response = []
|
||||||
|
|
@ -221,6 +229,10 @@ class FunctionCall(amp.Command):
|
||||||
dumps = lambda data: to_str(pickle.dumps(data, pickle.HIGHEST_PROTOCOL))
|
dumps = lambda data: to_str(pickle.dumps(data, pickle.HIGHEST_PROTOCOL))
|
||||||
loads = lambda data: pickle.loads(to_str(data))
|
loads = lambda data: pickle.loads(to_str(data))
|
||||||
|
|
||||||
|
# multipart message store
|
||||||
|
|
||||||
|
MSGBUFFER = defaultdict(list)
|
||||||
|
|
||||||
#------------------------------------------------------------
|
#------------------------------------------------------------
|
||||||
# Core AMP protocol for communication Server <-> Portal
|
# Core AMP protocol for communication Server <-> Portal
|
||||||
#------------------------------------------------------------
|
#------------------------------------------------------------
|
||||||
|
|
@ -262,16 +274,58 @@ class AMPProtocol(amp.AMP):
|
||||||
e.trap(Exception)
|
e.trap(Exception)
|
||||||
print "AMP Error for %(info)s: %(e)s" % {'info': info, 'e': e.getErrorMessage()}
|
print "AMP Error for %(info)s: %(e)s" % {'info': info, 'e': e.getErrorMessage()}
|
||||||
|
|
||||||
|
def send_split_msg(self, sessid, msg, data, command):
|
||||||
|
"""
|
||||||
|
This helper method splits the sending of a msg into multiple parts
|
||||||
|
with a maxlength of MAXLEN. This is to avoid repetition in the two
|
||||||
|
msg-sending commands. When calling this, the maximum length has
|
||||||
|
already been exceeded.
|
||||||
|
Inputs:
|
||||||
|
msg - string
|
||||||
|
data - data dictionary
|
||||||
|
command - one of MsgPortal2Server or MsgServer2Portal commands
|
||||||
|
"""
|
||||||
|
# split the strings into acceptable chunks
|
||||||
|
datastr = dumps(data)
|
||||||
|
nmsg, ndata = len(msg), len(datastr)
|
||||||
|
if nmsg > MAXLEN or ndata > MAXLEN:
|
||||||
|
msglist = [msg[i:i+MAXLEN] for i in range(0, len(msg), MAXLEN)]
|
||||||
|
datalist = [datastr[i:i+MAXLEN] for i in range(0, len(datastr), MAXLEN)]
|
||||||
|
nmsglist, ndatalist = len(msglist), len(datalist)
|
||||||
|
if ndatalist < nmsglist:
|
||||||
|
datalist.extend("" for i in range(nmsglist-ndatalist))
|
||||||
|
if nmsglist < ndatalist:
|
||||||
|
msglist.extend("" for i in range(ndatalist-nmsglist))
|
||||||
|
# we have split the msg/data into right-size chunks. Now we send it in sequence
|
||||||
|
return [self.callRemote(command,
|
||||||
|
sessid=sessid,
|
||||||
|
msg=to_str(msg),
|
||||||
|
ipart=icall,
|
||||||
|
nparts=nmsglist,
|
||||||
|
data=dumps(data)).addErrback(self.errback, "OOBServer2Portal")
|
||||||
|
for icall, (msg, data) in enumerate(zip(msglist, datalist))]
|
||||||
|
|
||||||
# Message definition + helper methods to call/create each message type
|
# Message definition + helper methods to call/create each message type
|
||||||
|
|
||||||
# Portal -> Server Msg
|
# Portal -> Server Msg
|
||||||
|
|
||||||
def amp_msg_portal2server(self, sessid, msg, data):
|
def amp_msg_portal2server(self, sessid, msg, ipart, nparts, data):
|
||||||
"""
|
"""
|
||||||
Relays message to server. This method is executed on the Server.
|
Relays message to server. This method is executed on the Server.
|
||||||
"""
|
"""
|
||||||
#print "msg portal -> server (server side):", sessid, msg
|
#print "msg portal -> server (server side):", sessid, msg
|
||||||
|
global MSGBUFFER
|
||||||
|
if nparts > 1:
|
||||||
|
# a multipart message
|
||||||
|
if len(MSGBUFFER[sessid]) != nparts:
|
||||||
|
# we don't have all parts yet. Wait.
|
||||||
|
return {}
|
||||||
|
else:
|
||||||
|
# we have all parts. Put it all together in the right order.
|
||||||
|
msg = "".join(t[1] for t in sorted(MSGBUFFER[sessid], key=lambda o:o[0]))
|
||||||
|
data = "".join(t[2] for t in sorted(MSGBUFFER[sessid], key=lambda o:o[0]))
|
||||||
|
del MSGBUFFER[sessid]
|
||||||
|
# call session hook with the data
|
||||||
self.factory.server.sessions.data_in(sessid, msg, loads(data))
|
self.factory.server.sessions.data_in(sessid, msg, loads(data))
|
||||||
return {}
|
return {}
|
||||||
MsgPortal2Server.responder(amp_msg_portal2server)
|
MsgPortal2Server.responder(amp_msg_portal2server)
|
||||||
|
|
@ -281,18 +335,37 @@ class AMPProtocol(amp.AMP):
|
||||||
Access method called by the Portal and executed on the Portal.
|
Access method called by the Portal and executed on the Portal.
|
||||||
"""
|
"""
|
||||||
#print "msg portal->server (portal side):", sessid, msg
|
#print "msg portal->server (portal side):", sessid, msg
|
||||||
|
try:
|
||||||
return self.callRemote(MsgPortal2Server,
|
return self.callRemote(MsgPortal2Server,
|
||||||
sessid=sessid,
|
sessid=sessid,
|
||||||
msg=msg,
|
msg=msg,
|
||||||
|
ipart=0,
|
||||||
|
nparts=1,
|
||||||
data=dumps(data)).addErrback(self.errback, "MsgPortal2Server")
|
data=dumps(data)).addErrback(self.errback, "MsgPortal2Server")
|
||||||
|
except amp.TooLong:
|
||||||
|
# the msg (or data) was too long for AMP to send. We need to send in blocks.
|
||||||
|
return self.send_split_msg(sessid, msg, data, MsgPortal2Server)
|
||||||
|
|
||||||
# Server -> Portal message
|
# Server -> Portal message
|
||||||
|
|
||||||
def amp_msg_server2portal(self, sessid, msg, data):
|
def amp_msg_server2portal(self, sessid, msg, ipart, nparts, data):
|
||||||
"""
|
"""
|
||||||
Relays message to Portal. This method is executed on the Portal.
|
Relays message to Portal. This method is executed on the Portal.
|
||||||
"""
|
"""
|
||||||
#print "msg server->portal (portal side):", sessid, msg
|
#print "msg server->portal (portal side):", sessid, msg
|
||||||
|
global MSGBUFFER
|
||||||
|
if nparts > 1:
|
||||||
|
# a multipart message
|
||||||
|
MSGBUFFER[sessid].append((ipart, msg, data))
|
||||||
|
if len(MSGBUFFER[sessid]) != nparts:
|
||||||
|
# we don't have all parts yet. Wait.
|
||||||
|
return {}
|
||||||
|
else:
|
||||||
|
# we have all parts. Put it all together in the right order.
|
||||||
|
msg = "".join(t[1] for t in sorted(MSGBUFFER[sessid], key=lambda o:o[0]))
|
||||||
|
data = "".join(t[2] for t in sorted(MSGBUFFER[sessid], key=lambda o:o[0]))
|
||||||
|
del MSGBUFFER[sessid]
|
||||||
|
# call session hook with the data
|
||||||
self.factory.portal.sessions.data_out(sessid, msg, loads(data))
|
self.factory.portal.sessions.data_out(sessid, msg, loads(data))
|
||||||
return {}
|
return {}
|
||||||
MsgServer2Portal.responder(amp_msg_server2portal)
|
MsgServer2Portal.responder(amp_msg_server2portal)
|
||||||
|
|
@ -302,10 +375,16 @@ class AMPProtocol(amp.AMP):
|
||||||
Access method called by the Server and executed on the Server.
|
Access method called by the Server and executed on the Server.
|
||||||
"""
|
"""
|
||||||
#print "msg server->portal (server side):", sessid, msg, data
|
#print "msg server->portal (server side):", sessid, msg, data
|
||||||
|
try:
|
||||||
return self.callRemote(MsgServer2Portal,
|
return self.callRemote(MsgServer2Portal,
|
||||||
sessid=sessid,
|
sessid=sessid,
|
||||||
msg=to_str(msg),
|
msg=to_str(msg),
|
||||||
|
ipart=0,
|
||||||
|
nparts=1,
|
||||||
data=dumps(data)).addErrback(self.errback, "OOBServer2Portal")
|
data=dumps(data)).addErrback(self.errback, "OOBServer2Portal")
|
||||||
|
except amp.TooLong:
|
||||||
|
# the msg (or data) was too long for AMP to send. We need to send in blocks.
|
||||||
|
return self.send_split_msg(sessid, msg, data, MsgServer2Portal)
|
||||||
|
|
||||||
# OOB Portal -> Server
|
# OOB Portal -> Server
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue