Used
This commit is contained in:
parent
afc7fd758f
commit
029ee71ffa
2 changed files with 144 additions and 109 deletions
|
|
@ -37,7 +37,6 @@ class Ticker(object):
|
||||||
self.subscriptions = {}
|
self.subscriptions = {}
|
||||||
self.task = LoopingCall(callback, self)
|
self.task = LoopingCall(callback, self)
|
||||||
|
|
||||||
|
|
||||||
def validate(self):
|
def validate(self):
|
||||||
"""
|
"""
|
||||||
Start/stop the task depending on how many
|
Start/stop the task depending on how many
|
||||||
|
|
@ -80,6 +79,8 @@ class TickerPool(object):
|
||||||
This maintains a pool of Twisted LoopingCall tasks
|
This maintains a pool of Twisted LoopingCall tasks
|
||||||
for calling subscribed objects at given times.
|
for calling subscribed objects at given times.
|
||||||
"""
|
"""
|
||||||
|
ticker_class = Ticker
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
"Initialize the pool"
|
"Initialize the pool"
|
||||||
self.tickers = {}
|
self.tickers = {}
|
||||||
|
|
@ -89,7 +90,7 @@ class TickerPool(object):
|
||||||
Add new ticker subscriber
|
Add new ticker subscriber
|
||||||
"""
|
"""
|
||||||
if interval not in self.tickers:
|
if interval not in self.tickers:
|
||||||
self.tickers[interval] = Ticker(interval)
|
self.tickers[interval] = self.ticker_class(interval)
|
||||||
self.tickers[interval].add(store_key, obj, *args, **kwargs)
|
self.tickers[interval].add(store_key, obj, *args, **kwargs)
|
||||||
|
|
||||||
def remove(self, store_key, interval):
|
def remove(self, store_key, interval):
|
||||||
|
|
@ -118,13 +119,15 @@ class TickerHandler(object):
|
||||||
objects to various tick rates. The pool maintains creation
|
objects to various tick rates. The pool maintains creation
|
||||||
instructions and and re-applies them at a server restart.
|
instructions and and re-applies them at a server restart.
|
||||||
"""
|
"""
|
||||||
|
ticker_pool_class = TickerPool
|
||||||
|
|
||||||
def __init__(self, save_name="ticker_storage"):
|
def __init__(self, save_name="ticker_storage"):
|
||||||
"""
|
"""
|
||||||
Initialize handler
|
Initialize handler
|
||||||
"""
|
"""
|
||||||
self.ticker_storage = {}
|
self.ticker_storage = {}
|
||||||
self.save_name = save_name
|
self.save_name = save_name
|
||||||
self.ticker_pool = TickerPool()
|
self.ticker_pool = self.ticker_pool_class()
|
||||||
|
|
||||||
def _store_key(self, obj, interval):
|
def _store_key(self, obj, interval):
|
||||||
"""
|
"""
|
||||||
|
|
@ -177,7 +180,7 @@ class TickerHandler(object):
|
||||||
ticker_storage = ServerConfig.objects.conf(key=self.save_name)
|
ticker_storage = ServerConfig.objects.conf(key=self.save_name)
|
||||||
if ticker_storage:
|
if ticker_storage:
|
||||||
self.ticker_storage = dbunserialize(ticker_storage)
|
self.ticker_storage = dbunserialize(ticker_storage)
|
||||||
print "restore:", self.ticker_storage
|
#print "restore:", self.ticker_storage
|
||||||
for (obj, interval), (args, kwargs) in self.ticker_storage.items():
|
for (obj, interval), (args, kwargs) in self.ticker_storage.items():
|
||||||
obj = unpack_dbobj(obj)
|
obj = unpack_dbobj(obj)
|
||||||
_, store_key = self._store_key(obj, interval)
|
_, store_key = self._store_key(obj, interval)
|
||||||
|
|
|
||||||
|
|
@ -33,8 +33,9 @@ from inspect import isfunction
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
from src.server.models import ServerConfig
|
from src.server.models import ServerConfig
|
||||||
from src.server.sessionhandler import SESSIONS
|
from src.server.sessionhandler import SESSIONS
|
||||||
from src.scripts.scripts import Script
|
#from src.scripts.scripts import Script
|
||||||
from src.utils.create import create_script
|
#from src.utils.create import create_script
|
||||||
|
from src.scripts import Ticker, TickerPool, TickerHandler
|
||||||
from src.utils.dbserialize import dbserialize, dbunserialize, pack_dbobj, unpack_dbobj
|
from src.utils.dbserialize import dbserialize, dbunserialize, pack_dbobj, unpack_dbobj
|
||||||
from src.utils import logger
|
from src.utils import logger
|
||||||
from src.utils.utils import all_from_module, make_iter
|
from src.utils.utils import all_from_module, make_iter
|
||||||
|
|
@ -125,83 +126,108 @@ class TrackerBase(object):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
class _RepeaterScript(Script):
|
#class _RepeaterScript(Script):
|
||||||
"""
|
# """
|
||||||
Repeating and subscription-enabled script for triggering OOB
|
# Repeating and subscription-enabled script for triggering OOB
|
||||||
functions. Maintained in a _RepeaterPool.
|
# functions. Maintained in a _RepeaterPool.
|
||||||
"""
|
# """
|
||||||
def at_script_creation(self):
|
# def at_script_creation(self):
|
||||||
"Called when script is initialized"
|
# "Called when script is initialized"
|
||||||
self.key = "oob_func"
|
# self.key = "oob_func"
|
||||||
self.desc = "OOB functionality script"
|
# self.desc = "OOB functionality script"
|
||||||
self.persistent = False # oob scripts should always be non-persistent
|
# self.persistent = False # oob scripts should always be non-persistent
|
||||||
self.ndb.subscriptions = {}
|
# self.ndb.subscriptions = {}
|
||||||
|
#
|
||||||
|
# def at_repeat(self):
|
||||||
|
# """
|
||||||
|
# Calls subscriptions every self.interval seconds
|
||||||
|
# """
|
||||||
|
# for (func_key, sessid, interval, args, kwargs) in self.ndb.subscriptions.values():
|
||||||
|
# session = SESSIONS.session_from_sessid(sessid)
|
||||||
|
# OOB_HANDLER.execute_cmd(session, func_key, *args, **kwargs)
|
||||||
|
#
|
||||||
|
# def subscribe(self, store_key, sessid, func_key, interval, *args, **kwargs):
|
||||||
|
# """
|
||||||
|
# Sign up a subscriber to this oobfunction. Subscriber is
|
||||||
|
# a database object with a dbref.
|
||||||
|
# """
|
||||||
|
# self.ndb.subscriptions[store_key] = (func_key, sessid, interval, args, kwargs)
|
||||||
|
#
|
||||||
|
# def unsubscribe(self, store_key):
|
||||||
|
# """
|
||||||
|
# Unsubscribe from oobfunction. Returns True if removal was
|
||||||
|
# successful, False otherwise
|
||||||
|
# """
|
||||||
|
# self.ndb.subscriptions.pop(store_key, None)
|
||||||
|
#
|
||||||
|
#
|
||||||
|
#class _RepeaterPool(object):
|
||||||
|
# """
|
||||||
|
# This maintains a pool of _RepeaterScript scripts, ordered one per
|
||||||
|
# interval. It will automatically cull itself once a given interval's
|
||||||
|
# script has no more subscriptions.
|
||||||
|
#
|
||||||
|
# This is used and accessed from oobhandler.repeat/unrepeat
|
||||||
|
# """
|
||||||
|
#
|
||||||
|
# def __init__(self):
|
||||||
|
# self.scripts = {}
|
||||||
|
#
|
||||||
|
# def add(self, store_key, sessid, func_key, interval, *args, **kwargs):
|
||||||
|
# """
|
||||||
|
# Add a new tracking
|
||||||
|
# """
|
||||||
|
# if interval not in self.scripts:
|
||||||
|
# # if no existing interval exists, create new script to fill the gap
|
||||||
|
# new_tracker = create_script(_RepeaterScript,
|
||||||
|
# key="oob_repeater_%is" % interval, interval=interval)
|
||||||
|
# self.scripts[interval] = new_tracker
|
||||||
|
# self.scripts[interval].subscribe(store_key, sessid, func_key,
|
||||||
|
# interval, *args, **kwargs)
|
||||||
|
#
|
||||||
|
# def remove(self, store_key, interval):
|
||||||
|
# """
|
||||||
|
# Remove tracking
|
||||||
|
# """
|
||||||
|
# if interval in self.scripts:
|
||||||
|
# self.scripts[interval].unsubscribe(store_key)
|
||||||
|
# if len(self.scripts[interval].ndb.subscriptions) == 0:
|
||||||
|
# # no more subscriptions for this interval. Clean out the script.
|
||||||
|
# self.scripts[interval].stop()
|
||||||
|
#
|
||||||
|
# def stop(self):
|
||||||
|
# """
|
||||||
|
# Stop all scripts in pool. This is done at server reload since
|
||||||
|
# restoring the pool will automatically re-populate the pool.
|
||||||
|
# """
|
||||||
|
# for script in self.scripts.values():
|
||||||
|
# script.stop()
|
||||||
|
|
||||||
def at_repeat(self):
|
from twisted.internet.task import LoopingCall
|
||||||
"""
|
|
||||||
Calls subscriptions every self.interval seconds
|
|
||||||
"""
|
|
||||||
for (func_key, sessid, interval, args, kwargs) in self.ndb.subscriptions.values():
|
|
||||||
session = SESSIONS.session_from_sessid(sessid)
|
|
||||||
OOB_HANDLER.execute_cmd(session, func_key, *args, **kwargs)
|
|
||||||
|
|
||||||
def subscribe(self, store_key, sessid, func_key, interval, *args, **kwargs):
|
class OOBTicker(Ticker):
|
||||||
"""
|
"""
|
||||||
Sign up a subscriber to this oobfunction. Subscriber is
|
Version of Ticker that calls OOB_FUNC rather than trying to call
|
||||||
a database object with a dbref.
|
a hook method.
|
||||||
"""
|
"""
|
||||||
self.ndb.subscriptions[store_key] = (func_key, sessid, interval, args, kwargs)
|
def __init__(self, interval):
|
||||||
|
def callback(self, oobhandler, sessions):
|
||||||
|
for key, (_, args, kwargs) in self.subscriptions.items():
|
||||||
|
session = sessions.session_from_sessid(kwargs.get("sessid"))
|
||||||
|
try:
|
||||||
|
oobhandler.execute_cmd(session, kwargs.get("func_key"), *args, **kwargs)
|
||||||
|
except Exception:
|
||||||
|
logger.log_trace()
|
||||||
|
|
||||||
def unsubscribe(self, store_key):
|
self.interval = interval
|
||||||
"""
|
self.subscriptions = {}
|
||||||
Unsubscribe from oobfunction. Returns True if removal was
|
self.task = LoopingCall(callback, self, OOB_HANDLER, SESSIONS)
|
||||||
successful, False otherwise
|
|
||||||
"""
|
|
||||||
self.ndb.subscriptions.pop(store_key, None)
|
|
||||||
|
|
||||||
|
class OOBTickerPool(TickerPool):
|
||||||
|
ticker_class = OOBTicker
|
||||||
|
|
||||||
class _RepeaterPool(object):
|
class OOBTickerHandler(TickerHandler):
|
||||||
"""
|
ticker_pool_class = OOBTickerPool
|
||||||
This maintains a pool of _RepeaterScript scripts, ordered one per
|
|
||||||
interval. It will automatically cull itself once a given interval's
|
|
||||||
script has no more subscriptions.
|
|
||||||
|
|
||||||
This is used and accessed from oobhandler.repeat/unrepeat
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self):
|
|
||||||
self.scripts = {}
|
|
||||||
|
|
||||||
def add(self, store_key, sessid, func_key, interval, *args, **kwargs):
|
|
||||||
"""
|
|
||||||
Add a new tracking
|
|
||||||
"""
|
|
||||||
if interval not in self.scripts:
|
|
||||||
# if no existing interval exists, create new script to fill the gap
|
|
||||||
new_tracker = create_script(_RepeaterScript,
|
|
||||||
key="oob_repeater_%is" % interval, interval=interval)
|
|
||||||
self.scripts[interval] = new_tracker
|
|
||||||
self.scripts[interval].subscribe(store_key, sessid, func_key,
|
|
||||||
interval, *args, **kwargs)
|
|
||||||
|
|
||||||
def remove(self, store_key, interval):
|
|
||||||
"""
|
|
||||||
Remove tracking
|
|
||||||
"""
|
|
||||||
if interval in self.scripts:
|
|
||||||
self.scripts[interval].unsubscribe(store_key)
|
|
||||||
if len(self.scripts[interval].ndb.subscriptions) == 0:
|
|
||||||
# no more subscriptions for this interval. Clean out the script.
|
|
||||||
self.scripts[interval].stop()
|
|
||||||
|
|
||||||
def stop(self):
|
|
||||||
"""
|
|
||||||
Stop all scripts in pool. This is done at server reload since
|
|
||||||
restoring the pool will automatically re-populate the pool.
|
|
||||||
"""
|
|
||||||
for script in self.scripts.values():
|
|
||||||
script.stop()
|
|
||||||
|
|
||||||
|
|
||||||
# Main OOB Handler
|
# Main OOB Handler
|
||||||
|
|
||||||
|
|
@ -217,8 +243,9 @@ class OOBHandler(object):
|
||||||
"""
|
"""
|
||||||
self.sessionhandler = SESSIONS
|
self.sessionhandler = SESSIONS
|
||||||
self.oob_tracker_storage = {}
|
self.oob_tracker_storage = {}
|
||||||
self.oob_repeat_storage = {}
|
#self.oob_repeat_storage = {}
|
||||||
self.oob_tracker_pool = _RepeaterPool()
|
#self.oob_tracker_pool = _RepeaterPool()
|
||||||
|
self.tickerhandler = OOBTickerHandler("oob_ticker_storage")
|
||||||
|
|
||||||
def save(self):
|
def save(self):
|
||||||
"""
|
"""
|
||||||
|
|
@ -229,11 +256,12 @@ class OOBHandler(object):
|
||||||
#print "saved tracker_storage:", self.oob_tracker_storage
|
#print "saved tracker_storage:", self.oob_tracker_storage
|
||||||
ServerConfig.objects.conf(key="oob_tracker_storage",
|
ServerConfig.objects.conf(key="oob_tracker_storage",
|
||||||
value=dbserialize(self.oob_tracker_storage))
|
value=dbserialize(self.oob_tracker_storage))
|
||||||
if self.oob_repeat_storage:
|
self.tickerhandler.save()
|
||||||
#print "saved repeat_storage:", self.oob_repeat_storage
|
#if self.oob_repeat_storage:
|
||||||
ServerConfig.objects.conf(key="oob_repeat_storage",
|
# #print "saved repeat_storage:", self.oob_repeat_storage
|
||||||
value=dbserialize(self.oob_repeat_storage))
|
# ServerConfig.objects.conf(key="oob_repeat_storage",
|
||||||
self.oob_tracker_pool.stop()
|
# value=dbserialize(self.oob_repeat_storage))
|
||||||
|
#self.oob_tracker_pool.stop()
|
||||||
|
|
||||||
def restore(self):
|
def restore(self):
|
||||||
"""
|
"""
|
||||||
|
|
@ -250,14 +278,16 @@ class OOBHandler(object):
|
||||||
# make sure to purce the storage
|
# make sure to purce the storage
|
||||||
ServerConfig.objects.conf(key="oob_tracker_storage", delete=True)
|
ServerConfig.objects.conf(key="oob_tracker_storage", delete=True)
|
||||||
|
|
||||||
repeat_storage = ServerConfig.objects.conf(key="oob_repeat_storage")
|
self.tickerhandler.restore()
|
||||||
if repeat_storage:
|
|
||||||
self.oob_repeat_storage = dbunserialize(repeat_storage)
|
#repeat_storage = ServerConfig.objects.conf(key="oob_repeat_storage")
|
||||||
#print "recovered from repeat_storage:", self.oob_repeat_storage
|
#if repeat_storage:
|
||||||
for (obj, sessid, func_key, interval, args, kwargs) in self.oob_repeat_storage.values():
|
# self.oob_repeat_storage = dbunserialize(repeat_storage)
|
||||||
self.repeat(unpack_dbobj(obj), sessid, func_key, interval, *args, **kwargs)
|
# #print "recovered from repeat_storage:", self.oob_repeat_storage
|
||||||
# make sure to purge the storage
|
# for (obj, sessid, func_key, interval, args, kwargs) in self.oob_repeat_storage.values():
|
||||||
ServerConfig.objects.conf(key="oob_repeat_storage", delete=True)
|
# self.repeat(unpack_dbobj(obj), sessid, func_key, interval, *args, **kwargs)
|
||||||
|
# # make sure to purge the storage
|
||||||
|
# ServerConfig.objects.conf(key="oob_repeat_storage", delete=True)
|
||||||
|
|
||||||
def track(self, obj, sessid, fieldname, trackerclass, *args, **kwargs):
|
def track(self, obj, sessid, fieldname, trackerclass, *args, **kwargs):
|
||||||
"""
|
"""
|
||||||
|
|
@ -362,27 +392,29 @@ class OOBHandler(object):
|
||||||
"""
|
"""
|
||||||
if not func_key in _OOB_FUNCS:
|
if not func_key in _OOB_FUNCS:
|
||||||
raise KeyError("%s is not a valid OOB function name.")
|
raise KeyError("%s is not a valid OOB function name.")
|
||||||
try:
|
#try:
|
||||||
obj = obj.dbobj
|
# obj = obj.dbobj
|
||||||
except AttributeError:
|
#except AttributeError:
|
||||||
pass
|
# pass
|
||||||
store_obj = pack_dbobj(obj)
|
self.tickerhandler.add(self, obj, interval, func_key=func_key, sessid=sessid, *args, **kwargs)
|
||||||
store_key = (store_obj, sessid, func_key, interval)
|
#store_obj = pack_dbobj(obj)
|
||||||
# prepare to store
|
#store_key = (store_obj, sessid, func_key, interval)
|
||||||
self.oob_repeat_storage[store_key] = (store_obj, sessid, func_key, interval, args, kwargs)
|
## prepare to store
|
||||||
self.oob_tracker_pool.add(store_key, sessid, func_key, interval, *args, **kwargs)
|
#self.oob_repeat_storage[store_key] = (store_obj, sessid, func_key, interval, args, kwargs)
|
||||||
|
#self.oob_tracker_pool.add(store_key, sessid, func_key, interval, *args, **kwargs)
|
||||||
|
|
||||||
def unrepeat(self, obj, sessid, func_key, interval=20):
|
def unrepeat(self, obj, sessid, func_key, interval=20):
|
||||||
"""
|
"""
|
||||||
Stop a repeating action
|
Stop a repeating action
|
||||||
"""
|
"""
|
||||||
try:
|
self.tickerhandler.remove(self, obj, interval)
|
||||||
obj = obj.dbobj
|
#try:
|
||||||
except AttributeError:
|
# obj = obj.dbobj
|
||||||
pass
|
#except AttributeError:
|
||||||
store_key = (pack_dbobj(obj), sessid, func_key, interval)
|
# pass
|
||||||
self.oob_tracker_pool.remove(store_key, interval)
|
#store_key = (pack_dbobj(obj), sessid, func_key, interval)
|
||||||
self.oob_repeat_storage.pop(store_key, None)
|
#self.oob_tracker_pool.remove(store_key, interval)
|
||||||
|
#self.oob_repeat_storage.pop(store_key, None)
|
||||||
|
|
||||||
def msg(self, sessid, funcname, *args, **kwargs):
|
def msg(self, sessid, funcname, *args, **kwargs):
|
||||||
"Shortcut to relay oob data back to portal. Used by oob functions."
|
"Shortcut to relay oob data back to portal. Used by oob functions."
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue