From 0b35279bc233b0a530a7edc0b7234b249563a560 Mon Sep 17 00:00:00 2001 From: Griatch Date: Thu, 17 Sep 2015 00:45:55 +0200 Subject: [PATCH] Removed artificial delay in AMP mechanism, added alternate streaming. Resolves #805. --- evennia/server/amp.py | 65 ++++++++++++++++++++++++++----------------- 1 file changed, 40 insertions(+), 25 deletions(-) diff --git a/evennia/server/amp.py b/evennia/server/amp.py index 9743d408d..2840b3036 100644 --- a/evennia/server/amp.py +++ b/evennia/server/amp.py @@ -25,7 +25,7 @@ try: except ImportError: import pickle from twisted.protocols import amp -from twisted.internet import protocol, task +from twisted.internet import protocol, reactor from twisted.internet.defer import Deferred from evennia.utils.utils import to_str, variable_from_module @@ -43,8 +43,8 @@ SCONN = chr(9) # server creating new connection (for irc/imc2 bots etc) PCONNSYNC = chr(10) # portal post-syncing a session AMP_MAXLEN = 65535 # max allowed data length in AMP protocol (cannot be changed) -BATCH_RATE = 500 # max commands/sec before switching to batch-sending -BATCH_TIMEOUT = 1.0 # how often to poll to empty batch queue, in seconds +BATCH_RATE = 250 # max commands/sec before switching to batch-sending +BATCH_TIMEOUT = 0.5 # how often to poll to empty batch queue, in seconds # buffers _SENDBATCH = defaultdict(list) @@ -298,11 +298,10 @@ class AMPProtocol(amp.AMP): already before connecting both on portal and server. """ - self.min_batch_step = 1.0 / BATCH_RATE - self.lastsend = time() - self.task = task.LoopingCall(self.batch_send, None, None) - self.task.start(BATCH_TIMEOUT) - + self.send_batch_counter = 0 + self.send_reset_time = time() + self.send_mode = True + self.send_task = None def connectionMade(self): """ @@ -356,27 +355,43 @@ class AMPProtocol(amp.AMP): as batch parts get sent (or fails). """ - #print "batch_send 1:", command, sessid global _SENDBATCH - if command is None: - # called by the automatic cleanup mechanism - commands = [cmd for cmd in (MsgPortal2Server, MsgServer2Portal, ServerAdmin, PortalAdmin) - if _SENDBATCH.get(cmd, False)] - if not commands: - return - else: - # called to send right away - commands = [command] - _SENDBATCH[command].append((sessid, kwargs)) + if command: + # always put AMP command in cache + _SENDBATCH[command].append((sessid, kwargs)) + self.send_batch_counter += 1 force_direct = kwargs.pop("force_direct", False) now = time() - #print "batch_send 2:", now, self.lastsend, self.min_batch_step, now-self.lastsend > self.min_batch_step - if force_direct or now - self.lastsend > self.min_batch_step: - for command in commands: - batch = dumps(_SENDBATCH[command]) - _SENDBATCH[command] = [] + if force_direct: + # check the current command rate to determine if we + # can return send mode or not. We add 1 to counter + # to avoid cases when it happens to be 0. + self.send_mode = (((self.send_batch_counter + 1) / + (now - self.send_reset_time)) <= (BATCH_RATE*BATCH_TIMEOUT)) + self.send_batch_counter = 0 + self.send_reset_time = now + if not (self.send_mode and self.send_task): + self.send_task = reactor.callLater(BATCH_TIMEOUT, self.batch_send, None, None, force_direct=True) + else: + self.send_task = None + elif self.send_mode and self.send_batch_counter > BATCH_RATE: + # we have reached the batch count. How long this took + # defines if we should halt sending or not. + self.send_mode = now - self.send_reset_time >= 1.0 + #print "BATCH_RATE:", BATCH_RATE / (now - self.send_reset_time) + self.send_batch_counter = 0 + self.send_reset_time = now + if not (self.send_mode and self.send_task): + force_direct = True # make sure to empty cache + self.send_task = reactor.callLater(BATCH_TIMEOUT, self.batch_send, None, None, force_direct=True) + + if self.send_mode or force_direct: + for command, cmdlist in _SENDBATCH.items(): + batch = dumps(cmdlist) # batch is a list of (sessid,kwargs) tuples. + # We pack the data in a string-form pickle. + del _SENDBATCH[command] # split in parts small enough to fit in AMP MAXLEN to_send = [batch[i:i+AMP_MAXLEN] for i in range(0, len(batch), AMP_MAXLEN)] nparts = len(to_send) @@ -389,6 +404,7 @@ class AMPProtocol(amp.AMP): ipart=0, nparts=1).addErrback(self.errback, command.key)] else: + #print "sending in %s parts." % nparts deferreds = [] for ipart, part in enumerate(to_send): deferred = self.callRemote(command, @@ -398,7 +414,6 @@ class AMPProtocol(amp.AMP): nparts=nparts) deferred.addErrback(self.errback, "%s part %i/%i" % (command.key, ipart, nparts)) deferreds.append(deferred) - self.lastsend = time() # don't use now here, keep it as up-to-date as possible return deferreds