From 491f0dab1ba5456f52b0710461fbaabc594ff1f5 Mon Sep 17 00:00:00 2001 From: Dirk Klimpel <5740567+dklimpel@users.noreply.github.com> Date: Tue, 14 Jul 2020 13:36:23 +0200 Subject: Add delete room admin endpoint (#7613) The Delete Room admin API allows server admins to remove rooms from server and block these rooms. `DELETE /_synapse/admin/v1/rooms/` It is a combination and improvement of "[Shutdown room](https://github.com/matrix-org/synapse/blob/develop/docs/admin_api/shutdown_room.md)" and "[Purge room](https://github.com/matrix-org/synapse/blob/develop/docs/admin_api/purge_room.md)" API. Fixes: #6425 It also fixes a bug in [synapse/storage/data_stores/main/room.py](synapse/storage/data_stores/main/room.py) in ` get_room_with_stats`. It should return `None` if the room is unknown. But it returns an `IndexError`. https://github.com/matrix-org/synapse/blob/901b1fa561e3cc661d78aa96d59802cf2078cb0d/synapse/storage/data_stores/main/room.py#L99-L105 Related to: - #5575 - https://github.com/Awesome-Technologies/synapse-admin/issues/17 Signed-off-by: Dirk Klimpel dirk@klimpel.org --- synapse/server.pyi | 2 ++ 1 file changed, 2 insertions(+) (limited to 'synapse/server.pyi') diff --git a/synapse/server.pyi b/synapse/server.pyi index fe8024d2d4..58cd099e6d 100644 --- a/synapse/server.pyi +++ b/synapse/server.pyi @@ -71,6 +71,8 @@ class HomeServer(object): pass def get_room_member_handler(self) -> synapse.handlers.room_member.RoomMemberHandler: pass + def get_room_shutdown_handler(self) -> synapse.handlers.room.RoomShutdownHandler: + pass def get_event_creation_handler( self, ) -> synapse.handlers.message.EventCreationHandler: -- cgit 1.5.1 From f13061d5153eca9bd7054d5b89ade41f3a430f3b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 15 Jul 2020 15:27:35 +0100 Subject: Fix client reader sharding tests (#7853) * Fix client reader sharding tests * Newsfile * Fix typing * Update changelog.d/7853.misc Co-authored-by: Patrick Cloke * Move mocking of http_client to tests Co-authored-by: Patrick Cloke --- changelog.d/7853.misc | 1 + synapse/http/client.py | 24 ++- synapse/server.pyi | 5 + tests/replication/_base.py | 168 ++++++++++++++++++- tests/replication/test_client_reader_shard.py | 59 ++----- tests/replication/test_federation_sender_shard.py | 191 ++++++++-------------- tests/server.py | 26 ++- 7 files changed, 300 insertions(+), 174 deletions(-) create mode 100644 changelog.d/7853.misc (limited to 'synapse/server.pyi') diff --git a/changelog.d/7853.misc b/changelog.d/7853.misc new file mode 100644 index 0000000000..b4f614084d --- /dev/null +++ b/changelog.d/7853.misc @@ -0,0 +1 @@ +Add support for handling registration requests across multiple client reader workers. diff --git a/synapse/http/client.py b/synapse/http/client.py index 505872ee90..b80681135e 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -31,6 +31,7 @@ from twisted.internet.interfaces import ( IReactorPluggableNameResolver, IResolutionReceiver, ) +from twisted.internet.task import Cooperator from twisted.python.failure import Failure from twisted.web._newclient import ResponseDone from twisted.web.client import Agent, HTTPConnectionPool, readBody @@ -69,6 +70,21 @@ def check_against_blacklist(ip_address, ip_whitelist, ip_blacklist): return False +_EPSILON = 0.00000001 + + +def _make_scheduler(reactor): + """Makes a schedular suitable for a Cooperator using the given reactor. + + (This is effectively just a copy from `twisted.internet.task`) + """ + + def _scheduler(x): + return reactor.callLater(_EPSILON, x) + + return _scheduler + + class IPBlacklistingResolver(object): """ A proxy for reactor.nameResolver which only produces non-blacklisted IP @@ -212,6 +228,10 @@ class SimpleHttpClient(object): if hs.config.user_agent_suffix: self.user_agent = "%s %s" % (self.user_agent, hs.config.user_agent_suffix) + # We use this for our body producers to ensure that they use the correct + # reactor. + self._cooperator = Cooperator(scheduler=_make_scheduler(hs.get_reactor())) + self.user_agent = self.user_agent.encode("ascii") if self._ip_blacklist: @@ -292,7 +312,9 @@ class SimpleHttpClient(object): try: body_producer = None if data is not None: - body_producer = QuieterFileBodyProducer(BytesIO(data)) + body_producer = QuieterFileBodyProducer( + BytesIO(data), cooperator=self._cooperator, + ) request_deferred = treq.request( method, diff --git a/synapse/server.pyi b/synapse/server.pyi index 58cd099e6d..cd50c721b8 100644 --- a/synapse/server.pyi +++ b/synapse/server.pyi @@ -20,6 +20,7 @@ import synapse.handlers.room import synapse.handlers.room_member import synapse.handlers.set_password import synapse.http.client +import synapse.http.matrixfederationclient import synapse.notifier import synapse.push.pusherpool import synapse.replication.tcp.client @@ -143,3 +144,7 @@ class HomeServer(object): pass def get_replication_streams(self) -> Dict[str, Stream]: pass + def get_http_client( + self, + ) -> synapse.http.matrixfederationclient.MatrixFederationHttpClient: + pass diff --git a/tests/replication/_base.py b/tests/replication/_base.py index 9d4f0bbe44..06575ba0a6 100644 --- a/tests/replication/_base.py +++ b/tests/replication/_base.py @@ -14,7 +14,7 @@ # limitations under the License. import logging -from typing import Any, List, Optional, Tuple +from typing import Any, Callable, List, Optional, Tuple import attr @@ -26,8 +26,9 @@ from synapse.app.generic_worker import ( GenericWorkerReplicationHandler, GenericWorkerServer, ) +from synapse.http.server import JsonResource from synapse.http.site import SynapseRequest -from synapse.replication.http import streams +from synapse.replication.http import ReplicationRestResource, streams from synapse.replication.tcp.handler import ReplicationCommandHandler from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory @@ -35,7 +36,7 @@ from synapse.server import HomeServer from synapse.util import Clock from tests import unittest -from tests.server import FakeTransport +from tests.server import FakeTransport, render logger = logging.getLogger(__name__) @@ -180,6 +181,159 @@ class BaseStreamTestCase(unittest.HomeserverTestCase): self.assertEqual(request.method, b"GET") +class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase): + """Base class for tests running multiple workers. + + Automatically handle HTTP replication requests from workers to master, + unlike `BaseStreamTestCase`. + """ + + servlets = [] # type: List[Callable[[HomeServer, JsonResource], None]] + + def setUp(self): + super().setUp() + + # build a replication server + self.server_factory = ReplicationStreamProtocolFactory(self.hs) + self.streamer = self.hs.get_replication_streamer() + + store = self.hs.get_datastore() + self.database = store.db + + self.reactor.lookups["testserv"] = "1.2.3.4" + + self._worker_hs_to_resource = {} + + # When we see a connection attempt to the master replication listener we + # automatically set up the connection. This is so that tests don't + # manually have to go and explicitly set it up each time (plus sometimes + # it is impossible to write the handling explicitly in the tests). + self.reactor.add_tcp_client_callback( + "1.2.3.4", 8765, self._handle_http_replication_attempt + ) + + def create_test_json_resource(self): + """Overrides `HomeserverTestCase.create_test_json_resource`. + """ + # We override this so that it automatically registers all the HTTP + # replication servlets, without having to explicitly do that in all + # subclassses. + + resource = ReplicationRestResource(self.hs) + + for servlet in self.servlets: + servlet(self.hs, resource) + + return resource + + def make_worker_hs( + self, worker_app: str, extra_config: dict = {}, **kwargs + ) -> HomeServer: + """Make a new worker HS instance, correctly connecting replcation + stream to the master HS. + + Args: + worker_app: Type of worker, e.g. `synapse.app.federation_sender`. + extra_config: Any extra config to use for this instances. + **kwargs: Options that get passed to `self.setup_test_homeserver`, + useful to e.g. pass some mocks for things like `http_client` + + Returns: + The new worker HomeServer instance. + """ + + config = self._get_worker_hs_config() + config["worker_app"] = worker_app + config.update(extra_config) + + worker_hs = self.setup_test_homeserver( + homeserverToUse=GenericWorkerServer, + config=config, + reactor=self.reactor, + **kwargs + ) + + store = worker_hs.get_datastore() + store.db._db_pool = self.database._db_pool + + repl_handler = ReplicationCommandHandler(worker_hs) + client = ClientReplicationStreamProtocol( + worker_hs, "client", "test", self.clock, repl_handler, + ) + server = self.server_factory.buildProtocol(None) + + client_transport = FakeTransport(server, self.reactor) + client.makeConnection(client_transport) + + server_transport = FakeTransport(client, self.reactor) + server.makeConnection(server_transport) + + # Set up a resource for the worker + resource = ReplicationRestResource(self.hs) + + for servlet in self.servlets: + servlet(worker_hs, resource) + + self._worker_hs_to_resource[worker_hs] = resource + + return worker_hs + + def _get_worker_hs_config(self) -> dict: + config = self.default_config() + config["worker_replication_host"] = "testserv" + config["worker_replication_http_port"] = "8765" + return config + + def render_on_worker(self, worker_hs: HomeServer, request: SynapseRequest): + render(request, self._worker_hs_to_resource[worker_hs], self.reactor) + + def replicate(self): + """Tell the master side of replication that something has happened, and then + wait for the replication to occur. + """ + self.streamer.on_notifier_poke() + self.pump() + + def _handle_http_replication_attempt(self): + """Handles a connection attempt to the master replication HTTP + listener. + """ + + # We should have at least one outbound connection attempt, where the + # last is one to the HTTP repication IP/port. + clients = self.reactor.tcpClients + self.assertGreaterEqual(len(clients), 1) + (host, port, client_factory, _timeout, _bindAddress) = clients.pop() + self.assertEqual(host, "1.2.3.4") + self.assertEqual(port, 8765) + + # Set up client side protocol + client_protocol = client_factory.buildProtocol(None) + + request_factory = OneShotRequestFactory() + + # Set up the server side protocol + channel = _PushHTTPChannel(self.reactor) + channel.requestFactory = request_factory + channel.site = self.site + + # Connect client to server and vice versa. + client_to_server_transport = FakeTransport( + channel, self.reactor, client_protocol + ) + client_protocol.makeConnection(client_to_server_transport) + + server_to_client_transport = FakeTransport( + client_protocol, self.reactor, channel + ) + channel.makeConnection(server_to_client_transport) + + # Note: at this point we've wired everything up, but we need to return + # before the data starts flowing over the connections as this is called + # inside `connecTCP` before the connection has been passed back to the + # code that requested the TCP connection. + + class TestReplicationDataHandler(GenericWorkerReplicationHandler): """Drop-in for ReplicationDataHandler which just collects RDATA rows""" @@ -241,6 +395,14 @@ class _PushHTTPChannel(HTTPChannel): # We need to manually stop the _PullToPushProducer. self._pull_to_push_producer.stop() + def checkPersistence(self, request, version): + """Check whether the connection can be re-used + """ + # We hijack this to always say no for ease of wiring stuff up in + # `handle_http_replication_attempt`. + request.responseHeaders.setRawHeaders(b"connection", [b"close"]) + return False + class _PullToPushProducer: """A push producer that wraps a pull producer. diff --git a/tests/replication/test_client_reader_shard.py b/tests/replication/test_client_reader_shard.py index b7d753e0a3..86c03fd89c 100644 --- a/tests/replication/test_client_reader_shard.py +++ b/tests/replication/test_client_reader_shard.py @@ -15,63 +15,26 @@ import logging from synapse.api.constants import LoginType -from synapse.app.generic_worker import GenericWorkerServer -from synapse.http.server import JsonResource from synapse.http.site import SynapseRequest -from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory from synapse.rest.client.v2_alpha import register -from tests import unittest +from tests.replication._base import BaseMultiWorkerStreamTestCase from tests.rest.client.v2_alpha.test_auth import DummyRecaptchaChecker -from tests.server import FakeChannel, render +from tests.server import FakeChannel logger = logging.getLogger(__name__) -class ClientReaderTestCase(unittest.HomeserverTestCase): +class ClientReaderTestCase(BaseMultiWorkerStreamTestCase): """Base class for tests of the replication streams""" - servlets = [ - register.register_servlets, - ] + servlets = [register.register_servlets] def prepare(self, reactor, clock, hs): - # build a replication server - self.server_factory = ReplicationStreamProtocolFactory(hs) - self.streamer = hs.get_replication_streamer() - - store = hs.get_datastore() - self.database = store.db - self.recaptcha_checker = DummyRecaptchaChecker(hs) auth_handler = hs.get_auth_handler() auth_handler.checkers[LoginType.RECAPTCHA] = self.recaptcha_checker - self.reactor.lookups["testserv"] = "1.2.3.4" - - def make_worker_hs(self, extra_config={}): - config = self._get_worker_hs_config() - config.update(extra_config) - - worker_hs = self.setup_test_homeserver( - homeserverToUse=GenericWorkerServer, config=config, reactor=self.reactor, - ) - - store = worker_hs.get_datastore() - store.db._db_pool = self.database._db_pool - - # Register the expected servlets, essentially this is HomeserverTestCase.create_test_json_resource. - resource = JsonResource(self.hs) - - for servlet in self.servlets: - servlet(worker_hs, resource) - - # Essentially HomeserverTestCase.render. - def _render(request): - render(request, self.resource, self.reactor) - - return worker_hs, _render - def _get_worker_hs_config(self) -> dict: config = self.default_config() config["worker_app"] = "synapse.app.client_reader" @@ -82,14 +45,14 @@ class ClientReaderTestCase(unittest.HomeserverTestCase): def test_register_single_worker(self): """Test that registration works when using a single client reader worker. """ - _, worker_render = self.make_worker_hs() + worker_hs = self.make_worker_hs("synapse.app.client_reader") request_1, channel_1 = self.make_request( "POST", "register", {"username": "user", "type": "m.login.password", "password": "bar"}, ) # type: SynapseRequest, FakeChannel - worker_render(request_1) + self.render_on_worker(worker_hs, request_1) self.assertEqual(request_1.code, 401) # Grab the session @@ -99,7 +62,7 @@ class ClientReaderTestCase(unittest.HomeserverTestCase): request_2, channel_2 = self.make_request( "POST", "register", {"auth": {"session": session, "type": "m.login.dummy"}} ) # type: SynapseRequest, FakeChannel - worker_render(request_2) + self.render_on_worker(worker_hs, request_2) self.assertEqual(request_2.code, 200) # We're given a registered user. @@ -108,15 +71,15 @@ class ClientReaderTestCase(unittest.HomeserverTestCase): def test_register_multi_worker(self): """Test that registration works when using multiple client reader workers. """ - _, worker_render_1 = self.make_worker_hs() - _, worker_render_2 = self.make_worker_hs() + worker_hs_1 = self.make_worker_hs("synapse.app.client_reader") + worker_hs_2 = self.make_worker_hs("synapse.app.client_reader") request_1, channel_1 = self.make_request( "POST", "register", {"username": "user", "type": "m.login.password", "password": "bar"}, ) # type: SynapseRequest, FakeChannel - worker_render_1(request_1) + self.render_on_worker(worker_hs_1, request_1) self.assertEqual(request_1.code, 401) # Grab the session @@ -126,7 +89,7 @@ class ClientReaderTestCase(unittest.HomeserverTestCase): request_2, channel_2 = self.make_request( "POST", "register", {"auth": {"session": session, "type": "m.login.dummy"}} ) # type: SynapseRequest, FakeChannel - worker_render_2(request_2) + self.render_on_worker(worker_hs_2, request_2) self.assertEqual(request_2.code, 200) # We're given a registered user. diff --git a/tests/replication/test_federation_sender_shard.py b/tests/replication/test_federation_sender_shard.py index 519a2dc510..8d4dbf232e 100644 --- a/tests/replication/test_federation_sender_shard.py +++ b/tests/replication/test_federation_sender_shard.py @@ -19,132 +19,40 @@ from mock import Mock from twisted.internet import defer from synapse.api.constants import EventTypes, Membership -from synapse.app.generic_worker import GenericWorkerServer from synapse.events.builder import EventBuilderFactory -from synapse.replication.http import streams -from synapse.replication.tcp.handler import ReplicationCommandHandler -from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol -from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory from synapse.rest.admin import register_servlets_for_client_rest_resource from synapse.rest.client.v1 import login, room from synapse.types import UserID -from tests import unittest -from tests.server import FakeTransport +from tests.replication._base import BaseMultiWorkerStreamTestCase logger = logging.getLogger(__name__) -class BaseStreamTestCase(unittest.HomeserverTestCase): - """Base class for tests of the replication streams""" - +class FederationSenderTestCase(BaseMultiWorkerStreamTestCase): servlets = [ - streams.register_servlets, + login.register_servlets, + register_servlets_for_client_rest_resource, + room.register_servlets, ] - def prepare(self, reactor, clock, hs): - # build a replication server - self.server_factory = ReplicationStreamProtocolFactory(hs) - self.streamer = hs.get_replication_streamer() - - store = hs.get_datastore() - self.database = store.db - - self.reactor.lookups["testserv"] = "1.2.3.4" - def default_config(self): conf = super().default_config() conf["send_federation"] = False return conf - def make_worker_hs(self, extra_config={}): - config = self._get_worker_hs_config() - config.update(extra_config) - - mock_federation_client = Mock(spec=["put_json"]) - mock_federation_client.put_json.side_effect = lambda *_, **__: defer.succeed({}) - - worker_hs = self.setup_test_homeserver( - http_client=mock_federation_client, - homeserverToUse=GenericWorkerServer, - config=config, - reactor=self.reactor, - ) - - store = worker_hs.get_datastore() - store.db._db_pool = self.database._db_pool - - repl_handler = ReplicationCommandHandler(worker_hs) - client = ClientReplicationStreamProtocol( - worker_hs, "client", "test", self.clock, repl_handler, - ) - server = self.server_factory.buildProtocol(None) - - client_transport = FakeTransport(server, self.reactor) - client.makeConnection(client_transport) - - server_transport = FakeTransport(client, self.reactor) - server.makeConnection(server_transport) - - return worker_hs - - def _get_worker_hs_config(self) -> dict: - config = self.default_config() - config["worker_app"] = "synapse.app.federation_sender" - config["worker_replication_host"] = "testserv" - config["worker_replication_http_port"] = "8765" - return config - - def replicate(self): - """Tell the master side of replication that something has happened, and then - wait for the replication to occur. - """ - self.streamer.on_notifier_poke() - self.pump() - - def create_room_with_remote_server(self, user, token, remote_server="other_server"): - room = self.helper.create_room_as(user, tok=token) - store = self.hs.get_datastore() - federation = self.hs.get_handlers().federation_handler - - prev_event_ids = self.get_success(store.get_latest_event_ids_in_room(room)) - room_version = self.get_success(store.get_room_version(room)) - - factory = EventBuilderFactory(self.hs) - factory.hostname = remote_server - - user_id = UserID("user", remote_server).to_string() - - event_dict = { - "type": EventTypes.Member, - "state_key": user_id, - "content": {"membership": Membership.JOIN}, - "sender": user_id, - "room_id": room, - } - - builder = factory.for_room_version(room_version, event_dict) - join_event = self.get_success(builder.build(prev_event_ids)) - - self.get_success(federation.on_send_join_request(remote_server, join_event)) - self.replicate() - - return room - - -class FederationSenderTestCase(BaseStreamTestCase): - servlets = [ - login.register_servlets, - register_servlets_for_client_rest_resource, - room.register_servlets, - ] - def test_send_event_single_sender(self): """Test that using a single federation sender worker correctly sends a new event. """ - worker_hs = self.make_worker_hs({"send_federation": True}) - mock_client = worker_hs.get_http_client() + mock_client = Mock(spec=["put_json"]) + mock_client.put_json.side_effect = lambda *_, **__: defer.succeed({}) + + self.make_worker_hs( + "synapse.app.federation_sender", + {"send_federation": True}, + http_client=mock_client, + ) user = self.register_user("user", "pass") token = self.login("user", "pass") @@ -165,23 +73,29 @@ class FederationSenderTestCase(BaseStreamTestCase): """Test that using two federation sender workers correctly sends new events. """ - worker1 = self.make_worker_hs( + mock_client1 = Mock(spec=["put_json"]) + mock_client1.put_json.side_effect = lambda *_, **__: defer.succeed({}) + self.make_worker_hs( + "synapse.app.federation_sender", { "send_federation": True, "worker_name": "sender1", "federation_sender_instances": ["sender1", "sender2"], - } + }, + http_client=mock_client1, ) - mock_client1 = worker1.get_http_client() - worker2 = self.make_worker_hs( + mock_client2 = Mock(spec=["put_json"]) + mock_client2.put_json.side_effect = lambda *_, **__: defer.succeed({}) + self.make_worker_hs( + "synapse.app.federation_sender", { "send_federation": True, "worker_name": "sender2", "federation_sender_instances": ["sender1", "sender2"], - } + }, + http_client=mock_client2, ) - mock_client2 = worker2.get_http_client() user = self.register_user("user2", "pass") token = self.login("user2", "pass") @@ -191,8 +105,8 @@ class FederationSenderTestCase(BaseStreamTestCase): for i in range(20): server_name = "other_server_%d" % (i,) room = self.create_room_with_remote_server(user, token, server_name) - mock_client1.reset_mock() - mock_client2.reset_mock() + mock_client1.reset_mock() # type: ignore[attr-defined] + mock_client2.reset_mock() # type: ignore[attr-defined] self.create_and_send_event(room, UserID.from_string(user)) self.replicate() @@ -222,23 +136,29 @@ class FederationSenderTestCase(BaseStreamTestCase): """Test that using two federation sender workers correctly sends new typing EDUs. """ - worker1 = self.make_worker_hs( + mock_client1 = Mock(spec=["put_json"]) + mock_client1.put_json.side_effect = lambda *_, **__: defer.succeed({}) + self.make_worker_hs( + "synapse.app.federation_sender", { "send_federation": True, "worker_name": "sender1", "federation_sender_instances": ["sender1", "sender2"], - } + }, + http_client=mock_client1, ) - mock_client1 = worker1.get_http_client() - worker2 = self.make_worker_hs( + mock_client2 = Mock(spec=["put_json"]) + mock_client2.put_json.side_effect = lambda *_, **__: defer.succeed({}) + self.make_worker_hs( + "synapse.app.federation_sender", { "send_federation": True, "worker_name": "sender2", "federation_sender_instances": ["sender1", "sender2"], - } + }, + http_client=mock_client2, ) - mock_client2 = worker2.get_http_client() user = self.register_user("user3", "pass") token = self.login("user3", "pass") @@ -250,8 +170,8 @@ class FederationSenderTestCase(BaseStreamTestCase): for i in range(20): server_name = "other_server_%d" % (i,) room = self.create_room_with_remote_server(user, token, server_name) - mock_client1.reset_mock() - mock_client2.reset_mock() + mock_client1.reset_mock() # type: ignore[attr-defined] + mock_client2.reset_mock() # type: ignore[attr-defined] self.get_success( typing_handler.started_typing( @@ -284,3 +204,32 @@ class FederationSenderTestCase(BaseStreamTestCase): self.assertTrue(sent_on_1) self.assertTrue(sent_on_2) + + def create_room_with_remote_server(self, user, token, remote_server="other_server"): + room = self.helper.create_room_as(user, tok=token) + store = self.hs.get_datastore() + federation = self.hs.get_handlers().federation_handler + + prev_event_ids = self.get_success(store.get_latest_event_ids_in_room(room)) + room_version = self.get_success(store.get_room_version(room)) + + factory = EventBuilderFactory(self.hs) + factory.hostname = remote_server + + user_id = UserID("user", remote_server).to_string() + + event_dict = { + "type": EventTypes.Member, + "state_key": user_id, + "content": {"membership": Membership.JOIN}, + "sender": user_id, + "room_id": room, + } + + builder = factory.for_room_version(room_version, event_dict) + join_event = self.get_success(builder.build(prev_event_ids)) + + self.get_success(federation.on_send_join_request(remote_server, join_event)) + self.replicate() + + return room diff --git a/tests/server.py b/tests/server.py index a5e57c52fa..b6e0b14e78 100644 --- a/tests/server.py +++ b/tests/server.py @@ -237,6 +237,7 @@ class ThreadedMemoryReactorClock(MemoryReactorClock): def __init__(self): self.threadpool = ThreadPool(self) + self._tcp_callbacks = {} self._udp = [] lookups = self.lookups = {} @@ -268,6 +269,29 @@ class ThreadedMemoryReactorClock(MemoryReactorClock): def getThreadPool(self): return self.threadpool + def add_tcp_client_callback(self, host, port, callback): + """Add a callback that will be invoked when we receive a connection + attempt to the given IP/port using `connectTCP`. + + Note that the callback gets run before we return the connection to the + client, which means callbacks cannot block while waiting for writes. + """ + self._tcp_callbacks[(host, port)] = callback + + def connectTCP(self, host, port, factory, timeout=30, bindAddress=None): + """Fake L{IReactorTCP.connectTCP}. + """ + + conn = super().connectTCP( + host, port, factory, timeout=timeout, bindAddress=None + ) + + callback = self._tcp_callbacks.get((host, port)) + if callback: + callback() + + return conn + class ThreadPool: """ @@ -486,7 +510,7 @@ class FakeTransport(object): try: self.other.dataReceived(to_write) except Exception as e: - logger.warning("Exception writing to protocol: %s", e) + logger.exception("Exception writing to protocol: %s", e) return self.buffer = self.buffer[len(to_write) :] -- cgit 1.5.1 From f2e38ca86711a8f80cf45d3182e426ed8967fc81 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 16 Jul 2020 15:12:54 +0100 Subject: Allow moving typing off master (#7869) --- changelog.d/7869.feature | 1 + synapse/app/generic_worker.py | 36 +---- synapse/config/workers.py | 19 +-- synapse/federation/federation_server.py | 125 +++++++++------- synapse/handlers/typing.py | 241 +++++++++++++++++++++---------- synapse/replication/tcp/handler.py | 9 ++ synapse/replication/tcp/streams/_base.py | 7 +- synapse/rest/client/v1/room.py | 9 ++ synapse/server.py | 13 +- synapse/server.pyi | 2 + 10 files changed, 284 insertions(+), 178 deletions(-) create mode 100644 changelog.d/7869.feature (limited to 'synapse/server.pyi') diff --git a/changelog.d/7869.feature b/changelog.d/7869.feature new file mode 100644 index 0000000000..1982049a52 --- /dev/null +++ b/changelog.d/7869.feature @@ -0,0 +1 @@ +Add experimental support for moving typing off master. diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index e90695f026..c0853eef22 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -111,6 +111,7 @@ from synapse.rest.client.v1.room import ( RoomSendEventRestServlet, RoomStateEventRestServlet, RoomStateRestServlet, + RoomTypingRestServlet, ) from synapse.rest.client.v1.voip import VoipRestServlet from synapse.rest.client.v2_alpha import groups, sync, user_directory @@ -451,37 +452,6 @@ class GenericWorkerPresence(BasePresenceHandler): await self._bump_active_client(user_id=user_id) -class GenericWorkerTyping(object): - def __init__(self, hs): - self._latest_room_serial = 0 - self._reset() - - def _reset(self): - """ - Reset the typing handler's data caches. - """ - # map room IDs to serial numbers - self._room_serials = {} - # map room IDs to sets of users currently typing - self._room_typing = {} - - def process_replication_rows(self, token, rows): - if self._latest_room_serial > token: - # The master has gone backwards. To prevent inconsistent data, just - # clear everything. - self._reset() - - # Set the latest serial token to whatever the server gave us. - self._latest_room_serial = token - - for row in rows: - self._room_serials[row.room_id] = token - self._room_typing[row.room_id] = row.user_ids - - def get_current_token(self) -> int: - return self._latest_room_serial - - class GenericWorkerSlavedStore( # FIXME(#3714): We need to add UserDirectoryStore as we write directly # rather than going via the correct worker. @@ -558,6 +528,7 @@ class GenericWorkerServer(HomeServer): KeyUploadServlet(self).register(resource) AccountDataServlet(self).register(resource) RoomAccountDataServlet(self).register(resource) + RoomTypingRestServlet(self).register(resource) sync.register_servlets(self, resource) events.register_servlets(self, resource) @@ -669,9 +640,6 @@ class GenericWorkerServer(HomeServer): def build_presence_handler(self): return GenericWorkerPresence(self) - def build_typing_handler(self): - return GenericWorkerTyping(self) - class GenericWorkerReplicationHandler(ReplicationDataHandler): def __init__(self, hs): diff --git a/synapse/config/workers.py b/synapse/config/workers.py index dbc661630c..2574cd3aa1 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -34,9 +34,11 @@ class WriterLocations: Attributes: events: The instance that writes to the event and backfill streams. + events: The instance that writes to the typing stream. """ events = attr.ib(default="master", type=str) + typing = attr.ib(default="master", type=str) class WorkerConfig(Config): @@ -93,16 +95,15 @@ class WorkerConfig(Config): writers = config.get("stream_writers") or {} self.writers = WriterLocations(**writers) - # Check that the configured writer for events also appears in + # Check that the configured writer for events and typing also appears in # `instance_map`. - if ( - self.writers.events != "master" - and self.writers.events not in self.instance_map - ): - raise ConfigError( - "Instance %r is configured to write events but does not appear in `instance_map` config." - % (self.writers.events,) - ) + for stream in ("events", "typing"): + instance = getattr(self.writers, stream) + if instance != "master" and instance not in self.instance_map: + raise ConfigError( + "Instance %r is configured to write %s but does not appear in `instance_map` config." + % (instance, stream) + ) def read_arguments(self, args): # We support a bunch of command line arguments that override options in diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 8c53330c49..23625ba995 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -15,7 +15,18 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import Any, Callable, Dict, List, Match, Optional, Tuple, Union +from typing import ( + TYPE_CHECKING, + Any, + Awaitable, + Callable, + Dict, + List, + Match, + Optional, + Tuple, + Union, +) from canonicaljson import json from prometheus_client import Counter, Histogram @@ -56,6 +67,9 @@ from synapse.util import glob_to_regex, unwrapFirstError from synapse.util.async_helpers import Linearizer, concurrently_execute from synapse.util.caches.response_cache import ResponseCache +if TYPE_CHECKING: + from synapse.server import HomeServer + # when processing incoming transactions, we try to handle multiple rooms in # parallel, up to this limit. TRANSACTION_CONCURRENCY_LIMIT = 10 @@ -768,11 +782,30 @@ class FederationHandlerRegistry(object): query type for incoming federation traffic. """ - def __init__(self): - self.edu_handlers = {} - self.query_handlers = {} + def __init__(self, hs: "HomeServer"): + self.config = hs.config + self.http_client = hs.get_simple_http_client() + self.clock = hs.get_clock() + self._instance_name = hs.get_instance_name() - def register_edu_handler(self, edu_type: str, handler: Callable[[str, dict], None]): + # These are safe to load in monolith mode, but will explode if we try + # and use them. However we have guards before we use them to ensure that + # we don't route to ourselves, and in monolith mode that will always be + # the case. + self._get_query_client = ReplicationGetQueryRestServlet.make_client(hs) + self._send_edu = ReplicationFederationSendEduRestServlet.make_client(hs) + + self.edu_handlers = ( + {} + ) # type: Dict[str, Callable[[str, dict], Awaitable[None]]] + self.query_handlers = {} # type: Dict[str, Callable[[dict], Awaitable[None]]] + + # Map from type to instance name that we should route EDU handling to. + self._edu_type_to_instance = {} # type: Dict[str, str] + + def register_edu_handler( + self, edu_type: str, handler: Callable[[str, dict], Awaitable[None]] + ): """Sets the handler callable that will be used to handle an incoming federation EDU of the given type. @@ -809,66 +842,56 @@ class FederationHandlerRegistry(object): self.query_handlers[query_type] = handler + def register_instance_for_edu(self, edu_type: str, instance_name: str): + """Register that the EDU handler is on a different instance than master. + """ + self._edu_type_to_instance[edu_type] = instance_name + async def on_edu(self, edu_type: str, origin: str, content: dict): + if not self.config.use_presence and edu_type == "m.presence": + return + + # Check if we have a handler on this instance handler = self.edu_handlers.get(edu_type) - if not handler: - logger.warning("No handler registered for EDU type %s", edu_type) + if handler: + with start_active_span_from_edu(content, "handle_edu"): + try: + await handler(origin, content) + except SynapseError as e: + logger.info("Failed to handle edu %r: %r", edu_type, e) + except Exception: + logger.exception("Failed to handle edu %r", edu_type) return - with start_active_span_from_edu(content, "handle_edu"): + # Check if we can route it somewhere else that isn't us + route_to = self._edu_type_to_instance.get(edu_type, "master") + if route_to != self._instance_name: try: - await handler(origin, content) + await self._send_edu( + instance_name=route_to, + edu_type=edu_type, + origin=origin, + content=content, + ) except SynapseError as e: logger.info("Failed to handle edu %r: %r", edu_type, e) except Exception: logger.exception("Failed to handle edu %r", edu_type) - - def on_query(self, query_type: str, args: dict) -> defer.Deferred: - handler = self.query_handlers.get(query_type) - if not handler: - logger.warning("No handler registered for query type %s", query_type) - raise NotFoundError("No handler for Query type '%s'" % (query_type,)) - - return handler(args) - - -class ReplicationFederationHandlerRegistry(FederationHandlerRegistry): - """A FederationHandlerRegistry for worker processes. - - When receiving EDU or queries it will check if an appropriate handler has - been registered on the worker, if there isn't one then it calls off to the - master process. - """ - - def __init__(self, hs): - self.config = hs.config - self.http_client = hs.get_simple_http_client() - self.clock = hs.get_clock() - - self._get_query_client = ReplicationGetQueryRestServlet.make_client(hs) - self._send_edu = ReplicationFederationSendEduRestServlet.make_client(hs) - - super(ReplicationFederationHandlerRegistry, self).__init__() - - async def on_edu(self, edu_type: str, origin: str, content: dict): - """Overrides FederationHandlerRegistry - """ - if not self.config.use_presence and edu_type == "m.presence": return - handler = self.edu_handlers.get(edu_type) - if handler: - return await super(ReplicationFederationHandlerRegistry, self).on_edu( - edu_type, origin, content - ) - - return await self._send_edu(edu_type=edu_type, origin=origin, content=content) + # Oh well, let's just log and move on. + logger.warning("No handler registered for EDU type %s", edu_type) async def on_query(self, query_type: str, args: dict): - """Overrides FederationHandlerRegistry - """ handler = self.query_handlers.get(query_type) if handler: return await handler(args) - return await self._get_query_client(query_type=query_type, args=args) + # Check if we can route it somewhere else that isn't us + if self._instance_name == "master": + return await self._get_query_client(query_type=query_type, args=args) + + # Uh oh, no handler! Let's raise an exception so the request returns an + # error. + logger.warning("No handler registered for query type %s", query_type) + raise NotFoundError("No handler for Query type '%s'" % (query_type,)) diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 846ddbdc6c..a86ac0150e 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -15,15 +15,19 @@ import logging from collections import namedtuple -from typing import List, Tuple +from typing import TYPE_CHECKING, List, Set, Tuple from synapse.api.errors import AuthError, SynapseError -from synapse.logging.context import run_in_background +from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.replication.tcp.streams import TypingStream from synapse.types import UserID, get_domain_from_id from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.metrics import Measure from synapse.util.wheel_timer import WheelTimer +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) @@ -39,48 +43,48 @@ FEDERATION_TIMEOUT = 60 * 1000 FEDERATION_PING_INTERVAL = 40 * 1000 -class TypingHandler(object): - def __init__(self, hs): +class FollowerTypingHandler: + """A typing handler on a different process than the writer that is updated + via replication. + """ + + def __init__(self, hs: "HomeServer"): self.store = hs.get_datastore() self.server_name = hs.config.server_name - self.auth = hs.get_auth() - self.is_mine_id = hs.is_mine_id - self.notifier = hs.get_notifier() - self.state = hs.get_state_handler() - - self.hs = hs - self.clock = hs.get_clock() - self.wheel_timer = WheelTimer(bucket_size=5000) + self.is_mine_id = hs.is_mine_id - self.federation = hs.get_federation_sender() + self.federation = None + if hs.should_send_federation(): + self.federation = hs.get_federation_sender() - hs.get_federation_registry().register_edu_handler("m.typing", self._recv_edu) + if hs.config.worker.writers.typing != hs.get_instance_name(): + hs.get_federation_registry().register_instance_for_edu( + "m.typing", hs.config.worker.writers.typing, + ) - hs.get_distributor().observe("user_left_room", self.user_left_room) + # map room IDs to serial numbers + self._room_serials = {} + # map room IDs to sets of users currently typing + self._room_typing = {} - self._member_typing_until = {} # clock time we expect to stop self._member_last_federation_poke = {} - + self.wheel_timer = WheelTimer(bucket_size=5000) self._latest_room_serial = 0 - self._reset() - - # caches which room_ids changed at which serials - self._typing_stream_change_cache = StreamChangeCache( - "TypingStreamChangeCache", self._latest_room_serial - ) self.clock.looping_call(self._handle_timeouts, 5000) def _reset(self): - """ - Reset the typing handler's data caches. + """Reset the typing handler's data caches. """ # map room IDs to serial numbers self._room_serials = {} # map room IDs to sets of users currently typing self._room_typing = {} + self._member_last_federation_poke = {} + self.wheel_timer = WheelTimer(bucket_size=5000) + def _handle_timeouts(self): logger.debug("Checking for typing timeouts") @@ -89,30 +93,140 @@ class TypingHandler(object): members = set(self.wheel_timer.fetch(now)) for member in members: - if not self.is_typing(member): - # Nothing to do if they're no longer typing - continue - - until = self._member_typing_until.get(member, None) - if not until or until <= now: - logger.info("Timing out typing for: %s", member.user_id) - self._stopped_typing(member) - continue - - # Check if we need to resend a keep alive over federation for this - # user. - if self.hs.is_mine_id(member.user_id): - last_fed_poke = self._member_last_federation_poke.get(member, None) - if not last_fed_poke or last_fed_poke + FEDERATION_PING_INTERVAL <= now: - run_in_background(self._push_remote, member=member, typing=True) - - # Add a paranoia timer to ensure that we always have a timer for - # each person typing. - self.wheel_timer.insert(now=now, obj=member, then=now + 60 * 1000) + self._handle_timeout_for_member(now, member) + + def _handle_timeout_for_member(self, now: int, member: RoomMember): + if not self.is_typing(member): + # Nothing to do if they're no longer typing + return + + # Check if we need to resend a keep alive over federation for this + # user. + if self.federation and self.is_mine_id(member.user_id): + last_fed_poke = self._member_last_federation_poke.get(member, None) + if not last_fed_poke or last_fed_poke + FEDERATION_PING_INTERVAL <= now: + run_as_background_process( + "typing._push_remote", self._push_remote, member=member, typing=True + ) + + # Add a paranoia timer to ensure that we always have a timer for + # each person typing. + self.wheel_timer.insert(now=now, obj=member, then=now + 60 * 1000) def is_typing(self, member): return member.user_id in self._room_typing.get(member.room_id, []) + async def _push_remote(self, member, typing): + if not self.federation: + return + + try: + users = await self.store.get_users_in_room(member.room_id) + self._member_last_federation_poke[member] = self.clock.time_msec() + + now = self.clock.time_msec() + self.wheel_timer.insert( + now=now, obj=member, then=now + FEDERATION_PING_INTERVAL + ) + + for domain in {get_domain_from_id(u) for u in users}: + if domain != self.server_name: + logger.debug("sending typing update to %s", domain) + self.federation.build_and_send_edu( + destination=domain, + edu_type="m.typing", + content={ + "room_id": member.room_id, + "user_id": member.user_id, + "typing": typing, + }, + key=member, + ) + except Exception: + logger.exception("Error pushing typing notif to remotes") + + def process_replication_rows( + self, token: int, rows: List[TypingStream.TypingStreamRow] + ): + """Should be called whenever we receive updates for typing stream. + """ + + if self._latest_room_serial > token: + # The master has gone backwards. To prevent inconsistent data, just + # clear everything. + self._reset() + + # Set the latest serial token to whatever the server gave us. + self._latest_room_serial = token + + for row in rows: + self._room_serials[row.room_id] = token + + prev_typing = set(self._room_typing.get(row.room_id, [])) + now_typing = set(row.user_ids) + self._room_typing[row.room_id] = row.user_ids + + run_as_background_process( + "_handle_change_in_typing", + self._handle_change_in_typing, + row.room_id, + prev_typing, + now_typing, + ) + + async def _handle_change_in_typing( + self, room_id: str, prev_typing: Set[str], now_typing: Set[str] + ): + """Process a change in typing of a room from replication, sending EDUs + for any local users. + """ + for user_id in now_typing - prev_typing: + if self.is_mine_id(user_id): + await self._push_remote(RoomMember(room_id, user_id), True) + + for user_id in prev_typing - now_typing: + if self.is_mine_id(user_id): + await self._push_remote(RoomMember(room_id, user_id), False) + + def get_current_token(self): + return self._latest_room_serial + + +class TypingWriterHandler(FollowerTypingHandler): + def __init__(self, hs): + super().__init__(hs) + + assert hs.config.worker.writers.typing == hs.get_instance_name() + + self.auth = hs.get_auth() + self.notifier = hs.get_notifier() + + self.hs = hs + + hs.get_federation_registry().register_edu_handler("m.typing", self._recv_edu) + + hs.get_distributor().observe("user_left_room", self.user_left_room) + + self._member_typing_until = {} # clock time we expect to stop + + # caches which room_ids changed at which serials + self._typing_stream_change_cache = StreamChangeCache( + "TypingStreamChangeCache", self._latest_room_serial + ) + + def _handle_timeout_for_member(self, now: int, member: RoomMember): + super()._handle_timeout_for_member(now, member) + + if not self.is_typing(member): + # Nothing to do if they're no longer typing + return + + until = self._member_typing_until.get(member, None) + if not until or until <= now: + logger.info("Timing out typing for: %s", member.user_id) + self._stopped_typing(member) + return + async def started_typing(self, target_user, auth_user, room_id, timeout): target_user_id = target_user.to_string() auth_user_id = auth_user.to_string() @@ -179,35 +293,11 @@ class TypingHandler(object): def _push_update(self, member, typing): if self.hs.is_mine_id(member.user_id): # Only send updates for changes to our own users. - run_in_background(self._push_remote, member, typing) - - self._push_update_local(member=member, typing=typing) - - async def _push_remote(self, member, typing): - try: - users = await self.store.get_users_in_room(member.room_id) - self._member_last_federation_poke[member] = self.clock.time_msec() - - now = self.clock.time_msec() - self.wheel_timer.insert( - now=now, obj=member, then=now + FEDERATION_PING_INTERVAL + run_as_background_process( + "typing._push_remote", self._push_remote, member, typing ) - for domain in {get_domain_from_id(u) for u in users}: - if domain != self.server_name: - logger.debug("sending typing update to %s", domain) - self.federation.build_and_send_edu( - destination=domain, - edu_type="m.typing", - content={ - "room_id": member.room_id, - "user_id": member.user_id, - "typing": typing, - }, - key=member, - ) - except Exception: - logger.exception("Error pushing typing notif to remotes") + self._push_update_local(member=member, typing=typing) async def _recv_edu(self, origin, content): room_id = content["room_id"] @@ -304,8 +394,11 @@ class TypingHandler(object): return rows, current_id, limited - def get_current_token(self): - return self._latest_room_serial + def process_replication_rows( + self, token: int, rows: List[TypingStream.TypingStreamRow] + ): + # The writing process should never get updates from replication. + raise Exception("Typing writer instance got typing info over replication") class TypingNotificationEventSource(object): diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 80f5df60f9..30d8de48fa 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -42,6 +42,7 @@ from synapse.replication.tcp.streams import ( EventsStream, FederationStream, Stream, + TypingStream, ) from synapse.util.async_helpers import Linearizer @@ -96,6 +97,14 @@ class ReplicationCommandHandler: continue + if isinstance(stream, TypingStream): + # Only add TypingStream as a source on the instance in charge of + # typing. + if hs.config.worker.writers.typing == hs.get_instance_name(): + self._streams_to_replicate.append(stream) + + continue + # Only add any other streams if we're on master. if hs.config.worker_app is not None: continue diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index 9076bbe9f1..7a42de3f7d 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -294,11 +294,12 @@ class TypingStream(Stream): def __init__(self, hs): typing_handler = hs.get_typing_handler() - if hs.config.worker_app is None: - # on the master, query the typing handler + writer_instance = hs.config.worker.writers.typing + if writer_instance == hs.get_instance_name(): + # On the writer, query the typing handler update_function = typing_handler.get_all_typing_updates else: - # Query master process + # Query the typing writer process update_function = make_http_update_function(hs, self.NAME) super().__init__( diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index ea5912d4e4..26d5a51cb2 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -817,9 +817,18 @@ class RoomTypingRestServlet(RestServlet): self.typing_handler = hs.get_typing_handler() self.auth = hs.get_auth() + # If we're not on the typing writer instance we should scream if we get + # requests. + self._is_typing_writer = ( + hs.config.worker.writers.typing == hs.get_instance_name() + ) + async def on_PUT(self, request, room_id, user_id): requester = await self.auth.get_user_by_req(request) + if not self._is_typing_writer: + raise Exception("Got /typing request on instance that is not typing writer") + room_id = urlparse.unquote(room_id) target_user = UserID.from_string(urlparse.unquote(user_id)) diff --git a/synapse/server.py b/synapse/server.py index 0e6ea96b33..8e41112530 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -44,7 +44,6 @@ from synapse.federation.federation_client import FederationClient from synapse.federation.federation_server import ( FederationHandlerRegistry, FederationServer, - ReplicationFederationHandlerRegistry, ) from synapse.federation.send_queue import FederationRemoteSendQueue from synapse.federation.sender import FederationSender @@ -84,7 +83,7 @@ from synapse.handlers.room_member_worker import RoomMemberWorkerHandler from synapse.handlers.set_password import SetPasswordHandler from synapse.handlers.stats import StatsHandler from synapse.handlers.sync import SyncHandler -from synapse.handlers.typing import TypingHandler +from synapse.handlers.typing import FollowerTypingHandler, TypingWriterHandler from synapse.handlers.user_directory import UserDirectoryHandler from synapse.http.client import InsecureInterceptableContextFactory, SimpleHttpClient from synapse.http.matrixfederationclient import MatrixFederationHttpClient @@ -378,7 +377,10 @@ class HomeServer(object): return PresenceHandler(self) def build_typing_handler(self): - return TypingHandler(self) + if self.config.worker.writers.typing == self.get_instance_name(): + return TypingWriterHandler(self) + else: + return FollowerTypingHandler(self) def build_sync_handler(self): return SyncHandler(self) @@ -534,10 +536,7 @@ class HomeServer(object): return RoomMemberMasterHandler(self) def build_federation_registry(self): - if self.config.worker_app: - return ReplicationFederationHandlerRegistry(self) - else: - return FederationHandlerRegistry() + return FederationHandlerRegistry(self) def build_server_notices_manager(self): if self.config.worker_app: diff --git a/synapse/server.pyi b/synapse/server.pyi index cd50c721b8..90a673778f 100644 --- a/synapse/server.pyi +++ b/synapse/server.pyi @@ -148,3 +148,5 @@ class HomeServer(object): self, ) -> synapse.http.matrixfederationclient.MatrixFederationHttpClient: pass + def should_send_federation(self) -> bool: + pass -- cgit 1.5.1 From 84d099ae1192af0f38d26f9a32e38bd4c0ad304e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 27 Jul 2020 14:10:53 +0100 Subject: Fix typing replication not being handled on master (#7959) Handling of incoming typing stream updates from replication was not hooked up on master, effecting set ups where typing was handled on a different worker. This is really only a problem if the master process is also handling sync requests, which is unlikely for those that are at the stage of moving typing off. The other observable effect is that if a worker restarts or a replication connect drops then the typing worker will issue a `POSITION typing`, triggering master process to try and stream *all* typing updates from position 0. Fixes #7907 --- changelog.d/7959.bugfix | 1 + synapse/app/generic_worker.py | 7 ------- synapse/replication/tcp/client.py | 8 ++++++++ synapse/server.pyi | 3 +++ 4 files changed, 12 insertions(+), 7 deletions(-) create mode 100644 changelog.d/7959.bugfix (limited to 'synapse/server.pyi') diff --git a/changelog.d/7959.bugfix b/changelog.d/7959.bugfix new file mode 100644 index 0000000000..1982049a52 --- /dev/null +++ b/changelog.d/7959.bugfix @@ -0,0 +1 @@ +Add experimental support for moving typing off master. diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index c1b76d827b..ec0dbddb8c 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -87,7 +87,6 @@ from synapse.replication.tcp.streams import ( ReceiptsStream, TagAccountDataStream, ToDeviceStream, - TypingStream, ) from synapse.rest.admin import register_servlets_for_media_repo from synapse.rest.client.v1 import events @@ -644,7 +643,6 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler): super(GenericWorkerReplicationHandler, self).__init__(hs) self.store = hs.get_datastore() - self.typing_handler = hs.get_typing_handler() self.presence_handler = hs.get_presence_handler() # type: GenericWorkerPresence self.notifier = hs.get_notifier() @@ -681,11 +679,6 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler): await self.pusher_pool.on_new_receipts( token, token, {row.room_id for row in rows} ) - elif stream_name == TypingStream.NAME: - self.typing_handler.process_replication_rows(token, rows) - self.notifier.on_new_event( - "typing_key", token, rooms=[row.room_id for row in rows] - ) elif stream_name == ToDeviceStream.NAME: entities = [row.entity for row in rows if row.entity.startswith("@")] if entities: diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 4985e40b1f..fcf8ebf1e7 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -24,6 +24,7 @@ from twisted.internet.protocol import ReconnectingClientFactory from synapse.api.constants import EventTypes from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol +from synapse.replication.tcp.streams import TypingStream from synapse.replication.tcp.streams.events import ( EventsStream, EventsStreamEventRow, @@ -104,6 +105,7 @@ class ReplicationDataHandler: self._clock = hs.get_clock() self._streams = hs.get_replication_streams() self._instance_name = hs.get_instance_name() + self._typing_handler = hs.get_typing_handler() # Map from stream to list of deferreds waiting for the stream to # arrive at a particular position. The lists are sorted by stream position. @@ -127,6 +129,12 @@ class ReplicationDataHandler: """ self.store.process_replication_rows(stream_name, instance_name, token, rows) + if stream_name == TypingStream.NAME: + self._typing_handler.process_replication_rows(token, rows) + self.notifier.on_new_event( + "typing_key", token, rooms=[row.room_id for row in rows] + ) + if stream_name == EventsStream.NAME: # We shouldn't get multiple rows per token for events stream, so # we don't need to optimise this for multiple rows. diff --git a/synapse/server.pyi b/synapse/server.pyi index 90a673778f..1aba408c21 100644 --- a/synapse/server.pyi +++ b/synapse/server.pyi @@ -31,6 +31,7 @@ import synapse.server_notices.server_notices_sender import synapse.state import synapse.storage from synapse.events.builder import EventBuilderFactory +from synapse.handlers.typing import FollowerTypingHandler from synapse.replication.tcp.streams import Stream class HomeServer(object): @@ -150,3 +151,5 @@ class HomeServer(object): pass def should_send_federation(self) -> bool: pass + def get_typing_handler(self) -> FollowerTypingHandler: + pass -- cgit 1.5.1 From 0f1afbe8dc853c8726de797ede10cad2fe336b16 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 11 Aug 2020 18:00:17 +0100 Subject: Change HomeServer definition to work with typing. Duplicating function signatures between server.py and server.pyi is silly. This commit changes that by changing all `build_*` methods to `get_*` methods and changing the `_make_dependency_method` to work work as a descriptor that caches the produced value. There are some changes in other files that were made to fix the typing in server.py. --- synapse/app/generic_worker.py | 8 +- synapse/handlers/oidc_handler.py | 8 +- synapse/secrets.py | 8 +- synapse/server.py | 455 +++++++++++++++++----------------- synapse/server.pyi | 155 ------------ synapse/storage/databases/__init__.py | 28 ++- tox.ini | 1 + 7 files changed, 264 insertions(+), 399 deletions(-) delete mode 100644 synapse/server.pyi (limited to 'synapse/server.pyi') diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 7957586d69..739b013d4c 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -125,7 +125,7 @@ from synapse.rest.client.v2_alpha.register import RegisterRestServlet from synapse.rest.client.versions import VersionsRestServlet from synapse.rest.health import HealthResource from synapse.rest.key.v2 import KeyApiV2Resource -from synapse.server import HomeServer +from synapse.server import HomeServer, cache_in_self from synapse.storage.databases.main.censor_events import CensorEventsStore from synapse.storage.databases.main.media_repository import MediaRepositoryStore from synapse.storage.databases.main.monthly_active_users import ( @@ -635,10 +635,12 @@ class GenericWorkerServer(HomeServer): async def remove_pusher(self, app_id, push_key, user_id): self.get_tcp_replication().send_remove_pusher(app_id, push_key, user_id) - def build_replication_data_handler(self): + @cache_in_self + def get_replication_data_handler(self): return GenericWorkerReplicationHandler(self) - def build_presence_handler(self): + @cache_in_self + def get_presence_handler(self): return GenericWorkerPresence(self) diff --git a/synapse/handlers/oidc_handler.py b/synapse/handlers/oidc_handler.py index 87f0c5e197..fa5ee5de8f 100644 --- a/synapse/handlers/oidc_handler.py +++ b/synapse/handlers/oidc_handler.py @@ -14,7 +14,7 @@ # limitations under the License. import json import logging -from typing import Dict, Generic, List, Optional, Tuple, TypeVar +from typing import TYPE_CHECKING, Dict, Generic, List, Optional, Tuple, TypeVar from urllib.parse import urlencode import attr @@ -39,9 +39,11 @@ from synapse.http.server import respond_with_html from synapse.http.site import SynapseRequest from synapse.logging.context import make_deferred_yieldable from synapse.push.mailer import load_jinja2_templates -from synapse.server import HomeServer from synapse.types import UserID, map_username_to_mxid_localpart +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) SESSION_COOKIE_NAME = b"oidc_session" @@ -91,7 +93,7 @@ class OidcHandler: """Handles requests related to the OpenID Connect login flow. """ - def __init__(self, hs: HomeServer): + def __init__(self, hs: "HomeServer"): self._callback_url = hs.config.oidc_callback_url # type: str self._scopes = hs.config.oidc_scopes # type: List[str] self._client_auth = ClientAuth( diff --git a/synapse/secrets.py b/synapse/secrets.py index 5f43f81eb0..ff86950a54 100644 --- a/synapse/secrets.py +++ b/synapse/secrets.py @@ -25,8 +25,12 @@ import sys if sys.version_info[0:2] >= (3, 6): import secrets - def Secrets(): - return secrets + class Secrets: + def token_bytes(self, nbytes=32): + return secrets.token_bytes(nbytes) + + def token_hex(self, nbytes=32): + return secrets.token_hex(nbytes) else: diff --git a/synapse/server.py b/synapse/server.py index 81d7f26f9c..aec1ba408c 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -22,10 +22,14 @@ # Imports required for the default HomeServer() implementation import abc +import functools import logging import os +from typing import Any, Callable, Dict, List, Optional, TypeVar, cast +import twisted from twisted.mail.smtp import sendmail +from twisted.web.iweb import IPolicyForHTTPS from synapse.api.auth import Auth from synapse.api.filtering import Filtering @@ -65,6 +69,7 @@ from synapse.handlers.events import EventHandler, EventStreamHandler from synapse.handlers.groups_local import GroupsLocalHandler, GroupsLocalWorkerHandler from synapse.handlers.initial_sync import InitialSyncHandler from synapse.handlers.message import EventCreationHandler, MessageHandler +from synapse.handlers.oidc_handler import OidcHandler from synapse.handlers.pagination import PaginationHandler from synapse.handlers.password_policy import PasswordPolicyHandler from synapse.handlers.presence import PresenceHandler @@ -80,6 +85,7 @@ from synapse.handlers.room import ( from synapse.handlers.room_list import RoomListHandler from synapse.handlers.room_member import RoomMemberMasterHandler from synapse.handlers.room_member_worker import RoomMemberWorkerHandler +from synapse.handlers.saml_handler import SamlHandler from synapse.handlers.set_password import SetPasswordHandler from synapse.handlers.stats import StatsHandler from synapse.handlers.sync import SyncHandler @@ -93,7 +99,7 @@ from synapse.push.pusherpool import PusherPool from synapse.replication.tcp.client import ReplicationDataHandler from synapse.replication.tcp.handler import ReplicationCommandHandler from synapse.replication.tcp.resource import ReplicationStreamer -from synapse.replication.tcp.streams import STREAMS_MAP +from synapse.replication.tcp.streams import STREAMS_MAP, Stream from synapse.rest.media.v1.media_repository import ( MediaRepository, MediaRepositoryResource, @@ -107,6 +113,7 @@ from synapse.server_notices.worker_server_notices_sender import ( from synapse.state import StateHandler, StateResolutionHandler from synapse.storage import Databases, DataStore, Storage from synapse.streams.events import EventSources +from synapse.types import DomainSpecificString from synapse.util import Clock from synapse.util.distributor import Distributor from synapse.util.stringutils import random_string @@ -114,23 +121,58 @@ from synapse.util.stringutils import random_string logger = logging.getLogger(__name__) -class HomeServer(object): +T = TypeVar("T", bound=Callable[..., Any]) + + +def cache_in_self(builder: T) -> T: + """Wraps a function called e.g. `get_foo`, checking if `self.foo` exists and + returning if so. If not, calls the given function and sets `self.foo` to it. + + Also ensures that dependency cycles throw an exception correctly, rather + than overflowing the stack. + """ + + if not builder.__name__.startswith("get_"): + raise Exception( + "@cache_in_self can only be used on functions starting with `get_`" + ) + + depname = builder.__name__[len("get_") :] + + building = [False] + + @functools.wraps(builder) + def _get(self): + try: + return getattr(self, depname) + except AttributeError: + pass + + # Prevent cyclic dependencies from deadlocking + if building[0]: + raise ValueError("Cyclic dependency while building %s" % (depname,)) + + building[0] = True + try: + dep = builder(self) + setattr(self, depname, dep) + finally: + building[0] = False + + return dep + + return cast(T, _get) + + +class HomeServer(metaclass=abc.ABCMeta): """A basic homeserver object without lazy component builders. This will need all of the components it requires to either be passed as constructor arguments, or the relevant methods overriding to create them. Typically this would only be used for unit tests. - For every dependency in the DEPENDENCIES list below, this class creates one - method, - def get_DEPENDENCY(self) - which returns the value of that dependency. If no value has yet been set - nor was provided to the constructor, it will attempt to call a lazy builder - method called - def build_DEPENDENCY(self) - which must be implemented by the subclass. This code may call any of the - required "get" methods on the instance to obtain the sub-dependencies that - one requires. + Dependencies should be added by creating a `def get_(self)` + function, wrapping it in `@cache_in_self`. Attributes: config (synapse.config.homeserver.HomeserverConfig): @@ -138,86 +180,6 @@ class HomeServer(object): we are listening on to provide HTTP services. """ - __metaclass__ = abc.ABCMeta - - DEPENDENCIES = [ - "http_client", - "federation_client", - "federation_server", - "handlers", - "auth", - "room_creation_handler", - "room_shutdown_handler", - "state_handler", - "state_resolution_handler", - "presence_handler", - "sync_handler", - "typing_handler", - "room_list_handler", - "acme_handler", - "auth_handler", - "device_handler", - "stats_handler", - "e2e_keys_handler", - "e2e_room_keys_handler", - "event_handler", - "event_stream_handler", - "initial_sync_handler", - "application_service_api", - "application_service_scheduler", - "application_service_handler", - "device_message_handler", - "profile_handler", - "event_creation_handler", - "deactivate_account_handler", - "set_password_handler", - "notifier", - "event_sources", - "keyring", - "pusherpool", - "event_builder_factory", - "filtering", - "http_client_context_factory", - "simple_http_client", - "proxied_http_client", - "media_repository", - "media_repository_resource", - "federation_transport_client", - "federation_sender", - "receipts_handler", - "macaroon_generator", - "tcp_replication", - "read_marker_handler", - "action_generator", - "user_directory_handler", - "groups_local_handler", - "groups_server_handler", - "groups_attestation_signing", - "groups_attestation_renewer", - "secrets", - "spam_checker", - "third_party_event_rules", - "room_member_handler", - "federation_registry", - "server_notices_manager", - "server_notices_sender", - "message_handler", - "pagination_handler", - "room_context_handler", - "sendmail", - "registration_handler", - "account_validity_handler", - "cas_handler", - "saml_handler", - "oidc_handler", - "event_client_serializer", - "password_policy_handler", - "storage", - "replication_streamer", - "replication_data_handler", - "replication_streams", - ] - REQUIRED_ON_MASTER_STARTUP = ["user_directory_handler", "stats_handler"] # This is overridden in derived application classes @@ -232,16 +194,17 @@ class HomeServer(object): config: The full config for the homeserver. """ if not reactor: - from twisted.internet import reactor + from twisted.internet import reactor as _reactor + + reactor = _reactor self._reactor = reactor self.hostname = hostname # the key we use to sign events and requests self.signing_key = config.key.signing_key[0] self.config = config - self._building = {} - self._listening_services = [] - self.start_time = None + self._listening_services = [] # type: List[twisted.internet.tcp.Port] + self.start_time = None # type: Optional[int] self._instance_id = random_string(5) self._instance_name = config.worker_name or "master" @@ -255,13 +218,13 @@ class HomeServer(object): burst_count=config.rc_registration.burst_count, ) - self.datastores = None + self.datastores = None # type: Optional[Databases] # Other kwargs are explicit dependencies for depname in kwargs: setattr(self, depname, kwargs[depname]) - def get_instance_id(self): + def get_instance_id(self) -> str: """A unique ID for this synapse process instance. This is used to distinguish running instances in worker-based @@ -277,13 +240,13 @@ class HomeServer(object): """ return self._instance_name - def setup(self): + def setup(self) -> None: logger.info("Setting up.") self.start_time = int(self.get_clock().time()) self.datastores = Databases(self.DATASTORE_CLASS, self) logger.info("Finished setting up.") - def setup_master(self): + def setup_master(self) -> None: """ Some handlers have side effects on instantiation (like registering background updates). This function causes them to be fetched, and @@ -292,192 +255,242 @@ class HomeServer(object): for i in self.REQUIRED_ON_MASTER_STARTUP: getattr(self, "get_" + i)() - def get_reactor(self): + def get_reactor(self) -> twisted.internet.base.ReactorBase: """ Fetch the Twisted reactor in use by this HomeServer. """ return self._reactor - def get_ip_from_request(self, request): + def get_ip_from_request(self, request) -> str: # X-Forwarded-For is handled by our custom request type. return request.getClientIP() - def is_mine(self, domain_specific_string): + def is_mine(self, domain_specific_string: DomainSpecificString) -> bool: return domain_specific_string.domain == self.hostname - def is_mine_id(self, string): + def is_mine_id(self, string: str) -> bool: return string.split(":", 1)[1] == self.hostname - def get_clock(self): + def get_clock(self) -> Clock: return self.clock def get_datastore(self) -> DataStore: + if not self.datastores: + raise Exception("HomeServer.setup must be called before getting datastores") + return self.datastores.main - def get_datastores(self): + def get_datastores(self) -> Databases: + if not self.datastores: + raise Exception("HomeServer.setup must be called before getting datastores") + return self.datastores - def get_config(self): + def get_config(self) -> HomeServerConfig: return self.config - def get_distributor(self): + def get_distributor(self) -> Distributor: return self.distributor def get_registration_ratelimiter(self) -> Ratelimiter: return self.registration_ratelimiter - def build_federation_client(self): + @cache_in_self + def get_federation_client(self) -> FederationClient: return FederationClient(self) - def build_federation_server(self): + @cache_in_self + def get_federation_server(self) -> FederationServer: return FederationServer(self) - def build_handlers(self): + @cache_in_self + def get_handlers(self) -> Handlers: return Handlers(self) - def build_notifier(self): + @cache_in_self + def get_notifier(self) -> Notifier: return Notifier(self) - def build_auth(self): + @cache_in_self + def get_auth(self) -> Auth: return Auth(self) - def build_http_client_context_factory(self): + @cache_in_self + def get_http_client_context_factory(self) -> IPolicyForHTTPS: return ( InsecureInterceptableContextFactory() if self.config.use_insecure_ssl_client_just_for_testing_do_not_use else RegularPolicyForHTTPS() ) - def build_simple_http_client(self): + @cache_in_self + def get_simple_http_client(self) -> SimpleHttpClient: return SimpleHttpClient(self) - def build_proxied_http_client(self): + @cache_in_self + def get_proxied_http_client(self) -> SimpleHttpClient: return SimpleHttpClient( self, http_proxy=os.getenvb(b"http_proxy"), https_proxy=os.getenvb(b"HTTPS_PROXY"), ) - def build_room_creation_handler(self): + @cache_in_self + def get_room_creation_handler(self) -> RoomCreationHandler: return RoomCreationHandler(self) - def build_room_shutdown_handler(self): + @cache_in_self + def get_room_shutdown_handler(self) -> RoomShutdownHandler: return RoomShutdownHandler(self) - def build_sendmail(self): + @cache_in_self + def get_sendmail(self) -> sendmail: return sendmail - def build_state_handler(self): + @cache_in_self + def get_state_handler(self) -> StateHandler: return StateHandler(self) - def build_state_resolution_handler(self): + @cache_in_self + def get_state_resolution_handler(self) -> StateResolutionHandler: return StateResolutionHandler(self) - def build_presence_handler(self): + @cache_in_self + def get_presence_handler(self) -> PresenceHandler: return PresenceHandler(self) - def build_typing_handler(self): + @cache_in_self + def get_typing_handler(self): if self.config.worker.writers.typing == self.get_instance_name(): return TypingWriterHandler(self) else: return FollowerTypingHandler(self) - def build_sync_handler(self): + @cache_in_self + def get_sync_handler(self) -> SyncHandler: return SyncHandler(self) - def build_room_list_handler(self): + @cache_in_self + def get_room_list_handler(self) -> RoomListHandler: return RoomListHandler(self) - def build_auth_handler(self): + @cache_in_self + def get_auth_handler(self) -> AuthHandler: return AuthHandler(self) - def build_macaroon_generator(self): + @cache_in_self + def get_macaroon_generator(self) -> MacaroonGenerator: return MacaroonGenerator(self) - def build_device_handler(self): + @cache_in_self + def get_device_handler(self): if self.config.worker_app: return DeviceWorkerHandler(self) else: return DeviceHandler(self) - def build_device_message_handler(self): + @cache_in_self + def get_device_message_handler(self) -> DeviceMessageHandler: return DeviceMessageHandler(self) - def build_e2e_keys_handler(self): + @cache_in_self + def get_e2e_keys_handler(self) -> E2eKeysHandler: return E2eKeysHandler(self) - def build_e2e_room_keys_handler(self): + @cache_in_self + def get_e2e_room_keys_handler(self) -> E2eRoomKeysHandler: return E2eRoomKeysHandler(self) - def build_acme_handler(self): + @cache_in_self + def get_acme_handler(self) -> AcmeHandler: return AcmeHandler(self) - def build_application_service_api(self): + @cache_in_self + def get_application_service_api(self) -> ApplicationServiceApi: return ApplicationServiceApi(self) - def build_application_service_scheduler(self): + @cache_in_self + def get_application_service_scheduler(self) -> ApplicationServiceScheduler: return ApplicationServiceScheduler(self) - def build_application_service_handler(self): + @cache_in_self + def get_application_service_handler(self) -> ApplicationServicesHandler: return ApplicationServicesHandler(self) - def build_event_handler(self): + @cache_in_self + def get_event_handler(self) -> EventHandler: return EventHandler(self) - def build_event_stream_handler(self): + @cache_in_self + def get_event_stream_handler(self) -> EventStreamHandler: return EventStreamHandler(self) - def build_initial_sync_handler(self): + @cache_in_self + def get_initial_sync_handler(self) -> InitialSyncHandler: return InitialSyncHandler(self) - def build_profile_handler(self): + @cache_in_self + def get_profile_handler(self): if self.config.worker_app: return BaseProfileHandler(self) else: return MasterProfileHandler(self) - def build_event_creation_handler(self): + @cache_in_self + def get_event_creation_handler(self) -> EventCreationHandler: return EventCreationHandler(self) - def build_deactivate_account_handler(self): + @cache_in_self + def get_deactivate_account_handler(self) -> DeactivateAccountHandler: return DeactivateAccountHandler(self) - def build_set_password_handler(self): + @cache_in_self + def get_set_password_handler(self) -> SetPasswordHandler: return SetPasswordHandler(self) - def build_event_sources(self): + @cache_in_self + def get_event_sources(self) -> EventSources: return EventSources(self) - def build_keyring(self): + @cache_in_self + def get_keyring(self) -> Keyring: return Keyring(self) - def build_event_builder_factory(self): + @cache_in_self + def get_event_builder_factory(self) -> EventBuilderFactory: return EventBuilderFactory(self) - def build_filtering(self): + @cache_in_self + def get_filtering(self) -> Filtering: return Filtering(self) - def build_pusherpool(self): + @cache_in_self + def get_pusherpool(self) -> PusherPool: return PusherPool(self) - def build_http_client(self): + @cache_in_self + def get_http_client(self) -> MatrixFederationHttpClient: tls_client_options_factory = context_factory.FederationPolicyForHTTPS( self.config ) return MatrixFederationHttpClient(self, tls_client_options_factory) - def build_media_repository_resource(self): + @cache_in_self + def get_media_repository_resource(self) -> MediaRepositoryResource: # build the media repo resource. This indirects through the HomeServer # to ensure that we only have a single instance of return MediaRepositoryResource(self) - def build_media_repository(self): + @cache_in_self + def get_media_repository(self) -> MediaRepository: return MediaRepository(self) - def build_federation_transport_client(self): + @cache_in_self + def get_federation_transport_client(self) -> TransportLayerClient: return TransportLayerClient(self) - def build_federation_sender(self): + @cache_in_self + def get_federation_sender(self): if self.should_send_federation(): return FederationSender(self) elif not self.config.worker_app: @@ -485,156 +498,150 @@ class HomeServer(object): else: raise Exception("Workers cannot send federation traffic") - def build_receipts_handler(self): + @cache_in_self + def get_receipts_handler(self) -> ReceiptsHandler: return ReceiptsHandler(self) - def build_read_marker_handler(self): + @cache_in_self + def get_read_marker_handler(self) -> ReadMarkerHandler: return ReadMarkerHandler(self) - def build_tcp_replication(self): + @cache_in_self + def get_tcp_replication(self) -> ReplicationCommandHandler: return ReplicationCommandHandler(self) - def build_action_generator(self): + @cache_in_self + def get_action_generator(self) -> ActionGenerator: return ActionGenerator(self) - def build_user_directory_handler(self): + @cache_in_self + def get_user_directory_handler(self) -> UserDirectoryHandler: return UserDirectoryHandler(self) - def build_groups_local_handler(self): + @cache_in_self + def get_groups_local_handler(self): if self.config.worker_app: return GroupsLocalWorkerHandler(self) else: return GroupsLocalHandler(self) - def build_groups_server_handler(self): + @cache_in_self + def get_groups_server_handler(self): if self.config.worker_app: return GroupsServerWorkerHandler(self) else: return GroupsServerHandler(self) - def build_groups_attestation_signing(self): + @cache_in_self + def get_groups_attestation_signing(self) -> GroupAttestationSigning: return GroupAttestationSigning(self) - def build_groups_attestation_renewer(self): + @cache_in_self + def get_groups_attestation_renewer(self) -> GroupAttestionRenewer: return GroupAttestionRenewer(self) - def build_secrets(self): + @cache_in_self + def get_secrets(self) -> Secrets: return Secrets() - def build_stats_handler(self): + @cache_in_self + def get_stats_handler(self) -> StatsHandler: return StatsHandler(self) - def build_spam_checker(self): + @cache_in_self + def get_spam_checker(self): return SpamChecker(self) - def build_third_party_event_rules(self): + @cache_in_self + def get_third_party_event_rules(self) -> ThirdPartyEventRules: return ThirdPartyEventRules(self) - def build_room_member_handler(self): + @cache_in_self + def get_room_member_handler(self): if self.config.worker_app: return RoomMemberWorkerHandler(self) return RoomMemberMasterHandler(self) - def build_federation_registry(self): + @cache_in_self + def get_federation_registry(self) -> FederationHandlerRegistry: return FederationHandlerRegistry(self) - def build_server_notices_manager(self): + @cache_in_self + def get_server_notices_manager(self): if self.config.worker_app: raise Exception("Workers cannot send server notices") return ServerNoticesManager(self) - def build_server_notices_sender(self): + @cache_in_self + def get_server_notices_sender(self): if self.config.worker_app: return WorkerServerNoticesSender(self) return ServerNoticesSender(self) - def build_message_handler(self): + @cache_in_self + def get_message_handler(self) -> MessageHandler: return MessageHandler(self) - def build_pagination_handler(self): + @cache_in_self + def get_pagination_handler(self) -> PaginationHandler: return PaginationHandler(self) - def build_room_context_handler(self): + @cache_in_self + def get_room_context_handler(self) -> RoomContextHandler: return RoomContextHandler(self) - def build_registration_handler(self): + @cache_in_self + def get_registration_handler(self) -> RegistrationHandler: return RegistrationHandler(self) - def build_account_validity_handler(self): + @cache_in_self + def get_account_validity_handler(self) -> AccountValidityHandler: return AccountValidityHandler(self) - def build_cas_handler(self): + @cache_in_self + def get_cas_handler(self) -> CasHandler: return CasHandler(self) - def build_saml_handler(self): - from synapse.handlers.saml_handler import SamlHandler + @cache_in_self + def get_saml_handler(self) -> SamlHandler: return SamlHandler(self) - def build_oidc_handler(self): - from synapse.handlers.oidc_handler import OidcHandler + @cache_in_self + def get_oidc_handler(self) -> OidcHandler: return OidcHandler(self) - def build_event_client_serializer(self): + @cache_in_self + def get_event_client_serializer(self) -> EventClientSerializer: return EventClientSerializer(self) - def build_password_policy_handler(self): + @cache_in_self + def get_password_policy_handler(self) -> PasswordPolicyHandler: return PasswordPolicyHandler(self) - def build_storage(self) -> Storage: - return Storage(self, self.datastores) + @cache_in_self + def get_storage(self) -> Storage: + return Storage(self, self.get_datastores()) - def build_replication_streamer(self) -> ReplicationStreamer: + @cache_in_self + def get_replication_streamer(self) -> ReplicationStreamer: return ReplicationStreamer(self) - def build_replication_data_handler(self): + @cache_in_self + def get_replication_data_handler(self) -> ReplicationDataHandler: return ReplicationDataHandler(self) - def build_replication_streams(self): + @cache_in_self + def get_replication_streams(self) -> Dict[str, Stream]: return {stream.NAME: stream(self) for stream in STREAMS_MAP.values()} - def remove_pusher(self, app_id, push_key, user_id): - return self.get_pusherpool().remove_pusher(app_id, push_key, user_id) + async def remove_pusher(self, app_id: str, push_key: str, user_id: str): + return await self.get_pusherpool().remove_pusher(app_id, push_key, user_id) - def should_send_federation(self): + def should_send_federation(self) -> bool: "Should this server be sending federation traffic directly?" return self.config.send_federation and ( not self.config.worker_app or self.config.worker_app == "synapse.app.federation_sender" ) - - -def _make_dependency_method(depname): - def _get(hs): - try: - return getattr(hs, depname) - except AttributeError: - pass - - try: - builder = getattr(hs, "build_%s" % (depname)) - except AttributeError: - raise NotImplementedError( - "%s has no %s nor a builder for it" % (type(hs).__name__, depname) - ) - - # Prevent cyclic dependencies from deadlocking - if depname in hs._building: - raise ValueError("Cyclic dependency while building %s" % (depname,)) - - hs._building[depname] = 1 - try: - dep = builder() - setattr(hs, depname, dep) - finally: - del hs._building[depname] - - return dep - - setattr(HomeServer, "get_%s" % (depname), _get) - - -# Build magic accessors for every dependency -for depname in HomeServer.DEPENDENCIES: - _make_dependency_method(depname) diff --git a/synapse/server.pyi b/synapse/server.pyi deleted file mode 100644 index 1aba408c21..0000000000 --- a/synapse/server.pyi +++ /dev/null @@ -1,155 +0,0 @@ -from typing import Dict - -import twisted.internet - -import synapse.api.auth -import synapse.config.homeserver -import synapse.crypto.keyring -import synapse.federation.federation_server -import synapse.federation.sender -import synapse.federation.transport.client -import synapse.handlers -import synapse.handlers.auth -import synapse.handlers.deactivate_account -import synapse.handlers.device -import synapse.handlers.e2e_keys -import synapse.handlers.message -import synapse.handlers.presence -import synapse.handlers.register -import synapse.handlers.room -import synapse.handlers.room_member -import synapse.handlers.set_password -import synapse.http.client -import synapse.http.matrixfederationclient -import synapse.notifier -import synapse.push.pusherpool -import synapse.replication.tcp.client -import synapse.replication.tcp.handler -import synapse.rest.media.v1.media_repository -import synapse.server_notices.server_notices_manager -import synapse.server_notices.server_notices_sender -import synapse.state -import synapse.storage -from synapse.events.builder import EventBuilderFactory -from synapse.handlers.typing import FollowerTypingHandler -from synapse.replication.tcp.streams import Stream - -class HomeServer(object): - @property - def config(self) -> synapse.config.homeserver.HomeServerConfig: - pass - @property - def hostname(self) -> str: - pass - def get_auth(self) -> synapse.api.auth.Auth: - pass - def get_auth_handler(self) -> synapse.handlers.auth.AuthHandler: - pass - def get_datastore(self) -> synapse.storage.DataStore: - pass - def get_device_handler(self) -> synapse.handlers.device.DeviceHandler: - pass - def get_e2e_keys_handler(self) -> synapse.handlers.e2e_keys.E2eKeysHandler: - pass - def get_handlers(self) -> synapse.handlers.Handlers: - pass - def get_state_handler(self) -> synapse.state.StateHandler: - pass - def get_state_resolution_handler(self) -> synapse.state.StateResolutionHandler: - pass - def get_simple_http_client(self) -> synapse.http.client.SimpleHttpClient: - """Fetch an HTTP client implementation which doesn't do any blacklisting - or support any HTTP_PROXY settings""" - pass - def get_proxied_http_client(self) -> synapse.http.client.SimpleHttpClient: - """Fetch an HTTP client implementation which doesn't do any blacklisting - but does support HTTP_PROXY settings""" - pass - def get_deactivate_account_handler( - self, - ) -> synapse.handlers.deactivate_account.DeactivateAccountHandler: - pass - def get_room_creation_handler(self) -> synapse.handlers.room.RoomCreationHandler: - pass - def get_room_member_handler(self) -> synapse.handlers.room_member.RoomMemberHandler: - pass - def get_room_shutdown_handler(self) -> synapse.handlers.room.RoomShutdownHandler: - pass - def get_event_creation_handler( - self, - ) -> synapse.handlers.message.EventCreationHandler: - pass - def get_set_password_handler( - self, - ) -> synapse.handlers.set_password.SetPasswordHandler: - pass - def get_federation_sender(self) -> synapse.federation.sender.FederationSender: - pass - def get_federation_transport_client( - self, - ) -> synapse.federation.transport.client.TransportLayerClient: - pass - def get_media_repository_resource( - self, - ) -> synapse.rest.media.v1.media_repository.MediaRepositoryResource: - pass - def get_media_repository( - self, - ) -> synapse.rest.media.v1.media_repository.MediaRepository: - pass - def get_server_notices_manager( - self, - ) -> synapse.server_notices.server_notices_manager.ServerNoticesManager: - pass - def get_server_notices_sender( - self, - ) -> synapse.server_notices.server_notices_sender.ServerNoticesSender: - pass - def get_notifier(self) -> synapse.notifier.Notifier: - pass - def get_presence_handler(self) -> synapse.handlers.presence.BasePresenceHandler: - pass - def get_clock(self) -> synapse.util.Clock: - pass - def get_reactor(self) -> twisted.internet.base.ReactorBase: - pass - def get_keyring(self) -> synapse.crypto.keyring.Keyring: - pass - def get_tcp_replication( - self, - ) -> synapse.replication.tcp.handler.ReplicationCommandHandler: - pass - def get_replication_data_handler( - self, - ) -> synapse.replication.tcp.client.ReplicationDataHandler: - pass - def get_federation_registry( - self, - ) -> synapse.federation.federation_server.FederationHandlerRegistry: - pass - def is_mine_id(self, domain_id: str) -> bool: - pass - def get_instance_id(self) -> str: - pass - def get_instance_name(self) -> str: - pass - def get_event_builder_factory(self) -> EventBuilderFactory: - pass - def get_storage(self) -> synapse.storage.Storage: - pass - def get_registration_handler(self) -> synapse.handlers.register.RegistrationHandler: - pass - def get_macaroon_generator(self) -> synapse.handlers.auth.MacaroonGenerator: - pass - def get_pusherpool(self) -> synapse.push.pusherpool.PusherPool: - pass - def get_replication_streams(self) -> Dict[str, Stream]: - pass - def get_http_client( - self, - ) -> synapse.http.matrixfederationclient.MatrixFederationHttpClient: - pass - def should_send_federation(self) -> bool: - pass - def get_typing_handler(self) -> FollowerTypingHandler: - pass diff --git a/synapse/storage/databases/__init__.py b/synapse/storage/databases/__init__.py index b163eebf39..4406e58273 100644 --- a/synapse/storage/databases/__init__.py +++ b/synapse/storage/databases/__init__.py @@ -38,9 +38,9 @@ class Databases(object): # store. self.databases = [] - self.main = None - self.state = None - self.persist_events = None + main = None + state = None + persist_events = None for database_config in hs.config.database.databases: db_name = database_config.name @@ -61,27 +61,25 @@ class Databases(object): # Sanity check we don't try and configure the main store on # multiple databases. - if self.main: + if main: raise Exception("'main' data store already configured") - self.main = main_store_class(database, db_conn, hs) + main = main_store_class(database, db_conn, hs) # If we're on a process that can persist events also # instantiate a `PersistEventsStore` if hs.config.worker.writers.events == hs.get_instance_name(): - self.persist_events = PersistEventsStore( - hs, database, self.main - ) + persist_events = PersistEventsStore(hs, database, main) if "state" in database_config.databases: logger.info("Starting 'state' data store") # Sanity check we don't try and configure the state store on # multiple databases. - if self.state: + if state: raise Exception("'state' data store already configured") - self.state = StateGroupDataStore(database, db_conn, hs) + state = StateGroupDataStore(database, db_conn, hs) db_conn.commit() @@ -90,8 +88,14 @@ class Databases(object): logger.info("Database %r prepared", db_name) # Sanity check that we have actually configured all the required stores. - if not self.main: + if not main: raise Exception("No 'main' data store configured") - if not self.state: + if not state: raise Exception("No 'main' data store configured") + + # We use local variables here to ensure that the databases do not have + # optional types. + self.main = main + self.state = state + self.persist_events = persist_events diff --git a/tox.ini b/tox.ini index 9a052c1e33..7971a7f9a8 100644 --- a/tox.ini +++ b/tox.ini @@ -202,6 +202,7 @@ commands = mypy \ synapse/push/push_rule_evaluator.py \ synapse/replication \ synapse/rest \ + synapse/server.py \ synapse/server_notices \ synapse/spam_checker_api \ synapse/storage/databases/main/ui_auth.py \ -- cgit 1.5.1