Add special handling for scripts when flushed from cache to avoid duplicate ExtendedLoopingCalls.
This commit is contained in:
parent
e31b9f0d27
commit
b4283a642b
1 changed files with 44 additions and 2 deletions
|
|
@ -18,6 +18,17 @@ from future.utils import with_metaclass
|
||||||
__all__ = ["DefaultScript", "DoNothing", "Store"]
|
__all__ = ["DefaultScript", "DoNothing", "Store"]
|
||||||
|
|
||||||
|
|
||||||
|
FLUSHING_INSTANCES = False # whether we're in the process of flushing scripts from the cache
|
||||||
|
SCRIPT_FLUSH_TIMERS = {} # stores timers for scripts that are currently being flushed
|
||||||
|
|
||||||
|
|
||||||
|
def restart_scripts_after_flush():
|
||||||
|
"""After instances are flushed, validate scripts so they're not dead for a long period of time"""
|
||||||
|
global FLUSHING_INSTANCES
|
||||||
|
ScriptDB.objects.validate()
|
||||||
|
FLUSHING_INSTANCES = False
|
||||||
|
|
||||||
|
|
||||||
class ExtendedLoopingCall(LoopingCall):
|
class ExtendedLoopingCall(LoopingCall):
|
||||||
"""
|
"""
|
||||||
LoopingCall that can start at a delay different
|
LoopingCall that can start at a delay different
|
||||||
|
|
@ -278,6 +289,27 @@ class DefaultScript(ScriptBase):
|
||||||
return max(0, self.db_repeats - task.callcount)
|
return max(0, self.db_repeats - task.callcount)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def at_idmapper_flush(self):
|
||||||
|
"""If we're flushing this object, make sure the LoopingCall is gone too"""
|
||||||
|
ret = super(DefaultScript, self).at_idmapper_flush()
|
||||||
|
if ret:
|
||||||
|
try:
|
||||||
|
from twisted.internet import reactor
|
||||||
|
global FLUSHING_INSTANCES
|
||||||
|
# store the current timers for the _task and stop it to avoid duplicates after cache flush
|
||||||
|
paused_time = self.ndb._task.next_call_time()
|
||||||
|
callcount = self.ndb._task.callcount
|
||||||
|
self._stop_task()
|
||||||
|
SCRIPT_FLUSH_TIMERS[self.id] = (paused_time, callcount)
|
||||||
|
# here we ensure that the restart call only happens once, not once per script
|
||||||
|
if not FLUSHING_INSTANCES:
|
||||||
|
FLUSHING_INSTANCES = True
|
||||||
|
reactor.callLater(2, restart_scripts_after_flush)
|
||||||
|
except Exception:
|
||||||
|
import traceback
|
||||||
|
traceback.print_exc()
|
||||||
|
return ret
|
||||||
|
|
||||||
def start(self, force_restart=False):
|
def start(self, force_restart=False):
|
||||||
"""
|
"""
|
||||||
Called every time the script is started (for persistent
|
Called every time the script is started (for persistent
|
||||||
|
|
@ -294,9 +326,19 @@ class DefaultScript(ScriptBase):
|
||||||
started or not. Used in counting.
|
started or not. Used in counting.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if self.is_active and not force_restart:
|
if self.is_active and not force_restart:
|
||||||
# script already runs and should not be restarted.
|
# The script is already running, but make sure we have a _task if this is after a cache flush
|
||||||
|
if not self.ndb._task:
|
||||||
|
self.ndb._task = ExtendedLoopingCall(self._step_task)
|
||||||
|
try:
|
||||||
|
start_delay, callcount = SCRIPT_FLUSH_TIMERS[self.id]
|
||||||
|
del SCRIPT_FLUSH_TIMERS[self.id]
|
||||||
|
now = False
|
||||||
|
except (KeyError, ValueError, TypeError):
|
||||||
|
now = not self.db_start_delay
|
||||||
|
start_delay = None
|
||||||
|
callcount = 0
|
||||||
|
self.ndb._task.start(self.db_interval, now=now, start_delay=start_delay, count_start=callcount)
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
obj = self.obj
|
obj = self.obj
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue