Rework launcher with persistent connection for better server status reporting. Still some errors.
This commit is contained in:
parent
3bec3a3512
commit
27afb3240d
4 changed files with 238 additions and 96 deletions
|
|
@ -106,6 +106,26 @@ class AMPServerClientProtocol(amp.AMPMultiConnectionProtocol):
|
||||||
# back with the Server side
|
# back with the Server side
|
||||||
self.send_AdminServer2Portal(amp.DUMMYSESSION, operation=amp.PSYNC)
|
self.send_AdminServer2Portal(amp.DUMMYSESSION, operation=amp.PSYNC)
|
||||||
|
|
||||||
|
def data_to_portal(self, command, sessid, **kwargs):
|
||||||
|
"""
|
||||||
|
Send data across the wire to the Portal
|
||||||
|
|
||||||
|
Args:
|
||||||
|
command (AMP Command): A protocol send command.
|
||||||
|
sessid (int): A unique Session id.
|
||||||
|
kwargs (any): Any data to pickle into the command.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
deferred (deferred or None): A deferred with an errback.
|
||||||
|
|
||||||
|
Notes:
|
||||||
|
Data will be sent across the wire pickled as a tuple
|
||||||
|
(sessid, kwargs).
|
||||||
|
|
||||||
|
"""
|
||||||
|
return self.callRemote(command, packed_data=amp.dumps((sessid, kwargs))).addErrback(
|
||||||
|
self.errback, command.key)
|
||||||
|
|
||||||
def send_MsgServer2Portal(self, session, **kwargs):
|
def send_MsgServer2Portal(self, session, **kwargs):
|
||||||
"""
|
"""
|
||||||
Access method - executed on the Server for sending data
|
Access method - executed on the Server for sending data
|
||||||
|
|
@ -116,7 +136,7 @@ class AMPServerClientProtocol(amp.AMPMultiConnectionProtocol):
|
||||||
kwargs (any, optiona): Extra data.
|
kwargs (any, optiona): Extra data.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
return self.data_out(amp.MsgServer2Portal, session.sessid, **kwargs)
|
return self.data_to_portal(amp.MsgServer2Portal, session.sessid, **kwargs)
|
||||||
|
|
||||||
def send_AdminServer2Portal(self, session, operation="", **kwargs):
|
def send_AdminServer2Portal(self, session, operation="", **kwargs):
|
||||||
"""
|
"""
|
||||||
|
|
@ -131,7 +151,8 @@ class AMPServerClientProtocol(amp.AMPMultiConnectionProtocol):
|
||||||
kwargs (dict, optional): Data going into the adminstrative.
|
kwargs (dict, optional): Data going into the adminstrative.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
return self.data_out(amp.AdminServer2Portal, session.sessid, operation=operation, **kwargs)
|
return self.data_to_portal(amp.AdminServer2Portal, session.sessid,
|
||||||
|
operation=operation, **kwargs)
|
||||||
|
|
||||||
# receiving AMP data
|
# receiving AMP data
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -19,7 +19,7 @@ import shutil
|
||||||
import importlib
|
import importlib
|
||||||
from distutils.version import LooseVersion
|
from distutils.version import LooseVersion
|
||||||
from argparse import ArgumentParser
|
from argparse import ArgumentParser
|
||||||
from subprocess import Popen, check_output, call, CalledProcessError, STDOUT, PIPE
|
from subprocess import Popen, check_output, call, CalledProcessError, STDOUT
|
||||||
|
|
||||||
try:
|
try:
|
||||||
import cPickle as pickle
|
import cPickle as pickle
|
||||||
|
|
@ -57,9 +57,6 @@ CURRENT_DIR = os.getcwd()
|
||||||
GAMEDIR = CURRENT_DIR
|
GAMEDIR = CURRENT_DIR
|
||||||
|
|
||||||
# Operational setup
|
# Operational setup
|
||||||
AMP_PORT = None
|
|
||||||
AMP_HOST = None
|
|
||||||
AMP_INTERFACE = None
|
|
||||||
|
|
||||||
SERVER_LOGFILE = None
|
SERVER_LOGFILE = None
|
||||||
PORTAL_LOGFILE = None
|
PORTAL_LOGFILE = None
|
||||||
|
|
@ -81,6 +78,9 @@ ENFORCED_SETTING = False
|
||||||
|
|
||||||
# communication constants
|
# communication constants
|
||||||
|
|
||||||
|
AMP_PORT = None
|
||||||
|
AMP_HOST = None
|
||||||
|
AMP_INTERFACE = None
|
||||||
AMP_CONNECTION = None
|
AMP_CONNECTION = None
|
||||||
|
|
||||||
SRELOAD = chr(14) # server reloading (have portal start a new server)
|
SRELOAD = chr(14) # server reloading (have portal start a new server)
|
||||||
|
|
@ -461,7 +461,35 @@ class MsgLauncher2Portal(amp.Command):
|
||||||
arguments = [('operation', amp.String()),
|
arguments = [('operation', amp.String()),
|
||||||
('arguments', amp.String())]
|
('arguments', amp.String())]
|
||||||
errors = {Exception: 'EXCEPTION'}
|
errors = {Exception: 'EXCEPTION'}
|
||||||
response = [('result', amp.String())]
|
response = []
|
||||||
|
|
||||||
|
|
||||||
|
class AMPLauncherProtocol(amp.AMP):
|
||||||
|
"""
|
||||||
|
Defines callbacks to the launcher
|
||||||
|
|
||||||
|
"""
|
||||||
|
def __init__(self):
|
||||||
|
self.on_status = []
|
||||||
|
|
||||||
|
def wait_for_status(self, callback):
|
||||||
|
"""
|
||||||
|
Register a waiter for a status return.
|
||||||
|
|
||||||
|
"""
|
||||||
|
self.on_status.append(callback)
|
||||||
|
|
||||||
|
@MsgStatus.responder
|
||||||
|
def receive_status_from_portal(self, status):
|
||||||
|
"""
|
||||||
|
Get a status signal from portal - fire callbacks
|
||||||
|
|
||||||
|
"""
|
||||||
|
status = pickle.loads(status)
|
||||||
|
for callback in self.on_status:
|
||||||
|
callback(status)
|
||||||
|
self.on_status = []
|
||||||
|
return {"status": ""}
|
||||||
|
|
||||||
|
|
||||||
def send_instruction(operation, arguments, callback=None, errback=None):
|
def send_instruction(operation, arguments, callback=None, errback=None):
|
||||||
|
|
@ -475,39 +503,54 @@ def send_instruction(operation, arguments, callback=None, errback=None):
|
||||||
print(ERROR_AMP_UNCONFIGURED)
|
print(ERROR_AMP_UNCONFIGURED)
|
||||||
sys.exit()
|
sys.exit()
|
||||||
|
|
||||||
def _on_connect(prot):
|
def _timeout(*args):
|
||||||
"""
|
print("Client timed out.")
|
||||||
This fires with the protocol when connection is established. We
|
reactor.stop()
|
||||||
immediately send off the instruction then shut down.
|
|
||||||
|
|
||||||
"""
|
|
||||||
def _callback(result):
|
def _callback(result):
|
||||||
if callback:
|
if callback:
|
||||||
callback(result)
|
callback(result)
|
||||||
prot.transport.loseConnection()
|
# prot.transport.loseConnection()
|
||||||
|
|
||||||
def _errback(fail):
|
def _errback(fail):
|
||||||
if errback:
|
if errback:
|
||||||
errback(fail)
|
errback(fail)
|
||||||
prot.transport.loseConnection()
|
# prot.transport.loseConnection()
|
||||||
|
|
||||||
if operation == PSTATUS:
|
def _on_connect(prot):
|
||||||
prot.callRemote(MsgStatus, status="").addCallbacks(_callback, _errback)
|
"""
|
||||||
else:
|
This fires with the protocol when connection is established. We
|
||||||
prot.callRemote(
|
immediately send off the instruction
|
||||||
MsgLauncher2Portal,
|
|
||||||
operation=operation,
|
"""
|
||||||
arguments=pickle.dumps(arguments, pickle.HIGHEST_PROTOCOL)).addCallbacks(
|
global AMP_CONNECTION
|
||||||
_callback, _errback)
|
AMP_CONNECTION = prot
|
||||||
|
_send()
|
||||||
|
|
||||||
def _on_connect_fail(fail):
|
def _on_connect_fail(fail):
|
||||||
"This is called if portal is not reachable."
|
"This is called if portal is not reachable."
|
||||||
errback(fail)
|
errback(fail)
|
||||||
|
|
||||||
|
def _send():
|
||||||
|
if operation == PSTATUS:
|
||||||
|
return AMP_CONNECTION.callRemote(MsgStatus, status="").addCallbacks(_callback, _errback)
|
||||||
|
else:
|
||||||
|
return AMP_CONNECTION.callRemote(
|
||||||
|
MsgLauncher2Portal,
|
||||||
|
operation=operation,
|
||||||
|
arguments=pickle.dumps(arguments, pickle.HIGHEST_PROTOCOL)).addCallbacks(
|
||||||
|
_callback, _errback)
|
||||||
|
|
||||||
|
if AMP_CONNECTION:
|
||||||
|
# already connected - send right away
|
||||||
|
_send()
|
||||||
|
else:
|
||||||
|
# we must connect first, send once connected
|
||||||
point = endpoints.TCP4ClientEndpoint(reactor, AMP_HOST, AMP_PORT)
|
point = endpoints.TCP4ClientEndpoint(reactor, AMP_HOST, AMP_PORT)
|
||||||
deferred = endpoints.connectProtocol(point, amp.AMP())
|
deferred = endpoints.connectProtocol(point, AMPLauncherProtocol())
|
||||||
deferred.addCallbacks(_on_connect, _on_connect_fail)
|
deferred.addCallbacks(_on_connect, _on_connect_fail)
|
||||||
return deferred
|
if not reactor.running:
|
||||||
|
reactor.run()
|
||||||
|
|
||||||
|
|
||||||
def _parse_status(response):
|
def _parse_status(response):
|
||||||
|
|
@ -541,7 +584,6 @@ def _get_twistd_cmdline(pprofiler, sprofiler):
|
||||||
"--profiler=cprofiler",
|
"--profiler=cprofiler",
|
||||||
"--profile={}".format(SPROFILER_LOGFILE)])
|
"--profile={}".format(SPROFILER_LOGFILE)])
|
||||||
|
|
||||||
|
|
||||||
return portal_cmd, server_cmd
|
return portal_cmd, server_cmd
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -565,7 +607,16 @@ def query_status(repeat=False):
|
||||||
reactor.stop()
|
reactor.stop()
|
||||||
|
|
||||||
send_instruction(PSTATUS, None, _callback, _errback)
|
send_instruction(PSTATUS, None, _callback, _errback)
|
||||||
reactor.run()
|
|
||||||
|
|
||||||
|
def wait_for_status_reply(callback):
|
||||||
|
"""
|
||||||
|
Wait for an explicit STATUS signal to be sent back from Evennia.
|
||||||
|
"""
|
||||||
|
if AMP_CONNECTION:
|
||||||
|
AMP_CONNECTION.wait_for_status(callback)
|
||||||
|
else:
|
||||||
|
print("No Evennia connection established.")
|
||||||
|
|
||||||
|
|
||||||
def wait_for_status(portal_running=True, server_running=True, callback=None, errback=None,
|
def wait_for_status(portal_running=True, server_running=True, callback=None, errback=None,
|
||||||
|
|
@ -653,7 +704,8 @@ def start_evennia(pprofiler=False, sprofiler=False):
|
||||||
reactor.stop()
|
reactor.stop()
|
||||||
|
|
||||||
def _portal_started(*args):
|
def _portal_started(*args):
|
||||||
send_instruction(SSTART, server_cmd, _server_started)
|
wait_for_status_reply(_server_started)
|
||||||
|
send_instruction(SSTART, server_cmd)
|
||||||
|
|
||||||
def _portal_running(response):
|
def _portal_running(response):
|
||||||
prun, srun, ppid, spid = _parse_status(response)
|
prun, srun, ppid, spid = _parse_status(response)
|
||||||
|
|
@ -675,12 +727,14 @@ def start_evennia(pprofiler=False, sprofiler=False):
|
||||||
wait_for_status(True, None, _portal_started)
|
wait_for_status(True, None, _portal_started)
|
||||||
|
|
||||||
send_instruction(PSTATUS, None, _portal_running, _portal_not_running)
|
send_instruction(PSTATUS, None, _portal_running, _portal_not_running)
|
||||||
reactor.run()
|
|
||||||
|
|
||||||
|
|
||||||
def reload_evennia(sprofiler=False, reset=False):
|
def reload_evennia(sprofiler=False, reset=False):
|
||||||
"""
|
"""
|
||||||
This will instruct the Portal to reboot the Server component.
|
This will instruct the Portal to reboot the Server component. We
|
||||||
|
do this manually by telling the server to shutdown (in reload mode)
|
||||||
|
and wait for the portal to report back, at which point we start the
|
||||||
|
server again. This way we control the process exactly.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
_, server_cmd = _get_twistd_cmdline(False, sprofiler)
|
_, server_cmd = _get_twistd_cmdline(False, sprofiler)
|
||||||
|
|
@ -689,23 +743,24 @@ def reload_evennia(sprofiler=False, reset=False):
|
||||||
print("... Server re-started.")
|
print("... Server re-started.")
|
||||||
reactor.stop()
|
reactor.stop()
|
||||||
|
|
||||||
def _server_reloaded(*args):
|
def _server_reloaded(status):
|
||||||
print("... Server {}.".format("reset" if reset else "reloaded"))
|
print("{} ... Server {}.".format(status, "reset" if reset else "reloaded"))
|
||||||
reactor.stop()
|
reactor.stop()
|
||||||
|
|
||||||
def _server_not_running(*args):
|
def _server_stopped(status):
|
||||||
|
wait_for_status_reply(_server_reloaded)
|
||||||
send_instruction(SSTART, server_cmd)
|
send_instruction(SSTART, server_cmd)
|
||||||
wait_for_status(True, True, _server_reloaded)
|
|
||||||
|
|
||||||
def _portal_running(response):
|
def _portal_running(response):
|
||||||
_, srun, _, _ = _parse_status(response)
|
_, srun, _, _ = _parse_status(response)
|
||||||
if srun:
|
if srun:
|
||||||
print("Server {}...".format("resetting" if reset else "reloading"))
|
print("Server {}...".format("resetting" if reset else "reloading"))
|
||||||
send_instruction(SRESET if reset else SRELOAD, server_cmd)
|
wait_for_status_reply(_server_stopped)
|
||||||
wait_for_status(True, False, _server_not_running)
|
send_instruction(SRESET if reset else SRELOAD, {})
|
||||||
else:
|
else:
|
||||||
print("Server down. Re-starting ...")
|
print("Server down. Re-starting ...")
|
||||||
send_instruction(SSTART, server_cmd, _server_restarted)
|
wait_for_status_reply(_server_restarted)
|
||||||
|
send_instruction(SSTART, server_cmd)
|
||||||
|
|
||||||
def _portal_not_running(fail):
|
def _portal_not_running(fail):
|
||||||
print("Evennia not running. Starting from scratch ...")
|
print("Evennia not running. Starting from scratch ...")
|
||||||
|
|
@ -713,7 +768,6 @@ def reload_evennia(sprofiler=False, reset=False):
|
||||||
|
|
||||||
# get portal status
|
# get portal status
|
||||||
send_instruction(PSTATUS, None, _portal_running, _portal_not_running)
|
send_instruction(PSTATUS, None, _portal_running, _portal_not_running)
|
||||||
reactor.run()
|
|
||||||
|
|
||||||
|
|
||||||
def stop_evennia():
|
def stop_evennia():
|
||||||
|
|
@ -735,18 +789,17 @@ def stop_evennia():
|
||||||
if srun:
|
if srun:
|
||||||
print("Server stopping ...")
|
print("Server stopping ...")
|
||||||
send_instruction(SSHUTD, {})
|
send_instruction(SSHUTD, {})
|
||||||
wait_for_status(True, False, _server_stopped)
|
wait_for_status_reply(_server_stopped)
|
||||||
else:
|
else:
|
||||||
print("Server already stopped.\nStopping Portal ...")
|
print("Server already stopped.\nStopping Portal ...")
|
||||||
send_instruction(PSHUTD, {})
|
send_instruction(PSHUTD, {})
|
||||||
wait_for_status(False, False, _portal_stopped)
|
wait_for_status(False, None, _portal_stopped)
|
||||||
|
|
||||||
def _portal_not_running(fail):
|
def _portal_not_running(fail):
|
||||||
print("Evennia is not running.")
|
print("Evennia is not running.")
|
||||||
reactor.stop()
|
reactor.stop()
|
||||||
|
|
||||||
send_instruction(PSTATUS, None, _portal_running, _portal_not_running)
|
send_instruction(PSTATUS, None, _portal_running, _portal_not_running)
|
||||||
reactor.run()
|
|
||||||
|
|
||||||
|
|
||||||
def stop_server_only():
|
def stop_server_only():
|
||||||
|
|
@ -762,8 +815,8 @@ def stop_server_only():
|
||||||
_, srun, _, _ = _parse_status(response)
|
_, srun, _, _ = _parse_status(response)
|
||||||
if srun:
|
if srun:
|
||||||
print("Server stopping ...")
|
print("Server stopping ...")
|
||||||
|
wait_for_status_reply(_server_stopped)
|
||||||
send_instruction(SSHUTD, {})
|
send_instruction(SSHUTD, {})
|
||||||
wait_for_status(True, False, _server_stopped)
|
|
||||||
else:
|
else:
|
||||||
print("Server is not running.")
|
print("Server is not running.")
|
||||||
|
|
||||||
|
|
@ -771,7 +824,6 @@ def stop_server_only():
|
||||||
print("Evennia is not running.")
|
print("Evennia is not running.")
|
||||||
|
|
||||||
send_instruction(PSTATUS, None, _portal_running, _portal_not_running)
|
send_instruction(PSTATUS, None, _portal_running, _portal_not_running)
|
||||||
reactor.run()
|
|
||||||
|
|
||||||
|
|
||||||
def evennia_version():
|
def evennia_version():
|
||||||
|
|
|
||||||
|
|
@ -155,7 +155,7 @@ class MsgLauncher2Portal(amp.Command):
|
||||||
arguments = [('operation', amp.String()),
|
arguments = [('operation', amp.String()),
|
||||||
('arguments', amp.String())]
|
('arguments', amp.String())]
|
||||||
errors = {Exception: 'EXCEPTION'}
|
errors = {Exception: 'EXCEPTION'}
|
||||||
response = [('result', amp.String())]
|
response = []
|
||||||
|
|
||||||
|
|
||||||
class MsgPortal2Server(amp.Command):
|
class MsgPortal2Server(amp.Command):
|
||||||
|
|
@ -335,9 +335,9 @@ class AMPMultiConnectionProtocol(amp.AMP):
|
||||||
"""
|
"""
|
||||||
return loads(packed_data)
|
return loads(packed_data)
|
||||||
|
|
||||||
def data_out(self, command, sessid, **kwargs):
|
def broadcast(self, command, sessid, **kwargs):
|
||||||
"""
|
"""
|
||||||
Send data across the wire. Always use this to send.
|
Send data across the wire to all connections.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
command (AMP Command): A protocol send command.
|
command (AMP Command): A protocol send command.
|
||||||
|
|
@ -353,9 +353,9 @@ class AMPMultiConnectionProtocol(amp.AMP):
|
||||||
"""
|
"""
|
||||||
deferreds = []
|
deferreds = []
|
||||||
for protcl in self.factory.broadcasts:
|
for protcl in self.factory.broadcasts:
|
||||||
deferreds.append(protcl.callRemote(command,
|
deferreds.append(protcl.callRemote(command, **kwargs).addErrback(
|
||||||
packed_data=dumps((sessid, kwargs))).addErrback(
|
|
||||||
self.errback, command.key))
|
self.errback, command.key))
|
||||||
|
|
||||||
return DeferredList(deferreds)
|
return DeferredList(deferreds)
|
||||||
|
|
||||||
# generic function send/recvs
|
# generic function send/recvs
|
||||||
|
|
|
||||||
|
|
@ -49,7 +49,9 @@ class AMPServerFactory(protocol.ServerFactory):
|
||||||
self.protocol = AMPServerProtocol
|
self.protocol = AMPServerProtocol
|
||||||
self.broadcasts = []
|
self.broadcasts = []
|
||||||
self.server_connection = None
|
self.server_connection = None
|
||||||
|
self.launcher_connection = None
|
||||||
self.disconnect_callbacks = {}
|
self.disconnect_callbacks = {}
|
||||||
|
self.server_connect_callbacks = []
|
||||||
|
|
||||||
def buildProtocol(self, addr):
|
def buildProtocol(self, addr):
|
||||||
"""
|
"""
|
||||||
|
|
@ -72,12 +74,18 @@ class AMPServerProtocol(amp.AMPMultiConnectionProtocol):
|
||||||
Protocol subclass for the AMP-server run by the Portal.
|
Protocol subclass for the AMP-server run by the Portal.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def connectionLost(self, reason):
|
def connectionLost(self, reason):
|
||||||
"""
|
"""
|
||||||
Set up a simple callback mechanism to let the amp-server wait for a connection to close.
|
Set up a simple callback mechanism to let the amp-server wait for a connection to close.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
# wipe broadcast and data memory
|
||||||
|
super(AMPServerProtocol, self).connectionLost(reason)
|
||||||
|
if self.factory.server_connection == self:
|
||||||
|
self.factory.server_connection = None
|
||||||
|
if self.factory.launcher_connection == self:
|
||||||
|
self.factory.launcher_connection = None
|
||||||
|
|
||||||
callback, args, kwargs = self.factory.disconnect_callbacks.pop(self, (None, None, None))
|
callback, args, kwargs = self.factory.disconnect_callbacks.pop(self, (None, None, None))
|
||||||
if callback:
|
if callback:
|
||||||
try:
|
try:
|
||||||
|
|
@ -85,6 +93,45 @@ class AMPServerProtocol(amp.AMPMultiConnectionProtocol):
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.log_trace()
|
logger.log_trace()
|
||||||
|
|
||||||
|
def get_status(self):
|
||||||
|
"""
|
||||||
|
Return status for the Evennia infrastructure.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
status (tuple): The portal/server status and pids
|
||||||
|
(portal_live, server_live, portal_PID, server_PID).
|
||||||
|
|
||||||
|
"""
|
||||||
|
server_connected = bool(self.factory.server_connection and
|
||||||
|
self.factory.server_connection.transport.connected)
|
||||||
|
server_pid = self.factory.portal.server_process_id
|
||||||
|
portal_pid = os.getpid()
|
||||||
|
return (True, server_connected, portal_pid, server_pid)
|
||||||
|
|
||||||
|
def data_to_server(self, command, sessid, **kwargs):
|
||||||
|
"""
|
||||||
|
Send data across the wire to the Server.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
command (AMP Command): A protocol send command.
|
||||||
|
sessid (int): A unique Session id.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
deferred (deferred or None): A deferred with an errback.
|
||||||
|
|
||||||
|
Notes:
|
||||||
|
Data will be sent across the wire pickled as a tuple
|
||||||
|
(sessid, kwargs).
|
||||||
|
|
||||||
|
"""
|
||||||
|
if self.factory.server_connection:
|
||||||
|
return self.factory.server_connection.callRemote(
|
||||||
|
command, packed_data=amp.dumps((sessid, kwargs))).addErrback(
|
||||||
|
self.errback, command.key)
|
||||||
|
else:
|
||||||
|
# if no server connection is available, broadcast
|
||||||
|
return self.broadcast(command, sessid, packed_data=amp.dumps((sessid, kwargs)))
|
||||||
|
|
||||||
def start_server(self, server_twistd_cmd):
|
def start_server(self, server_twistd_cmd):
|
||||||
"""
|
"""
|
||||||
(Re-)Launch the Evennia server.
|
(Re-)Launch the Evennia server.
|
||||||
|
|
@ -122,6 +169,17 @@ class AMPServerProtocol(amp.AMPMultiConnectionProtocol):
|
||||||
"""
|
"""
|
||||||
self.factory.disconnect_callbacks[self] = (callback, args, kwargs)
|
self.factory.disconnect_callbacks[self] = (callback, args, kwargs)
|
||||||
|
|
||||||
|
def wait_for_server_connect(self, callback, *args, **kwargs):
|
||||||
|
"""
|
||||||
|
Add a callback for when the Server is sure to have connected.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
callback (callable): Will be called with *args, **kwargs
|
||||||
|
once the Server handshake with Portal is complete.
|
||||||
|
|
||||||
|
"""
|
||||||
|
self.factory.server_connect_callbacks.append((callback, args, kwargs))
|
||||||
|
|
||||||
def stop_server(self, mode='shutdown'):
|
def stop_server(self, mode='shutdown'):
|
||||||
"""
|
"""
|
||||||
Shut down server in one or more modes.
|
Shut down server in one or more modes.
|
||||||
|
|
@ -139,6 +197,17 @@ class AMPServerProtocol(amp.AMPMultiConnectionProtocol):
|
||||||
|
|
||||||
# sending amp data
|
# sending amp data
|
||||||
|
|
||||||
|
def send_Status2Launcher(self):
|
||||||
|
"""
|
||||||
|
Send a status stanza to the launcher.
|
||||||
|
|
||||||
|
"""
|
||||||
|
if self.factory.launcher_connection:
|
||||||
|
self.factory.launcher_connection.callRemote(
|
||||||
|
amp.MsgStatus,
|
||||||
|
status=amp.dumps(self.get_status())).addErrback(
|
||||||
|
self.errback, amp.MsgStatus.key)
|
||||||
|
|
||||||
def send_MsgPortal2Server(self, session, **kwargs):
|
def send_MsgPortal2Server(self, session, **kwargs):
|
||||||
"""
|
"""
|
||||||
Access method called by the Portal and executed on the Portal.
|
Access method called by the Portal and executed on the Portal.
|
||||||
|
|
@ -151,7 +220,7 @@ class AMPServerProtocol(amp.AMPMultiConnectionProtocol):
|
||||||
deferred (Deferred): Asynchronous return.
|
deferred (Deferred): Asynchronous return.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
return self.data_out(amp.MsgPortal2Server, session.sessid, **kwargs)
|
return self.data_to_server(amp.MsgPortal2Server, session.sessid, **kwargs)
|
||||||
|
|
||||||
def send_AdminPortal2Server(self, session, operation="", **kwargs):
|
def send_AdminPortal2Server(self, session, operation="", **kwargs):
|
||||||
"""
|
"""
|
||||||
|
|
@ -166,7 +235,8 @@ class AMPServerProtocol(amp.AMPMultiConnectionProtocol):
|
||||||
data (str or dict, optional): Data used in the administrative operation.
|
data (str or dict, optional): Data used in the administrative operation.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
return self.data_out(amp.AdminPortal2Server, session.sessid, operation=operation, **kwargs)
|
return self.data_to_server(amp.AdminPortal2Server, session.sessid,
|
||||||
|
operation=operation, **kwargs)
|
||||||
|
|
||||||
# receive amp data
|
# receive amp data
|
||||||
|
|
||||||
|
|
@ -183,16 +253,7 @@ class AMPServerProtocol(amp.AMPMultiConnectionProtocol):
|
||||||
(portal_running, server_running, portal_pid, server_pid).
|
(portal_running, server_running, portal_pid, server_pid).
|
||||||
|
|
||||||
"""
|
"""
|
||||||
# check if the server is connected
|
return {"status": amp.dumps(self.get_status())}
|
||||||
server_connected = (self.factory.server_connection and
|
|
||||||
self.factory.server_connection.transport.connected)
|
|
||||||
server_pid = self.factory.portal.server_process_id
|
|
||||||
portal_pid = os.getpid()
|
|
||||||
|
|
||||||
if server_connected:
|
|
||||||
return {"status": amp.dumps((True, True, portal_pid, server_pid))}
|
|
||||||
else:
|
|
||||||
return {"status": amp.dumps((True, False, portal_pid, server_pid))}
|
|
||||||
|
|
||||||
@amp.MsgLauncher2Portal.responder
|
@amp.MsgLauncher2Portal.responder
|
||||||
@amp.catch_traceback
|
@amp.catch_traceback
|
||||||
|
|
@ -213,58 +274,55 @@ class AMPServerProtocol(amp.AMPMultiConnectionProtocol):
|
||||||
launcher. It can obviously only accessed when the Portal is already up and running.
|
launcher. It can obviously only accessed when the Portal is already up and running.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
def _retval(success, txt):
|
self.factory.launcher_connection = self
|
||||||
return {"result": amp.dumps((success, txt))}
|
|
||||||
|
|
||||||
server_connected = (self.factory.server_connection and
|
_, server_connected, _, _ = self.get_status()
|
||||||
self.factory.server_connection.transport.connected)
|
|
||||||
server_pid = self.factory.portal.server_process_id
|
|
||||||
|
|
||||||
logger.log_msg("AMP SERVER operation == %s received" % (ord(operation)))
|
logger.log_msg("AMP SERVER operation == %s received" % (ord(operation)))
|
||||||
logger.log_msg("AMP SERVER arguments: %s" % (amp.loads(arguments)))
|
logger.log_msg("AMP SERVER arguments: %s" % (amp.loads(arguments)))
|
||||||
|
|
||||||
if operation == amp.SSTART: # portal start #15
|
if operation == amp.SSTART: # portal start #15
|
||||||
# first, check if server is already running
|
# first, check if server is already running
|
||||||
if server_connected:
|
if not server_connected:
|
||||||
return _retval(False,
|
self.wait_for_server_connect(self.send_Status2Launcher)
|
||||||
"Server already running at PID={spid}".format(spid=server_pid))
|
self.start_server(amp.loads(arguments))
|
||||||
else:
|
|
||||||
spid = self.start_server(amp.loads(arguments))
|
|
||||||
return _retval(True, "Server started with PID {spid}.".format(spid=spid))
|
|
||||||
|
|
||||||
elif operation == amp.SRELOAD: # reload server #14
|
elif operation == amp.SRELOAD: # reload server #14
|
||||||
if server_connected:
|
if server_connected:
|
||||||
# don't restart until the server connection goes down
|
# We let the launcher restart us once they get the signal
|
||||||
|
self.factory.server_connection.wait_for_disconnect(
|
||||||
|
self.send_Status2Launcher)
|
||||||
self.stop_server(mode='reload')
|
self.stop_server(mode='reload')
|
||||||
else:
|
else:
|
||||||
spid = self.start_server(amp.loads(arguments))
|
self.wait_for_server_connect(self.send_Status2Launcher)
|
||||||
return _retval(True, "Server started with PID {spid}.".format(spid=spid))
|
self.start_server(amp.loads(arguments))
|
||||||
|
|
||||||
elif operation == amp.SRESET: # reload server #19
|
elif operation == amp.SRESET: # reload server #19
|
||||||
if server_connected:
|
if server_connected:
|
||||||
|
self.factory.server_connection.wait_for_disconnect(
|
||||||
|
self.send_Status2Launcher)
|
||||||
self.stop_server(mode='reset')
|
self.stop_server(mode='reset')
|
||||||
return _retval(True, "Server restarted with PID {spid}.".format(spid=spid))
|
|
||||||
else:
|
else:
|
||||||
spid = self.start_server(amp.loads(arguments))
|
self.wait_for_server_connect(self.send_Status2Launcher)
|
||||||
return _retval(True, "Server started with PID {spid}.".format(spid=spid))
|
self.start_server(amp.loads(arguments))
|
||||||
|
|
||||||
elif operation == amp.SSHUTD: # server-only shutdown #17
|
elif operation == amp.SSHUTD: # server-only shutdown #17
|
||||||
if server_connected:
|
if server_connected:
|
||||||
|
self.factory.server_connection.wait_for_disconnect(
|
||||||
|
self.send_Status2Launcher)
|
||||||
self.stop_server(mode='shutdown')
|
self.stop_server(mode='shutdown')
|
||||||
return _retval(True, "Server stopped.")
|
|
||||||
else:
|
|
||||||
return _retval(False, "Server not running")
|
|
||||||
|
|
||||||
elif operation == amp.PSHUTD: # portal + server shutdown #16
|
elif operation == amp.PSHUTD: # portal + server shutdown #16
|
||||||
if server_connected:
|
if server_connected:
|
||||||
self.stop_server(mode='shutdown')
|
self.factory.server_connection.wait_for_disconnect(
|
||||||
return _retval(True, "Server stopped.")
|
self.factory.portal.shutdown, restart=False)
|
||||||
|
else:
|
||||||
self.factory.portal.shutdown(restart=False)
|
self.factory.portal.shutdown(restart=False)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
raise Exception("operation %(op)s not recognized." % {'op': operation})
|
raise Exception("operation %(op)s not recognized." % {'op': operation})
|
||||||
# fallback
|
|
||||||
return {"result": ""}
|
return {}
|
||||||
|
|
||||||
@amp.MsgServer2Portal.responder
|
@amp.MsgServer2Portal.responder
|
||||||
@amp.catch_traceback
|
@amp.catch_traceback
|
||||||
|
|
@ -295,13 +353,12 @@ class AMPServerProtocol(amp.AMPMultiConnectionProtocol):
|
||||||
packed_data (str): Data received, a pickled tuple (sessid, kwargs).
|
packed_data (str): Data received, a pickled tuple (sessid, kwargs).
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
self.factory.server_connection = self
|
||||||
|
|
||||||
sessid, kwargs = self.data_in(packed_data)
|
sessid, kwargs = self.data_in(packed_data)
|
||||||
operation = kwargs.pop("operation")
|
operation = kwargs.pop("operation")
|
||||||
portal_sessionhandler = self.factory.portal.sessions
|
portal_sessionhandler = self.factory.portal.sessions
|
||||||
|
|
||||||
# store this transport since we know it comes from the Server
|
|
||||||
self.factory.server_connection = self
|
|
||||||
|
|
||||||
if operation == amp.SLOGIN: # server_session_login
|
if operation == amp.SLOGIN: # server_session_login
|
||||||
# a session has authenticated; sync it.
|
# a session has authenticated; sync it.
|
||||||
session = portal_sessionhandler.get(sessid)
|
session = portal_sessionhandler.get(sessid)
|
||||||
|
|
@ -344,11 +401,23 @@ class AMPServerProtocol(amp.AMPMultiConnectionProtocol):
|
||||||
sessiondata=sessdata)
|
sessiondata=sessdata)
|
||||||
self.factory.portal.sessions.at_server_connection()
|
self.factory.portal.sessions.at_server_connection()
|
||||||
|
|
||||||
|
print("Portal PSYNC: %s" % self.factory.server_connection)
|
||||||
|
if self.factory.server_connection:
|
||||||
|
# this is an indication the server has successfully connected, so
|
||||||
|
# we trigger any callbacks (usually to tell the launcher server is up)
|
||||||
|
for callback, args, kwargs in self.factory.server_connect_callbacks:
|
||||||
|
try:
|
||||||
|
callback(*args, **kwargs)
|
||||||
|
except Exception:
|
||||||
|
logger.log_trace()
|
||||||
|
self.factory.server_connect_callbacks = []
|
||||||
|
|
||||||
elif operation == amp.SSYNC: # server_session_sync
|
elif operation == amp.SSYNC: # server_session_sync
|
||||||
# server wants to save session data to the portal,
|
# server wants to save session data to the portal,
|
||||||
# maybe because it's about to shut down.
|
# maybe because it's about to shut down.
|
||||||
portal_sessionhandler.server_session_sync(kwargs.get("sessiondata"),
|
portal_sessionhandler.server_session_sync(kwargs.get("sessiondata"),
|
||||||
kwargs.get("clean", True))
|
kwargs.get("clean", True))
|
||||||
|
|
||||||
# set a flag in case we are about to shut down soon
|
# set a flag in case we are about to shut down soon
|
||||||
self.factory.server_restart_mode = True
|
self.factory.server_restart_mode = True
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue