diff options
Diffstat (limited to 'tests/util')
-rw-r--r-- | tests/util/caches/test_descriptors.py | 68 | ||||
-rw-r--r-- | tests/util/test_dict_cache.py | 16 | ||||
-rw-r--r-- | tests/util/test_expiring_cache.py | 4 | ||||
-rw-r--r-- | tests/util/test_file_consumer.py | 177 | ||||
-rw-r--r-- | tests/util/test_limiter.py | 70 | ||||
-rw-r--r-- | tests/util/test_linearizer.py | 109 | ||||
-rw-r--r-- | tests/util/test_log_context.py | 96 | ||||
-rw-r--r-- | tests/util/test_logcontext.py | 179 | ||||
-rw-r--r-- | tests/util/test_logformatter.py (renamed from tests/util/test_clock.py) | 34 | ||||
-rw-r--r-- | tests/util/test_lrucache.py | 4 | ||||
-rw-r--r-- | tests/util/test_rwlock.py | 4 | ||||
-rw-r--r-- | tests/util/test_snapshot_cache.py | 5 | ||||
-rw-r--r-- | tests/util/test_stream_change_cache.py | 215 | ||||
-rw-r--r-- | tests/util/test_treecache.py | 4 | ||||
-rw-r--r-- | tests/util/test_wheel_timer.py | 8 |
15 files changed, 783 insertions, 210 deletions
diff --git a/tests/util/caches/test_descriptors.py b/tests/util/caches/test_descriptors.py index 3f14ab503f..8176a7dabd 100644 --- a/tests/util/caches/test_descriptors.py +++ b/tests/util/caches/test_descriptors.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,18 +14,71 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +from functools import partial import mock + +from twisted.internet import defer, reactor + from synapse.api.errors import SynapseError -from synapse.util import async from synapse.util import logcontext -from twisted.internet import defer from synapse.util.caches import descriptors + from tests import unittest logger = logging.getLogger(__name__) +def run_on_reactor(): + d = defer.Deferred() + reactor.callLater(0, d.callback, 0) + return logcontext.make_deferred_yieldable(d) + + +class CacheTestCase(unittest.TestCase): + def test_invalidate_all(self): + cache = descriptors.Cache("testcache") + + callback_record = [False, False] + + def record_callback(idx): + callback_record[idx] = True + + # add a couple of pending entries + d1 = defer.Deferred() + cache.set("key1", d1, partial(record_callback, 0)) + + d2 = defer.Deferred() + cache.set("key2", d2, partial(record_callback, 1)) + + # lookup should return the deferreds + self.assertIs(cache.get("key1"), d1) + self.assertIs(cache.get("key2"), d2) + + # let one of the lookups complete + d2.callback("result2") + self.assertEqual(cache.get("key2"), "result2") + + # now do the invalidation + cache.invalidate_all() + + # lookup should return none + self.assertIsNone(cache.get("key1", None)) + self.assertIsNone(cache.get("key2", None)) + + # both callbacks should have been callbacked + self.assertTrue( + callback_record[0], "Invalidation callback for key1 not called", + ) + self.assertTrue( + callback_record[1], "Invalidation callback for key2 not called", + ) + + # letting the other lookup complete should do nothing + d1.callback("result1") + self.assertIsNone(cache.get("key1", None)) + + class DescriptorTestCase(unittest.TestCase): @defer.inlineCallbacks def test_cache(self): @@ -149,7 +203,8 @@ class DescriptorTestCase(unittest.TestCase): def fn(self, arg1): @defer.inlineCallbacks def inner_fn(): - yield async.run_on_reactor() + # we want this to behave like an asynchronous function + yield run_on_reactor() raise SynapseError(400, "blah") return inner_fn() @@ -159,7 +214,12 @@ class DescriptorTestCase(unittest.TestCase): with logcontext.LoggingContext() as c1: c1.name = "c1" try: - yield obj.fn(1) + d = obj.fn(1) + self.assertEqual( + logcontext.LoggingContext.current_context(), + logcontext.LoggingContext.sentinel, + ) + yield d self.fail("No exception thrown") except SynapseError: pass diff --git a/tests/util/test_dict_cache.py b/tests/util/test_dict_cache.py index bc92f85fa6..26f2fa5800 100644 --- a/tests/util/test_dict_cache.py +++ b/tests/util/test_dict_cache.py @@ -14,10 +14,10 @@ # limitations under the License. -from tests import unittest - from synapse.util.caches.dictionary_cache import DictionaryCache +from tests import unittest + class DictCacheTestCase(unittest.TestCase): @@ -32,7 +32,7 @@ class DictCacheTestCase(unittest.TestCase): seq = self.cache.sequence test_value = {"test": "test_simple_cache_hit_full"} - self.cache.update(seq, key, test_value, full=True) + self.cache.update(seq, key, test_value) c = self.cache.get(key) self.assertEqual(test_value, c.value) @@ -44,7 +44,7 @@ class DictCacheTestCase(unittest.TestCase): test_value = { "test": "test_simple_cache_hit_partial" } - self.cache.update(seq, key, test_value, full=True) + self.cache.update(seq, key, test_value) c = self.cache.get(key, ["test"]) self.assertEqual(test_value, c.value) @@ -56,7 +56,7 @@ class DictCacheTestCase(unittest.TestCase): test_value = { "test": "test_simple_cache_miss_partial" } - self.cache.update(seq, key, test_value, full=True) + self.cache.update(seq, key, test_value) c = self.cache.get(key, ["test2"]) self.assertEqual({}, c.value) @@ -70,7 +70,7 @@ class DictCacheTestCase(unittest.TestCase): "test2": "test_simple_cache_hit_miss_partial2", "test3": "test_simple_cache_hit_miss_partial3", } - self.cache.update(seq, key, test_value, full=True) + self.cache.update(seq, key, test_value) c = self.cache.get(key, ["test2"]) self.assertEqual({"test2": "test_simple_cache_hit_miss_partial2"}, c.value) @@ -82,13 +82,13 @@ class DictCacheTestCase(unittest.TestCase): test_value_1 = { "test": "test_simple_cache_hit_miss_partial", } - self.cache.update(seq, key, test_value_1, full=False) + self.cache.update(seq, key, test_value_1, fetched_keys=set("test")) seq = self.cache.sequence test_value_2 = { "test2": "test_simple_cache_hit_miss_partial2", } - self.cache.update(seq, key, test_value_2, full=False) + self.cache.update(seq, key, test_value_2, fetched_keys=set("test2")) c = self.cache.get(key) self.assertEqual( diff --git a/tests/util/test_expiring_cache.py b/tests/util/test_expiring_cache.py index 31d24adb8b..d12b5e838b 100644 --- a/tests/util/test_expiring_cache.py +++ b/tests/util/test_expiring_cache.py @@ -14,12 +14,12 @@ # limitations under the License. -from .. import unittest - from synapse.util.caches.expiringcache import ExpiringCache from tests.utils import MockClock +from .. import unittest + class ExpiringCacheTestCase(unittest.TestCase): diff --git a/tests/util/test_file_consumer.py b/tests/util/test_file_consumer.py new file mode 100644 index 0000000000..7ce5f8c258 --- /dev/null +++ b/tests/util/test_file_consumer.py @@ -0,0 +1,177 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import threading + +from mock import NonCallableMock +from six import StringIO + +from twisted.internet import defer, reactor + +from synapse.util.file_consumer import BackgroundFileConsumer + +from tests import unittest + + +class FileConsumerTests(unittest.TestCase): + + @defer.inlineCallbacks + def test_pull_consumer(self): + string_file = StringIO() + consumer = BackgroundFileConsumer(string_file, reactor=reactor) + + try: + producer = DummyPullProducer() + + yield producer.register_with_consumer(consumer) + + yield producer.write_and_wait("Foo") + + self.assertEqual(string_file.getvalue(), "Foo") + + yield producer.write_and_wait("Bar") + + self.assertEqual(string_file.getvalue(), "FooBar") + finally: + consumer.unregisterProducer() + + yield consumer.wait() + + self.assertTrue(string_file.closed) + + @defer.inlineCallbacks + def test_push_consumer(self): + string_file = BlockingStringWrite() + consumer = BackgroundFileConsumer(string_file, reactor=reactor) + + try: + producer = NonCallableMock(spec_set=[]) + + consumer.registerProducer(producer, True) + + consumer.write("Foo") + yield string_file.wait_for_n_writes(1) + + self.assertEqual(string_file.buffer, "Foo") + + consumer.write("Bar") + yield string_file.wait_for_n_writes(2) + + self.assertEqual(string_file.buffer, "FooBar") + finally: + consumer.unregisterProducer() + + yield consumer.wait() + + self.assertTrue(string_file.closed) + + @defer.inlineCallbacks + def test_push_producer_feedback(self): + string_file = BlockingStringWrite() + consumer = BackgroundFileConsumer(string_file, reactor=reactor) + + try: + producer = NonCallableMock(spec_set=["pauseProducing", "resumeProducing"]) + + resume_deferred = defer.Deferred() + producer.resumeProducing.side_effect = lambda: resume_deferred.callback(None) + + consumer.registerProducer(producer, True) + + number_writes = 0 + with string_file.write_lock: + for _ in range(consumer._PAUSE_ON_QUEUE_SIZE): + consumer.write("Foo") + number_writes += 1 + + producer.pauseProducing.assert_called_once() + + yield string_file.wait_for_n_writes(number_writes) + + yield resume_deferred + producer.resumeProducing.assert_called_once() + finally: + consumer.unregisterProducer() + + yield consumer.wait() + + self.assertTrue(string_file.closed) + + +class DummyPullProducer(object): + def __init__(self): + self.consumer = None + self.deferred = defer.Deferred() + + def resumeProducing(self): + d = self.deferred + self.deferred = defer.Deferred() + d.callback(None) + + def write_and_wait(self, bytes): + d = self.deferred + self.consumer.write(bytes) + return d + + def register_with_consumer(self, consumer): + d = self.deferred + self.consumer = consumer + self.consumer.registerProducer(self, False) + return d + + +class BlockingStringWrite(object): + def __init__(self): + self.buffer = "" + self.closed = False + self.write_lock = threading.Lock() + + self._notify_write_deferred = None + self._number_of_writes = 0 + + def write(self, bytes): + with self.write_lock: + self.buffer += bytes + self._number_of_writes += 1 + + reactor.callFromThread(self._notify_write) + + def close(self): + self.closed = True + + def _notify_write(self): + "Called by write to indicate a write happened" + with self.write_lock: + if not self._notify_write_deferred: + return + d = self._notify_write_deferred + self._notify_write_deferred = None + d.callback(None) + + @defer.inlineCallbacks + def wait_for_n_writes(self, n): + "Wait for n writes to have happened" + while True: + with self.write_lock: + if n <= self._number_of_writes: + return + + if not self._notify_write_deferred: + self._notify_write_deferred = defer.Deferred() + + d = self._notify_write_deferred + + yield d diff --git a/tests/util/test_limiter.py b/tests/util/test_limiter.py deleted file mode 100644 index 9c795d9fdb..0000000000 --- a/tests/util/test_limiter.py +++ /dev/null @@ -1,70 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2016 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from tests import unittest - -from twisted.internet import defer - -from synapse.util.async import Limiter - - -class LimiterTestCase(unittest.TestCase): - - @defer.inlineCallbacks - def test_limiter(self): - limiter = Limiter(3) - - key = object() - - d1 = limiter.queue(key) - cm1 = yield d1 - - d2 = limiter.queue(key) - cm2 = yield d2 - - d3 = limiter.queue(key) - cm3 = yield d3 - - d4 = limiter.queue(key) - self.assertFalse(d4.called) - - d5 = limiter.queue(key) - self.assertFalse(d5.called) - - with cm1: - self.assertFalse(d4.called) - self.assertFalse(d5.called) - - self.assertTrue(d4.called) - self.assertFalse(d5.called) - - with cm3: - self.assertFalse(d5.called) - - self.assertTrue(d5.called) - - with cm2: - pass - - with (yield d4): - pass - - with (yield d5): - pass - - d6 = limiter.queue(key) - with (yield d6): - pass diff --git a/tests/util/test_linearizer.py b/tests/util/test_linearizer.py index afcba482f9..4729bd5a0a 100644 --- a/tests/util/test_linearizer.py +++ b/tests/util/test_linearizer.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,13 +14,16 @@ # See the License for the specific language governing permissions and # limitations under the License. +from six.moves import range -from tests import unittest - -from twisted.internet import defer +from twisted.internet import defer, reactor +from twisted.internet.defer import CancelledError +from synapse.util import Clock, logcontext from synapse.util.async import Linearizer +from tests import unittest + class LinearizerTestCase(unittest.TestCase): @@ -38,7 +42,104 @@ class LinearizerTestCase(unittest.TestCase): with cm1: self.assertFalse(d2.called) + with (yield d2): + pass + + def test_lots_of_queued_things(self): + # we have one slow thing, and lots of fast things queued up behind it. + # it should *not* explode the stack. + linearizer = Linearizer() + + @defer.inlineCallbacks + def func(i, sleep=False): + with logcontext.LoggingContext("func(%s)" % i) as lc: + with (yield linearizer.queue("")): + self.assertEqual( + logcontext.LoggingContext.current_context(), lc) + if sleep: + yield Clock(reactor).sleep(0) + + self.assertEqual( + logcontext.LoggingContext.current_context(), lc) + + func(0, sleep=True) + for i in range(1, 100): + func(i) + + return func(1000) + + @defer.inlineCallbacks + def test_multiple_entries(self): + limiter = Linearizer(max_count=3) + + key = object() + + d1 = limiter.queue(key) + cm1 = yield d1 + + d2 = limiter.queue(key) + cm2 = yield d2 + + d3 = limiter.queue(key) + cm3 = yield d3 + + d4 = limiter.queue(key) + self.assertFalse(d4.called) + + d5 = limiter.queue(key) + self.assertFalse(d5.called) + + with cm1: + self.assertFalse(d4.called) + self.assertFalse(d5.called) + + cm4 = yield d4 + self.assertFalse(d5.called) + + with cm3: + self.assertFalse(d5.called) + + cm5 = yield d5 + + with cm2: + pass + + with cm4: + pass + + with cm5: + pass + + d6 = limiter.queue(key) + with (yield d6): + pass + + @defer.inlineCallbacks + def test_cancellation(self): + linearizer = Linearizer() + + key = object() + + d1 = linearizer.queue(key) + cm1 = yield d1 + + d2 = linearizer.queue(key) + self.assertFalse(d2.called) + + d3 = linearizer.queue(key) + self.assertFalse(d3.called) + + d2.cancel() + + with cm1: + pass + self.assertTrue(d2.called) + try: + yield d2 + self.fail("Expected d2 to raise CancelledError") + except CancelledError: + pass - with (yield d2): + with (yield d3): pass diff --git a/tests/util/test_log_context.py b/tests/util/test_log_context.py deleted file mode 100644 index 9ffe209c4d..0000000000 --- a/tests/util/test_log_context.py +++ /dev/null @@ -1,96 +0,0 @@ -import twisted.python.failure -from twisted.internet import defer -from twisted.internet import reactor -from .. import unittest - -from synapse.util.async import sleep -from synapse.util import logcontext -from synapse.util.logcontext import LoggingContext - - -class LoggingContextTestCase(unittest.TestCase): - - def _check_test_key(self, value): - self.assertEquals( - LoggingContext.current_context().test_key, value - ) - - def test_with_context(self): - with LoggingContext() as context_one: - context_one.test_key = "test" - self._check_test_key("test") - - @defer.inlineCallbacks - def test_sleep(self): - @defer.inlineCallbacks - def competing_callback(): - with LoggingContext() as competing_context: - competing_context.test_key = "competing" - yield sleep(0) - self._check_test_key("competing") - - reactor.callLater(0, competing_callback) - - with LoggingContext() as context_one: - context_one.test_key = "one" - yield sleep(0) - self._check_test_key("one") - - def _test_preserve_fn(self, function): - sentinel_context = LoggingContext.current_context() - - callback_completed = [False] - - @defer.inlineCallbacks - def cb(): - context_one.test_key = "one" - yield function() - self._check_test_key("one") - - callback_completed[0] = True - - with LoggingContext() as context_one: - context_one.test_key = "one" - - # fire off function, but don't wait on it. - logcontext.preserve_fn(cb)() - - self._check_test_key("one") - - # now wait for the function under test to have run, and check that - # the logcontext is left in a sane state. - d2 = defer.Deferred() - - def check_logcontext(): - if not callback_completed[0]: - reactor.callLater(0.01, check_logcontext) - return - - # make sure that the context was reset before it got thrown back - # into the reactor - try: - self.assertIs(LoggingContext.current_context(), - sentinel_context) - d2.callback(None) - except BaseException: - d2.errback(twisted.python.failure.Failure()) - - reactor.callLater(0.01, check_logcontext) - - # test is done once d2 finishes - return d2 - - def test_preserve_fn_with_blocking_fn(self): - @defer.inlineCallbacks - def blocking_function(): - yield sleep(0) - - return self._test_preserve_fn(blocking_function) - - def test_preserve_fn_with_non_blocking_fn(self): - @defer.inlineCallbacks - def nonblocking_function(): - with logcontext.PreserveLoggingContext(): - yield defer.succeed(None) - - return self._test_preserve_fn(nonblocking_function) diff --git a/tests/util/test_logcontext.py b/tests/util/test_logcontext.py new file mode 100644 index 0000000000..c54001f7a4 --- /dev/null +++ b/tests/util/test_logcontext.py @@ -0,0 +1,179 @@ +import twisted.python.failure +from twisted.internet import defer, reactor + +from synapse.util import Clock, logcontext +from synapse.util.logcontext import LoggingContext + +from .. import unittest + + +class LoggingContextTestCase(unittest.TestCase): + + def _check_test_key(self, value): + self.assertEquals( + LoggingContext.current_context().request, value + ) + + def test_with_context(self): + with LoggingContext() as context_one: + context_one.request = "test" + self._check_test_key("test") + + @defer.inlineCallbacks + def test_sleep(self): + clock = Clock(reactor) + + @defer.inlineCallbacks + def competing_callback(): + with LoggingContext() as competing_context: + competing_context.request = "competing" + yield clock.sleep(0) + self._check_test_key("competing") + + reactor.callLater(0, competing_callback) + + with LoggingContext() as context_one: + context_one.request = "one" + yield clock.sleep(0) + self._check_test_key("one") + + def _test_run_in_background(self, function): + sentinel_context = LoggingContext.current_context() + + callback_completed = [False] + + def test(): + context_one.request = "one" + d = function() + + def cb(res): + self._check_test_key("one") + callback_completed[0] = True + return res + d.addCallback(cb) + + return d + + with LoggingContext() as context_one: + context_one.request = "one" + + # fire off function, but don't wait on it. + logcontext.run_in_background(test) + + self._check_test_key("one") + + # now wait for the function under test to have run, and check that + # the logcontext is left in a sane state. + d2 = defer.Deferred() + + def check_logcontext(): + if not callback_completed[0]: + reactor.callLater(0.01, check_logcontext) + return + + # make sure that the context was reset before it got thrown back + # into the reactor + try: + self.assertIs(LoggingContext.current_context(), + sentinel_context) + d2.callback(None) + except BaseException: + d2.errback(twisted.python.failure.Failure()) + + reactor.callLater(0.01, check_logcontext) + + # test is done once d2 finishes + return d2 + + def test_run_in_background_with_blocking_fn(self): + @defer.inlineCallbacks + def blocking_function(): + yield Clock(reactor).sleep(0) + + return self._test_run_in_background(blocking_function) + + def test_run_in_background_with_non_blocking_fn(self): + @defer.inlineCallbacks + def nonblocking_function(): + with logcontext.PreserveLoggingContext(): + yield defer.succeed(None) + + return self._test_run_in_background(nonblocking_function) + + def test_run_in_background_with_chained_deferred(self): + # a function which returns a deferred which looks like it has been + # called, but is actually paused + def testfunc(): + return logcontext.make_deferred_yieldable( + _chained_deferred_function() + ) + + return self._test_run_in_background(testfunc) + + @defer.inlineCallbacks + def test_make_deferred_yieldable(self): + # a function which retuns an incomplete deferred, but doesn't follow + # the synapse rules. + def blocking_function(): + d = defer.Deferred() + reactor.callLater(0, d.callback, None) + return d + + sentinel_context = LoggingContext.current_context() + + with LoggingContext() as context_one: + context_one.request = "one" + + d1 = logcontext.make_deferred_yieldable(blocking_function()) + # make sure that the context was reset by make_deferred_yieldable + self.assertIs(LoggingContext.current_context(), sentinel_context) + + yield d1 + + # now it should be restored + self._check_test_key("one") + + @defer.inlineCallbacks + def test_make_deferred_yieldable_with_chained_deferreds(self): + sentinel_context = LoggingContext.current_context() + + with LoggingContext() as context_one: + context_one.request = "one" + + d1 = logcontext.make_deferred_yieldable(_chained_deferred_function()) + # make sure that the context was reset by make_deferred_yieldable + self.assertIs(LoggingContext.current_context(), sentinel_context) + + yield d1 + + # now it should be restored + self._check_test_key("one") + + @defer.inlineCallbacks + def test_make_deferred_yieldable_on_non_deferred(self): + """Check that make_deferred_yieldable does the right thing when its + argument isn't actually a deferred""" + + with LoggingContext() as context_one: + context_one.request = "one" + + d1 = logcontext.make_deferred_yieldable("bum") + self._check_test_key("one") + + r = yield d1 + self.assertEqual(r, "bum") + self._check_test_key("one") + + +# a function which returns a deferred which has been "called", but +# which had a function which returned another incomplete deferred on +# its callback list, so won't yet call any other new callbacks. +def _chained_deferred_function(): + d = defer.succeed(None) + + def cb(res): + d2 = defer.Deferred() + reactor.callLater(0, d2.callback, res) + return d2 + d.addCallback(cb) + return d diff --git a/tests/util/test_clock.py b/tests/util/test_logformatter.py index 9672603579..297aebbfbe 100644 --- a/tests/util/test_clock.py +++ b/tests/util/test_logformatter.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2017 Vector Creations Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,22 +12,28 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from synapse import util -from twisted.internet import defer +import sys + +from synapse.util.logformatter import LogFormatter + from tests import unittest -class ClockTestCase(unittest.TestCase): - @defer.inlineCallbacks - def test_time_bound_deferred(self): - # just a deferred which never resolves - slow_deferred = defer.Deferred() +class TestException(Exception): + pass + - clock = util.Clock() - time_bound = clock.time_bound_deferred(slow_deferred, 0.001) +class LogFormatterTestCase(unittest.TestCase): + def test_formatter(self): + formatter = LogFormatter() try: - yield time_bound - self.fail("Expected timedout error, but got nothing") - except util.DeferredTimedOutError: - pass + raise TestException("testytest") + except TestException: + ei = sys.exc_info() + + output = formatter.formatException(ei) + + # check the output looks vaguely sane + self.assertIn("testytest", output) + self.assertIn("Capture point", output) diff --git a/tests/util/test_lrucache.py b/tests/util/test_lrucache.py index dfb78cb8bd..9b36ef4482 100644 --- a/tests/util/test_lrucache.py +++ b/tests/util/test_lrucache.py @@ -14,12 +14,12 @@ # limitations under the License. -from .. import unittest +from mock import Mock from synapse.util.caches.lrucache import LruCache from synapse.util.caches.treecache import TreeCache -from mock import Mock +from .. import unittest class LruCacheTestCase(unittest.TestCase): diff --git a/tests/util/test_rwlock.py b/tests/util/test_rwlock.py index 1d745ae1a7..24194e3b25 100644 --- a/tests/util/test_rwlock.py +++ b/tests/util/test_rwlock.py @@ -14,10 +14,10 @@ # limitations under the License. -from tests import unittest - from synapse.util.async import ReadWriteLock +from tests import unittest + class ReadWriteLockTestCase(unittest.TestCase): diff --git a/tests/util/test_snapshot_cache.py b/tests/util/test_snapshot_cache.py index d3a8630c2f..0f5b32fcc0 100644 --- a/tests/util/test_snapshot_cache.py +++ b/tests/util/test_snapshot_cache.py @@ -14,10 +14,11 @@ # limitations under the License. -from .. import unittest +from twisted.internet.defer import Deferred from synapse.util.caches.snapshot_cache import SnapshotCache -from twisted.internet.defer import Deferred + +from .. import unittest class SnapshotCacheTestCase(unittest.TestCase): diff --git a/tests/util/test_stream_change_cache.py b/tests/util/test_stream_change_cache.py new file mode 100644 index 0000000000..65b0f2e6fb --- /dev/null +++ b/tests/util/test_stream_change_cache.py @@ -0,0 +1,215 @@ +from mock import patch + +from synapse.util.caches.stream_change_cache import StreamChangeCache + +from tests import unittest + + +class StreamChangeCacheTests(unittest.TestCase): + """ + Tests for StreamChangeCache. + """ + + def test_prefilled_cache(self): + """ + Providing a prefilled cache to StreamChangeCache will result in a cache + with the prefilled-cache entered in. + """ + cache = StreamChangeCache("#test", 1, prefilled_cache={"user@foo.com": 2}) + self.assertTrue(cache.has_entity_changed("user@foo.com", 1)) + + def test_has_entity_changed(self): + """ + StreamChangeCache.entity_has_changed will mark entities as changed, and + has_entity_changed will observe the changed entities. + """ + cache = StreamChangeCache("#test", 3) + + cache.entity_has_changed("user@foo.com", 6) + cache.entity_has_changed("bar@baz.net", 7) + + # If it's been changed after that stream position, return True + self.assertTrue(cache.has_entity_changed("user@foo.com", 4)) + self.assertTrue(cache.has_entity_changed("bar@baz.net", 4)) + + # If it's been changed at that stream position, return False + self.assertFalse(cache.has_entity_changed("user@foo.com", 6)) + + # If there's no changes after that stream position, return False + self.assertFalse(cache.has_entity_changed("user@foo.com", 7)) + + # If the entity does not exist, return False. + self.assertFalse(cache.has_entity_changed("not@here.website", 7)) + + # If we request before the stream cache's earliest known position, + # return True, whether it's a known entity or not. + self.assertTrue(cache.has_entity_changed("user@foo.com", 0)) + self.assertTrue(cache.has_entity_changed("not@here.website", 0)) + + @patch("synapse.util.caches.CACHE_SIZE_FACTOR", 1.0) + def test_has_entity_changed_pops_off_start(self): + """ + StreamChangeCache.entity_has_changed will respect the max size and + purge the oldest items upon reaching that max size. + """ + cache = StreamChangeCache("#test", 1, max_size=2) + + cache.entity_has_changed("user@foo.com", 2) + cache.entity_has_changed("bar@baz.net", 3) + cache.entity_has_changed("user@elsewhere.org", 4) + + # The cache is at the max size, 2 + self.assertEqual(len(cache._cache), 2) + + # The oldest item has been popped off + self.assertTrue("user@foo.com" not in cache._entity_to_key) + + # If we update an existing entity, it keeps the two existing entities + cache.entity_has_changed("bar@baz.net", 5) + self.assertEqual( + set(["bar@baz.net", "user@elsewhere.org"]), set(cache._entity_to_key) + ) + + def test_get_all_entities_changed(self): + """ + StreamChangeCache.get_all_entities_changed will return all changed + entities since the given position. If the position is before the start + of the known stream, it returns None instead. + """ + cache = StreamChangeCache("#test", 1) + + cache.entity_has_changed("user@foo.com", 2) + cache.entity_has_changed("bar@baz.net", 3) + cache.entity_has_changed("user@elsewhere.org", 4) + + self.assertEqual( + cache.get_all_entities_changed(1), + ["user@foo.com", "bar@baz.net", "user@elsewhere.org"], + ) + self.assertEqual( + cache.get_all_entities_changed(2), ["bar@baz.net", "user@elsewhere.org"] + ) + self.assertEqual(cache.get_all_entities_changed(3), ["user@elsewhere.org"]) + self.assertEqual(cache.get_all_entities_changed(0), None) + + def test_has_any_entity_changed(self): + """ + StreamChangeCache.has_any_entity_changed will return True if any + entities have been changed since the provided stream position, and + False if they have not. If the cache has entries and the provided + stream position is before it, it will return True, otherwise False if + the cache has no entries. + """ + cache = StreamChangeCache("#test", 1) + + # With no entities, it returns False for the past, present, and future. + self.assertFalse(cache.has_any_entity_changed(0)) + self.assertFalse(cache.has_any_entity_changed(1)) + self.assertFalse(cache.has_any_entity_changed(2)) + + # We add an entity + cache.entity_has_changed("user@foo.com", 2) + + # With an entity, it returns True for the past, the stream start + # position, and False for the stream position the entity was changed + # on and ones after it. + self.assertTrue(cache.has_any_entity_changed(0)) + self.assertTrue(cache.has_any_entity_changed(1)) + self.assertFalse(cache.has_any_entity_changed(2)) + self.assertFalse(cache.has_any_entity_changed(3)) + + def test_get_entities_changed(self): + """ + StreamChangeCache.get_entities_changed will return the entities in the + given list that have changed since the provided stream ID. If the + stream position is earlier than the earliest known position, it will + return all of the entities queried for. + """ + cache = StreamChangeCache("#test", 1) + + cache.entity_has_changed("user@foo.com", 2) + cache.entity_has_changed("bar@baz.net", 3) + cache.entity_has_changed("user@elsewhere.org", 4) + + # Query all the entries, but mid-way through the stream. We should only + # get the ones after that point. + self.assertEqual( + cache.get_entities_changed( + ["user@foo.com", "bar@baz.net", "user@elsewhere.org"], stream_pos=2 + ), + set(["bar@baz.net", "user@elsewhere.org"]), + ) + + # Query all the entries mid-way through the stream, but include one + # that doesn't exist in it. We shouldn't get back the one that doesn't + # exist. + self.assertEqual( + cache.get_entities_changed( + [ + "user@foo.com", + "bar@baz.net", + "user@elsewhere.org", + "not@here.website", + ], + stream_pos=2, + ), + set(["bar@baz.net", "user@elsewhere.org"]), + ) + + # Query all the entries, but before the first known point. We will get + # all the entries we queried for, including ones that don't exist. + self.assertEqual( + cache.get_entities_changed( + [ + "user@foo.com", + "bar@baz.net", + "user@elsewhere.org", + "not@here.website", + ], + stream_pos=0, + ), + set( + [ + "user@foo.com", + "bar@baz.net", + "user@elsewhere.org", + "not@here.website", + ] + ), + ) + + # Query a subset of the entries mid-way through the stream. We should + # only get back the subset. + self.assertEqual( + cache.get_entities_changed( + [ + "bar@baz.net", + ], + stream_pos=2, + ), + set( + [ + "bar@baz.net", + ] + ), + ) + + def test_max_pos(self): + """ + StreamChangeCache.get_max_pos_of_last_change will return the most + recent point where the entity could have changed. If the entity is not + known, the stream start is provided instead. + """ + cache = StreamChangeCache("#test", 1) + + cache.entity_has_changed("user@foo.com", 2) + cache.entity_has_changed("bar@baz.net", 3) + cache.entity_has_changed("user@elsewhere.org", 4) + + # Known entities will return the point where they were changed. + self.assertEqual(cache.get_max_pos_of_last_change("user@foo.com"), 2) + self.assertEqual(cache.get_max_pos_of_last_change("bar@baz.net"), 3) + self.assertEqual(cache.get_max_pos_of_last_change("user@elsewhere.org"), 4) + + # Unknown entities will return the stream start position. + self.assertEqual(cache.get_max_pos_of_last_change("not@here.website"), 1) diff --git a/tests/util/test_treecache.py b/tests/util/test_treecache.py index 7ab578a185..a5f2261208 100644 --- a/tests/util/test_treecache.py +++ b/tests/util/test_treecache.py @@ -14,10 +14,10 @@ # limitations under the License. -from .. import unittest - from synapse.util.caches.treecache import TreeCache +from .. import unittest + class TreeCacheTestCase(unittest.TestCase): def test_get_set_onelevel(self): diff --git a/tests/util/test_wheel_timer.py b/tests/util/test_wheel_timer.py index c44567e52e..03201a4d9b 100644 --- a/tests/util/test_wheel_timer.py +++ b/tests/util/test_wheel_timer.py @@ -13,10 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -from .. import unittest - from synapse.util.wheel_timer import WheelTimer +from .. import unittest + class WheelTimerTestCase(unittest.TestCase): def test_single_insert_fetch(self): @@ -33,7 +33,7 @@ class WheelTimerTestCase(unittest.TestCase): self.assertListEqual(wheel.fetch(156), [obj]) self.assertListEqual(wheel.fetch(170), []) - def test_mutli_insert(self): + def test_multi_insert(self): wheel = WheelTimer(bucket_size=5) obj1 = object() @@ -58,7 +58,7 @@ class WheelTimerTestCase(unittest.TestCase): wheel.insert(100, obj, 50) self.assertListEqual(wheel.fetch(120), [obj]) - def test_insert_past_mutli(self): + def test_insert_past_multi(self): wheel = WheelTimer(bucket_size=5) obj1 = object() |