Fixed error in amp that caused reload mechanism to act up.
This commit is contained in:
parent
7a3152713c
commit
1bb886de03
1 changed files with 53 additions and 34 deletions
|
|
@ -245,7 +245,7 @@ class AMPProtocol(amp.AMP):
|
||||||
"""
|
"""
|
||||||
self.min_batch_step = 1.0 / BATCH_RATE
|
self.min_batch_step = 1.0 / BATCH_RATE
|
||||||
self.lastsend = time()
|
self.lastsend = time()
|
||||||
self.task = task.LoopingCall(self.batch_send, MsgPortal2Server, None)
|
self.task = task.LoopingCall(self.batch_send, None, None)
|
||||||
self.task.start(BATCH_TIMEOUT)
|
self.task.start(BATCH_TIMEOUT)
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -278,40 +278,54 @@ class AMPProtocol(amp.AMP):
|
||||||
def batch_send(self, command, sessid, **kwargs):
|
def batch_send(self, command, sessid, **kwargs):
|
||||||
"""
|
"""
|
||||||
This will batch data together to send fewer, large batches.
|
This will batch data together to send fewer, large batches.
|
||||||
"""
|
|
||||||
global _SENDBATCH
|
|
||||||
cmdkey = command.key
|
|
||||||
if sessid is not None:
|
|
||||||
_SENDBATCH[cmdkey].append((sessid, kwargs))
|
|
||||||
if not _SENDBATCH:
|
|
||||||
return
|
|
||||||
now = time()
|
|
||||||
|
|
||||||
if now - self.lastsend > self.min_batch_step:
|
Kwargs:
|
||||||
batch = dumps(_SENDBATCH[cmdkey])
|
force_direct: send direct
|
||||||
_SENDBATCH[cmdkey] = []
|
"""
|
||||||
to_send = [batch[i:i+AMP_MAXLEN] for i in range(0, len(batch), AMP_MAXLEN)]
|
#print "batch_send 1:", command, sessid
|
||||||
nparts = len(to_send)
|
global _SENDBATCH
|
||||||
# tag this batch
|
if command is None:
|
||||||
hashid = "%s-%s" % (id(batch), now)
|
# called by the automatic cleanup mechanism
|
||||||
if nparts == 1:
|
commands = [cmd for cmd in (MsgPortal2Server, MsgServer2Portal, ServerAdmin, PortalAdmin)
|
||||||
deferreds = self.callRemote(command,
|
if _SENDBATCH.get(cmd, False)]
|
||||||
hashid=hashid,
|
if not commands:
|
||||||
data=batch,
|
return
|
||||||
ipart=0,
|
else:
|
||||||
nparts=1).addErrback(self.errback, command.key)
|
# called to send right away
|
||||||
else:
|
commands = [command]
|
||||||
deferreds = []
|
_SENDBATCH[command].append((sessid, kwargs))
|
||||||
for ipart, part in enumerate(to_send):
|
|
||||||
deferred = self.callRemote(command,
|
force_direct = kwargs.pop("force_direct", False)
|
||||||
hashid=hashid,
|
now = time()
|
||||||
data=part,
|
#print "batch_send 2:", now, self.lastsend, self.min_batch_step, now-self.lastsend > self.min_batch_step
|
||||||
ipart=ipart,
|
|
||||||
nparts=nparts)
|
if force_direct or now - self.lastsend > self.min_batch_step:
|
||||||
deferred.addErrback(self.errback, "%s part %i/%i" % (command.key, ipart, part))
|
for command in commands:
|
||||||
deferreds.append(deferred)
|
batch = dumps(_SENDBATCH[command])
|
||||||
self.lastsend = time() # don't use now here, keep it as up-to-date as possible
|
_SENDBATCH[command] = []
|
||||||
return deferreds
|
# split in parts small enough to fit in AMP MAXLEN
|
||||||
|
to_send = [batch[i:i+AMP_MAXLEN] for i in range(0, len(batch), AMP_MAXLEN)]
|
||||||
|
nparts = len(to_send)
|
||||||
|
# tag this batch
|
||||||
|
hashid = "%s-%s" % (id(batch), now)
|
||||||
|
if nparts == 1:
|
||||||
|
deferreds = [self.callRemote(command,
|
||||||
|
hashid=hashid,
|
||||||
|
data=batch,
|
||||||
|
ipart=0,
|
||||||
|
nparts=1).addErrback(self.errback, command.key)]
|
||||||
|
else:
|
||||||
|
deferreds = []
|
||||||
|
for ipart, part in enumerate(to_send):
|
||||||
|
deferred = self.callRemote(command,
|
||||||
|
hashid=hashid,
|
||||||
|
data=part,
|
||||||
|
ipart=ipart,
|
||||||
|
nparts=nparts)
|
||||||
|
deferred.addErrback(self.errback, "%s part %i/%i" % (command.key, ipart, part))
|
||||||
|
deferreds.append(deferred)
|
||||||
|
self.lastsend = time() # don't use now here, keep it as up-to-date as possible
|
||||||
|
return deferreds
|
||||||
|
|
||||||
|
|
||||||
def batch_recv(self, hashid, data, ipart, nparts):
|
def batch_recv(self, hashid, data, ipart, nparts):
|
||||||
|
|
@ -406,6 +420,7 @@ class AMPProtocol(amp.AMP):
|
||||||
operations on the server. This is executed on the Server.
|
operations on the server. This is executed on the Server.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
#print "serveradmin (server side):", hashid, ipart, nparts
|
||||||
batch = self.batch_recv(hashid, data, ipart, nparts)
|
batch = self.batch_recv(hashid, data, ipart, nparts)
|
||||||
|
|
||||||
for (sessid, kwargs) in batch:
|
for (sessid, kwargs) in batch:
|
||||||
|
|
@ -443,6 +458,8 @@ class AMPProtocol(amp.AMP):
|
||||||
Access method called by the Portal and Executed on the Portal.
|
Access method called by the Portal and Executed on the Portal.
|
||||||
"""
|
"""
|
||||||
#print "serveradmin (portal side):", sessid, ord(operation), data
|
#print "serveradmin (portal side):", sessid, ord(operation), data
|
||||||
|
if hasattr(self.factory, "server_restart_mode"):
|
||||||
|
return self.batch_send(ServerAdmin, sessid, force_direct=True, operation=operation, data=data)
|
||||||
return self.batch_send(ServerAdmin, sessid, operation=operation, data=data)
|
return self.batch_send(ServerAdmin, sessid, operation=operation, data=data)
|
||||||
|
|
||||||
# Portal administraton from the Server side
|
# Portal administraton from the Server side
|
||||||
|
|
@ -494,6 +511,8 @@ class AMPProtocol(amp.AMP):
|
||||||
"""
|
"""
|
||||||
Access method called by the server side.
|
Access method called by the server side.
|
||||||
"""
|
"""
|
||||||
|
if operation == SSYNC:
|
||||||
|
return self.batch_send(PortalAdmin, sessid, force_direct=True, operation=operation, data=data)
|
||||||
return self.batch_send(PortalAdmin, sessid, operation=operation, data=data)
|
return self.batch_send(PortalAdmin, sessid, operation=operation, data=data)
|
||||||
|
|
||||||
# Extra functions
|
# Extra functions
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue