First preparations for moving webserver to Server process.
This commit is contained in:
parent
60068771c7
commit
76fa0059ea
3 changed files with 48 additions and 27 deletions
|
|
@ -31,10 +31,6 @@ from django.contrib.contenttypes.models import ContentType
|
||||||
from src.utils.utils import to_str, uses_database
|
from src.utils.utils import to_str, uses_database
|
||||||
from src.utils import logger
|
from src.utils import logger
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
__all__ = ("to_pickle", "from_pickle", "do_pickle", "do_unpickle")
|
__all__ = ("to_pickle", "from_pickle", "do_pickle", "do_unpickle")
|
||||||
|
|
||||||
PICKLE_PROTOCOL = 2
|
PICKLE_PROTOCOL = 2
|
||||||
|
|
@ -47,13 +43,21 @@ _FROM_MODEL_MAP = None
|
||||||
_TO_MODEL_MAP = None
|
_TO_MODEL_MAP = None
|
||||||
_TO_TYPECLASS = lambda o: hasattr(o, 'typeclass') and o.typeclass or o
|
_TO_TYPECLASS = lambda o: hasattr(o, 'typeclass') and o.typeclass or o
|
||||||
_IS_PACKED_DBOBJ = lambda o: type(o) == tuple and len(o) == 4 and o[0] == '__packed_dbobj__'
|
_IS_PACKED_DBOBJ = lambda o: type(o) == tuple and len(o) == 4 and o[0] == '__packed_dbobj__'
|
||||||
_TO_DATESTRING = lambda o: _GA(o, "db_date_created").strftime("%Y:%m:%d-%H:%M:%S:%f")
|
if uses_database("mysql") and ServerConfig.objects.get_mysql_db_version() < '5.6.4':
|
||||||
if uses_database("mysql"):
|
|
||||||
from src.server.models import ServerConfig
|
|
||||||
mysql_version = ServerConfig.objects.get_mysql_db_version()
|
|
||||||
if mysql_version < '5.6.4':
|
|
||||||
# mysql <5.6.4 don't support millisecond precision
|
# mysql <5.6.4 don't support millisecond precision
|
||||||
_TO_DATESTRING = lambda o: _GA(o, "db_date_created").strftime("%Y:%m:%d-%H:%M:%S:000000")
|
_DATESTRING = "%Y:%m:%d-%H:%M:%S:000000"
|
||||||
|
else:
|
||||||
|
_DATESTRING = "%Y:%m:%d-%H:%M:%S:%f"
|
||||||
|
|
||||||
|
def _TO_DATESTRING(obj):
|
||||||
|
"this will only be called with valid database objects. Returns datestring on correct form."
|
||||||
|
try:
|
||||||
|
return _GA(obj, "db_date_created").strftime(_DATESTRING)
|
||||||
|
except AttributeError:
|
||||||
|
# this can happen if object is not yet saved - no datestring is then set
|
||||||
|
obj.save()
|
||||||
|
return _GA(obj, "db_date_created").strftime(_DATESTRING)
|
||||||
|
|
||||||
|
|
||||||
def _init_globals():
|
def _init_globals():
|
||||||
"Lazy importing to avoid circular import issues"
|
"Lazy importing to avoid circular import issues"
|
||||||
|
|
|
||||||
|
|
@ -7,10 +7,12 @@ leave caching unexpectedly (no use if WeakRefs).
|
||||||
Also adds cache_size() for monitoring the size of the cache.
|
Also adds cache_size() for monitoring the size of the cache.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import os
|
import os, threading
|
||||||
|
from twisted.internet import reactor
|
||||||
|
from twisted.internet.reactor import callFromThread
|
||||||
|
from twisted.internet.threads import blockingCallFromThread
|
||||||
from django.db.models.base import Model, ModelBase
|
from django.db.models.base import Model, ModelBase
|
||||||
from django.db.models.signals import post_save, pre_delete, \
|
from django.db.models.signals import post_save, pre_delete, post_syncdb
|
||||||
post_syncdb
|
|
||||||
|
|
||||||
from manager import SharedMemoryManager
|
from manager import SharedMemoryManager
|
||||||
|
|
||||||
|
|
@ -37,11 +39,19 @@ def _get_pids():
|
||||||
if server_pid and portal_pid:
|
if server_pid and portal_pid:
|
||||||
return int(server_pid), int(portal_pid)
|
return int(server_pid), int(portal_pid)
|
||||||
return None, None
|
return None, None
|
||||||
_SELF_PID = os.getpid()
|
|
||||||
_SERVER_PID = None
|
|
||||||
_PORTAL_PID = None
|
|
||||||
_IS_SUBPROCESS = False
|
|
||||||
|
|
||||||
|
# get info about the current process and thread
|
||||||
|
|
||||||
|
_SELF_PID = os.getpid()
|
||||||
|
_SERVER_PID, _PORTAL_PID = _get_pids()
|
||||||
|
_IS_SUBPROCESS = (_SERVER_PID and _PORTAL_PID) and not _SELF_PID in (_SERVER_PID, _PORTAL_PID)
|
||||||
|
_IS_MAIN_THREAD = threading.currentThread().getName() == "MainThread"
|
||||||
|
|
||||||
|
#_SERVER_PID = None
|
||||||
|
#_PORTAL_PID = None
|
||||||
|
# #global _SERVER_PID, _PORTAL_PID, _IS_SUBPROCESS, _SELF_PID
|
||||||
|
# if not _SERVER_PID and not _PORTAL_PID:
|
||||||
|
# _IS_SUBPROCESS = (_SERVER_PID and _PORTAL_PID) and not _SELF_PID in (_SERVER_PID, _PORTAL_PID)
|
||||||
|
|
||||||
class SharedMemoryModelBase(ModelBase):
|
class SharedMemoryModelBase(ModelBase):
|
||||||
# CL: upstream had a __new__ method that skipped ModelBase's __new__ if
|
# CL: upstream had a __new__ method that skipped ModelBase's __new__ if
|
||||||
|
|
@ -158,15 +168,22 @@ class SharedMemoryModel(Model):
|
||||||
flush_instance_cache = classmethod(flush_instance_cache)
|
flush_instance_cache = classmethod(flush_instance_cache)
|
||||||
|
|
||||||
def save(cls, *args, **kwargs):
|
def save(cls, *args, **kwargs):
|
||||||
"overload spot for saving"
|
"save tracking process/thread issues"
|
||||||
global _SERVER_PID, _PORTAL_PID, _IS_SUBPROCESS, _SELF_PID
|
|
||||||
if not _SERVER_PID and not _PORTAL_PID:
|
|
||||||
_SERVER_PID, _PORTAL_PID = _get_pids()
|
|
||||||
_IS_SUBPROCESS = (_SERVER_PID and _PORTAL_PID) and (_SERVER_PID != _SELF_PID) and (_PORTAL_PID != _SELF_PID)
|
|
||||||
if _IS_SUBPROCESS:
|
if _IS_SUBPROCESS:
|
||||||
#print "storing in PROC_MODIFIED_OBJS:", cls.db_key, cls.id
|
# we keep a store of objects modified in subprocesses so
|
||||||
|
# we know to update their caches in the central process
|
||||||
PROC_MODIFIED_OBJS.append(cls)
|
PROC_MODIFIED_OBJS.append(cls)
|
||||||
|
|
||||||
|
if _IS_MAIN_THREAD:
|
||||||
|
# in main thread - normal operation
|
||||||
super(SharedMemoryModel, cls).save(*args, **kwargs)
|
super(SharedMemoryModel, cls).save(*args, **kwargs)
|
||||||
|
else:
|
||||||
|
# in another thread; make sure to save in reactor thread
|
||||||
|
def _save_callback(cls, *args, **kwargs):
|
||||||
|
super(SharedMemoryModel, cls).save(*args, **kwargs)
|
||||||
|
blockingCallFromThread(reactor, _save_callback, cls, *args, **kwargs)
|
||||||
|
#callFromThread(_save_callback, cls, *args, **kwargs)
|
||||||
|
|
||||||
# Use a signal so we make sure to catch cascades.
|
# Use a signal so we make sure to catch cascades.
|
||||||
def flush_cache(**kwargs):
|
def flush_cache(**kwargs):
|
||||||
|
|
|
||||||
|
|
@ -467,9 +467,9 @@ def delay(delay=2, retval=None, callback=None):
|
||||||
"""
|
"""
|
||||||
Delay the return of a value.
|
Delay the return of a value.
|
||||||
Inputs:
|
Inputs:
|
||||||
to_return (any) - this will be returned by this function after a delay
|
|
||||||
delay (int) - the delay in seconds
|
delay (int) - the delay in seconds
|
||||||
callback (func(r)) - if given, this will be called with the to_return after delay seconds
|
retval (any) - this will be returned by this function after a delay
|
||||||
|
callback (func(retval)) - if given, this will be called with retval after delay seconds
|
||||||
Returns:
|
Returns:
|
||||||
deferred that will fire with to_return after delay seconds
|
deferred that will fire with to_return after delay seconds
|
||||||
"""
|
"""
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue