Added cache-resyncing to ProcPool. This makes sure to update all affected object caches

whenever the subprocess returns (this is potentially not good enough for long-running scripts,
will have to ponder that one). Made ProcPool work with MySQL (where it works much better). Tested and fixed many
small bugs.
This commit is contained in:
Griatch 2012-09-03 01:11:14 +02:00
parent 275d00d4db
commit ffcf4b3c2f
7 changed files with 121 additions and 14 deletions

View file

@ -0,0 +1,2 @@
# experimental central dictionary for models in subprocesses to report they have been changed.
PROC_MODIFIED_OBJS = []

View file

@ -278,7 +278,7 @@ class CmdBatchCommands(MuxCommand):
caller.msg(" {GBatchfile '%s' applied." % python_path) caller.msg(" {GBatchfile '%s' applied." % python_path)
purge_processor(caller) purge_processor(caller)
def errback(e): def errback(e):
caller.msg(" {RError from processor: 'e'") caller.msg(" {RError from processor: '%s'" % e)
purge_processor(caller) purge_processor(caller)
utils.run_async(_PROCPOOL_BATCHCMD_SOURCE, commands=commands, caller=caller, at_return=callback, at_err=errback) utils.run_async(_PROCPOOL_BATCHCMD_SOURCE, commands=commands, caller=caller, at_return=callback, at_err=errback)
else: else:
@ -372,7 +372,7 @@ class CmdBatchCode(MuxCommand):
caller.msg(" {GBatchfile '%s' applied." % python_path) caller.msg(" {GBatchfile '%s' applied." % python_path)
purge_processor(caller) purge_processor(caller)
def errback(e): def errback(e):
caller.msg(" {RError from processor: 'e'") caller.msg(" {RError from processor: '%s'" % e)
purge_processor(caller) purge_processor(caller)
utils.run_async(_PROCPOOL_BATCHCODE_SOURCE, commands=commands, caller=caller, at_return=callback, at_err=errback) utils.run_async(_PROCPOOL_BATCHCODE_SOURCE, commands=commands, caller=caller, at_return=callback, at_err=errback)
else: else:

View file

@ -41,6 +41,10 @@ _GA = object.__getattribute__
_SA = object.__setattr__ _SA = object.__setattr__
_DA = object.__delattr__ _DA = object.__delattr__
def clean_content_cache(obj):
"Clean obj's content cache"
_SA(obj, "_contents_cache", None)
#------------------------------------------------------------ #------------------------------------------------------------
# #
# ObjAttribute # ObjAttribute

View file

@ -17,6 +17,7 @@ It can be customized via settings.PROCPOOL_*
from twisted.protocols import amp from twisted.protocols import amp
from src.utils.ampoule.child import AMPChild from src.utils.ampoule.child import AMPChild
from src.utils.utils import to_pickle, from_pickle from src.utils.utils import to_pickle, from_pickle
from src import PROC_MODIFIED_OBJS
# handle global setups # handle global setups
_LOGGER = None _LOGGER = None
@ -43,11 +44,11 @@ class ExecuteCode(amp.Command):
arguments = [('source', amp.String()), arguments = [('source', amp.String()),
('environment', amp.String())] ('environment', amp.String())]
errors = [(Exception, 'EXCEPTION')] errors = [(Exception, 'EXCEPTION')]
response = [('response', amp.String())] response = [('response', amp.String()),
('recached', amp.String())]
# Evennia multiprocess child process template # Evennia multiprocess child process template
class ProcPoolChild(AMPChild): class ProcPoolChild(AMPChild):
""" """
This is describing what happens on the subprocess side. This is describing what happens on the subprocess side.
@ -58,7 +59,6 @@ class ProcPoolChild(AMPChild):
executecode - a remote code execution environment executecode - a remote code execution environment
""" """
def executecode(self, source, environment): def executecode(self, source, environment):
""" """
Remote code execution Remote code execution
@ -82,8 +82,9 @@ class ProcPoolChild(AMPChild):
a list being return. The return value is pickled a list being return. The return value is pickled
and thus allows for returning any pickleable data. and thus allows for returning any pickleable data.
""" """
import ev, utils import ev, utils
class Ret(object): class Ret(object):
"Helper class for holding returns from exec" "Helper class for holding returns from exec"
@ -114,14 +115,24 @@ class ProcPoolChild(AMPChild):
from src.utils.logger import logger as _LOGGER from src.utils.logger import logger as _LOGGER
_LOGGER.log_trace("Could not find remote object") _LOGGER.log_trace("Could not find remote object")
available_vars.update(environment) available_vars.update(environment)
# try to execute with eval first
try: try:
ret = eval(source, {}, available_vars) ret = eval(source, {}, available_vars)
if ret != None: ret = to_pickle(ret, emptypickle=False) or ""
return {'response':to_pickle(ret, emptypickle=False) or ""}
except Exception: except Exception:
# use exec instead # use exec instead
exec source in available_vars exec source in available_vars
ret = _return.get_returns()
return {'response': _return.get_returns()} # get the list of affected objects to recache
objs = list(set(PROC_MODIFIED_OBJS))
# we need to include the locations too, to update their content caches
objs = objs + list(set([o.location for o in objs if hasattr(o, "location") and o.location]))
#print "objs:", objs
#print "to_pickle", to_pickle(objs, emptypickle=False, do_pickle=False)
to_recache = to_pickle(objs, emptypickle=False) or ""
# empty the list without loosing memory reference
PROC_MODIFIED_OBJS[:] = []
return {'response': ret,
'recached': to_recache}
ExecuteCode.responder(executecode) ExecuteCode.responder(executecode)

View file

@ -71,13 +71,16 @@ def _set_cache(obj, name, val):
_SA(obj, "db_%s" % name, val) _SA(obj, "db_%s" % name, val)
_GA(obj, "save")() _GA(obj, "save")()
_SA(obj, "_cached_db_%s" % name, val) _SA(obj, "_cached_db_%s" % name, val)
def _del_cache(obj, name): def _del_cache(obj, name):
"On-model cache deleter" "On-model cache deleter"
try: try:
_DA(obj, "_cached_db_%s" % name) _DA(obj, "_cached_db_%s" % name)
except AttributeError: except AttributeError:
pass pass
def _clean_cache(obj):
"On-model cache resetter"
[_DA(obj, cname) for cname in obj.__dict__.keys() if cname.startswith("_cached_db_")]
# this cache holds the attributes loaded on objects, one dictionary # this cache holds the attributes loaded on objects, one dictionary
# of attributes per object. # of attributes per object.

View file

@ -7,12 +7,41 @@ leave caching unexpectedly (no use if WeakRefs).
Also adds cache_size() for monitoring the size of the cache. Also adds cache_size() for monitoring the size of the cache.
""" """
import os
from django.db.models.base import Model, ModelBase from django.db.models.base import Model, ModelBase
from django.db.models.signals import post_save, pre_delete, \ from django.db.models.signals import post_save, pre_delete, \
post_syncdb post_syncdb
from manager import SharedMemoryManager from manager import SharedMemoryManager
# determine if our current pid is different from the server PID (i.e.
# if we are in a subprocess or not)
from src import PROC_MODIFIED_OBJS
def _get_pids():
"""
Get the PID (Process ID) by trying to access
an PID file.
"""
from django.conf import settings
server_pidfile = os.path.join(settings.GAME_DIR, 'server.pid')
portal_pidfile = os.path.join(settings.GAME_DIR, 'portal.pid')
server_pid, portal_pid = None, None
if os.path.exists(server_pidfile):
f = open(server_pidfile, 'r')
server_pid = f.read()
f.close()
if os.path.exists(portal_pidfile):
f = open(portal_pidfile, 'r')
portal_pid = f.read()
f.close()
if server_pid and portal_pid:
return int(server_pid), int(portal_pid)
return None, None
_SELF_PID = os.getpid()
_SERVER_PID = None
_PORTAL_PID = None
_IS_SUBPROCESS = False
class SharedMemoryModelBase(ModelBase): class SharedMemoryModelBase(ModelBase):
# CL: upstream had a __new__ method that skipped ModelBase's __new__ if # CL: upstream had a __new__ method that skipped ModelBase's __new__ if
@ -130,9 +159,15 @@ class SharedMemoryModel(Model):
def save(cls, *args, **kwargs): def save(cls, *args, **kwargs):
"overload spot for saving" "overload spot for saving"
global _SERVER_PID, _PORTAL_PID, _IS_SUBPROCESS, _SELF_PID
if not _SERVER_PID and not _PORTAL_PID:
_SERVER_PID, _PORTAL_PID = _get_pids()
_IS_SUBPROCESS = (_SERVER_PID and _PORTAL_PID) and (_SERVER_PID != _SELF_PID) and (_PORTAL_PID != _SELF_PID)
if _IS_SUBPROCESS:
#print "storing in PROC_MODIFIED_OBJS:", cls.db_key, cls.id
PROC_MODIFIED_OBJS.append(cls)
super(SharedMemoryModel, cls).save(*args, **kwargs) super(SharedMemoryModel, cls).save(*args, **kwargs)
# Use a signal so we make sure to catch cascades. # Use a signal so we make sure to catch cascades.
def flush_cache(**kwargs): def flush_cache(**kwargs):
for model in SharedMemoryModel.__subclasses__(): for model in SharedMemoryModel.__subclasses__():

View file

@ -21,6 +21,11 @@ except ImportError:
ENCODINGS = settings.ENCODINGS ENCODINGS = settings.ENCODINGS
_LOGGER = None _LOGGER = None
_GA = object.__getattribute__
_SA = object.__setattr__
_DA = object.__delattr__
def is_iter(iterable): def is_iter(iterable):
""" """
@ -490,6 +495,8 @@ def to_pickle(data, do_pickle=True, emptypickle=True):
return tuple(iter_db2id(val) for val in item) return tuple(iter_db2id(val) for val in item)
elif dtype == dict: elif dtype == dict:
return dict((key, iter_db2id(val)) for key, val in item.items()) return dict((key, iter_db2id(val)) for key, val in item.items())
elif hasattr(item, '__iter__'):
return [iter_db2id(val) for val in item]
else: else:
item = _TO_DBOBJ(item) item = _TO_DBOBJ(item)
natural_key = _FROM_MODEL_MAP[hasattr(item, "id") and hasattr(item, '__class__') and item.__class__.__name__.lower()] natural_key = _FROM_MODEL_MAP[hasattr(item, "id") and hasattr(item, '__class__') and item.__class__.__name__.lower()]
@ -501,10 +508,11 @@ def to_pickle(data, do_pickle=True, emptypickle=True):
if do_pickle and not (not emptypickle and not data and data != False): if do_pickle and not (not emptypickle and not data and data != False):
return _DUMPS(data) return _DUMPS(data)
return data return data
_TO_MODEL_MAP = None _TO_MODEL_MAP = None
_IS_PACKED_DBOBJ = lambda o: type(o)== tuple and len(o)==3 and o[0]=='__packed_dbobj__' _IS_PACKED_DBOBJ = lambda o: type(o)== tuple and len(o)==3 and o[0]=='__packed_dbobj__'
_TO_TYPECLASS = lambda o: (hasattr(o, 'typeclass') and o.typeclass) or o _TO_TYPECLASS = lambda o: (hasattr(o, 'typeclass') and o.typeclass) or o
from django.db import transaction
@transaction.autocommit
def from_pickle(data, do_pickle=True): def from_pickle(data, do_pickle=True):
""" """
Converts back from a data stream prepared with to_pickle. This will Converts back from a data stream prepared with to_pickle. This will
@ -529,17 +537,57 @@ def from_pickle(data, do_pickle=True):
if dtype in (basestring, int, float): if dtype in (basestring, int, float):
return item return item
elif _IS_PACKED_DBOBJ(item): # this is a tuple and must be done before tuple-check elif _IS_PACKED_DBOBJ(item): # this is a tuple and must be done before tuple-check
return _TO_TYPECLASS(_TO_MODEL_MAP[item[1]].objects.get(id=item[2])) #print item[1], item[2]
if item[2]: #TODO Not sure why this could ever be None, but it can
return _TO_TYPECLASS(_TO_MODEL_MAP[item[1]].objects.get(id=item[2]))
return None
elif dtype == tuple: elif dtype == tuple:
return tuple(iter_id2db(val) for val in item) return tuple(iter_id2db(val) for val in item)
elif dtype == dict: elif dtype == dict:
return dict((key, iter_id2db(val)) for key, val in item.items()) return dict((key, iter_id2db(val)) for key, val in item.items())
elif hasattr(item, '__iter__'):
return [iter_id2db(val) for val in item]
return item return item
if do_pickle: if do_pickle:
data = _LOADS(data) data = _LOADS(data)
# we have to make sure the database is in a safe state
# (this is relevant for multiprocess operation)
transaction.commit()
# do recursive conversion # do recursive conversion
return iter_id2db(data) return iter_id2db(data)
_TYPECLASSMODELS = None
_OBJECTMODELS = None
def clean_object_caches(obj):
"""
Clean all object caches on the given object
"""
global _TYPECLASSMODELS, _OBJECTMODELS
if not _TYPECLASSMODELS:
from src.typeclasses import models as _TYPECLASSMODELS
if not _OBJECTMODELS:
from src.objects import models as _OBJECTMODELS
#print "recaching:", obj
if not obj:
return
obj = hasattr(obj, "dbobj") and obj.dbobj or obj
# contents cache
try:
_SA(obj, "_contents_cache", None)
except AttributeError:
pass
# on-object property cache
[_DA(obj, cname) for cname in obj.__dict__.keys() if cname.startswith("_cached_db_")]
try:
hashid = _GA(obj, "hashid")
hasid = obj.hashid
_TYPECLASSMODELS._ATTRIBUTE_CACHE[hashid] = {}
except AttributeError:
pass
_PPOOL = None _PPOOL = None
_PCMD = None _PCMD = None
_DUMPS = None _DUMPS = None
@ -625,6 +673,9 @@ def run_async(to_execute, *args, **kwargs):
def convert_return(f): def convert_return(f):
def func(ret, *args, **kwargs): def func(ret, *args, **kwargs):
rval = ret["response"] and from_pickle(ret["response"]) rval = ret["response"] and from_pickle(ret["response"])
reca = ret["recached"] and from_pickle(ret["recached"])
# recache all indicated objects
[clean_object_caches(obj) for obj in reca]
if f: return f(rval, *args, **kwargs) if f: return f(rval, *args, **kwargs)
else: return rval else: return rval
return func return func
@ -632,6 +683,7 @@ def run_async(to_execute, *args, **kwargs):
def func(err, *args, **kwargs): def func(err, *args, **kwargs):
err.trap(Exception) err.trap(Exception)
err = err.getErrorMessage() err = err.getErrorMessage()
print err
if f: if f:
return f(err, *args, **kwargs) return f(err, *args, **kwargs)
else: else: