Add missing CONNECTION_THROTTLE, cleanup
This commit is contained in:
commit
0fd36ac335
3 changed files with 195 additions and 71 deletions
|
|
@ -23,12 +23,14 @@ try:
|
|||
from django.utils import unittest
|
||||
except ImportError:
|
||||
import unittest
|
||||
|
||||
|
||||
from evennia.server.validators import EvenniaPasswordValidator
|
||||
from evennia.utils.test_resources import EvenniaTest
|
||||
|
||||
from django.test.runner import DiscoverRunner
|
||||
|
||||
from evennia.server.throttle import Throttle
|
||||
|
||||
from .deprecations import check_errors
|
||||
|
||||
|
||||
|
|
@ -65,31 +67,80 @@ class TestDeprecations(TestCase):
|
|||
"""
|
||||
Class for testing deprecations.check_errors.
|
||||
"""
|
||||
deprecated_settings = ("CMDSET_DEFAULT", "CMDSET_OOC", "BASE_COMM_TYPECLASS", "COMM_TYPECLASS_PATHS",
|
||||
"CHARACTER_DEFAULT_HOME", "OBJECT_TYPECLASS_PATHS", "SCRIPT_TYPECLASS_PATHS",
|
||||
"ACCOUNT_TYPECLASS_PATHS", "CHANNEL_TYPECLASS_PATHS", "SEARCH_MULTIMATCH_SEPARATOR",
|
||||
"TIME_SEC_PER_MIN", "TIME_MIN_PER_HOUR", "TIME_HOUR_PER_DAY", "TIME_DAY_PER_WEEK",
|
||||
"TIME_WEEK_PER_MONTH", "TIME_MONTH_PER_YEAR")
|
||||
deprecated_settings = (
|
||||
"CMDSET_DEFAULT", "CMDSET_OOC", "BASE_COMM_TYPECLASS", "COMM_TYPECLASS_PATHS",
|
||||
"CHARACTER_DEFAULT_HOME", "OBJECT_TYPECLASS_PATHS", "SCRIPT_TYPECLASS_PATHS",
|
||||
"ACCOUNT_TYPECLASS_PATHS", "CHANNEL_TYPECLASS_PATHS", "SEARCH_MULTIMATCH_SEPARATOR",
|
||||
"TIME_SEC_PER_MIN", "TIME_MIN_PER_HOUR", "TIME_HOUR_PER_DAY", "TIME_DAY_PER_WEEK",
|
||||
"TIME_WEEK_PER_MONTH", "TIME_MONTH_PER_YEAR")
|
||||
|
||||
def test_check_errors(self):
|
||||
"""
|
||||
All settings in deprecated_settings should raise a DeprecationWarning if they exist. WEBSERVER_PORTS
|
||||
raises an error if the iterable value passed does not have a tuple as its first element.
|
||||
All settings in deprecated_settings should raise a DeprecationWarning if they exist.
|
||||
WEBSERVER_PORTS raises an error if the iterable value passed does not have a tuple as its
|
||||
first element.
|
||||
"""
|
||||
for setting in self.deprecated_settings:
|
||||
self.assertRaises(DeprecationWarning, check_errors, MockSettings(setting))
|
||||
# test check for WEBSERVER_PORTS having correct value
|
||||
self.assertRaises(DeprecationWarning, check_errors, MockSettings("WEBSERVER_PORTS", value=["not a tuple"]))
|
||||
self.assertRaises(
|
||||
DeprecationWarning,
|
||||
check_errors, MockSettings("WEBSERVER_PORTS", value=["not a tuple"]))
|
||||
|
||||
|
||||
class ValidatorTest(EvenniaTest):
|
||||
|
||||
|
||||
def test_validator(self):
|
||||
# Validator returns None on success and ValidationError on failure.
|
||||
validator = EvenniaPasswordValidator()
|
||||
|
||||
|
||||
# This password should meet Evennia standards.
|
||||
self.assertFalse(validator.validate('testpassword', user=self.account))
|
||||
|
||||
|
||||
# This password contains illegal characters and should raise an Exception.
|
||||
from django.core.exceptions import ValidationError
|
||||
self.assertRaises(ValidationError, validator.validate, '(#)[#]<>', user=self.account)
|
||||
self.assertRaises(ValidationError, validator.validate, '(#)[#]<>', user=self.account)
|
||||
|
||||
|
||||
class ThrottleTest(EvenniaTest):
|
||||
"""
|
||||
Class for testing the connection/IP throttle.
|
||||
"""
|
||||
def test_throttle(self):
|
||||
ips = ('94.100.176.153', '45.56.148.77', '5.196.1.129')
|
||||
kwargs = {
|
||||
'limit': 5,
|
||||
'timeout': 15 * 60
|
||||
}
|
||||
|
||||
throttle = Throttle(**kwargs)
|
||||
|
||||
for ip in ips:
|
||||
# Throttle should not be engaged by default
|
||||
self.assertFalse(throttle.check(ip))
|
||||
|
||||
# Pretend to fail a bunch of events
|
||||
for x in range(50):
|
||||
obj = throttle.update(ip)
|
||||
self.assertFalse(obj)
|
||||
|
||||
# Next ones should be blocked
|
||||
self.assertTrue(throttle.check(ip))
|
||||
|
||||
for x in range(throttle.cache_size * 2):
|
||||
obj = throttle.update(ip)
|
||||
self.assertFalse(obj)
|
||||
|
||||
# Should still be blocked
|
||||
self.assertTrue(throttle.check(ip))
|
||||
|
||||
# Number of values should be limited by cache size
|
||||
self.assertEqual(throttle.cache_size, len(throttle.get(ip)))
|
||||
|
||||
cache = throttle.get()
|
||||
|
||||
# Make sure there are entries for each IP
|
||||
self.assertEqual(len(ips), len(cache.keys()))
|
||||
|
||||
# There should only be (cache_size * num_ips) total in the Throttle cache
|
||||
self.assertEqual(sum([len(cache[x]) for x in cache.keys()]), throttle.cache_size * len(ips))
|
||||
|
|
|
|||
101
evennia/server/throttle.py
Normal file
101
evennia/server/throttle.py
Normal file
|
|
@ -0,0 +1,101 @@
|
|||
from collections import defaultdict, deque
|
||||
import time
|
||||
|
||||
class Throttle(object):
|
||||
"""
|
||||
Keeps a running count of failed actions per IP address.
|
||||
|
||||
Available methods indicate whether or not the number of failures exceeds a
|
||||
particular threshold.
|
||||
|
||||
This version of the throttle is usable by both the terminal server as well
|
||||
as the web server, imposes limits on memory consumption by using deques
|
||||
with length limits instead of open-ended lists, and removes sparse keys when
|
||||
no recent failures have been recorded.
|
||||
"""
|
||||
|
||||
error_msg = 'Too many failed attempts; you must wait a few minutes before trying again.'
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
"""
|
||||
Allows setting of throttle parameters.
|
||||
|
||||
Kwargs:
|
||||
limit (int): Max number of failures before imposing limiter
|
||||
timeout (int): number of timeout seconds after
|
||||
max number of tries has been reached.
|
||||
cache_size (int): Max number of attempts to record per IP within a
|
||||
rolling window; this is NOT the same as the limit after which
|
||||
the throttle is imposed!
|
||||
"""
|
||||
self.storage = defaultdict(deque)
|
||||
self.cache_size = self.limit = kwargs.get('limit', 5)
|
||||
self.timeout = kwargs.get('timeout', 5 * 60)
|
||||
|
||||
def get(self, ip=None):
|
||||
"""
|
||||
Convenience function that returns the storage table, or part of.
|
||||
|
||||
Args:
|
||||
ip (str, optional): IP address of requestor
|
||||
|
||||
Returns:
|
||||
storage (dict): When no IP is provided, returns a dict of all
|
||||
current IPs being tracked and the timestamps of their recent
|
||||
failures.
|
||||
timestamps (deque): When an IP is provided, returns a deque of
|
||||
timestamps of recent failures only for that IP.
|
||||
|
||||
"""
|
||||
if ip: return self.storage.get(ip, deque(maxlen=self.cache_size))
|
||||
else: return self.storage
|
||||
|
||||
def update(self, ip):
|
||||
"""
|
||||
Store the time of the latest failure/
|
||||
|
||||
Args:
|
||||
ip (str): IP address of requestor
|
||||
|
||||
Returns:
|
||||
None
|
||||
|
||||
"""
|
||||
# Enforce length limits
|
||||
if not self.storage[ip].maxlen:
|
||||
self.storage[ip] = deque(maxlen=self.cache_size)
|
||||
|
||||
self.storage[ip].append(time.time())
|
||||
|
||||
def check(self, ip):
|
||||
"""
|
||||
This will check the session's address against the
|
||||
storage dictionary to check they haven't spammed too many
|
||||
fails recently.
|
||||
|
||||
Args:
|
||||
ip (str): IP address of requestor
|
||||
|
||||
Returns:
|
||||
throttled (bool): True if throttling is active,
|
||||
False otherwise.
|
||||
|
||||
"""
|
||||
now = time.time()
|
||||
ip = str(ip)
|
||||
|
||||
# checking mode
|
||||
latest_fails = self.storage[ip]
|
||||
if latest_fails and len(latest_fails) >= self.limit:
|
||||
# too many fails recently
|
||||
if now - latest_fails[-1] < self.timeout:
|
||||
# too soon - timeout in play
|
||||
return True
|
||||
else:
|
||||
# timeout has passed. clear faillist
|
||||
del(self.storage[ip])
|
||||
return False
|
||||
else:
|
||||
return False
|
||||
|
||||
|
||||
Loading…
Add table
Add a link
Reference in a new issue