Removed artificial delay in AMP mechanism, added alternate streaming. Resolves #805.

This commit is contained in:
Griatch 2015-09-17 00:45:55 +02:00
parent 8234d9c0d0
commit 0b35279bc2

View file

@ -25,7 +25,7 @@ try:
except ImportError: except ImportError:
import pickle import pickle
from twisted.protocols import amp from twisted.protocols import amp
from twisted.internet import protocol, task from twisted.internet import protocol, reactor
from twisted.internet.defer import Deferred from twisted.internet.defer import Deferred
from evennia.utils.utils import to_str, variable_from_module from evennia.utils.utils import to_str, variable_from_module
@ -43,8 +43,8 @@ SCONN = chr(9) # server creating new connection (for irc/imc2 bots etc)
PCONNSYNC = chr(10) # portal post-syncing a session PCONNSYNC = chr(10) # portal post-syncing a session
AMP_MAXLEN = 65535 # max allowed data length in AMP protocol (cannot be changed) AMP_MAXLEN = 65535 # max allowed data length in AMP protocol (cannot be changed)
BATCH_RATE = 500 # max commands/sec before switching to batch-sending BATCH_RATE = 250 # max commands/sec before switching to batch-sending
BATCH_TIMEOUT = 1.0 # how often to poll to empty batch queue, in seconds BATCH_TIMEOUT = 0.5 # how often to poll to empty batch queue, in seconds
# buffers # buffers
_SENDBATCH = defaultdict(list) _SENDBATCH = defaultdict(list)
@ -298,11 +298,10 @@ class AMPProtocol(amp.AMP):
already before connecting both on portal and server. already before connecting both on portal and server.
""" """
self.min_batch_step = 1.0 / BATCH_RATE self.send_batch_counter = 0
self.lastsend = time() self.send_reset_time = time()
self.task = task.LoopingCall(self.batch_send, None, None) self.send_mode = True
self.task.start(BATCH_TIMEOUT) self.send_task = None
def connectionMade(self): def connectionMade(self):
""" """
@ -356,27 +355,43 @@ class AMPProtocol(amp.AMP):
as batch parts get sent (or fails). as batch parts get sent (or fails).
""" """
#print "batch_send 1:", command, sessid
global _SENDBATCH global _SENDBATCH
if command is None:
# called by the automatic cleanup mechanism
commands = [cmd for cmd in (MsgPortal2Server, MsgServer2Portal, ServerAdmin, PortalAdmin)
if _SENDBATCH.get(cmd, False)]
if not commands:
return
else:
# called to send right away
commands = [command]
_SENDBATCH[command].append((sessid, kwargs))
if command:
# always put AMP command in cache
_SENDBATCH[command].append((sessid, kwargs))
self.send_batch_counter += 1
force_direct = kwargs.pop("force_direct", False) force_direct = kwargs.pop("force_direct", False)
now = time() now = time()
#print "batch_send 2:", now, self.lastsend, self.min_batch_step, now-self.lastsend > self.min_batch_step
if force_direct or now - self.lastsend > self.min_batch_step: if force_direct:
for command in commands: # check the current command rate to determine if we
batch = dumps(_SENDBATCH[command]) # can return send mode or not. We add 1 to counter
_SENDBATCH[command] = [] # to avoid cases when it happens to be 0.
self.send_mode = (((self.send_batch_counter + 1) /
(now - self.send_reset_time)) <= (BATCH_RATE*BATCH_TIMEOUT))
self.send_batch_counter = 0
self.send_reset_time = now
if not (self.send_mode and self.send_task):
self.send_task = reactor.callLater(BATCH_TIMEOUT, self.batch_send, None, None, force_direct=True)
else:
self.send_task = None
elif self.send_mode and self.send_batch_counter > BATCH_RATE:
# we have reached the batch count. How long this took
# defines if we should halt sending or not.
self.send_mode = now - self.send_reset_time >= 1.0
#print "BATCH_RATE:", BATCH_RATE / (now - self.send_reset_time)
self.send_batch_counter = 0
self.send_reset_time = now
if not (self.send_mode and self.send_task):
force_direct = True # make sure to empty cache
self.send_task = reactor.callLater(BATCH_TIMEOUT, self.batch_send, None, None, force_direct=True)
if self.send_mode or force_direct:
for command, cmdlist in _SENDBATCH.items():
batch = dumps(cmdlist) # batch is a list of (sessid,kwargs) tuples.
# We pack the data in a string-form pickle.
del _SENDBATCH[command]
# split in parts small enough to fit in AMP MAXLEN # split in parts small enough to fit in AMP MAXLEN
to_send = [batch[i:i+AMP_MAXLEN] for i in range(0, len(batch), AMP_MAXLEN)] to_send = [batch[i:i+AMP_MAXLEN] for i in range(0, len(batch), AMP_MAXLEN)]
nparts = len(to_send) nparts = len(to_send)
@ -389,6 +404,7 @@ class AMPProtocol(amp.AMP):
ipart=0, ipart=0,
nparts=1).addErrback(self.errback, command.key)] nparts=1).addErrback(self.errback, command.key)]
else: else:
#print "sending in %s parts." % nparts
deferreds = [] deferreds = []
for ipart, part in enumerate(to_send): for ipart, part in enumerate(to_send):
deferred = self.callRemote(command, deferred = self.callRemote(command,
@ -398,7 +414,6 @@ class AMPProtocol(amp.AMP):
nparts=nparts) nparts=nparts)
deferred.addErrback(self.errback, "%s part %i/%i" % (command.key, ipart, nparts)) deferred.addErrback(self.errback, "%s part %i/%i" % (command.key, ipart, nparts))
deferreds.append(deferred) deferreds.append(deferred)
self.lastsend = time() # don't use now here, keep it as up-to-date as possible
return deferreds return deferreds