diff --git a/src/settings_default.py b/src/settings_default.py index 8f4d79eef..51b011ade 100644 --- a/src/settings_default.py +++ b/src/settings_default.py @@ -152,6 +152,7 @@ DATABASE_PASSWORD = '' DATABASE_HOST = '' DATABASE_PORT = '' +DATABASE_NONBLOCKING_SAVE = True ################################################### # Evennia in-game parsers diff --git a/src/utils/idmapper/base.py b/src/utils/idmapper/base.py index f9b3cd6c2..ee0db1032 100755 --- a/src/utils/idmapper/base.py +++ b/src/utils/idmapper/base.py @@ -1,10 +1,66 @@ +""" +Idmapper base functionality. Most of this is unchanged from the idmapper distribution. + +Extended for Evennia: + +- made object cache a dictionary rather than a WeakValueDictionary. The latter does not work + well for long-time persistence in memory and caused very hard-to-track bugs in Evennia's + typeclass system (which depends on cached memory adresses not changing or going out of scope). +- Added optional asynchronous save operation for use with Twisted. + +""" + from weakref import WeakValueDictionary, ref from django.db.models.base import Model, ModelBase from manager import SharedMemoryManager -TCACHE = {} # test cache, for debugging /Griatch +# +# Evennia extension: Asynchronous save functionality. This +# is used by SharedMemoryBase.save(). +# + +from twisted.internet.defer import DeferredQueue +from twisted.internet.task import cooperate +from twisted.internet.threads import deferToThread, blockingCallFromThread +from django.conf import settings +from src.utils import logger +from twisted.internet import reactor + +ASYNC_DB_SAVE = settings.DATABASE_NONBLOCKING_SAVE + +# This special form of queue has a get() function that returns deferreds. +# Add save function tuples to this in order to add to save queue. +ASYNC_QUEUE = DeferredQueue(backlog=1) + +def async_callback(funcdef): + "This callback is run with the item returned from queue - a tuple (func, args, kwargs)" + #d = deferToThread(funcdef[0], *funcdef[1], **funcdef[2]) + d = reactor.callFromThread(funcdef[0], *funcdef[1], **funcdef[2]) + #d = reactor.blockingCallFromThread(funcdef[0], *funcdef[1], **funcdef[2]) + return d +def async_errback(failure): + "errback" + logger.log_errmsg(str(failure)) +def async_worker(queue): + """ + This will eternally yield items from queue as soon as they are + available (since DeferredQueue.get() only returns when there is + actually something in the queue). The queue automatically inserts + the stored value into the callback async_exec. + """ + while True: + yield queue.get().addCallbacks(async_callback, async_errback) + +# the cooperate handler will iterate over async_worker every time its +# returned deferred fires. +cooperate(async_worker(ASYNC_QUEUE)) + +# in models's save(*args, **kwargs): +# ASYNC_QUEUE.put((super(SharedMemoryModel, self).save, args, kwargs)) + +# ------------------------------------------------------- class SharedMemoryModelBase(ModelBase): #def __new__(cls, name, bases, attrs): @@ -120,13 +176,16 @@ class SharedMemoryModel(Model): """ cls._flush_cached_by_key(instance._get_pk_val()) #key = "%s-%s" % (cls, instance.pk) - #del TCACHE[key] #print "uncached: %s (%s: %s) (total cached: %s)" % (instance, cls.__name__, len(cls.__instance_cache__), len(TCACHE)) flush_cached_instance = classmethod(flush_cached_instance) def save(self, *args, **kwargs): - super(SharedMemoryModel, self).save(*args, **kwargs) + if ASYNC_DB_SAVE: + logger.log_infomsg("Adding to queue %s" % self) + ASYNC_QUEUE.put((super(SharedMemoryModel, self).save, args, kwargs)) + else: + super(SharedMemoryModel, self).save(*args, **kwargs) self.__class__.cache_instance(self) # TODO: This needs moved to the prepare stage (I believe?)