Added first version of TickerHandler, for managing subscription style tickers in a centralized way.
This commit is contained in:
parent
1b0d229eeb
commit
205960948d
2 changed files with 242 additions and 0 deletions
239
src/scripts/tickerhandler.py
Normal file
239
src/scripts/tickerhandler.py
Normal file
|
|
@ -0,0 +1,239 @@
|
||||||
|
"""
|
||||||
|
Tickerhandler
|
||||||
|
|
||||||
|
This implements an efficient Ticker which uses a subscription
|
||||||
|
model to 'tick' subscribed objects at regular intervals. All
|
||||||
|
that is required is that the subscribing objects has a
|
||||||
|
method "at_tick".
|
||||||
|
"""
|
||||||
|
from twisted.internet.task import LoopingCall
|
||||||
|
from src.server.models import ServerConfig
|
||||||
|
from src.utils.logger import log_trace
|
||||||
|
from src.utils.utils import to_str
|
||||||
|
from src.utils.dbserialize import dbserialize, dbunserialize, pack_dbobj, unpack_dbobj
|
||||||
|
|
||||||
|
_GA = object.__getattribute__
|
||||||
|
_SA = object.__setattr__
|
||||||
|
|
||||||
|
|
||||||
|
class _Ticker(object):
|
||||||
|
"""
|
||||||
|
Represents a repeatedly running task that calls
|
||||||
|
hooks repeatedly.
|
||||||
|
"""
|
||||||
|
def __init__(self, interval, hook_key="at_tick"):
|
||||||
|
"""
|
||||||
|
Set up the ticker
|
||||||
|
"""
|
||||||
|
def callback(self):
|
||||||
|
"This should be fed _Task as argument"
|
||||||
|
hook_key = self.hook_key
|
||||||
|
for key, obj in self.subscriptions.items():
|
||||||
|
try:
|
||||||
|
_GA(obj, hook_key)()
|
||||||
|
except Exception:
|
||||||
|
log_trace()
|
||||||
|
|
||||||
|
self.interval = interval
|
||||||
|
self.hook_key = hook_key
|
||||||
|
self.subscriptions = {}
|
||||||
|
self.task = LoopingCall(callback, self)
|
||||||
|
|
||||||
|
def validate(self):
|
||||||
|
"""
|
||||||
|
Start/stop the task depending on how many
|
||||||
|
subscribers we have using it.
|
||||||
|
"""
|
||||||
|
subs = self.subscriptions
|
||||||
|
if None in subs.values():
|
||||||
|
# clean out objects that may have been deleted
|
||||||
|
subs = dict((store_key, obj) for store_key, obj in subs if obj)
|
||||||
|
self.subscriptions = subs
|
||||||
|
if self.task.running:
|
||||||
|
if not subs:
|
||||||
|
self.task.stop()
|
||||||
|
elif subs:
|
||||||
|
self.task.start(self.interval, now=False)
|
||||||
|
|
||||||
|
def add(self, store_key, obj):
|
||||||
|
"""
|
||||||
|
Sign up a subscriber to this ticker
|
||||||
|
"""
|
||||||
|
self.subscriptions[store_key] = obj
|
||||||
|
self.validate()
|
||||||
|
|
||||||
|
def remove(self, store_key):
|
||||||
|
"""
|
||||||
|
Unsubscribe object from this ticker
|
||||||
|
"""
|
||||||
|
self.subscriptions.pop(store_key, False)
|
||||||
|
self.validate()
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
"""
|
||||||
|
Kill the Task, regardless of subscriptions
|
||||||
|
"""
|
||||||
|
self.subscriptions = {}
|
||||||
|
self.validate()
|
||||||
|
|
||||||
|
class _TickerPool(object):
|
||||||
|
"""
|
||||||
|
This maintains a pool of Twisted LoopingCall tasks
|
||||||
|
for calling subscribed objects at given times.
|
||||||
|
"""
|
||||||
|
def __init__(self, hook_key="at_tick"):
|
||||||
|
"Initialize the pool"
|
||||||
|
self.tickers = {}
|
||||||
|
|
||||||
|
def add(self, store_key, obj, interval, hook_key="at_tick"):
|
||||||
|
"""
|
||||||
|
Add new ticker subscriber
|
||||||
|
"""
|
||||||
|
if interval not in self.tickers:
|
||||||
|
self.tickers[interval] = _Ticker(interval, hook_key=hook_key)
|
||||||
|
self.tickers[interval].add(store_key, obj)
|
||||||
|
|
||||||
|
def remove(self, store_key, interval):
|
||||||
|
"""
|
||||||
|
Remove subscription from pool
|
||||||
|
"""
|
||||||
|
if interval in self.tickers:
|
||||||
|
self.tickers[interval].remove(store_key)
|
||||||
|
|
||||||
|
def stop(self, interval=None):
|
||||||
|
"""
|
||||||
|
Stop all scripts in pool. This is done at server reload since
|
||||||
|
restoring the pool will automatically re-populate the pool.
|
||||||
|
If interval is given, only stop tickers with that interval.
|
||||||
|
"""
|
||||||
|
if interval and interval in self.tickers:
|
||||||
|
self.tickers[interval].stop()
|
||||||
|
else:
|
||||||
|
for ticker in self.tickers.values():
|
||||||
|
ticker.stop()
|
||||||
|
|
||||||
|
|
||||||
|
class TickerHandler(object):
|
||||||
|
"""
|
||||||
|
The Tickerhandler maintains a pool of tasks for subscribing
|
||||||
|
objects to various tick rates. The pool maintains creation
|
||||||
|
instructions and and re-applies them at a server restart.
|
||||||
|
"""
|
||||||
|
def __init__(self):
|
||||||
|
"""
|
||||||
|
Initialize handler
|
||||||
|
"""
|
||||||
|
self.ticker_storage = {}
|
||||||
|
self.ticker_pool = _TickerPool()
|
||||||
|
|
||||||
|
def _store_key(self, obj, interval, hook_key):
|
||||||
|
"""
|
||||||
|
Tries to create a store_key for the object.
|
||||||
|
Returns a tuple (isdb, store_key) where isdb
|
||||||
|
is a boolean True if obj was a database object,
|
||||||
|
False otherwise.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
obj = obj.typeclass
|
||||||
|
except AttributeError:
|
||||||
|
pass
|
||||||
|
dbobj = None
|
||||||
|
try:
|
||||||
|
dbobj = obj.dbobj
|
||||||
|
except AttributeError:
|
||||||
|
pass
|
||||||
|
isdb = True
|
||||||
|
if dbobj:
|
||||||
|
# create a store_key using the database representation
|
||||||
|
objkey = pack_dbobj(dbobj)
|
||||||
|
else:
|
||||||
|
# non-db object, look for a property "key" on it, otherwise
|
||||||
|
# use its memory location.
|
||||||
|
try:
|
||||||
|
objkey = _GA(obj, "key")
|
||||||
|
except AttributeError:
|
||||||
|
objkey = id(obj)
|
||||||
|
isdb = False
|
||||||
|
# return sidb and store_key
|
||||||
|
return isdb, (objkey, interval, hook_key)
|
||||||
|
|
||||||
|
def save(self):
|
||||||
|
"""
|
||||||
|
Save ticker_storage as a serialized string into a temporary
|
||||||
|
ServerConf field. This is called by server when it shuts down
|
||||||
|
"""
|
||||||
|
#print "save:", self.ticker_storage
|
||||||
|
if self.ticker_storage:
|
||||||
|
ServerConfig.objects.conf(key="ticker_storage",
|
||||||
|
value=dbserialize(self.ticker_storage))
|
||||||
|
else:
|
||||||
|
ServerConfig.objects.conf(key="ticker_storage", delete=True)
|
||||||
|
|
||||||
|
def restore(self):
|
||||||
|
"""
|
||||||
|
Restore ticker_storage from database and re-initialize the handler from storage. This is triggered by the server at restart.
|
||||||
|
"""
|
||||||
|
# load stored command instructions and use them to re-initialize handler
|
||||||
|
ticker_storage = ServerConfig.objects.conf(key="ticker_storage")
|
||||||
|
if ticker_storage:
|
||||||
|
self.ticker_storage = dbunserialize(ticker_storage)
|
||||||
|
#print "restore:", self.ticker_storage
|
||||||
|
for (obj, interval, hook_key) in self.ticker_storage.values():
|
||||||
|
obj = unpack_dbobj(obj)
|
||||||
|
_, store_key = self._store_key(obj, interval, hook_key)
|
||||||
|
self.ticker_pool.add(store_key, obj, interval, hook_key)
|
||||||
|
|
||||||
|
def add(self, obj, interval, hook_key="at_tick"):
|
||||||
|
"""
|
||||||
|
Add object to tickerhandler. The object must have an at_tick
|
||||||
|
method. This will be called every interval seconds until the
|
||||||
|
object is unsubscribed from the ticker.
|
||||||
|
"""
|
||||||
|
isdb, store_key = self._store_key(obj, interval, hook_key)
|
||||||
|
if isdb:
|
||||||
|
self.ticker_storage[store_key] = store_key
|
||||||
|
self.save()
|
||||||
|
self.ticker_pool.add(store_key, obj, interval, hook_key)
|
||||||
|
|
||||||
|
def remove(self, obj, interval, hook_key="at_tick"):
|
||||||
|
"""
|
||||||
|
Remove object from ticker with given interval.
|
||||||
|
"""
|
||||||
|
isdb, store_key = self._store_key(obj, interval, hook_key)
|
||||||
|
if isdb:
|
||||||
|
self.ticker_storage.pop(store_key, None)
|
||||||
|
self.save()
|
||||||
|
self.ticker_pool.remove(store_key, interval)
|
||||||
|
|
||||||
|
def clear(self, interval=None):
|
||||||
|
"""
|
||||||
|
Stop/remove all tickers from handler, or the ones
|
||||||
|
with a given interval. This is the only supported
|
||||||
|
way to kill tickers for non-db objects. If interval
|
||||||
|
is given, only stop tickers with this interval.
|
||||||
|
"""
|
||||||
|
self.ticker_pool.stop(interval)
|
||||||
|
if interval:
|
||||||
|
self.ticker_storage = dict((store_key, store_key) for store_key in self.ticker_storage if store_key[1] != interval)
|
||||||
|
else:
|
||||||
|
self.ticker_storage = {}
|
||||||
|
self.save()
|
||||||
|
|
||||||
|
def all(self, interval=None):
|
||||||
|
"""
|
||||||
|
Get the subsciptions for a given interval. If interval
|
||||||
|
is not given, return a dictionary with lists for every
|
||||||
|
interval in the tickerhandler.
|
||||||
|
"""
|
||||||
|
if interval is None:
|
||||||
|
# return dict of all, ordered by interval
|
||||||
|
return dict((interval, ticker.subscriptions.values())
|
||||||
|
for interval, ticker in self.ticker_pool.tickers.items())
|
||||||
|
else:
|
||||||
|
# get individual interval
|
||||||
|
ticker = self.ticker_pool.tickers.get(interval, None)
|
||||||
|
if ticker:
|
||||||
|
return ticker.subscriptions.values()
|
||||||
|
|
||||||
|
# main tickerhandler
|
||||||
|
TICKER_HANDLER = TickerHandler()
|
||||||
|
|
@ -237,6 +237,9 @@ class Evennia(object):
|
||||||
from src.server.oobhandler import OOB_HANDLER
|
from src.server.oobhandler import OOB_HANDLER
|
||||||
OOB_HANDLER.restore()
|
OOB_HANDLER.restore()
|
||||||
|
|
||||||
|
from src.scripts.tickerhandler import TICKER_HANDLER
|
||||||
|
TICKER_HANDLER.restore()
|
||||||
|
|
||||||
if SERVER_STARTSTOP_MODULE:
|
if SERVER_STARTSTOP_MODULE:
|
||||||
# call correct server hook based on start file value
|
# call correct server hook based on start file value
|
||||||
if mode in ('True', 'reload'):
|
if mode in ('True', 'reload'):
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue