summary refs log tree commit diff
path: root/tests/replication
diff options
context:
space:
mode:
Diffstat (limited to 'tests/replication')
-rw-r--r--tests/replication/__init__.py1
-rw-r--r--tests/replication/_base.py145
-rw-r--r--tests/replication/slave/__init__.py1
-rw-r--r--tests/replication/slave/storage/__init__.py1
-rw-r--r--tests/replication/tcp/__init__.py1
-rw-r--r--tests/replication/tcp/streams/__init__.py1
-rw-r--r--tests/replication/tcp/streams/test_account_data.py1
-rw-r--r--tests/replication/tcp/streams/test_events.py5
-rw-r--r--tests/replication/tcp/streams/test_federation.py1
-rw-r--r--tests/replication/tcp/streams/test_receipts.py1
-rw-r--r--tests/replication/tcp/streams/test_typing.py1
-rw-r--r--tests/replication/tcp/test_commands.py1
-rw-r--r--tests/replication/tcp/test_remote_server_up.py1
-rw-r--r--tests/replication/test_auth.py1
-rw-r--r--tests/replication/test_client_reader_shard.py1
-rw-r--r--tests/replication/test_federation_ack.py1
-rw-r--r--tests/replication/test_federation_sender_shard.py1
-rw-r--r--tests/replication/test_multi_media_repo.py1
-rw-r--r--tests/replication/test_pusher_shard.py1
-rw-r--r--tests/replication/test_sharded_event_persister.py1
20 files changed, 26 insertions, 142 deletions
diff --git a/tests/replication/__init__.py b/tests/replication/__init__.py

index b7df13c9ee..f43a360a80 100644 --- a/tests/replication/__init__.py +++ b/tests/replication/__init__.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/tests/replication/_base.py b/tests/replication/_base.py
index aff19d9fb3..624bd1b927 100644 --- a/tests/replication/_base.py +++ b/tests/replication/_base.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2019 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -13,22 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import Any, Callable, Dict, List, Optional, Tuple, Type +from typing import Any, Callable, Dict, List, Optional, Tuple -from twisted.internet.interfaces import IConsumer, IPullProducer, IReactorTime from twisted.internet.protocol import Protocol -from twisted.internet.task import LoopingCall -from twisted.web.http import HTTPChannel from twisted.web.resource import Resource -from twisted.web.server import Request, Site -from synapse.app.generic_worker import ( - GenericWorkerReplicationHandler, - GenericWorkerServer, -) +from synapse.app.generic_worker import GenericWorkerServer from synapse.http.server import JsonResource from synapse.http.site import SynapseRequest, SynapseSite from synapse.replication.http import ReplicationRestResource +from synapse.replication.tcp.client import ReplicationDataHandler from synapse.replication.tcp.handler import ReplicationCommandHandler from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol from synapse.replication.tcp.resource import ( @@ -36,7 +29,6 @@ from synapse.replication.tcp.resource import ( ServerReplicationStreamProtocol, ) from synapse.server import HomeServer -from synapse.util import Clock from tests import unittest from tests.server import FakeTransport @@ -157,7 +149,19 @@ class BaseStreamTestCase(unittest.HomeserverTestCase): client_protocol = client_factory.buildProtocol(None) # Set up the server side protocol - channel = _PushHTTPChannel(self.reactor, SynapseRequest, self.site) + channel = self.site.buildProtocol(None) + + # hook into the channel's request factory so that we can keep a record + # of the requests + requests: List[SynapseRequest] = [] + real_request_factory = channel.requestFactory + + def request_factory(*args, **kwargs): + request = real_request_factory(*args, **kwargs) + requests.append(request) + return request + + channel.requestFactory = request_factory # Connect client to server and vice versa. client_to_server_transport = FakeTransport( @@ -179,7 +183,10 @@ class BaseStreamTestCase(unittest.HomeserverTestCase): server_to_client_transport.loseConnection() client_to_server_transport.loseConnection() - return channel.request + # there should have been exactly one request + self.assertEqual(len(requests), 1) + + return requests[0] def assert_request_is_get_repl_stream_updates( self, request: SynapseRequest, stream_name: str @@ -352,6 +359,8 @@ class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase): config=worker_hs.config.server.listeners[0], resource=resource, server_version_string="1", + max_request_body_size=4096, + reactor=self.reactor, ) if worker_hs.config.redis.redis_enabled: @@ -389,7 +398,7 @@ class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase): client_protocol = client_factory.buildProtocol(None) # Set up the server side protocol - channel = _PushHTTPChannel(self.reactor, SynapseRequest, self._hs_to_site[hs]) + channel = self._hs_to_site[hs].buildProtocol(None) # Connect client to server and vice versa. client_to_server_transport = FakeTransport( @@ -432,7 +441,7 @@ class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase): server_protocol.makeConnection(server_to_client_transport) -class TestReplicationDataHandler(GenericWorkerReplicationHandler): +class TestReplicationDataHandler(ReplicationDataHandler): """Drop-in for ReplicationDataHandler which just collects RDATA rows""" def __init__(self, hs: HomeServer): @@ -447,112 +456,6 @@ class TestReplicationDataHandler(GenericWorkerReplicationHandler): self.received_rdata_rows.append((stream_name, token, r)) -class _PushHTTPChannel(HTTPChannel): - """A HTTPChannel that wraps pull producers to push producers. - - This is a hack to get around the fact that HTTPChannel transparently wraps a - pull producer (which is what Synapse uses to reply to requests) with - `_PullToPush` to convert it to a push producer. Unfortunately `_PullToPush` - uses the standard reactor rather than letting us use our test reactor, which - makes it very hard to test. - """ - - def __init__( - self, reactor: IReactorTime, request_factory: Type[Request], site: Site - ): - super().__init__() - self.reactor = reactor - self.requestFactory = request_factory - self.site = site - - self._pull_to_push_producer = None # type: Optional[_PullToPushProducer] - - def registerProducer(self, producer, streaming): - # Convert pull producers to push producer. - if not streaming: - self._pull_to_push_producer = _PullToPushProducer( - self.reactor, producer, self - ) - producer = self._pull_to_push_producer - - super().registerProducer(producer, True) - - def unregisterProducer(self): - if self._pull_to_push_producer: - # We need to manually stop the _PullToPushProducer. - self._pull_to_push_producer.stop() - - def checkPersistence(self, request, version): - """Check whether the connection can be re-used""" - # We hijack this to always say no for ease of wiring stuff up in - # `handle_http_replication_attempt`. - request.responseHeaders.setRawHeaders(b"connection", [b"close"]) - return False - - def requestDone(self, request): - # Store the request for inspection. - self.request = request - super().requestDone(request) - - -class _PullToPushProducer: - """A push producer that wraps a pull producer.""" - - def __init__( - self, reactor: IReactorTime, producer: IPullProducer, consumer: IConsumer - ): - self._clock = Clock(reactor) - self._producer = producer - self._consumer = consumer - - # While running we use a looping call with a zero delay to call - # resumeProducing on given producer. - self._looping_call = None # type: Optional[LoopingCall] - - # We start writing next reactor tick. - self._start_loop() - - def _start_loop(self): - """Start the looping call to""" - - if not self._looping_call: - # Start a looping call which runs every tick. - self._looping_call = self._clock.looping_call(self._run_once, 0) - - def stop(self): - """Stops calling resumeProducing.""" - if self._looping_call: - self._looping_call.stop() - self._looping_call = None - - def pauseProducing(self): - """Implements IPushProducer""" - self.stop() - - def resumeProducing(self): - """Implements IPushProducer""" - self._start_loop() - - def stopProducing(self): - """Implements IPushProducer""" - self.stop() - self._producer.stopProducing() - - def _run_once(self): - """Calls resumeProducing on producer once.""" - - try: - self._producer.resumeProducing() - except Exception: - logger.exception("Failed to call resumeProducing") - try: - self._consumer.unregisterProducer() - except Exception: - pass - - self.stopProducing() - - class FakeRedisPubSubServer: """A fake Redis server for pub/sub.""" diff --git a/tests/replication/slave/__init__.py b/tests/replication/slave/__init__.py
index b7df13c9ee..f43a360a80 100644 --- a/tests/replication/slave/__init__.py +++ b/tests/replication/slave/__init__.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/tests/replication/slave/storage/__init__.py b/tests/replication/slave/storage/__init__.py
index b7df13c9ee..f43a360a80 100644 --- a/tests/replication/slave/storage/__init__.py +++ b/tests/replication/slave/storage/__init__.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/tests/replication/tcp/__init__.py b/tests/replication/tcp/__init__.py
index 1453d04571..743fb9904a 100644 --- a/tests/replication/tcp/__init__.py +++ b/tests/replication/tcp/__init__.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2019 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/tests/replication/tcp/streams/__init__.py b/tests/replication/tcp/streams/__init__.py
index 1453d04571..743fb9904a 100644 --- a/tests/replication/tcp/streams/__init__.py +++ b/tests/replication/tcp/streams/__init__.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2019 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/tests/replication/tcp/streams/test_account_data.py b/tests/replication/tcp/streams/test_account_data.py
index 153634d4ee..cdd052001b 100644 --- a/tests/replication/tcp/streams/test_account_data.py +++ b/tests/replication/tcp/streams/test_account_data.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2020 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/tests/replication/tcp/streams/test_events.py b/tests/replication/tcp/streams/test_events.py
index 77856fc304..f51fa0a79e 100644 --- a/tests/replication/tcp/streams/test_events.py +++ b/tests/replication/tcp/streams/test_events.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2019 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -240,7 +239,7 @@ class EventsStreamTestCase(BaseStreamTestCase): # the state rows are unsorted state_rows = [] # type: List[EventsStreamCurrentStateRow] - for stream_name, token, row in received_rows: + for stream_name, _, row in received_rows: self.assertEqual("events", stream_name) self.assertIsInstance(row, EventsStreamRow) self.assertEqual(row.type, "state") @@ -357,7 +356,7 @@ class EventsStreamTestCase(BaseStreamTestCase): # the state rows are unsorted state_rows = [] # type: List[EventsStreamCurrentStateRow] - for j in range(STATES_PER_USER + 1): + for _ in range(STATES_PER_USER + 1): stream_name, token, row = received_rows.pop(0) self.assertEqual("events", stream_name) self.assertIsInstance(row, EventsStreamRow) diff --git a/tests/replication/tcp/streams/test_federation.py b/tests/replication/tcp/streams/test_federation.py
index aa4bf1c7e3..ffec06a0d6 100644 --- a/tests/replication/tcp/streams/test_federation.py +++ b/tests/replication/tcp/streams/test_federation.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2019 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/tests/replication/tcp/streams/test_receipts.py b/tests/replication/tcp/streams/test_receipts.py
index 7d848e41ff..7f5d932f0b 100644 --- a/tests/replication/tcp/streams/test_receipts.py +++ b/tests/replication/tcp/streams/test_receipts.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2019 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/tests/replication/tcp/streams/test_typing.py b/tests/replication/tcp/streams/test_typing.py
index 4a0b342264..ecd360c2d0 100644 --- a/tests/replication/tcp/streams/test_typing.py +++ b/tests/replication/tcp/streams/test_typing.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2020 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/tests/replication/tcp/test_commands.py b/tests/replication/tcp/test_commands.py
index 60c10a441a..cca7ebb719 100644 --- a/tests/replication/tcp/test_commands.py +++ b/tests/replication/tcp/test_commands.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2020 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/tests/replication/tcp/test_remote_server_up.py b/tests/replication/tcp/test_remote_server_up.py
index 1fe9d5b4d0..262c35cef3 100644 --- a/tests/replication/tcp/test_remote_server_up.py +++ b/tests/replication/tcp/test_remote_server_up.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2020 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/tests/replication/test_auth.py b/tests/replication/test_auth.py
index f8fd8a843c..1346e0e160 100644 --- a/tests/replication/test_auth.py +++ b/tests/replication/test_auth.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2020 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/tests/replication/test_client_reader_shard.py b/tests/replication/test_client_reader_shard.py
index 5da1d5dc4d..b9751efdc5 100644 --- a/tests/replication/test_client_reader_shard.py +++ b/tests/replication/test_client_reader_shard.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2020 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/tests/replication/test_federation_ack.py b/tests/replication/test_federation_ack.py
index 44ad5eec57..04a869e295 100644 --- a/tests/replication/test_federation_ack.py +++ b/tests/replication/test_federation_ack.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2020 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/tests/replication/test_federation_sender_shard.py b/tests/replication/test_federation_sender_shard.py
index 8ca595c3ee..48ab3aa4e3 100644 --- a/tests/replication/test_federation_sender_shard.py +++ b/tests/replication/test_federation_sender_shard.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2020 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/tests/replication/test_multi_media_repo.py b/tests/replication/test_multi_media_repo.py
index b0800f9840..76e6644353 100644 --- a/tests/replication/test_multi_media_repo.py +++ b/tests/replication/test_multi_media_repo.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2020 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/tests/replication/test_pusher_shard.py b/tests/replication/test_pusher_shard.py
index 1f12bde1aa..1e4e3821b9 100644 --- a/tests/replication/test_pusher_shard.py +++ b/tests/replication/test_pusher_shard.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2020 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/tests/replication/test_sharded_event_persister.py b/tests/replication/test_sharded_event_persister.py
index 6c2e1674cb..d739eb6b17 100644 --- a/tests/replication/test_sharded_event_persister.py +++ b/tests/replication/test_sharded_event_persister.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2020 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License");