Implemented a working batch-variation. Now for testing ...
This commit is contained in:
parent
e63922ff19
commit
4e4461b7fd
1 changed files with 21 additions and 14 deletions
|
|
@ -145,7 +145,7 @@ class MsgPortal2Server(amp.Command):
|
||||||
Message portal -> server
|
Message portal -> server
|
||||||
"""
|
"""
|
||||||
key = "MsgPortal2Server"
|
key = "MsgPortal2Server"
|
||||||
arguments = [('hashid', amp.Integer()),
|
arguments = [('hashid', amp.String()),
|
||||||
('data', amp.String()),
|
('data', amp.String()),
|
||||||
('ipart', amp.Integer()),
|
('ipart', amp.Integer()),
|
||||||
('nparts', amp.Integer())]
|
('nparts', amp.Integer())]
|
||||||
|
|
@ -158,7 +158,7 @@ class MsgServer2Portal(amp.Command):
|
||||||
Message server -> portal
|
Message server -> portal
|
||||||
"""
|
"""
|
||||||
key = "MsgServer2Portal"
|
key = "MsgServer2Portal"
|
||||||
arguments = [('hashid', amp.Integer()),
|
arguments = [('hashid', amp.String()),
|
||||||
('data', amp.String()),
|
('data', amp.String()),
|
||||||
('ipart', amp.Integer()),
|
('ipart', amp.Integer()),
|
||||||
('nparts', amp.Integer())]
|
('nparts', amp.Integer())]
|
||||||
|
|
@ -175,7 +175,7 @@ class ServerAdmin(amp.Command):
|
||||||
session connects or resyncs
|
session connects or resyncs
|
||||||
"""
|
"""
|
||||||
key = "ServerAdmin"
|
key = "ServerAdmin"
|
||||||
arguments = [('hashid', amp.Integer()),
|
arguments = [('hashid', amp.String()),
|
||||||
('data', amp.String()),
|
('data', amp.String()),
|
||||||
('ipart', amp.Integer()),
|
('ipart', amp.Integer()),
|
||||||
('nparts', amp.Integer())]
|
('nparts', amp.Integer())]
|
||||||
|
|
@ -191,7 +191,7 @@ class PortalAdmin(amp.Command):
|
||||||
operations on the portal.
|
operations on the portal.
|
||||||
"""
|
"""
|
||||||
key = "PortalAdmin"
|
key = "PortalAdmin"
|
||||||
arguments = [('hashid', amp.Integer()),
|
arguments = [('hashid', amp.String()),
|
||||||
('data', amp.String()),
|
('data', amp.String()),
|
||||||
('ipart', amp.Integer()),
|
('ipart', amp.Integer()),
|
||||||
('nparts', amp.Integer())]
|
('nparts', amp.Integer())]
|
||||||
|
|
@ -238,6 +238,17 @@ class AMPProtocol(amp.AMP):
|
||||||
|
|
||||||
# helper methods
|
# helper methods
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
"""
|
||||||
|
Initialize protocol with some things that need to be
|
||||||
|
in place already before connecting both on portal and server.
|
||||||
|
"""
|
||||||
|
self.min_batch_step = 1.0 / BATCH_RATE
|
||||||
|
self.lastsend = time()
|
||||||
|
self.task = task.LoopingCall(self.batch_send, MsgPortal2Server, None)
|
||||||
|
self.task.start(BATCH_TIMEOUT)
|
||||||
|
|
||||||
|
|
||||||
def connectionMade(self):
|
def connectionMade(self):
|
||||||
"""
|
"""
|
||||||
This is called when a connection is established
|
This is called when a connection is established
|
||||||
|
|
@ -255,11 +266,6 @@ class AMPProtocol(amp.AMP):
|
||||||
self.factory.portal.sessions.at_server_connection()
|
self.factory.portal.sessions.at_server_connection()
|
||||||
if hasattr(self.factory, "server_restart_mode"):
|
if hasattr(self.factory, "server_restart_mode"):
|
||||||
del self.factory.server_restart_mode
|
del self.factory.server_restart_mode
|
||||||
# should be set both on portal and server
|
|
||||||
self.min_batch_step = 1.0 / BATCH_RATE
|
|
||||||
self.lastsend = time()
|
|
||||||
self.task = task.LoopingCall(self.batch_send, MsgPortal2Server, None)
|
|
||||||
self.task.start(BATCH_TIMEOUT)
|
|
||||||
|
|
||||||
# Error handling
|
# Error handling
|
||||||
|
|
||||||
|
|
@ -274,15 +280,16 @@ class AMPProtocol(amp.AMP):
|
||||||
This will batch data together to send fewer, large batches.
|
This will batch data together to send fewer, large batches.
|
||||||
"""
|
"""
|
||||||
global _SENDBATCH
|
global _SENDBATCH
|
||||||
|
cmdkey = command.key
|
||||||
if sessid is not None:
|
if sessid is not None:
|
||||||
_SENDBATCH.append((sessid, kwargs))
|
_SENDBATCH[cmdkey].append((sessid, kwargs))
|
||||||
if not _SENDBATCH:
|
if not _SENDBATCH:
|
||||||
return
|
return
|
||||||
now = time()
|
now = time()
|
||||||
|
|
||||||
if now - self.lastsend > self.min_batch_step:
|
if now - self.lastsend > self.min_batch_step:
|
||||||
batch = dumps(_SENDBATCH)
|
batch = dumps(_SENDBATCH[cmdkey])
|
||||||
_SENDBATCH = []
|
_SENDBATCH[cmdkey] = []
|
||||||
to_send = [batch[i:i+AMP_MAXLEN] for i in range(0, len(batch), AMP_MAXLEN)]
|
to_send = [batch[i:i+AMP_MAXLEN] for i in range(0, len(batch), AMP_MAXLEN)]
|
||||||
nparts = len(to_send)
|
nparts = len(to_send)
|
||||||
# tag this batch
|
# tag this batch
|
||||||
|
|
@ -339,7 +346,7 @@ class AMPProtocol(amp.AMP):
|
||||||
data comes in multiple chunks; if so (nparts>1) we buffer the data
|
data comes in multiple chunks; if so (nparts>1) we buffer the data
|
||||||
and wait for the remaining parts to arrive before continuing.
|
and wait for the remaining parts to arrive before continuing.
|
||||||
"""
|
"""
|
||||||
batch = self.batch_recv(MsgPortal2Server, hashid, data, ipart, nparts)
|
batch = self.batch_recv(hashid, data, ipart, nparts)
|
||||||
for (sessid, kwargs) in batch:
|
for (sessid, kwargs) in batch:
|
||||||
#print "msg portal -> server (server side):", sessid, msg, loads(ret["data"])
|
#print "msg portal -> server (server side):", sessid, msg, loads(ret["data"])
|
||||||
self.factory.server.sessions.data_in(sessid,
|
self.factory.server.sessions.data_in(sessid,
|
||||||
|
|
@ -487,7 +494,7 @@ class AMPProtocol(amp.AMP):
|
||||||
"""
|
"""
|
||||||
Access method called by the server side.
|
Access method called by the server side.
|
||||||
"""
|
"""
|
||||||
return self.batch_send(PortalAdmin, sessid, operation=operation, data=dumps(data))
|
return self.batch_send(PortalAdmin, sessid, operation=operation, data=data)
|
||||||
|
|
||||||
# Extra functions
|
# Extra functions
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue