Added fixes to make sure the tickerhandler loop doesn't delete from its own subscription dict while inside the loop - even if that deletion happens inside a callback.

This commit is contained in:
Griatch 2016-05-26 18:56:38 +02:00
parent 343f2eed3f
commit a74c921686

View file

@ -98,7 +98,9 @@ class Ticker(object):
kwargs is used here to identify which hook method to call. kwargs is used here to identify which hook method to call.
""" """
to_remove = [] self._to_add = []
self._to_remove = []
self._is_ticking = True
for store_key, (args, kwargs) in self.subscriptions.iteritems(): for store_key, (args, kwargs) in self.subscriptions.iteritems():
callback = yield kwargs.pop("_callback", "at_tick") callback = yield kwargs.pop("_callback", "at_tick")
obj = yield kwargs.pop("_obj", None) obj = yield kwargs.pop("_obj", None)
@ -110,22 +112,28 @@ class Ticker(object):
# try object method # try object method
if not obj or not obj.pk: if not obj or not obj.pk:
# object was deleted between calls # object was deleted between calls
to_remove.append(store_key) self._to_remove.append(store_key)
continue continue
else: else:
yield _GA(obj, callback)(*args, **kwargs) yield _GA(obj, callback)(*args, **kwargs)
except ObjectDoesNotExist: except ObjectDoesNotExist:
log_trace("Removing ticker.") log_trace("Removing ticker.")
to_remove.append(store_key) self._to_remove.append(store_key)
except Exception: except Exception:
log_trace() log_trace()
finally: finally:
# make sure to re-store # make sure to re-store
kwargs["_callback"] = callback kwargs["_callback"] = callback
kwargs["_obj"] = obj kwargs["_obj"] = obj
# cleanup # cleanup - we do this here to avoid changing the subscription dict while it loops
for store_key in to_remove: self._is_ticking = False
for store_key in self._to_remove:
self.remove(store_key) self.remove(store_key)
for store_key, (args, kwargs) in self._to_add:
self.add(store_key, *args, **kwargs)
self._to_remove = []
self._to_add = []
def __init__(self, interval): def __init__(self, interval):
""" """
@ -137,6 +145,9 @@ class Ticker(object):
""" """
self.interval = interval self.interval = interval
self.subscriptions = {} self.subscriptions = {}
self._is_ticking = False
self._to_remove = []
self._to_add = []
# set up a twisted asynchronous repeat call # set up a twisted asynchronous repeat call
self.task = ExtendedLoopingCall(self._callback) self.task = ExtendedLoopingCall(self._callback)
@ -169,9 +180,14 @@ class Ticker(object):
`interval`. `interval`.
""" """
start_delay = kwargs.pop("_start_delay", None) if self._is_ticking:
self.subscriptions[store_key] = (args, kwargs) # protects the subscription dict from
self.validate(start_delay=start_delay) # updating while it is looping
self._to_start.append((store_key, (args, kwargs)))
else:
start_delay = kwargs.pop("_start_delay", None)
self.subscriptions[store_key] = (args, kwargs)
self.validate(start_delay=start_delay)
def remove(self, store_key): def remove(self, store_key):
""" """
@ -181,8 +197,13 @@ class Ticker(object):
store_key (str): Unique store key. store_key (str): Unique store key.
""" """
self.subscriptions.pop(store_key, False) if self._is_ticking:
self.validate() # this protects the subscription dict from
# updating while it is looping
self._to_remove.append(store_key)
else:
self.subscriptions.pop(store_key, False)
self.validate()
def stop(self): def stop(self):
""" """