From f40645e60b9cab69c953094848be61c0989a91cb Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 18 Aug 2020 16:20:49 -0400 Subject: Convert events worker database to async/await. (#8071) --- tests/server_notices/test_resource_limits_server_notices.py | 6 +++--- tests/storage/test_appservice.py | 3 ++- 2 files changed, 5 insertions(+), 4 deletions(-) (limited to 'tests') diff --git a/tests/server_notices/test_resource_limits_server_notices.py b/tests/server_notices/test_resource_limits_server_notices.py index 2858d13558..23db821fb7 100644 --- a/tests/server_notices/test_resource_limits_server_notices.py +++ b/tests/server_notices/test_resource_limits_server_notices.py @@ -104,7 +104,7 @@ class TestResourceLimitsServerNotices(unittest.HomeserverTestCase): type=EventTypes.Message, content={"msgtype": ServerNoticeMsgType} ) self._rlsn._store.get_events = Mock( - return_value=defer.succeed({"123": mock_event}) + return_value=make_awaitable({"123": mock_event}) ) self.get_success(self._rlsn.maybe_send_server_notice_to_user(self.user_id)) # Would be better to check the content, but once == remove blocking event @@ -122,7 +122,7 @@ class TestResourceLimitsServerNotices(unittest.HomeserverTestCase): type=EventTypes.Message, content={"msgtype": ServerNoticeMsgType} ) self._rlsn._store.get_events = Mock( - return_value=defer.succeed({"123": mock_event}) + return_value=make_awaitable({"123": mock_event}) ) self.get_success(self._rlsn.maybe_send_server_notice_to_user(self.user_id)) @@ -217,7 +217,7 @@ class TestResourceLimitsServerNotices(unittest.HomeserverTestCase): type=EventTypes.Message, content={"msgtype": ServerNoticeMsgType} ) self._rlsn._store.get_events = Mock( - return_value=defer.succeed({"123": mock_event}) + return_value=make_awaitable({"123": mock_event}) ) self.get_success(self._rlsn.maybe_send_server_notice_to_user(self.user_id)) diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index a425e66f37..17fbde284a 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -31,6 +31,7 @@ from synapse.storage.databases.main.appservice import ( ) from tests import unittest +from tests.test_utils import make_awaitable from tests.utils import setup_test_homeserver @@ -357,7 +358,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): other_events = [Mock(event_id="e5"), Mock(event_id="e6")] # we aren't testing store._base stuff here, so mock this out - self.store.get_events_as_list = Mock(return_value=defer.succeed(events)) + self.store.get_events_as_list = Mock(return_value=make_awaitable(events)) yield self._insert_txn(self.as_list[1]["id"], 9, other_events) yield self._insert_txn(service.id, 10, events) -- cgit 1.5.1 From 76d21d14a042756b0c8a8f520dfd9ea09cf092c7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 19 Aug 2020 10:39:31 +0100 Subject: Separate `get_current_token` into two. (#8113) The function is used for two purposes: 1) for subscribers of streams to get a token they can use to get further updates with, and 2) for replication to track position of the writers of the stream. For streams with a single writer the two scenarios produce the same result, however the situation becomes complicated for streams with multiple writers. The current `MultiWriterIdGenerator` does not correctly handle the first case (which is not an issue as its only used for the `caches` stream which nothing subscribes to outside of replication). --- changelog.d/8113.misc | 1 + .../slave/storage/_slaved_id_tracker.py | 8 +++++ synapse/replication/tcp/streams/_base.py | 2 +- synapse/storage/databases/main/cache.py | 4 +-- synapse/storage/util/id_generators.py | 36 ++++++++++++++++------ tests/storage/test_id_generators.py | 16 +++++----- 6 files changed, 47 insertions(+), 20 deletions(-) create mode 100644 changelog.d/8113.misc (limited to 'tests') diff --git a/changelog.d/8113.misc b/changelog.d/8113.misc new file mode 100644 index 0000000000..00bec4f8ef --- /dev/null +++ b/changelog.d/8113.misc @@ -0,0 +1 @@ +Separate `get_current_token` into two since there are two different use cases for it. diff --git a/synapse/replication/slave/storage/_slaved_id_tracker.py b/synapse/replication/slave/storage/_slaved_id_tracker.py index 9d1d173b2f..d43eaf3a29 100644 --- a/synapse/replication/slave/storage/_slaved_id_tracker.py +++ b/synapse/replication/slave/storage/_slaved_id_tracker.py @@ -33,3 +33,11 @@ class SlavedIdTracker(object): int """ return self._current + + def get_current_token_for_writer(self, instance_name: str) -> int: + """Returns the position of the given writer. + + For streams with single writers this is equivalent to + `get_current_token`. + """ + return self.get_current_token() diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index 7a42de3f7d..1e92d52165 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -405,7 +405,7 @@ class CachesStream(Stream): store = hs.get_datastore() super().__init__( hs.get_instance_name(), - store.get_cache_stream_token, + store.get_cache_stream_token_for_writer, store.get_all_updated_caches, ) diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 10de446065..1e7637a6f5 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -299,8 +299,8 @@ class CacheInvalidationWorkerStore(SQLBaseStore): }, ) - def get_cache_stream_token(self, instance_name): + def get_cache_stream_token_for_writer(self, instance_name: str) -> int: if self._cache_id_gen: - return self._cache_id_gen.get_current_token(instance_name) + return self._cache_id_gen.get_current_token_for_writer(instance_name) else: return 0 diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index e2ddd01290..8276a755e5 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -158,6 +158,14 @@ class StreamIdGenerator(object): return self._current + def get_current_token_for_writer(self, instance_name: str) -> int: + """Returns the position of the given writer. + + For streams with single writers this is equivalent to + `get_current_token`. + """ + return self.get_current_token() + class ChainedIdGenerator(object): """Used to generate new stream ids where the stream must be kept in sync @@ -216,6 +224,14 @@ class ChainedIdGenerator(object): "Attempted to advance token on source for table %r", self._table ) + def get_current_token_for_writer(self, instance_name: str) -> Tuple[int, int]: + """Returns the position of the given writer. + + For streams with single writers this is equivalent to + `get_current_token`. + """ + return self.get_current_token() + class MultiWriterIdGenerator: """An ID generator that tracks a stream that can have multiple writers. @@ -298,7 +314,7 @@ class MultiWriterIdGenerator: # Assert the fetched ID is actually greater than what we currently # believe the ID to be. If not, then the sequence and table have got # out of sync somehow. - assert self.get_current_token() < next_id + assert self.get_current_token_for_writer(self._instance_name) < next_id with self._lock: self._unfinished_ids.add(next_id) @@ -344,16 +360,18 @@ class MultiWriterIdGenerator: curr = self._current_positions.get(self._instance_name, 0) self._current_positions[self._instance_name] = max(curr, next_id) - def get_current_token(self, instance_name: str = None) -> int: - """Gets the current position of a named writer (defaults to current - instance). - - Returns 0 if we don't have a position for the named writer (likely due - to it being a new writer). + def get_current_token(self) -> int: + """Returns the maximum stream id such that all stream ids less than or + equal to it have been successfully persisted. """ - if instance_name is None: - instance_name = self._instance_name + # Currently we don't support this operation, as it's not obvious how to + # condense the stream positions of multiple writers into a single int. + raise NotImplementedError() + + def get_current_token_for_writer(self, instance_name: str) -> int: + """Returns the position of the given writer. + """ with self._lock: return self._current_positions.get(instance_name, 0) diff --git a/tests/storage/test_id_generators.py b/tests/storage/test_id_generators.py index e845410dae..7a05194653 100644 --- a/tests/storage/test_id_generators.py +++ b/tests/storage/test_id_generators.py @@ -88,7 +88,7 @@ class MultiWriterIdGeneratorTestCase(HomeserverTestCase): id_gen = self._create_id_generator() self.assertEqual(id_gen.get_positions(), {"master": 7}) - self.assertEqual(id_gen.get_current_token("master"), 7) + self.assertEqual(id_gen.get_current_token_for_writer("master"), 7) # Try allocating a new ID gen and check that we only see position # advanced after we leave the context manager. @@ -98,12 +98,12 @@ class MultiWriterIdGeneratorTestCase(HomeserverTestCase): self.assertEqual(stream_id, 8) self.assertEqual(id_gen.get_positions(), {"master": 7}) - self.assertEqual(id_gen.get_current_token("master"), 7) + self.assertEqual(id_gen.get_current_token_for_writer("master"), 7) self.get_success(_get_next_async()) self.assertEqual(id_gen.get_positions(), {"master": 8}) - self.assertEqual(id_gen.get_current_token("master"), 8) + self.assertEqual(id_gen.get_current_token_for_writer("master"), 8) def test_multi_instance(self): """Test that reads and writes from multiple processes are handled @@ -116,8 +116,8 @@ class MultiWriterIdGeneratorTestCase(HomeserverTestCase): second_id_gen = self._create_id_generator("second") self.assertEqual(first_id_gen.get_positions(), {"first": 3, "second": 7}) - self.assertEqual(first_id_gen.get_current_token("first"), 3) - self.assertEqual(first_id_gen.get_current_token("second"), 7) + self.assertEqual(first_id_gen.get_current_token_for_writer("first"), 3) + self.assertEqual(first_id_gen.get_current_token_for_writer("second"), 7) # Try allocating a new ID gen and check that we only see position # advanced after we leave the context manager. @@ -166,7 +166,7 @@ class MultiWriterIdGeneratorTestCase(HomeserverTestCase): id_gen = self._create_id_generator() self.assertEqual(id_gen.get_positions(), {"master": 7}) - self.assertEqual(id_gen.get_current_token("master"), 7) + self.assertEqual(id_gen.get_current_token_for_writer("master"), 7) # Try allocating a new ID gen and check that we only see position # advanced after we leave the context manager. @@ -176,9 +176,9 @@ class MultiWriterIdGeneratorTestCase(HomeserverTestCase): self.assertEqual(stream_id, 8) self.assertEqual(id_gen.get_positions(), {"master": 7}) - self.assertEqual(id_gen.get_current_token("master"), 7) + self.assertEqual(id_gen.get_current_token_for_writer("master"), 7) self.get_success(self.db_pool.runInteraction("test", _get_next_txn)) self.assertEqual(id_gen.get_positions(), {"master": 8}) - self.assertEqual(id_gen.get_current_token("master"), 8) + self.assertEqual(id_gen.get_current_token_for_writer("master"), 8) -- cgit 1.5.1 From d294f0e7e18f4ba91b5ca6a944758d5b92d1ea2a Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 19 Aug 2020 07:09:07 -0400 Subject: Remove the unused inlineCallbacks code-paths in the caching code (#8119) --- changelog.d/8119.misc | 1 + synapse/util/caches/descriptors.py | 54 ++++++----------------------------- tests/util/caches/test_descriptors.py | 12 ++++---- 3 files changed, 15 insertions(+), 52 deletions(-) create mode 100644 changelog.d/8119.misc (limited to 'tests') diff --git a/changelog.d/8119.misc b/changelog.d/8119.misc new file mode 100644 index 0000000000..dfe4c03171 --- /dev/null +++ b/changelog.d/8119.misc @@ -0,0 +1 @@ +Convert various parts of the codebase to async/await. diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index c2d72a82cf..49d9fddcf0 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -285,16 +285,9 @@ class Cache(object): class _CacheDescriptorBase(object): - def __init__( - self, orig: _CachedFunction, num_args, inlineCallbacks, cache_context=False - ): + def __init__(self, orig: _CachedFunction, num_args, cache_context=False): self.orig = orig - if inlineCallbacks: - self.function_to_call = defer.inlineCallbacks(orig) - else: - self.function_to_call = orig - arg_spec = inspect.getfullargspec(orig) all_args = arg_spec.args @@ -364,7 +357,7 @@ class CacheDescriptor(_CacheDescriptorBase): invalidated) by adding a special "cache_context" argument to the function and passing that as a kwarg to all caches called. For example:: - @cachedInlineCallbacks(cache_context=True) + @cached(cache_context=True) def foo(self, key, cache_context): r1 = yield self.bar1(key, on_invalidate=cache_context.invalidate) r2 = yield self.bar2(key, on_invalidate=cache_context.invalidate) @@ -382,17 +375,11 @@ class CacheDescriptor(_CacheDescriptorBase): max_entries=1000, num_args=None, tree=False, - inlineCallbacks=False, cache_context=False, iterable=False, ): - super(CacheDescriptor, self).__init__( - orig, - num_args=num_args, - inlineCallbacks=inlineCallbacks, - cache_context=cache_context, - ) + super().__init__(orig, num_args=num_args, cache_context=cache_context) self.max_entries = max_entries self.tree = tree @@ -465,9 +452,7 @@ class CacheDescriptor(_CacheDescriptorBase): observer = defer.succeed(cached_result_d) except KeyError: - ret = defer.maybeDeferred( - preserve_fn(self.function_to_call), obj, *args, **kwargs - ) + ret = defer.maybeDeferred(preserve_fn(self.orig), obj, *args, **kwargs) def onErr(f): cache.invalidate(cache_key) @@ -510,9 +495,7 @@ class CacheListDescriptor(_CacheDescriptorBase): of results. """ - def __init__( - self, orig, cached_method_name, list_name, num_args=None, inlineCallbacks=False - ): + def __init__(self, orig, cached_method_name, list_name, num_args=None): """ Args: orig (function) @@ -521,12 +504,8 @@ class CacheListDescriptor(_CacheDescriptorBase): num_args (int): number of positional arguments (excluding ``self``, but including list_name) to use as cache keys. Defaults to all named args of the function. - inlineCallbacks (bool): Whether orig is a generator that should - be wrapped by defer.inlineCallbacks """ - super(CacheListDescriptor, self).__init__( - orig, num_args=num_args, inlineCallbacks=inlineCallbacks - ) + super().__init__(orig, num_args=num_args) self.list_name = list_name @@ -631,7 +610,7 @@ class CacheListDescriptor(_CacheDescriptorBase): cached_defers.append( defer.maybeDeferred( - preserve_fn(self.function_to_call), **args_to_call + preserve_fn(self.orig), **args_to_call ).addCallbacks(complete_all, errback) ) @@ -695,21 +674,7 @@ def cached( ) -def cachedInlineCallbacks( - max_entries=1000, num_args=None, tree=False, cache_context=False, iterable=False -): - return lambda orig: CacheDescriptor( - orig, - max_entries=max_entries, - num_args=num_args, - tree=tree, - inlineCallbacks=True, - cache_context=cache_context, - iterable=iterable, - ) - - -def cachedList(cached_method_name, list_name, num_args=None, inlineCallbacks=False): +def cachedList(cached_method_name, list_name, num_args=None): """Creates a descriptor that wraps a function in a `CacheListDescriptor`. Used to do batch lookups for an already created cache. A single argument @@ -725,8 +690,6 @@ def cachedList(cached_method_name, list_name, num_args=None, inlineCallbacks=Fal do batch lookups in the cache. num_args (int): Number of arguments to use as the key in the cache (including list_name). Defaults to all named parameters. - inlineCallbacks (bool): Should the function be wrapped in an - `defer.inlineCallbacks`? Example: @@ -744,5 +707,4 @@ def cachedList(cached_method_name, list_name, num_args=None, inlineCallbacks=Fal cached_method_name=cached_method_name, list_name=list_name, num_args=num_args, - inlineCallbacks=inlineCallbacks, ) diff --git a/tests/util/caches/test_descriptors.py b/tests/util/caches/test_descriptors.py index 4d2b9e0d64..0363735d4f 100644 --- a/tests/util/caches/test_descriptors.py +++ b/tests/util/caches/test_descriptors.py @@ -366,11 +366,11 @@ class CachedListDescriptorTestCase(unittest.TestCase): def fn(self, arg1, arg2): pass - @descriptors.cachedList("fn", "args1", inlineCallbacks=True) - def list_fn(self, args1, arg2): + @descriptors.cachedList("fn", "args1") + async def list_fn(self, args1, arg2): assert current_context().request == "c1" # we want this to behave like an asynchronous function - yield run_on_reactor() + await run_on_reactor() assert current_context().request == "c1" return self.mock(args1, arg2) @@ -416,10 +416,10 @@ class CachedListDescriptorTestCase(unittest.TestCase): def fn(self, arg1, arg2): pass - @descriptors.cachedList("fn", "args1", inlineCallbacks=True) - def list_fn(self, args1, arg2): + @descriptors.cachedList("fn", "args1") + async def list_fn(self, args1, arg2): # we want this to behave like an asynchronous function - yield run_on_reactor() + await run_on_reactor() return self.mock(args1, arg2) obj = Cls() -- cgit 1.5.1 From f594e434c35ab99bc71216cbb06082aa2b975980 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 19 Aug 2020 08:07:57 -0400 Subject: Switch the JSON byte producer from a pull to a push producer. (#8116) --- changelog.d/8116.feature | 1 + synapse/http/server.py | 75 +++++++++++++++++------------ tests/rest/client/v1/test_login.py | 16 ++---- tests/rest/client/v2_alpha/test_register.py | 4 +- tests/storage/test_cleanup_extrems.py | 3 +- 5 files changed, 53 insertions(+), 46 deletions(-) create mode 100644 changelog.d/8116.feature (limited to 'tests') diff --git a/changelog.d/8116.feature b/changelog.d/8116.feature new file mode 100644 index 0000000000..b1eaf1e78a --- /dev/null +++ b/changelog.d/8116.feature @@ -0,0 +1 @@ +Iteratively encode JSON to avoid blocking the reactor. diff --git a/synapse/http/server.py b/synapse/http/server.py index 37fdf14405..8d791bd2ca 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -500,7 +500,7 @@ class RootOptionsRedirectResource(OptionsResource, RootRedirect): pass -@implementer(interfaces.IPullProducer) +@implementer(interfaces.IPushProducer) class _ByteProducer: """ Iteratively write bytes to the request. @@ -515,52 +515,64 @@ class _ByteProducer: ): self._request = request self._iterator = iterator + self._paused = False - def start(self) -> None: - self._request.registerProducer(self, False) + # Register the producer and start producing data. + self._request.registerProducer(self, True) + self.resumeProducing() def _send_data(self, data: List[bytes]) -> None: """ - Send a list of strings as a response to the request. + Send a list of bytes as a chunk of a response. """ if not data: return self._request.write(b"".join(data)) + def pauseProducing(self) -> None: + self._paused = True + def resumeProducing(self) -> None: # We've stopped producing in the meantime (note that this might be # re-entrant after calling write). if not self._request: return - # Get the next chunk and write it to the request. - # - # The output of the JSON encoder is coalesced until min_chunk_size is - # reached. (This is because JSON encoders produce a very small output - # per iteration.) - # - # Note that buffer stores a list of bytes (instead of appending to - # bytes) to hopefully avoid many allocations. - buffer = [] - buffered_bytes = 0 - while buffered_bytes < self.min_chunk_size: - try: - data = next(self._iterator) - buffer.append(data) - buffered_bytes += len(data) - except StopIteration: - # The entire JSON object has been serialized, write any - # remaining data, finalize the producer and the request, and - # clean-up any references. - self._send_data(buffer) - self._request.unregisterProducer() - self._request.finish() - self.stopProducing() - return - - self._send_data(buffer) + self._paused = False + + # Write until there's backpressure telling us to stop. + while not self._paused: + # Get the next chunk and write it to the request. + # + # The output of the JSON encoder is buffered and coalesced until + # min_chunk_size is reached. This is because JSON encoders produce + # very small output per iteration and the Request object converts + # each call to write() to a separate chunk. Without this there would + # be an explosion in bytes written (e.g. b"{" becoming "1\r\n{\r\n"). + # + # Note that buffer stores a list of bytes (instead of appending to + # bytes) to hopefully avoid many allocations. + buffer = [] + buffered_bytes = 0 + while buffered_bytes < self.min_chunk_size: + try: + data = next(self._iterator) + buffer.append(data) + buffered_bytes += len(data) + except StopIteration: + # The entire JSON object has been serialized, write any + # remaining data, finalize the producer and the request, and + # clean-up any references. + self._send_data(buffer) + self._request.unregisterProducer() + self._request.finish() + self.stopProducing() + return + + self._send_data(buffer) def stopProducing(self) -> None: + # Clear a circular reference. self._request = None @@ -620,8 +632,7 @@ def respond_with_json( if send_cors: set_cors_headers(request) - producer = _ByteProducer(request, encoder(json_object)) - producer.start() + _ByteProducer(request, encoder(json_object)) return NOT_DONE_YET diff --git a/tests/rest/client/v1/test_login.py b/tests/rest/client/v1/test_login.py index db52725cfe..2668662c9e 100644 --- a/tests/rest/client/v1/test_login.py +++ b/tests/rest/client/v1/test_login.py @@ -62,8 +62,7 @@ class LoginRestServletTestCase(unittest.HomeserverTestCase): "identifier": {"type": "m.id.user", "user": "kermit" + str(i)}, "password": "monkey", } - request_data = json.dumps(params) - request, channel = self.make_request(b"POST", LOGIN_URL, request_data) + request, channel = self.make_request(b"POST", LOGIN_URL, params) self.render(request) if i == 5: @@ -76,14 +75,13 @@ class LoginRestServletTestCase(unittest.HomeserverTestCase): # than 1min. self.assertTrue(retry_after_ms < 6000) - self.reactor.advance(retry_after_ms / 1000.0) + self.reactor.advance(retry_after_ms / 1000.0 + 1.0) params = { "type": "m.login.password", "identifier": {"type": "m.id.user", "user": "kermit" + str(i)}, "password": "monkey", } - request_data = json.dumps(params) request, channel = self.make_request(b"POST", LOGIN_URL, params) self.render(request) @@ -111,8 +109,7 @@ class LoginRestServletTestCase(unittest.HomeserverTestCase): "identifier": {"type": "m.id.user", "user": "kermit"}, "password": "monkey", } - request_data = json.dumps(params) - request, channel = self.make_request(b"POST", LOGIN_URL, request_data) + request, channel = self.make_request(b"POST", LOGIN_URL, params) self.render(request) if i == 5: @@ -132,7 +129,6 @@ class LoginRestServletTestCase(unittest.HomeserverTestCase): "identifier": {"type": "m.id.user", "user": "kermit"}, "password": "monkey", } - request_data = json.dumps(params) request, channel = self.make_request(b"POST", LOGIN_URL, params) self.render(request) @@ -160,8 +156,7 @@ class LoginRestServletTestCase(unittest.HomeserverTestCase): "identifier": {"type": "m.id.user", "user": "kermit"}, "password": "notamonkey", } - request_data = json.dumps(params) - request, channel = self.make_request(b"POST", LOGIN_URL, request_data) + request, channel = self.make_request(b"POST", LOGIN_URL, params) self.render(request) if i == 5: @@ -174,14 +169,13 @@ class LoginRestServletTestCase(unittest.HomeserverTestCase): # than 1min. self.assertTrue(retry_after_ms < 6000) - self.reactor.advance(retry_after_ms / 1000.0) + self.reactor.advance(retry_after_ms / 1000.0 + 1.0) params = { "type": "m.login.password", "identifier": {"type": "m.id.user", "user": "kermit"}, "password": "notamonkey", } - request_data = json.dumps(params) request, channel = self.make_request(b"POST", LOGIN_URL, params) self.render(request) diff --git a/tests/rest/client/v2_alpha/test_register.py b/tests/rest/client/v2_alpha/test_register.py index 53a43038f0..2fc3a60fc5 100644 --- a/tests/rest/client/v2_alpha/test_register.py +++ b/tests/rest/client/v2_alpha/test_register.py @@ -160,7 +160,7 @@ class RegisterRestServletTestCase(unittest.HomeserverTestCase): else: self.assertEquals(channel.result["code"], b"200", channel.result) - self.reactor.advance(retry_after_ms / 1000.0) + self.reactor.advance(retry_after_ms / 1000.0 + 1.0) request, channel = self.make_request(b"POST", self.url + b"?kind=guest", b"{}") self.render(request) @@ -186,7 +186,7 @@ class RegisterRestServletTestCase(unittest.HomeserverTestCase): else: self.assertEquals(channel.result["code"], b"200", channel.result) - self.reactor.advance(retry_after_ms / 1000.0) + self.reactor.advance(retry_after_ms / 1000.0 + 1.0) request, channel = self.make_request(b"POST", self.url + b"?kind=guest", b"{}") self.render(request) diff --git a/tests/storage/test_cleanup_extrems.py b/tests/storage/test_cleanup_extrems.py index 8e9a650f9f..43639ca286 100644 --- a/tests/storage/test_cleanup_extrems.py +++ b/tests/storage/test_cleanup_extrems.py @@ -353,6 +353,7 @@ class CleanupExtremDummyEventsTestCase(HomeserverTestCase): self.event_creator_handler._rooms_to_exclude_from_dummy_event_insertion[ "3" ] = 300000 + self.event_creator_handler._expire_rooms_to_exclude_from_dummy_event_insertion() # All entries within time frame self.assertEqual( @@ -362,7 +363,7 @@ class CleanupExtremDummyEventsTestCase(HomeserverTestCase): 3, ) # Oldest room to expire - self.pump(1) + self.pump(1.01) self.event_creator_handler._expire_rooms_to_exclude_from_dummy_event_insertion() self.assertEqual( len( -- cgit 1.5.1