From 54fef094b31e0401d6d35efdf7d5d6b0b9e5d51f Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Thu, 31 Oct 2019 10:23:24 +0000 Subject: Remove usage of deprecated logger.warn method from codebase (#6271) Replace every instance of `logger.warn` with `logger.warning` as the former is deprecated. --- synapse/app/pusher.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/app/pusher.py') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index d84732ee3c..01a5ffc363 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -114,7 +114,7 @@ class PusherServer(HomeServer): ) elif listener["type"] == "metrics": if not self.get_config().enable_metrics: - logger.warn( + logger.warning( ( "Metrics listener configured, but " "enable_metrics is not True!" @@ -123,7 +123,7 @@ class PusherServer(HomeServer): else: _base.listen_metrics(listener["bind_addresses"], listener["port"]) else: - logger.warn("Unrecognized listener type: %s", listener["type"]) + logger.warning("Unrecognized listener type: %s", listener["type"]) self.get_tcp_replication().start_replication(self) -- cgit 1.4.1 From 2ac78438d8c7992c9c1ab347830c5498205f7008 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 10 Dec 2019 12:31:03 +0000 Subject: Make the PusherSlaveStore inherit from the slave RoomStore So that it has access to the get_retention_policy_for_room function which is required by filter_events_for_client. --- synapse/app/pusher.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) (limited to 'synapse/app/pusher.py') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 01a5ffc363..dd52a9fc2d 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -33,6 +33,7 @@ from synapse.replication.slave.storage.account_data import SlavedAccountDataStor from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.pushers import SlavedPusherStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore +from synapse.replication.slave.storage.room import RoomStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.server import HomeServer from synapse.storage import DataStore @@ -45,7 +46,11 @@ logger = logging.getLogger("synapse.app.pusher") class PusherSlaveStore( - SlavedEventStore, SlavedPusherStore, SlavedReceiptsStore, SlavedAccountDataStore + SlavedEventStore, + SlavedPusherStore, + SlavedReceiptsStore, + SlavedAccountDataStore, + RoomStore, ): update_pusher_last_stream_ordering_and_success = __func__( DataStore.update_pusher_last_stream_ordering_and_success -- cgit 1.4.1 From bc5cb8bfe8c5b0b551de9a97912cd00caeaf6b48 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 6 Dec 2019 17:42:18 +0000 Subject: Remove database config parsing from apps. --- synapse/app/admin_cmd.py | 5 ----- synapse/app/appservice.py | 5 ----- synapse/app/client_reader.py | 5 ----- synapse/app/event_creator.py | 5 ----- synapse/app/federation_reader.py | 5 ----- synapse/app/federation_sender.py | 5 ----- synapse/app/frontend_proxy.py | 5 ----- synapse/app/homeserver.py | 7 +------ synapse/app/media_repository.py | 5 ----- synapse/app/pusher.py | 5 ----- synapse/app/synchrotron.py | 5 ----- synapse/app/user_dir.py | 5 ----- synapse/server.py | 10 +++++++++- tests/test_types.py | 19 ++++++++----------- tests/utils.py | 2 -- 15 files changed, 18 insertions(+), 75 deletions(-) (limited to 'synapse/app/pusher.py') diff --git a/synapse/app/admin_cmd.py b/synapse/app/admin_cmd.py index 04751a6a5e..51a909419f 100644 --- a/synapse/app/admin_cmd.py +++ b/synapse/app/admin_cmd.py @@ -45,7 +45,6 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto from synapse.replication.slave.storage.room import RoomStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.server import HomeServer -from synapse.storage.engines import create_engine from synapse.util.logcontext import LoggingContext from synapse.util.versionstring import get_version_string @@ -229,14 +228,10 @@ def start(config_options): synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts - database_engine = create_engine(config.database_config) - ss = AdminCmdServer( config.server_name, - db_config=config.database_config, config=config, version_string="Synapse/" + get_version_string(synapse), - database_engine=database_engine, ) setup_logging(ss, config, use_worker_options=True) diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py index 02b900f382..e82e0f11e3 100644 --- a/synapse/app/appservice.py +++ b/synapse/app/appservice.py @@ -34,7 +34,6 @@ from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.server import HomeServer -from synapse.storage.engines import create_engine from synapse.util.httpresourcetree import create_resource_tree from synapse.util.manhole import manhole from synapse.util.versionstring import get_version_string @@ -143,8 +142,6 @@ def start(config_options): events.USE_FROZEN_DICTS = config.use_frozen_dicts - database_engine = create_engine(config.database_config) - if config.notify_appservices: sys.stderr.write( "\nThe appservices must be disabled in the main synapse process" @@ -159,10 +156,8 @@ def start(config_options): ps = AppserviceServer( config.server_name, - db_config=config.database_config, config=config, version_string="Synapse/" + get_version_string(synapse), - database_engine=database_engine, ) setup_logging(ps, config, use_worker_options=True) diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py index dadb487d5f..3edfe19567 100644 --- a/synapse/app/client_reader.py +++ b/synapse/app/client_reader.py @@ -62,7 +62,6 @@ from synapse.rest.client.v2_alpha.keys import KeyChangesServlet, KeyQueryServlet from synapse.rest.client.v2_alpha.register import RegisterRestServlet from synapse.rest.client.versions import VersionsRestServlet from synapse.server import HomeServer -from synapse.storage.engines import create_engine from synapse.util.httpresourcetree import create_resource_tree from synapse.util.manhole import manhole from synapse.util.versionstring import get_version_string @@ -181,14 +180,10 @@ def start(config_options): events.USE_FROZEN_DICTS = config.use_frozen_dicts - database_engine = create_engine(config.database_config) - ss = ClientReaderServer( config.server_name, - db_config=config.database_config, config=config, version_string="Synapse/" + get_version_string(synapse), - database_engine=database_engine, ) setup_logging(ss, config, use_worker_options=True) diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py index d110599a35..d0ddbe38fc 100644 --- a/synapse/app/event_creator.py +++ b/synapse/app/event_creator.py @@ -57,7 +57,6 @@ from synapse.rest.client.v1.room import ( ) from synapse.server import HomeServer from synapse.storage.data_stores.main.user_directory import UserDirectoryStore -from synapse.storage.engines import create_engine from synapse.util.httpresourcetree import create_resource_tree from synapse.util.manhole import manhole from synapse.util.versionstring import get_version_string @@ -180,14 +179,10 @@ def start(config_options): events.USE_FROZEN_DICTS = config.use_frozen_dicts - database_engine = create_engine(config.database_config) - ss = EventCreatorServer( config.server_name, - db_config=config.database_config, config=config, version_string="Synapse/" + get_version_string(synapse), - database_engine=database_engine, ) setup_logging(ss, config, use_worker_options=True) diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index 418c086254..311523e0ed 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -46,7 +46,6 @@ from synapse.replication.slave.storage.transactions import SlavedTransactionStor from synapse.replication.tcp.client import ReplicationClientHandler from synapse.rest.key.v2 import KeyApiV2Resource from synapse.server import HomeServer -from synapse.storage.engines import create_engine from synapse.util.httpresourcetree import create_resource_tree from synapse.util.manhole import manhole from synapse.util.versionstring import get_version_string @@ -162,14 +161,10 @@ def start(config_options): events.USE_FROZEN_DICTS = config.use_frozen_dicts - database_engine = create_engine(config.database_config) - ss = FederationReaderServer( config.server_name, - db_config=config.database_config, config=config, version_string="Synapse/" + get_version_string(synapse), - database_engine=database_engine, ) setup_logging(ss, config, use_worker_options=True) diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index f24920a7d6..83c436229c 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -41,7 +41,6 @@ from synapse.replication.tcp.client import ReplicationClientHandler from synapse.replication.tcp.streams._base import ReceiptsStream from synapse.server import HomeServer from synapse.storage.database import Database -from synapse.storage.engines import create_engine from synapse.types import ReadReceipt from synapse.util.async_helpers import Linearizer from synapse.util.httpresourcetree import create_resource_tree @@ -174,8 +173,6 @@ def start(config_options): events.USE_FROZEN_DICTS = config.use_frozen_dicts - database_engine = create_engine(config.database_config) - if config.send_federation: sys.stderr.write( "\nThe send_federation must be disabled in the main synapse process" @@ -190,10 +187,8 @@ def start(config_options): ss = FederationSenderServer( config.server_name, - db_config=config.database_config, config=config, version_string="Synapse/" + get_version_string(synapse), - database_engine=database_engine, ) setup_logging(ss, config, use_worker_options=True) diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py index e647459d0e..30e435eead 100644 --- a/synapse/app/frontend_proxy.py +++ b/synapse/app/frontend_proxy.py @@ -39,7 +39,6 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto from synapse.replication.tcp.client import ReplicationClientHandler from synapse.rest.client.v2_alpha._base import client_patterns from synapse.server import HomeServer -from synapse.storage.engines import create_engine from synapse.util.httpresourcetree import create_resource_tree from synapse.util.manhole import manhole from synapse.util.versionstring import get_version_string @@ -234,14 +233,10 @@ def start(config_options): events.USE_FROZEN_DICTS = config.use_frozen_dicts - database_engine = create_engine(config.database_config) - ss = FrontendProxyServer( config.server_name, - db_config=config.database_config, config=config, version_string="Synapse/" + get_version_string(synapse), - database_engine=database_engine, ) setup_logging(ss, config, use_worker_options=True) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index df65d0a989..79341784c5 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -69,7 +69,7 @@ from synapse.rest.media.v0.content_repository import ContentRepoResource from synapse.rest.well_known import WellKnownResource from synapse.server import HomeServer from synapse.storage import DataStore -from synapse.storage.engines import IncorrectDatabaseSetup, create_engine +from synapse.storage.engines import IncorrectDatabaseSetup from synapse.storage.prepare_database import UpgradeDatabaseException from synapse.util.caches import CACHE_SIZE_FACTOR from synapse.util.httpresourcetree import create_resource_tree @@ -328,15 +328,10 @@ def setup(config_options): events.USE_FROZEN_DICTS = config.use_frozen_dicts - database_engine = create_engine(config.database_config) - config.database_config["args"]["cp_openfun"] = database_engine.on_new_connection - hs = SynapseHomeServer( config.server_name, - db_config=config.database_config, config=config, version_string="Synapse/" + get_version_string(synapse), - database_engine=database_engine, ) synapse.config.logger.setup_logging(hs, config, use_worker_options=False) diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py index 2c6dd3ef02..4c80f257e2 100644 --- a/synapse/app/media_repository.py +++ b/synapse/app/media_repository.py @@ -40,7 +40,6 @@ from synapse.rest.admin import register_servlets_for_media_repo from synapse.rest.media.v0.content_repository import ContentRepoResource from synapse.server import HomeServer from synapse.storage.data_stores.main.media_repository import MediaRepositoryStore -from synapse.storage.engines import create_engine from synapse.util.httpresourcetree import create_resource_tree from synapse.util.manhole import manhole from synapse.util.versionstring import get_version_string @@ -157,14 +156,10 @@ def start(config_options): events.USE_FROZEN_DICTS = config.use_frozen_dicts - database_engine = create_engine(config.database_config) - ss = MediaRepositoryServer( config.server_name, - db_config=config.database_config, config=config, version_string="Synapse/" + get_version_string(synapse), - database_engine=database_engine, ) setup_logging(ss, config, use_worker_options=True) diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 01a5ffc363..e157cbf64b 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -36,7 +36,6 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.server import HomeServer from synapse.storage import DataStore -from synapse.storage.engines import create_engine from synapse.util.httpresourcetree import create_resource_tree from synapse.util.manhole import manhole from synapse.util.versionstring import get_version_string @@ -198,14 +197,10 @@ def start(config_options): # Force the pushers to start since they will be disabled in the main config config.start_pushers = True - database_engine = create_engine(config.database_config) - ps = PusherServer( config.server_name, - db_config=config.database_config, config=config, version_string="Synapse/" + get_version_string(synapse), - database_engine=database_engine, ) setup_logging(ps, config, use_worker_options=True) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 288ee64b42..dd2132e608 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -55,7 +55,6 @@ from synapse.rest.client.v1.room import RoomInitialSyncRestServlet from synapse.rest.client.v2_alpha import sync from synapse.server import HomeServer from synapse.storage.data_stores.main.presence import UserPresenceState -from synapse.storage.engines import create_engine from synapse.util.httpresourcetree import create_resource_tree from synapse.util.manhole import manhole from synapse.util.stringutils import random_string @@ -437,14 +436,10 @@ def start(config_options): synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts - database_engine = create_engine(config.database_config) - ss = SynchrotronServer( config.server_name, - db_config=config.database_config, config=config, version_string="Synapse/" + get_version_string(synapse), - database_engine=database_engine, application_service_handler=SynchrotronApplicationService(), ) diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py index c01fb34a9b..1257098f92 100644 --- a/synapse/app/user_dir.py +++ b/synapse/app/user_dir.py @@ -44,7 +44,6 @@ from synapse.rest.client.v2_alpha import user_directory from synapse.server import HomeServer from synapse.storage.data_stores.main.user_directory import UserDirectoryStore from synapse.storage.database import Database -from synapse.storage.engines import create_engine from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.httpresourcetree import create_resource_tree from synapse.util.manhole import manhole @@ -200,8 +199,6 @@ def start(config_options): events.USE_FROZEN_DICTS = config.use_frozen_dicts - database_engine = create_engine(config.database_config) - if config.update_user_directory: sys.stderr.write( "\nThe update_user_directory must be disabled in the main synapse process" @@ -216,10 +213,8 @@ def start(config_options): ss = UserDirectoryServer( config.server_name, - db_config=config.database_config, config=config, version_string="Synapse/" + get_version_string(synapse), - database_engine=database_engine, ) setup_logging(ss, config, use_worker_options=True) diff --git a/synapse/server.py b/synapse/server.py index 2db3dab221..a8b5459ff3 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -97,6 +97,7 @@ from synapse.server_notices.worker_server_notices_sender import ( ) from synapse.state import StateHandler, StateResolutionHandler from synapse.storage import DataStores, Storage +from synapse.storage.engines import create_engine from synapse.streams.events import EventSources from synapse.util import Clock from synapse.util.distributor import Distributor @@ -209,7 +210,7 @@ class HomeServer(object): # instantiated during setup() for future return by get_datastore() DATASTORE_CLASS = abc.abstractproperty() - def __init__(self, hostname, reactor=None, **kwargs): + def __init__(self, hostname, config, reactor=None, **kwargs): """ Args: hostname : The hostname for the server. @@ -219,6 +220,7 @@ class HomeServer(object): self._reactor = reactor self.hostname = hostname + self.config = config self._building = {} self._listening_services = [] self.start_time = None @@ -229,6 +231,12 @@ class HomeServer(object): self.admin_redaction_ratelimiter = Ratelimiter() self.registration_ratelimiter = Ratelimiter() + self.database_engine = create_engine(config.database_config) + config.database_config.setdefault("args", {})[ + "cp_openfun" + ] = self.database_engine.on_new_connection + self.db_config = config.database_config + self.datastores = None # Other kwargs are explicit dependencies diff --git a/tests/test_types.py b/tests/test_types.py index 9ab5f829b0..8d97c751ea 100644 --- a/tests/test_types.py +++ b/tests/test_types.py @@ -17,18 +17,15 @@ from synapse.api.errors import SynapseError from synapse.types import GroupID, RoomAlias, UserID, map_username_to_mxid_localpart from tests import unittest -from tests.utils import TestHomeServer -mock_homeserver = TestHomeServer(hostname="my.domain") - -class UserIDTestCase(unittest.TestCase): +class UserIDTestCase(unittest.HomeserverTestCase): def test_parse(self): - user = UserID.from_string("@1234abcd:my.domain") + user = UserID.from_string("@1234abcd:test") self.assertEquals("1234abcd", user.localpart) - self.assertEquals("my.domain", user.domain) - self.assertEquals(True, mock_homeserver.is_mine(user)) + self.assertEquals("test", user.domain) + self.assertEquals(True, self.hs.is_mine(user)) def test_pase_empty(self): with self.assertRaises(SynapseError): @@ -48,13 +45,13 @@ class UserIDTestCase(unittest.TestCase): self.assertTrue(userA != userB) -class RoomAliasTestCase(unittest.TestCase): +class RoomAliasTestCase(unittest.HomeserverTestCase): def test_parse(self): - room = RoomAlias.from_string("#channel:my.domain") + room = RoomAlias.from_string("#channel:test") self.assertEquals("channel", room.localpart) - self.assertEquals("my.domain", room.domain) - self.assertEquals(True, mock_homeserver.is_mine(room)) + self.assertEquals("test", room.domain) + self.assertEquals(True, self.hs.is_mine(room)) def test_build(self): room = RoomAlias("channel", "my.domain") diff --git a/tests/utils.py b/tests/utils.py index c57da59191..585f305b9a 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -260,9 +260,7 @@ def setup_test_homeserver( hs = homeserverToUse( name, config=config, - db_config=config.database_config, version_string="Synapse/tests", - database_engine=db_engine, tls_server_context_factory=Mock(), tls_client_options_factory=Mock(), reactor=reactor, -- cgit 1.4.1 From 48c3a96886de64f3141ad68b8163cd2fc0c197ff Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 16 Jan 2020 09:16:12 +0000 Subject: Port synapse.replication.tcp to async/await (#6666) * Port synapse.replication.tcp to async/await * Newsfile * Correctly document type of on_ functions as async * Don't be overenthusiastic with the asyncing.... --- changelog.d/6666.misc | 1 + synapse/app/admin_cmd.py | 3 +- synapse/app/appservice.py | 5 +-- synapse/app/federation_sender.py | 5 +-- synapse/app/pusher.py | 5 +-- synapse/app/synchrotron.py | 5 +-- synapse/app/user_dir.py | 5 +-- synapse/federation/send_queue.py | 4 +- synapse/handlers/typing.py | 2 +- synapse/replication/tcp/client.py | 11 ++--- synapse/replication/tcp/protocol.py | 72 ++++++++++++++----------------- synapse/replication/tcp/resource.py | 31 ++++++------- synapse/replication/tcp/streams/_base.py | 25 +++++------ synapse/replication/tcp/streams/events.py | 9 ++-- tests/replication/tcp/streams/_base.py | 2 +- 15 files changed, 80 insertions(+), 105 deletions(-) create mode 100644 changelog.d/6666.misc (limited to 'synapse/app/pusher.py') diff --git a/changelog.d/6666.misc b/changelog.d/6666.misc new file mode 100644 index 0000000000..e79c23d2d2 --- /dev/null +++ b/changelog.d/6666.misc @@ -0,0 +1 @@ +Port `synapse.replication.tcp` to async/await. diff --git a/synapse/app/admin_cmd.py b/synapse/app/admin_cmd.py index 8e36bc57d3..1c7c6ec0c8 100644 --- a/synapse/app/admin_cmd.py +++ b/synapse/app/admin_cmd.py @@ -84,8 +84,7 @@ class AdminCmdServer(HomeServer): class AdminCmdReplicationHandler(ReplicationClientHandler): - @defer.inlineCallbacks - def on_rdata(self, stream_name, token, rows): + async def on_rdata(self, stream_name, token, rows): pass def get_streams_to_replicate(self): diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py index e82e0f11e3..2217d4a4fb 100644 --- a/synapse/app/appservice.py +++ b/synapse/app/appservice.py @@ -115,9 +115,8 @@ class ASReplicationHandler(ReplicationClientHandler): super(ASReplicationHandler, self).__init__(hs.get_datastore()) self.appservice_handler = hs.get_application_service_handler() - @defer.inlineCallbacks - def on_rdata(self, stream_name, token, rows): - yield super(ASReplicationHandler, self).on_rdata(stream_name, token, rows) + async def on_rdata(self, stream_name, token, rows): + await super(ASReplicationHandler, self).on_rdata(stream_name, token, rows) if stream_name == "events": max_stream_id = self.store.get_room_max_stream_ordering() diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 83c436229c..a57cf991ac 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -145,9 +145,8 @@ class FederationSenderReplicationHandler(ReplicationClientHandler): super(FederationSenderReplicationHandler, self).__init__(hs.get_datastore()) self.send_handler = FederationSenderHandler(hs, self) - @defer.inlineCallbacks - def on_rdata(self, stream_name, token, rows): - yield super(FederationSenderReplicationHandler, self).on_rdata( + async def on_rdata(self, stream_name, token, rows): + await super(FederationSenderReplicationHandler, self).on_rdata( stream_name, token, rows ) self.send_handler.process_replication_rows(stream_name, token, rows) diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 09e639040a..e46b6ac598 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -141,9 +141,8 @@ class PusherReplicationHandler(ReplicationClientHandler): self.pusher_pool = hs.get_pusherpool() - @defer.inlineCallbacks - def on_rdata(self, stream_name, token, rows): - yield super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows) + async def on_rdata(self, stream_name, token, rows): + await super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows) run_in_background(self.poke_pushers, stream_name, token, rows) @defer.inlineCallbacks diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 03031ee34d..3218da07bd 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -358,9 +358,8 @@ class SyncReplicationHandler(ReplicationClientHandler): self.presence_handler = hs.get_presence_handler() self.notifier = hs.get_notifier() - @defer.inlineCallbacks - def on_rdata(self, stream_name, token, rows): - yield super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows) + async def on_rdata(self, stream_name, token, rows): + await super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows) run_in_background(self.process_and_notify, stream_name, token, rows) def get_streams_to_replicate(self): diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py index 1257098f92..ba536d6f04 100644 --- a/synapse/app/user_dir.py +++ b/synapse/app/user_dir.py @@ -172,9 +172,8 @@ class UserDirectoryReplicationHandler(ReplicationClientHandler): super(UserDirectoryReplicationHandler, self).__init__(hs.get_datastore()) self.user_directory = hs.get_user_directory_handler() - @defer.inlineCallbacks - def on_rdata(self, stream_name, token, rows): - yield super(UserDirectoryReplicationHandler, self).on_rdata( + async def on_rdata(self, stream_name, token, rows): + await super(UserDirectoryReplicationHandler, self).on_rdata( stream_name, token, rows ) if stream_name == EventsStream.NAME: diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index ced4925a98..174f6e42be 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -259,7 +259,9 @@ class FederationRemoteSendQueue(object): def federation_ack(self, token): self._clear_queue_before_pos(token) - def get_replication_rows(self, from_token, to_token, limit, federation_ack=None): + async def get_replication_rows( + self, from_token, to_token, limit, federation_ack=None + ): """Get rows to be sent over federation between the two tokens Args: diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index b635c339ed..d5ca9cb07b 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -257,7 +257,7 @@ class TypingHandler(object): "typing_key", self._latest_room_serial, rooms=[member.room_id] ) - def get_all_typing_updates(self, last_id, current_id): + async def get_all_typing_updates(self, last_id, current_id): if last_id == current_id: return [] diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index aa7fd90e26..52a0aefe68 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -110,7 +110,7 @@ class ReplicationClientHandler(AbstractReplicationClientHandler): port = hs.config.worker_replication_port hs.get_reactor().connectTCP(host, port, self.factory) - def on_rdata(self, stream_name, token, rows): + async def on_rdata(self, stream_name, token, rows): """Called to handle a batch of replication data with a given stream token. By default this just pokes the slave store. Can be overridden in subclasses to @@ -121,20 +121,17 @@ class ReplicationClientHandler(AbstractReplicationClientHandler): token (int): stream token for this batch of rows rows (list): a list of Stream.ROW_TYPE objects as returned by Stream.parse_row. - - Returns: - Deferred|None """ logger.debug("Received rdata %s -> %s", stream_name, token) - return self.store.process_replication_rows(stream_name, token, rows) + self.store.process_replication_rows(stream_name, token, rows) - def on_position(self, stream_name, token): + async def on_position(self, stream_name, token): """Called when we get new position data. By default this just pokes the slave store. Can be overriden in subclasses to handle more. """ - return self.store.process_replication_rows(stream_name, token, []) + self.store.process_replication_rows(stream_name, token, []) def on_sync(self, data): """When we received a SYNC we wake up any deferreds that were waiting diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index db0353c996..5f4bdf84d2 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -81,12 +81,11 @@ from synapse.replication.tcp.commands import ( SyncCommand, UserSyncCommand, ) +from synapse.replication.tcp.streams import STREAMS_MAP from synapse.types import Collection from synapse.util import Clock from synapse.util.stringutils import random_string -from .streams import STREAMS_MAP - connection_close_counter = Counter( "synapse_replication_tcp_protocol_close_reason", "", ["reason_type"] ) @@ -241,19 +240,16 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): "replication-" + cmd.get_logcontext_id(), self.handle_command, cmd ) - def handle_command(self, cmd): + async def handle_command(self, cmd: Command): """Handle a command we have received over the replication stream. - By default delegates to on_ + By default delegates to on_, which should return an awaitable. Args: - cmd (synapse.replication.tcp.commands.Command): received command - - Returns: - Deferred + cmd: received command """ handler = getattr(self, "on_%s" % (cmd.NAME,)) - return handler(cmd) + await handler(cmd) def close(self): logger.warning("[%s] Closing connection", self.id()) @@ -326,10 +322,10 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): for cmd in pending: self.send_command(cmd) - def on_PING(self, line): + async def on_PING(self, line): self.received_ping = True - def on_ERROR(self, cmd): + async def on_ERROR(self, cmd): logger.error("[%s] Remote reported error: %r", self.id(), cmd.data) def pauseProducing(self): @@ -429,16 +425,16 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): BaseReplicationStreamProtocol.connectionMade(self) self.streamer.new_connection(self) - def on_NAME(self, cmd): + async def on_NAME(self, cmd): logger.info("[%s] Renamed to %r", self.id(), cmd.data) self.name = cmd.data - def on_USER_SYNC(self, cmd): - return self.streamer.on_user_sync( + async def on_USER_SYNC(self, cmd): + await self.streamer.on_user_sync( self.conn_id, cmd.user_id, cmd.is_syncing, cmd.last_sync_ms ) - def on_REPLICATE(self, cmd): + async def on_REPLICATE(self, cmd): stream_name = cmd.stream_name token = cmd.token @@ -449,23 +445,23 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): for stream in iterkeys(self.streamer.streams_by_name) ] - return make_deferred_yieldable( + await make_deferred_yieldable( defer.gatherResults(deferreds, consumeErrors=True) ) else: - return self.subscribe_to_stream(stream_name, token) + await self.subscribe_to_stream(stream_name, token) - def on_FEDERATION_ACK(self, cmd): - return self.streamer.federation_ack(cmd.token) + async def on_FEDERATION_ACK(self, cmd): + self.streamer.federation_ack(cmd.token) - def on_REMOVE_PUSHER(self, cmd): - return self.streamer.on_remove_pusher(cmd.app_id, cmd.push_key, cmd.user_id) + async def on_REMOVE_PUSHER(self, cmd): + await self.streamer.on_remove_pusher(cmd.app_id, cmd.push_key, cmd.user_id) - def on_INVALIDATE_CACHE(self, cmd): - return self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys) + async def on_INVALIDATE_CACHE(self, cmd): + self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys) - def on_USER_IP(self, cmd): - return self.streamer.on_user_ip( + async def on_USER_IP(self, cmd): + self.streamer.on_user_ip( cmd.user_id, cmd.access_token, cmd.ip, @@ -474,8 +470,7 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): cmd.last_seen, ) - @defer.inlineCallbacks - def subscribe_to_stream(self, stream_name, token): + async def subscribe_to_stream(self, stream_name, token): """Subscribe the remote to a stream. This invloves checking if they've missed anything and sending those @@ -487,7 +482,7 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): try: # Get missing updates - updates, current_token = yield self.streamer.get_stream_updates( + updates, current_token = await self.streamer.get_stream_updates( stream_name, token ) @@ -572,7 +567,7 @@ class AbstractReplicationClientHandler(metaclass=abc.ABCMeta): """ @abc.abstractmethod - def on_rdata(self, stream_name, token, rows): + async def on_rdata(self, stream_name, token, rows): """Called to handle a batch of replication data with a given stream token. Args: @@ -580,14 +575,11 @@ class AbstractReplicationClientHandler(metaclass=abc.ABCMeta): token (int): stream token for this batch of rows rows (list): a list of Stream.ROW_TYPE objects as returned by Stream.parse_row. - - Returns: - Deferred|None """ raise NotImplementedError() @abc.abstractmethod - def on_position(self, stream_name, token): + async def on_position(self, stream_name, token): """Called when we get new position data.""" raise NotImplementedError() @@ -676,12 +668,12 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol): if not self.streams_connecting: self.handler.finished_connecting() - def on_SERVER(self, cmd): + async def on_SERVER(self, cmd): if cmd.data != self.server_name: logger.error("[%s] Connected to wrong remote: %r", self.id(), cmd.data) self.send_error("Wrong remote") - def on_RDATA(self, cmd): + async def on_RDATA(self, cmd): stream_name = cmd.stream_name inbound_rdata_count.labels(stream_name).inc() @@ -701,19 +693,19 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol): # Check if this is the last of a batch of updates rows = self.pending_batches.pop(stream_name, []) rows.append(row) - return self.handler.on_rdata(stream_name, cmd.token, rows) + await self.handler.on_rdata(stream_name, cmd.token, rows) - def on_POSITION(self, cmd): + async def on_POSITION(self, cmd): # When we get a `POSITION` command it means we've finished getting # missing updates for the given stream, and are now up to date. self.streams_connecting.discard(cmd.stream_name) if not self.streams_connecting: self.handler.finished_connecting() - return self.handler.on_position(cmd.stream_name, cmd.token) + await self.handler.on_position(cmd.stream_name, cmd.token) - def on_SYNC(self, cmd): - return self.handler.on_sync(cmd.data) + async def on_SYNC(self, cmd): + self.handler.on_sync(cmd.data) def replicate(self, stream_name, token): """Send the subscription request to the server diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index cbfdaf5773..b1752e88cd 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -23,7 +23,6 @@ from six import itervalues from prometheus_client import Counter -from twisted.internet import defer from twisted.internet.protocol import Factory from synapse.metrics import LaterGauge @@ -155,8 +154,7 @@ class ReplicationStreamer(object): run_as_background_process("replication_notifier", self._run_notifier_loop) - @defer.inlineCallbacks - def _run_notifier_loop(self): + async def _run_notifier_loop(self): self.is_looping = True try: @@ -185,7 +183,7 @@ class ReplicationStreamer(object): continue if self._replication_torture_level: - yield self.clock.sleep( + await self.clock.sleep( self._replication_torture_level / 1000.0 ) @@ -196,7 +194,7 @@ class ReplicationStreamer(object): stream.upto_token, ) try: - updates, current_token = yield stream.get_updates() + updates, current_token = await stream.get_updates() except Exception: logger.info("Failed to handle stream %s", stream.NAME) raise @@ -233,7 +231,7 @@ class ReplicationStreamer(object): self.is_looping = False @measure_func("repl.get_stream_updates") - def get_stream_updates(self, stream_name, token): + async def get_stream_updates(self, stream_name, token): """For a given stream get all updates since token. This is called when a client first subscribes to a stream. """ @@ -241,7 +239,7 @@ class ReplicationStreamer(object): if not stream: raise Exception("unknown stream %s", stream_name) - return stream.get_updates_since(token) + return await stream.get_updates_since(token) @measure_func("repl.federation_ack") def federation_ack(self, token): @@ -252,22 +250,20 @@ class ReplicationStreamer(object): self.federation_sender.federation_ack(token) @measure_func("repl.on_user_sync") - @defer.inlineCallbacks - def on_user_sync(self, conn_id, user_id, is_syncing, last_sync_ms): + async def on_user_sync(self, conn_id, user_id, is_syncing, last_sync_ms): """A client has started/stopped syncing on a worker. """ user_sync_counter.inc() - yield self.presence_handler.update_external_syncs_row( + await self.presence_handler.update_external_syncs_row( conn_id, user_id, is_syncing, last_sync_ms ) @measure_func("repl.on_remove_pusher") - @defer.inlineCallbacks - def on_remove_pusher(self, app_id, push_key, user_id): + async def on_remove_pusher(self, app_id, push_key, user_id): """A client has asked us to remove a pusher """ remove_pusher_counter.inc() - yield self.store.delete_pusher_by_app_id_pushkey_user_id( + await self.store.delete_pusher_by_app_id_pushkey_user_id( app_id=app_id, pushkey=push_key, user_id=user_id ) @@ -281,15 +277,16 @@ class ReplicationStreamer(object): getattr(self.store, cache_func).invalidate(tuple(keys)) @measure_func("repl.on_user_ip") - @defer.inlineCallbacks - def on_user_ip(self, user_id, access_token, ip, user_agent, device_id, last_seen): + async def on_user_ip( + self, user_id, access_token, ip, user_agent, device_id, last_seen + ): """The client saw a user request """ user_ip_cache_counter.inc() - yield self.store.insert_client_ip( + await self.store.insert_client_ip( user_id, access_token, ip, user_agent, device_id, last_seen ) - yield self._server_notices_sender.on_user_ip(user_id) + await self._server_notices_sender.on_user_ip(user_id) def send_sync_to_all_connections(self, data): """Sends a SYNC command to all clients. diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index 4ab0334fc1..e03e77199b 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -19,8 +19,6 @@ import logging from collections import namedtuple from typing import Any -from twisted.internet import defer - logger = logging.getLogger(__name__) @@ -144,8 +142,7 @@ class Stream(object): self.upto_token = self.current_token() self.last_token = self.upto_token - @defer.inlineCallbacks - def get_updates(self): + async def get_updates(self): """Gets all updates since the last time this function was called (or since the stream was constructed if it hadn't been called before), until the `upto_token` @@ -156,13 +153,12 @@ class Stream(object): list of ``(token, row)`` entries. ``row`` will be json-serialised and sent over the replication steam. """ - updates, current_token = yield self.get_updates_since(self.last_token) + updates, current_token = await self.get_updates_since(self.last_token) self.last_token = current_token return updates, current_token - @defer.inlineCallbacks - def get_updates_since(self, from_token): + async def get_updates_since(self, from_token): """Like get_updates except allows specifying from when we should stream updates @@ -182,15 +178,16 @@ class Stream(object): if from_token == current_token: return [], current_token + logger.info("get_updates_since: %s", self.__class__) if self._LIMITED: - rows = yield self.update_function( + rows = await self.update_function( from_token, current_token, limit=MAX_EVENTS_BEHIND + 1 ) # never turn more than MAX_EVENTS_BEHIND + 1 into updates. rows = itertools.islice(rows, MAX_EVENTS_BEHIND + 1) else: - rows = yield self.update_function(from_token, current_token) + rows = await self.update_function(from_token, current_token) updates = [(row[0], row[1:]) for row in rows] @@ -295,9 +292,8 @@ class PushRulesStream(Stream): push_rules_token, _ = self.store.get_push_rules_stream_token() return push_rules_token - @defer.inlineCallbacks - def update_function(self, from_token, to_token, limit): - rows = yield self.store.get_all_push_rule_updates(from_token, to_token, limit) + async def update_function(self, from_token, to_token, limit): + rows = await self.store.get_all_push_rule_updates(from_token, to_token, limit) return [(row[0], row[2]) for row in rows] @@ -413,9 +409,8 @@ class AccountDataStream(Stream): super(AccountDataStream, self).__init__(hs) - @defer.inlineCallbacks - def update_function(self, from_token, to_token, limit): - global_results, room_results = yield self.store.get_all_updated_account_data( + async def update_function(self, from_token, to_token, limit): + global_results, room_results = await self.store.get_all_updated_account_data( from_token, from_token, to_token, limit ) diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py index 0843e5aa90..b3afabb8cd 100644 --- a/synapse/replication/tcp/streams/events.py +++ b/synapse/replication/tcp/streams/events.py @@ -19,8 +19,6 @@ from typing import Tuple, Type import attr -from twisted.internet import defer - from ._base import Stream @@ -122,16 +120,15 @@ class EventsStream(Stream): super(EventsStream, self).__init__(hs) - @defer.inlineCallbacks - def update_function(self, from_token, current_token, limit=None): - event_rows = yield self._store.get_all_new_forward_event_rows( + async def update_function(self, from_token, current_token, limit=None): + event_rows = await self._store.get_all_new_forward_event_rows( from_token, current_token, limit ) event_updates = ( (row[0], EventsStreamEventRow.TypeId, row[1:]) for row in event_rows ) - state_rows = yield self._store.get_all_updated_current_state_deltas( + state_rows = await self._store.get_all_updated_current_state_deltas( from_token, current_token, limit ) state_updates = ( diff --git a/tests/replication/tcp/streams/_base.py b/tests/replication/tcp/streams/_base.py index 1d14e77255..e96ad4ca4e 100644 --- a/tests/replication/tcp/streams/_base.py +++ b/tests/replication/tcp/streams/_base.py @@ -73,6 +73,6 @@ class TestReplicationClientHandler(object): def finished_connecting(self): pass - def on_rdata(self, stream_name, token, rows): + async def on_rdata(self, stream_name, token, rows): for r in rows: self.received_rdata_rows.append((stream_name, token, r)) -- cgit 1.4.1 From 509e381afa8c656e72f5fef3d651a9819794174a Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 21 Feb 2020 07:15:07 -0500 Subject: Clarify list/set/dict/tuple comprehensions and enforce via flake8 (#6957) Ensure good comprehension hygiene using flake8-comprehensions. --- CONTRIBUTING.md | 2 +- changelog.d/6957.misc | 1 + docs/code_style.md | 2 +- scripts-dev/convert_server_keys.py | 2 +- synapse/app/_base.py | 2 +- synapse/app/federation_sender.py | 4 +-- synapse/app/pusher.py | 2 +- synapse/config/server.py | 4 +-- synapse/config/tls.py | 2 +- synapse/crypto/keyring.py | 6 ++-- synapse/federation/send_queue.py | 4 +-- synapse/groups/groups_server.py | 2 +- synapse/handlers/device.py | 2 +- synapse/handlers/directory.py | 4 +-- synapse/handlers/federation.py | 18 +++++----- synapse/handlers/presence.py | 6 ++-- synapse/handlers/receipts.py | 2 +- synapse/handlers/room.py | 2 +- synapse/handlers/search.py | 8 ++--- synapse/handlers/sync.py | 22 ++++++------ synapse/handlers/typing.py | 4 +-- synapse/logging/utils.py | 2 +- synapse/metrics/__init__.py | 2 +- synapse/metrics/background_process_metrics.py | 4 +-- synapse/push/bulk_push_rule_evaluator.py | 8 ++--- synapse/push/emailpusher.py | 2 +- synapse/push/mailer.py | 20 +++++------ synapse/push/pusherpool.py | 2 +- synapse/rest/admin/_base.py | 4 +-- synapse/rest/client/v1/push_rule.py | 6 ++-- synapse/rest/client/v1/pusher.py | 4 +-- synapse/rest/client/v2_alpha/sync.py | 2 +- synapse/rest/key/v2/remote_key_resource.py | 2 +- synapse/rest/media/v1/_base.py | 40 ++++++++++------------ synapse/state/v1.py | 10 +++--- synapse/state/v2.py | 8 ++--- synapse/storage/_base.py | 2 +- synapse/storage/background_updates.py | 2 +- synapse/storage/data_stores/main/appservice.py | 14 ++++---- synapse/storage/data_stores/main/client_ips.py | 4 +-- synapse/storage/data_stores/main/devices.py | 13 ++++--- .../storage/data_stores/main/event_federation.py | 2 +- synapse/storage/data_stores/main/events.py | 8 ++--- .../storage/data_stores/main/events_bg_updates.py | 2 +- synapse/storage/data_stores/main/events_worker.py | 6 ++-- synapse/storage/data_stores/main/push_rule.py | 8 ++--- synapse/storage/data_stores/main/receipts.py | 4 +-- synapse/storage/data_stores/main/roommember.py | 4 +-- synapse/storage/data_stores/main/state.py | 8 ++--- synapse/storage/data_stores/main/stream.py | 8 ++--- .../storage/data_stores/main/user_erasure_store.py | 4 +-- synapse/storage/data_stores/state/store.py | 4 +-- synapse/storage/database.py | 4 +-- synapse/storage/persist_events.py | 8 ++--- synapse/storage/prepare_database.py | 6 ++-- synapse/util/frozenutils.py | 2 +- synapse/visibility.py | 4 +-- tests/config/test_generate.py | 2 +- tests/federation/test_federation_server.py | 2 +- tests/handlers/test_presence.py | 4 +-- tests/handlers/test_typing.py | 6 ++-- tests/handlers/test_user_directory.py | 12 +++---- tests/push/test_email.py | 6 ++-- tests/push/test_http.py | 8 ++--- tests/rest/client/v2_alpha/test_sync.py | 28 ++++++++------- tests/storage/test__base.py | 4 +-- tests/storage/test_appservice.py | 36 +++++++++---------- tests/storage/test_cleanup_extrems.py | 10 +++--- tests/storage/test_event_metrics.py | 36 +++++++++---------- tests/storage/test_state.py | 2 +- tests/test_state.py | 18 +++------- tests/util/test_stream_change_cache.py | 18 +++------- tox.ini | 1 + 73 files changed, 251 insertions(+), 276 deletions(-) create mode 100644 changelog.d/6957.misc (limited to 'synapse/app/pusher.py') diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 4b01b6ac8c..253a0ca648 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -60,7 +60,7 @@ python 3.6 and to install each tool: ``` # Install the dependencies -pip install -U black flake8 isort +pip install -U black flake8 flake8-comprehensions isort # Run the linter script ./scripts-dev/lint.sh diff --git a/changelog.d/6957.misc b/changelog.d/6957.misc new file mode 100644 index 0000000000..4f98030110 --- /dev/null +++ b/changelog.d/6957.misc @@ -0,0 +1 @@ +Use flake8-comprehensions to enforce good hygiene of list/set/dict comprehensions. diff --git a/docs/code_style.md b/docs/code_style.md index 71aecd41f7..6ef6f80290 100644 --- a/docs/code_style.md +++ b/docs/code_style.md @@ -30,7 +30,7 @@ The necessary tools are detailed below. Install `flake8` with: - pip install --upgrade flake8 + pip install --upgrade flake8 flake8-comprehensions Check all application and test code with: diff --git a/scripts-dev/convert_server_keys.py b/scripts-dev/convert_server_keys.py index 179be61c30..06b4c1e2ff 100644 --- a/scripts-dev/convert_server_keys.py +++ b/scripts-dev/convert_server_keys.py @@ -103,7 +103,7 @@ def main(): yaml.safe_dump(result, sys.stdout, default_flow_style=False) - rows = list(row for server, json in result.items() for row in rows_v2(server, json)) + rows = [row for server, json in result.items() for row in rows_v2(server, json)] cursor = connection.cursor() cursor.executemany( diff --git a/synapse/app/_base.py b/synapse/app/_base.py index 109b1e2fb5..9ffd23c6df 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -141,7 +141,7 @@ def start_reactor( def quit_with_error(error_string): message_lines = error_string.split("\n") - line_length = max([len(l) for l in message_lines if len(l) < 80]) + 2 + line_length = max(len(l) for l in message_lines if len(l) < 80) + 2 sys.stderr.write("*" * line_length + "\n") for line in message_lines: sys.stderr.write(" %s\n" % (line.rstrip(),)) diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 63a91f1177..b7fcf80ddc 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -262,7 +262,7 @@ class FederationSenderHandler(object): # ... as well as device updates and messages elif stream_name == DeviceListsStream.NAME: - hosts = set(row.destination for row in rows) + hosts = {row.destination for row in rows} for host in hosts: self.federation_sender.send_device_messages(host) @@ -270,7 +270,7 @@ class FederationSenderHandler(object): # The to_device stream includes stuff to be pushed to both local # clients and remote servers, so we ignore entities that start with # '@' (since they'll be local users rather than destinations). - hosts = set(row.entity for row in rows if not row.entity.startswith("@")) + hosts = {row.entity for row in rows if not row.entity.startswith("@")} for host in hosts: self.federation_sender.send_device_messages(host) diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index e46b6ac598..84e9f8d5e2 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -158,7 +158,7 @@ class PusherReplicationHandler(ReplicationClientHandler): yield self.pusher_pool.on_new_notifications(token, token) elif stream_name == "receipts": yield self.pusher_pool.on_new_receipts( - token, token, set(row.room_id for row in rows) + token, token, {row.room_id for row in rows} ) except Exception: logger.exception("Error poking pushers") diff --git a/synapse/config/server.py b/synapse/config/server.py index 0ec1b0fadd..7525765fee 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -1066,12 +1066,12 @@ KNOWN_RESOURCES = ( def _check_resource_config(listeners): - resource_names = set( + resource_names = { res_name for listener in listeners for res in listener.get("resources", []) for res_name in res.get("names", []) - ) + } for resource in resource_names: if resource not in KNOWN_RESOURCES: diff --git a/synapse/config/tls.py b/synapse/config/tls.py index 97a12d51f6..a65538562b 100644 --- a/synapse/config/tls.py +++ b/synapse/config/tls.py @@ -260,7 +260,7 @@ class TlsConfig(Config): crypto.FILETYPE_ASN1, self.tls_certificate ) sha256_fingerprint = encode_base64(sha256(x509_certificate_bytes).digest()) - sha256_fingerprints = set(f["sha256"] for f in self.tls_fingerprints) + sha256_fingerprints = {f["sha256"] for f in self.tls_fingerprints} if sha256_fingerprint not in sha256_fingerprints: self.tls_fingerprints.append({"sha256": sha256_fingerprint}) diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index 6fe5a6a26a..983f0ead8c 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -326,9 +326,7 @@ class Keyring(object): verify_requests (list[VerifyJsonRequest]): list of verify requests """ - remaining_requests = set( - (rq for rq in verify_requests if not rq.key_ready.called) - ) + remaining_requests = {rq for rq in verify_requests if not rq.key_ready.called} @defer.inlineCallbacks def do_iterations(): @@ -396,7 +394,7 @@ class Keyring(object): results = yield fetcher.get_keys(missing_keys) - completed = list() + completed = [] for verify_request in remaining_requests: server_name = verify_request.server_name diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 001bb304ae..876fb0e245 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -129,9 +129,9 @@ class FederationRemoteSendQueue(object): for key in keys[:i]: del self.presence_changed[key] - user_ids = set( + user_ids = { user_id for uids in self.presence_changed.values() for user_id in uids - ) + } keys = self.presence_destinations.keys() i = self.presence_destinations.bisect_left(position_to_delete) diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py index c106abae21..4f0dc0a209 100644 --- a/synapse/groups/groups_server.py +++ b/synapse/groups/groups_server.py @@ -608,7 +608,7 @@ class GroupsServerHandler(GroupsServerWorkerHandler): user_results = yield self.store.get_users_in_group( group_id, include_private=True ) - if user_id in [user_result["user_id"] for user_result in user_results]: + if user_id in (user_result["user_id"] for user_result in user_results): raise SynapseError(400, "User already in group") content = { diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 50cea3f378..a514c30714 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -742,6 +742,6 @@ class DeviceListUpdater(object): # We clobber the seen updates since we've re-synced from a given # point. - self._seen_updates[user_id] = set([stream_id]) + self._seen_updates[user_id] = {stream_id} defer.returnValue(result) diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index 921d887b24..0b23ca919a 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -72,7 +72,7 @@ class DirectoryHandler(BaseHandler): # TODO(erikj): Check if there is a current association. if not servers: users = yield self.state.get_current_users_in_room(room_id) - servers = set(get_domain_from_id(u) for u in users) + servers = {get_domain_from_id(u) for u in users} if not servers: raise SynapseError(400, "Failed to get server list") @@ -255,7 +255,7 @@ class DirectoryHandler(BaseHandler): ) users = yield self.state.get_current_users_in_room(room_id) - extra_servers = set(get_domain_from_id(u) for u in users) + extra_servers = {get_domain_from_id(u) for u in users} servers = set(extra_servers) | set(servers) # If this server is in the list of servers, return it first. diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index eb20ef4aec..a689065f89 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -659,11 +659,11 @@ class FederationHandler(BaseHandler): # this can happen if a remote server claims that the state or # auth_events at an event in room A are actually events in room B - bad_events = list( + bad_events = [ (event_id, event.room_id) for event_id, event in fetched_events.items() if event.room_id != room_id - ) + ] for bad_event_id, bad_room_id in bad_events: # This is a bogus situation, but since we may only discover it a long time @@ -856,7 +856,7 @@ class FederationHandler(BaseHandler): # Don't bother processing events we already have. seen_events = await self.store.have_events_in_timeline( - set(e.event_id for e in events) + {e.event_id for e in events} ) events = [e for e in events if e.event_id not in seen_events] @@ -866,7 +866,7 @@ class FederationHandler(BaseHandler): event_map = {e.event_id: e for e in events} - event_ids = set(e.event_id for e in events) + event_ids = {e.event_id for e in events} # build a list of events whose prev_events weren't in the batch. # (XXX: this will include events whose prev_events we already have; that doesn't @@ -892,13 +892,13 @@ class FederationHandler(BaseHandler): state_events.update({s.event_id: s for s in state}) events_to_state[e_id] = state - required_auth = set( + required_auth = { a_id for event in events + list(state_events.values()) + list(auth_events.values()) for a_id in event.auth_event_ids() - ) + } auth_events.update( {e_id: event_map[e_id] for e_id in required_auth if e_id in event_map} ) @@ -1247,7 +1247,7 @@ class FederationHandler(BaseHandler): async def on_event_auth(self, event_id: str) -> List[EventBase]: event = await self.store.get_event(event_id) auth = await self.store.get_auth_chain( - [auth_id for auth_id in event.auth_event_ids()], include_given=True + list(event.auth_event_ids()), include_given=True ) return list(auth) @@ -2152,7 +2152,7 @@ class FederationHandler(BaseHandler): # Now get the current auth_chain for the event. local_auth_chain = await self.store.get_auth_chain( - [auth_id for auth_id in event.auth_event_ids()], include_given=True + list(event.auth_event_ids()), include_given=True ) # TODO: Check if we would now reject event_id. If so we need to tell @@ -2654,7 +2654,7 @@ class FederationHandler(BaseHandler): member_handler = self.hs.get_room_member_handler() yield member_handler.send_membership_event(None, event, context) else: - destinations = set(x.split(":", 1)[-1] for x in (sender_user_id, room_id)) + destinations = {x.split(":", 1)[-1] for x in (sender_user_id, room_id)} yield self.federation_client.forward_third_party_invite( destinations, room_id, event_dict ) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 202aa9294f..0d6cf2b008 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -313,7 +313,7 @@ class PresenceHandler(object): notified_presence_counter.inc(len(to_notify)) yield self._persist_and_notify(list(to_notify.values())) - self.unpersisted_users_changes |= set(s.user_id for s in new_states) + self.unpersisted_users_changes |= {s.user_id for s in new_states} self.unpersisted_users_changes -= set(to_notify.keys()) to_federation_ping = { @@ -698,7 +698,7 @@ class PresenceHandler(object): updates = yield self.current_state_for_users(target_user_ids) updates = list(updates.values()) - for user_id in set(target_user_ids) - set(u.user_id for u in updates): + for user_id in set(target_user_ids) - {u.user_id for u in updates}: updates.append(UserPresenceState.default(user_id)) now = self.clock.time_msec() @@ -886,7 +886,7 @@ class PresenceHandler(object): hosts = yield self.state.get_current_hosts_in_room(room_id) # Filter out ourselves. - hosts = set(host for host in hosts if host != self.server_name) + hosts = {host for host in hosts if host != self.server_name} self.federation.send_presence_to_destinations( states=[state], destinations=hosts diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 9283c039e3..8bc100db42 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -94,7 +94,7 @@ class ReceiptsHandler(BaseHandler): # no new receipts return False - affected_room_ids = list(set([r.room_id for r in receipts])) + affected_room_ids = list({r.room_id for r in receipts}) self.notifier.on_new_event("receipt_key", max_batch_id, rooms=affected_room_ids) # Note that the min here shouldn't be relied upon to be accurate. diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 76e8f61b74..8ee870f0bb 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -355,7 +355,7 @@ class RoomCreationHandler(BaseHandler): # If so, mark the new room as non-federatable as well creation_content["m.federate"] = False - initial_state = dict() + initial_state = {} # Replicate relevant room events types_to_copy = ( diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index 110097eab9..ec1542d416 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -184,7 +184,7 @@ class SearchHandler(BaseHandler): membership_list=[Membership.JOIN], # membership_list=[Membership.JOIN, Membership.LEAVE, Membership.Ban], ) - room_ids = set(r.room_id for r in rooms) + room_ids = {r.room_id for r in rooms} # If doing a subset of all rooms seearch, check if any of the rooms # are from an upgraded room, and search their contents as well @@ -374,12 +374,12 @@ class SearchHandler(BaseHandler): ).to_string() if include_profile: - senders = set( + senders = { ev.sender for ev in itertools.chain( res["events_before"], [event], res["events_after"] ) - ) + } if res["events_after"]: last_event_id = res["events_after"][-1].event_id @@ -421,7 +421,7 @@ class SearchHandler(BaseHandler): state_results = {} if include_state: - rooms = set(e.room_id for e in allowed_events) + rooms = {e.room_id for e in allowed_events} for room_id in rooms: state = yield self.state_handler.get_current_state(room_id) state_results[room_id] = list(state.values()) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 4324bc702e..669dbc8a48 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -682,11 +682,9 @@ class SyncHandler(object): # FIXME: order by stream ordering rather than as returned by SQL if joined_user_ids or invited_user_ids: - summary["m.heroes"] = sorted( - [user_id for user_id in (joined_user_ids + invited_user_ids)] - )[0:5] + summary["m.heroes"] = sorted(joined_user_ids + invited_user_ids)[0:5] else: - summary["m.heroes"] = sorted([user_id for user_id in gone_user_ids])[0:5] + summary["m.heroes"] = sorted(gone_user_ids)[0:5] if not sync_config.filter_collection.lazy_load_members(): return summary @@ -697,9 +695,9 @@ class SyncHandler(object): # track which members the client should already know about via LL: # Ones which are already in state... - existing_members = set( + existing_members = { user_id for (typ, user_id) in state.keys() if typ == EventTypes.Member - ) + } # ...or ones which are in the timeline... for ev in batch.events: @@ -773,10 +771,10 @@ class SyncHandler(object): # We only request state for the members needed to display the # timeline: - members_to_fetch = set( + members_to_fetch = { event.sender # FIXME: we also care about invite targets etc. for event in batch.events - ) + } if full_state: # always make sure we LL ourselves so we know we're in the room @@ -1993,10 +1991,10 @@ def _calculate_state( ) } - c_ids = set(e for e in itervalues(current)) - ts_ids = set(e for e in itervalues(timeline_start)) - p_ids = set(e for e in itervalues(previous)) - tc_ids = set(e for e in itervalues(timeline_contains)) + c_ids = set(itervalues(current)) + ts_ids = set(itervalues(timeline_start)) + p_ids = set(itervalues(previous)) + tc_ids = set(itervalues(timeline_contains)) # If we are lazyloading room members, we explicitly add the membership events # for the senders in the timeline into the state block returned by /sync, diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 5406618431..391bceb0c4 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -198,7 +198,7 @@ class TypingHandler(object): now=now, obj=member, then=now + FEDERATION_PING_INTERVAL ) - for domain in set(get_domain_from_id(u) for u in users): + for domain in {get_domain_from_id(u) for u in users}: if domain != self.server_name: logger.debug("sending typing update to %s", domain) self.federation.build_and_send_edu( @@ -231,7 +231,7 @@ class TypingHandler(object): return users = yield self.state.get_current_users_in_room(room_id) - domains = set(get_domain_from_id(u) for u in users) + domains = {get_domain_from_id(u) for u in users} if self.server_name in domains: logger.info("Got typing update from %s: %r", user_id, content) diff --git a/synapse/logging/utils.py b/synapse/logging/utils.py index 6073fc2725..0c2527bd86 100644 --- a/synapse/logging/utils.py +++ b/synapse/logging/utils.py @@ -148,7 +148,7 @@ def trace_function(f): pathname=pathname, lineno=lineno, msg=msg, - args=tuple(), + args=(), exc_info=None, ) diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 0b45e1f52a..0dba997a23 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -240,7 +240,7 @@ class BucketCollector(object): res.append(["+Inf", sum(data.values())]) metric = HistogramMetricFamily( - self.name, "", buckets=res, sum_value=sum([x * y for x, y in data.items()]) + self.name, "", buckets=res, sum_value=sum(x * y for x, y in data.items()) ) yield metric diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index c53d2a0d40..b65bcd8806 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -80,13 +80,13 @@ _background_process_db_sched_duration = Counter( # map from description to a counter, so that we can name our logcontexts # incrementally. (It actually duplicates _background_process_start_count, but # it's much simpler to do so than to try to combine them.) -_background_process_counts = dict() # type: dict[str, int] +_background_process_counts = {} # type: dict[str, int] # map from description to the currently running background processes. # # it's kept as a dict of sets rather than a big set so that we can keep track # of process descriptions that no longer have any active processes. -_background_processes = dict() # type: dict[str, set[_BackgroundProcess]] +_background_processes = {} # type: dict[str, set[_BackgroundProcess]] # A lock that covers the above dicts _bg_metrics_lock = threading.Lock() diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 7d9f5a38d9..433ca2f416 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -400,11 +400,11 @@ class RulesForRoom(object): if logger.isEnabledFor(logging.DEBUG): logger.debug("Found members %r: %r", self.room_id, members.values()) - interested_in_user_ids = set( + interested_in_user_ids = { user_id for user_id, membership in itervalues(members) if membership == Membership.JOIN - ) + } logger.debug("Joined: %r", interested_in_user_ids) @@ -412,9 +412,9 @@ class RulesForRoom(object): interested_in_user_ids, on_invalidate=self.invalidate_all_cb ) - user_ids = set( + user_ids = { uid for uid, have_pusher in iteritems(if_users_with_pushers) if have_pusher - ) + } logger.debug("With pushers: %r", user_ids) diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index 8c818a86bf..ba4551d619 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -204,7 +204,7 @@ class EmailPusher(object): yield self.send_notification(unprocessed, reason) yield self.save_last_stream_ordering_and_success( - max([ea["stream_ordering"] for ea in unprocessed]) + max(ea["stream_ordering"] for ea in unprocessed) ) # we update the throttle on all the possible unprocessed push actions diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py index b13b646bfd..4ccaf178ce 100644 --- a/synapse/push/mailer.py +++ b/synapse/push/mailer.py @@ -526,12 +526,10 @@ class Mailer(object): # If the room doesn't have a name, say who the messages # are from explicitly to avoid, "messages in the Bob room" sender_ids = list( - set( - [ - notif_events[n["event_id"]].sender - for n in notifs_by_room[room_id] - ] - ) + { + notif_events[n["event_id"]].sender + for n in notifs_by_room[room_id] + } ) member_events = yield self.store.get_events( @@ -558,12 +556,10 @@ class Mailer(object): # If the reason room doesn't have a name, say who the messages # are from explicitly to avoid, "messages in the Bob room" sender_ids = list( - set( - [ - notif_events[n["event_id"]].sender - for n in notifs_by_room[reason["room_id"]] - ] - ) + { + notif_events[n["event_id"]].sender + for n in notifs_by_room[reason["room_id"]] + } ) member_events = yield self.store.get_events( diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index b9dca5bc63..01789a9fb4 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -191,7 +191,7 @@ class PusherPool: min_stream_id - 1, max_stream_id ) # This returns a tuple, user_id is at index 3 - users_affected = set([r[3] for r in updated_receipts]) + users_affected = {r[3] for r in updated_receipts} for u in users_affected: if u in self.pushers: diff --git a/synapse/rest/admin/_base.py b/synapse/rest/admin/_base.py index 459482eb6d..a96f75ce26 100644 --- a/synapse/rest/admin/_base.py +++ b/synapse/rest/admin/_base.py @@ -29,7 +29,7 @@ def historical_admin_path_patterns(path_regex): Note that this should only be used for existing endpoints: new ones should just register for the /_synapse/admin path. """ - return list( + return [ re.compile(prefix + path_regex) for prefix in ( "^/_synapse/admin/v1", @@ -37,7 +37,7 @@ def historical_admin_path_patterns(path_regex): "^/_matrix/client/unstable/admin", "^/_matrix/client/r0/admin", ) - ) + ] def admin_patterns(path_regex: str): diff --git a/synapse/rest/client/v1/push_rule.py b/synapse/rest/client/v1/push_rule.py index 4f74600239..9fd4908136 100644 --- a/synapse/rest/client/v1/push_rule.py +++ b/synapse/rest/client/v1/push_rule.py @@ -49,7 +49,7 @@ class PushRuleRestServlet(RestServlet): if self._is_worker: raise Exception("Cannot handle PUT /push_rules on worker") - spec = _rule_spec_from_path([x for x in path.split("/")]) + spec = _rule_spec_from_path(path.split("/")) try: priority_class = _priority_class_from_spec(spec) except InvalidRuleException as e: @@ -110,7 +110,7 @@ class PushRuleRestServlet(RestServlet): if self._is_worker: raise Exception("Cannot handle DELETE /push_rules on worker") - spec = _rule_spec_from_path([x for x in path.split("/")]) + spec = _rule_spec_from_path(path.split("/")) requester = await self.auth.get_user_by_req(request) user_id = requester.user.to_string() @@ -138,7 +138,7 @@ class PushRuleRestServlet(RestServlet): rules = format_push_rules_for_user(requester.user, rules) - path = [x for x in path.split("/")][1:] + path = path.split("/")[1:] if path == []: # we're a reference impl: pedantry is our job. diff --git a/synapse/rest/client/v1/pusher.py b/synapse/rest/client/v1/pusher.py index 6f6b7aed6e..550a2f1b44 100644 --- a/synapse/rest/client/v1/pusher.py +++ b/synapse/rest/client/v1/pusher.py @@ -54,9 +54,9 @@ class PushersRestServlet(RestServlet): pushers = await self.hs.get_datastore().get_pushers_by_user_id(user.to_string()) - filtered_pushers = list( + filtered_pushers = [ {k: v for k, v in p.items() if k in ALLOWED_KEYS} for p in pushers - ) + ] return 200, {"pushers": filtered_pushers} diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index d8292ce29f..8fa68dd37f 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -72,7 +72,7 @@ class SyncRestServlet(RestServlet): """ PATTERNS = client_patterns("/sync$") - ALLOWED_PRESENCE = set(["online", "offline", "unavailable"]) + ALLOWED_PRESENCE = {"online", "offline", "unavailable"} def __init__(self, hs): super(SyncRestServlet, self).__init__() diff --git a/synapse/rest/key/v2/remote_key_resource.py b/synapse/rest/key/v2/remote_key_resource.py index 9d6813a047..4b6d030a57 100644 --- a/synapse/rest/key/v2/remote_key_resource.py +++ b/synapse/rest/key/v2/remote_key_resource.py @@ -149,7 +149,7 @@ class RemoteKey(DirectServeResource): time_now_ms = self.clock.time_msec() - cache_misses = dict() # type: Dict[str, Set[str]] + cache_misses = {} # type: Dict[str, Set[str]] for (server_name, key_id, from_server), results in cached.items(): results = [(result["ts_added_ms"], result) for result in results] diff --git a/synapse/rest/media/v1/_base.py b/synapse/rest/media/v1/_base.py index 65bbf00073..ba28dd089d 100644 --- a/synapse/rest/media/v1/_base.py +++ b/synapse/rest/media/v1/_base.py @@ -135,27 +135,25 @@ def add_file_headers(request, media_type, file_size, upload_name): # separators as defined in RFC2616. SP and HT are handled separately. # see _can_encode_filename_as_token. -_FILENAME_SEPARATOR_CHARS = set( - ( - "(", - ")", - "<", - ">", - "@", - ",", - ";", - ":", - "\\", - '"', - "/", - "[", - "]", - "?", - "=", - "{", - "}", - ) -) +_FILENAME_SEPARATOR_CHARS = { + "(", + ")", + "<", + ">", + "@", + ",", + ";", + ":", + "\\", + '"', + "/", + "[", + "]", + "?", + "=", + "{", + "}", +} def _can_encode_filename_as_token(x): diff --git a/synapse/state/v1.py b/synapse/state/v1.py index 24b7c0faef..9bf98d06f2 100644 --- a/synapse/state/v1.py +++ b/synapse/state/v1.py @@ -69,9 +69,9 @@ def resolve_events_with_store( unconflicted_state, conflicted_state = _seperate(state_sets) - needed_events = set( + needed_events = { event_id for event_ids in itervalues(conflicted_state) for event_id in event_ids - ) + } needed_event_count = len(needed_events) if event_map is not None: needed_events -= set(iterkeys(event_map)) @@ -261,11 +261,11 @@ def _resolve_state_events(conflicted_state, auth_events): def _resolve_auth_events(events, auth_events): - reverse = [i for i in reversed(_ordered_events(events))] + reverse = list(reversed(_ordered_events(events))) - auth_keys = set( + auth_keys = { key for event in events for key in event_auth.auth_types_for_event(event) - ) + } new_auth_events = {} for key in auth_keys: diff --git a/synapse/state/v2.py b/synapse/state/v2.py index 75fe58305a..0ffe6d8c14 100644 --- a/synapse/state/v2.py +++ b/synapse/state/v2.py @@ -105,7 +105,7 @@ def resolve_events_with_store( % (room_id, event.event_id, event.room_id,) ) - full_conflicted_set = set(eid for eid in full_conflicted_set if eid in event_map) + full_conflicted_set = {eid for eid in full_conflicted_set if eid in event_map} logger.debug("%d full_conflicted_set entries", len(full_conflicted_set)) @@ -233,7 +233,7 @@ def _get_auth_chain_difference(state_sets, event_map, state_res_store): auth_sets = [] for state_set in state_sets: - auth_ids = set( + auth_ids = { eid for key, eid in iteritems(state_set) if ( @@ -246,7 +246,7 @@ def _get_auth_chain_difference(state_sets, event_map, state_res_store): ) ) and eid not in common - ) + } auth_chain = yield state_res_store.get_auth_chain(auth_ids, common) auth_ids.update(auth_chain) @@ -275,7 +275,7 @@ def _seperate(state_sets): conflicted_state = {} for key in set(itertools.chain.from_iterable(state_sets)): - event_ids = set(state_set.get(key) for state_set in state_sets) + event_ids = {state_set.get(key) for state_set in state_sets} if len(event_ids) == 1: unconflicted_state[key] = event_ids.pop() else: diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index da3b99f93d..13de5f1f62 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -56,7 +56,7 @@ class SQLBaseStore(metaclass=ABCMeta): members_changed (iterable[str]): The user_ids of members that have changed """ - for host in set(get_domain_from_id(u) for u in members_changed): + for host in {get_domain_from_id(u) for u in members_changed}: self._attempt_to_invalidate_cache("is_host_joined", (room_id, host)) self._attempt_to_invalidate_cache("was_host_joined", (room_id, host)) diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index bd547f35cf..eb1a7e5002 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -189,7 +189,7 @@ class BackgroundUpdater(object): keyvalues=None, retcols=("update_name", "depends_on"), ) - in_flight = set(update["update_name"] for update in updates) + in_flight = {update["update_name"] for update in updates} for update in updates: if update["depends_on"] not in in_flight: self._background_update_queue.append(update["update_name"]) diff --git a/synapse/storage/data_stores/main/appservice.py b/synapse/storage/data_stores/main/appservice.py index b2f39649fd..efbc06c796 100644 --- a/synapse/storage/data_stores/main/appservice.py +++ b/synapse/storage/data_stores/main/appservice.py @@ -135,7 +135,7 @@ class ApplicationServiceTransactionWorkerStore( may be empty. """ results = yield self.db.simple_select_list( - "application_services_state", dict(state=state), ["as_id"] + "application_services_state", {"state": state}, ["as_id"] ) # NB: This assumes this class is linked with ApplicationServiceStore as_list = self.get_app_services() @@ -158,7 +158,7 @@ class ApplicationServiceTransactionWorkerStore( """ result = yield self.db.simple_select_one( "application_services_state", - dict(as_id=service.id), + {"as_id": service.id}, ["state"], allow_none=True, desc="get_appservice_state", @@ -177,7 +177,7 @@ class ApplicationServiceTransactionWorkerStore( A Deferred which resolves when the state was set successfully. """ return self.db.simple_upsert( - "application_services_state", dict(as_id=service.id), dict(state=state) + "application_services_state", {"as_id": service.id}, {"state": state} ) def create_appservice_txn(self, service, events): @@ -253,13 +253,15 @@ class ApplicationServiceTransactionWorkerStore( self.db.simple_upsert_txn( txn, "application_services_state", - dict(as_id=service.id), - dict(last_txn=txn_id), + {"as_id": service.id}, + {"last_txn": txn_id}, ) # Delete txn self.db.simple_delete_txn( - txn, "application_services_txns", dict(txn_id=txn_id, as_id=service.id) + txn, + "application_services_txns", + {"txn_id": txn_id, "as_id": service.id}, ) return self.db.runInteraction( diff --git a/synapse/storage/data_stores/main/client_ips.py b/synapse/storage/data_stores/main/client_ips.py index 13f4c9c72e..e1ccb27142 100644 --- a/synapse/storage/data_stores/main/client_ips.py +++ b/synapse/storage/data_stores/main/client_ips.py @@ -530,7 +530,7 @@ class ClientIpStore(ClientIpBackgroundUpdateStore): ((row["access_token"], row["ip"]), (row["user_agent"], row["last_seen"])) for row in rows ) - return list( + return [ { "access_token": access_token, "ip": ip, @@ -538,7 +538,7 @@ class ClientIpStore(ClientIpBackgroundUpdateStore): "last_seen": last_seen, } for (access_token, ip), (user_agent, last_seen) in iteritems(results) - ) + ] @wrap_as_background_process("prune_old_user_ips") async def _prune_old_user_ips(self): diff --git a/synapse/storage/data_stores/main/devices.py b/synapse/storage/data_stores/main/devices.py index b7617efb80..d55733a4cd 100644 --- a/synapse/storage/data_stores/main/devices.py +++ b/synapse/storage/data_stores/main/devices.py @@ -137,7 +137,7 @@ class DeviceWorkerStore(SQLBaseStore): # get the cross-signing keys of the users in the list, so that we can # determine which of the device changes were cross-signing keys - users = set(r[0] for r in updates) + users = {r[0] for r in updates} master_key_by_user = {} self_signing_key_by_user = {} for user in users: @@ -446,7 +446,7 @@ class DeviceWorkerStore(SQLBaseStore): a set of user_ids and results_map is a mapping of user_id -> device_id -> device_info """ - user_ids = set(user_id for user_id, _ in query_list) + user_ids = {user_id for user_id, _ in query_list} user_map = yield self.get_device_list_last_stream_id_for_remotes(list(user_ids)) # We go and check if any of the users need to have their device lists @@ -454,10 +454,9 @@ class DeviceWorkerStore(SQLBaseStore): users_needing_resync = yield self.get_user_ids_requiring_device_list_resync( user_ids ) - user_ids_in_cache = ( - set(user_id for user_id, stream_id in user_map.items() if stream_id) - - users_needing_resync - ) + user_ids_in_cache = { + user_id for user_id, stream_id in user_map.items() if stream_id + } - users_needing_resync user_ids_not_in_cache = user_ids - user_ids_in_cache results = {} @@ -604,7 +603,7 @@ class DeviceWorkerStore(SQLBaseStore): rows = yield self.db.execute( "get_users_whose_signatures_changed", None, sql, user_id, from_key ) - return set(user for row in rows for user in json.loads(row[0])) + return {user for row in rows for user in json.loads(row[0])} else: return set() diff --git a/synapse/storage/data_stores/main/event_federation.py b/synapse/storage/data_stores/main/event_federation.py index 750ec1b70d..49a7b8b433 100644 --- a/synapse/storage/data_stores/main/event_federation.py +++ b/synapse/storage/data_stores/main/event_federation.py @@ -426,7 +426,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas query, (room_id, event_id, False, limit - len(event_results)) ) - new_results = set(t[0] for t in txn) - seen_events + new_results = {t[0] for t in txn} - seen_events new_front |= new_results seen_events |= new_results diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index c9d0d68c3a..8ae23df00a 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -145,7 +145,7 @@ class EventsStore( return txn.fetchall() res = yield self.db.runInteraction("read_forward_extremities", fetch) - self._current_forward_extremities_amount = c_counter(list(x[0] for x in res)) + self._current_forward_extremities_amount = c_counter([x[0] for x in res]) @_retry_on_integrity_error @defer.inlineCallbacks @@ -598,11 +598,11 @@ class EventsStore( # We find out which membership events we may have deleted # and which we have added, then we invlidate the caches for all # those users. - members_changed = set( + members_changed = { state_key for ev_type, state_key in itertools.chain(to_delete, to_insert) if ev_type == EventTypes.Member - ) + } for member in members_changed: txn.call_after( @@ -1615,7 +1615,7 @@ class EventsStore( """ ) - referenced_state_groups = set(sg for sg, in txn) + referenced_state_groups = {sg for sg, in txn} logger.info( "[purge] found %i referenced state groups", len(referenced_state_groups) ) diff --git a/synapse/storage/data_stores/main/events_bg_updates.py b/synapse/storage/data_stores/main/events_bg_updates.py index 5177b71016..f54c8b1ee0 100644 --- a/synapse/storage/data_stores/main/events_bg_updates.py +++ b/synapse/storage/data_stores/main/events_bg_updates.py @@ -402,7 +402,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): keyvalues={}, retcols=("room_id",), ) - room_ids = set(row["room_id"] for row in rows) + room_ids = {row["room_id"] for row in rows} for room_id in room_ids: txn.call_after( self.get_latest_event_ids_in_room.invalidate, (room_id,) diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py index 7251e819f5..47a3a26072 100644 --- a/synapse/storage/data_stores/main/events_worker.py +++ b/synapse/storage/data_stores/main/events_worker.py @@ -494,9 +494,9 @@ class EventsWorkerStore(SQLBaseStore): """ with Measure(self._clock, "_fetch_event_list"): try: - events_to_fetch = set( + events_to_fetch = { event_id for events, _ in event_list for event_id in events - ) + } row_dict = self.db.new_transaction( conn, "do_fetch", [], [], self._fetch_event_rows, events_to_fetch @@ -804,7 +804,7 @@ class EventsWorkerStore(SQLBaseStore): desc="have_events_in_timeline", ) - return set(r["event_id"] for r in rows) + return {r["event_id"] for r in rows} @defer.inlineCallbacks def have_seen_events(self, event_ids): diff --git a/synapse/storage/data_stores/main/push_rule.py b/synapse/storage/data_stores/main/push_rule.py index e2673ae073..62ac88d9f2 100644 --- a/synapse/storage/data_stores/main/push_rule.py +++ b/synapse/storage/data_stores/main/push_rule.py @@ -276,21 +276,21 @@ class PushRulesWorkerStore( # We ignore app service users for now. This is so that we don't fill # up the `get_if_users_have_pushers` cache with AS entries that we # know don't have pushers, nor even read receipts. - local_users_in_room = set( + local_users_in_room = { u for u in users_in_room if self.hs.is_mine_id(u) and not self.get_if_app_services_interested_in_user(u) - ) + } # users in the room who have pushers need to get push rules run because # that's how their pushers work if_users_with_pushers = yield self.get_if_users_have_pushers( local_users_in_room, on_invalidate=cache_context.invalidate ) - user_ids = set( + user_ids = { uid for uid, have_pusher in if_users_with_pushers.items() if have_pusher - ) + } users_with_receipts = yield self.get_users_with_read_receipts_in_room( room_id, on_invalidate=cache_context.invalidate diff --git a/synapse/storage/data_stores/main/receipts.py b/synapse/storage/data_stores/main/receipts.py index 96e54d145e..0d932a0672 100644 --- a/synapse/storage/data_stores/main/receipts.py +++ b/synapse/storage/data_stores/main/receipts.py @@ -58,7 +58,7 @@ class ReceiptsWorkerStore(SQLBaseStore): @cachedInlineCallbacks() def get_users_with_read_receipts_in_room(self, room_id): receipts = yield self.get_receipts_for_room(room_id, "m.read") - return set(r["user_id"] for r in receipts) + return {r["user_id"] for r in receipts} @cached(num_args=2) def get_receipts_for_room(self, room_id, receipt_type): @@ -283,7 +283,7 @@ class ReceiptsWorkerStore(SQLBaseStore): args.append(limit) txn.execute(sql, args) - return list(r[0:5] + (json.loads(r[5]),) for r in txn) + return [r[0:5] + (json.loads(r[5]),) for r in txn] return self.db.runInteraction( "get_all_updated_receipts", get_all_updated_receipts_txn diff --git a/synapse/storage/data_stores/main/roommember.py b/synapse/storage/data_stores/main/roommember.py index d5ced05701..d5bd0cb5cf 100644 --- a/synapse/storage/data_stores/main/roommember.py +++ b/synapse/storage/data_stores/main/roommember.py @@ -465,7 +465,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): txn.execute(sql % (clause,), args) - return set(row[0] for row in txn) + return {row[0] for row in txn} return await self.db.runInteraction( "get_users_server_still_shares_room_with", @@ -826,7 +826,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): GROUP BY room_id, user_id; """ txn.execute(sql, (user_id,)) - return set(row[0] for row in txn if row[1] == 0) + return {row[0] for row in txn if row[1] == 0} return self.db.runInteraction( "get_forgotten_rooms_for_user", _get_forgotten_rooms_for_user_txn diff --git a/synapse/storage/data_stores/main/state.py b/synapse/storage/data_stores/main/state.py index 3d34103e67..3a3b9a8e72 100644 --- a/synapse/storage/data_stores/main/state.py +++ b/synapse/storage/data_stores/main/state.py @@ -321,7 +321,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): desc="get_referenced_state_groups", ) - return set(row["state_group"] for row in rows) + return {row["state_group"] for row in rows} class MainStateBackgroundUpdateStore(RoomMemberWorkerStore): @@ -367,7 +367,7 @@ class MainStateBackgroundUpdateStore(RoomMemberWorkerStore): """ txn.execute(sql, (last_room_id, batch_size)) - room_ids = list(row[0] for row in txn) + room_ids = [row[0] for row in txn] if not room_ids: return True, set() @@ -384,7 +384,7 @@ class MainStateBackgroundUpdateStore(RoomMemberWorkerStore): txn.execute(sql, (last_room_id, room_ids[-1], "%:" + self.server_name)) - joined_room_ids = set(row[0] for row in txn) + joined_room_ids = {row[0] for row in txn} left_rooms = set(room_ids) - joined_room_ids @@ -404,7 +404,7 @@ class MainStateBackgroundUpdateStore(RoomMemberWorkerStore): retcols=("state_key",), ) - potentially_left_users = set(row["state_key"] for row in rows) + potentially_left_users = {row["state_key"] for row in rows} # Now lets actually delete the rooms from the DB. self.db.simple_delete_many_txn( diff --git a/synapse/storage/data_stores/main/stream.py b/synapse/storage/data_stores/main/stream.py index 056b25b13a..ada5cce6c2 100644 --- a/synapse/storage/data_stores/main/stream.py +++ b/synapse/storage/data_stores/main/stream.py @@ -346,11 +346,11 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): from_key (str): The room_key portion of a StreamToken """ from_key = RoomStreamToken.parse_stream_token(from_key).stream - return set( + return { room_id for room_id in room_ids if self._events_stream_cache.has_entity_changed(room_id, from_key) - ) + } @defer.inlineCallbacks def get_room_events_stream_for_room( @@ -679,11 +679,11 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): ) events_before = yield self.get_events_as_list( - [e for e in results["before"]["event_ids"]], get_prev_content=True + list(results["before"]["event_ids"]), get_prev_content=True ) events_after = yield self.get_events_as_list( - [e for e in results["after"]["event_ids"]], get_prev_content=True + list(results["after"]["event_ids"]), get_prev_content=True ) return { diff --git a/synapse/storage/data_stores/main/user_erasure_store.py b/synapse/storage/data_stores/main/user_erasure_store.py index af8025bc17..ec6b8a4ffd 100644 --- a/synapse/storage/data_stores/main/user_erasure_store.py +++ b/synapse/storage/data_stores/main/user_erasure_store.py @@ -63,9 +63,9 @@ class UserErasureWorkerStore(SQLBaseStore): retcols=("user_id",), desc="are_users_erased", ) - erased_users = set(row["user_id"] for row in rows) + erased_users = {row["user_id"] for row in rows} - res = dict((u, u in erased_users) for u in user_ids) + res = {u: u in erased_users for u in user_ids} return res diff --git a/synapse/storage/data_stores/state/store.py b/synapse/storage/data_stores/state/store.py index c4ee9b7ccb..57a5267663 100644 --- a/synapse/storage/data_stores/state/store.py +++ b/synapse/storage/data_stores/state/store.py @@ -520,11 +520,11 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): retcols=("state_group",), ) - remaining_state_groups = set( + remaining_state_groups = { row["state_group"] for row in rows if row["state_group"] not in state_groups_to_delete - ) + } logger.info( "[purge] de-delta-ing %i remaining state groups", diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 6dcb5c04da..1953614401 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -554,8 +554,8 @@ class Database(object): Returns: A list of dicts where the key is the column header. """ - col_headers = list(intern(str(column[0])) for column in cursor.description) - results = list(dict(zip(col_headers, row)) for row in cursor) + col_headers = [intern(str(column[0])) for column in cursor.description] + results = [dict(zip(col_headers, row)) for row in cursor] return results def execute(self, desc, decoder, query, *args): diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index b950550f23..0f9ac1cf09 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -602,14 +602,14 @@ class EventsPersistenceStorage(object): event_id_to_state_group.update(event_to_groups) # State groups of old_latest_event_ids - old_state_groups = set( + old_state_groups = { event_id_to_state_group[evid] for evid in old_latest_event_ids - ) + } # State groups of new_latest_event_ids - new_state_groups = set( + new_state_groups = { event_id_to_state_group[evid] for evid in new_latest_event_ids - ) + } # If they old and new groups are the same then we don't need to do # anything. diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index c285ef52a0..fc69c32a0a 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -345,9 +345,9 @@ def _upgrade_existing_database( "Could not open delta dir for version %d: %s" % (v, directory) ) - duplicates = set( + duplicates = { file_name for file_name, count in file_name_counter.items() if count > 1 - ) + } if duplicates: # We don't support using the same file name in the same delta version. raise PrepareDatabaseException( @@ -454,7 +454,7 @@ def _apply_module_schema_files(cur, database_engine, modname, names_and_streams) ), (modname,), ) - applied_deltas = set(d for d, in cur) + applied_deltas = {d for d, in cur} for (name, stream) in names_and_streams: if name in applied_deltas: continue diff --git a/synapse/util/frozenutils.py b/synapse/util/frozenutils.py index 635b897d6c..f2ccd5e7c6 100644 --- a/synapse/util/frozenutils.py +++ b/synapse/util/frozenutils.py @@ -30,7 +30,7 @@ def freeze(o): return o try: - return tuple([freeze(i) for i in o]) + return tuple(freeze(i) for i in o) except TypeError: pass diff --git a/synapse/visibility.py b/synapse/visibility.py index d0abd8f04f..e60d9756b7 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py @@ -75,7 +75,7 @@ def filter_events_for_client( """ # Filter out events that have been soft failed so that we don't relay them # to clients. - events = list(e for e in events if not e.internal_metadata.is_soft_failed()) + events = [e for e in events if not e.internal_metadata.is_soft_failed()] types = ((EventTypes.RoomHistoryVisibility, ""), (EventTypes.Member, user_id)) event_id_to_state = yield storage.state.get_state_for_events( @@ -97,7 +97,7 @@ def filter_events_for_client( erased_senders = yield storage.main.are_users_erased((e.sender for e in events)) if apply_retention_policies: - room_ids = set(e.room_id for e in events) + room_ids = {e.room_id for e in events} retention_policies = {} for room_id in room_ids: diff --git a/tests/config/test_generate.py b/tests/config/test_generate.py index 2684e662de..463855ecc8 100644 --- a/tests/config/test_generate.py +++ b/tests/config/test_generate.py @@ -48,7 +48,7 @@ class ConfigGenerationTestCase(unittest.TestCase): ) self.assertSetEqual( - set(["homeserver.yaml", "lemurs.win.log.config", "lemurs.win.signing.key"]), + {"homeserver.yaml", "lemurs.win.log.config", "lemurs.win.signing.key"}, set(os.listdir(self.dir)), ) diff --git a/tests/federation/test_federation_server.py b/tests/federation/test_federation_server.py index e7d8699040..296dc887be 100644 --- a/tests/federation/test_federation_server.py +++ b/tests/federation/test_federation_server.py @@ -83,7 +83,7 @@ class StateQueryTests(unittest.FederatingHomeserverTestCase): ) ) - self.assertEqual(members, set(["@user:other.example.com", u1])) + self.assertEqual(members, {"@user:other.example.com", u1}) self.assertEqual(len(channel.json_body["pdus"]), 6) def test_needs_to_be_in_room(self): diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index c171038df8..64915bafcd 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -338,7 +338,7 @@ class PresenceTimeoutTestCase(unittest.TestCase): ) new_state = handle_timeout( - state, is_mine=True, syncing_user_ids=set([user_id]), now=now + state, is_mine=True, syncing_user_ids={user_id}, now=now ) self.assertIsNotNone(new_state) @@ -579,7 +579,7 @@ class PresenceJoinTestCase(unittest.HomeserverTestCase): ) self.assertEqual(expected_state.state, PresenceState.ONLINE) self.federation_sender.send_presence_to_destinations.assert_called_once_with( - destinations=set(("server2", "server3")), states=[expected_state] + destinations={"server2", "server3"}, states=[expected_state] ) def _add_new_user(self, room_id, user_id): diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index 140cc0a3c2..07b204666e 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -129,12 +129,12 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase): hs.get_auth().check_user_in_room = check_user_in_room def get_joined_hosts_for_room(room_id): - return set(member.domain for member in self.room_members) + return {member.domain for member in self.room_members} self.datastore.get_joined_hosts_for_room = get_joined_hosts_for_room def get_current_users_in_room(room_id): - return set(str(u) for u in self.room_members) + return {str(u) for u in self.room_members} hs.get_state_handler().get_current_users_in_room = get_current_users_in_room @@ -257,7 +257,7 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase): member = RoomMember(ROOM_ID, U_APPLE.to_string()) self.handler._member_typing_until[member] = 1002000 - self.handler._room_typing[ROOM_ID] = set([U_APPLE.to_string()]) + self.handler._room_typing[ROOM_ID] = {U_APPLE.to_string()} self.assertEquals(self.event_source.get_current_key(), 0) diff --git a/tests/handlers/test_user_directory.py b/tests/handlers/test_user_directory.py index 0a4765fff4..7b92bdbc47 100644 --- a/tests/handlers/test_user_directory.py +++ b/tests/handlers/test_user_directory.py @@ -114,7 +114,7 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase): public_users = self.get_users_in_public_rooms() self.assertEqual( - self._compress_shared(shares_private), set([(u1, u2, room), (u2, u1, room)]) + self._compress_shared(shares_private), {(u1, u2, room), (u2, u1, room)} ) self.assertEqual(public_users, []) @@ -169,7 +169,7 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase): public_users = self.get_users_in_public_rooms() self.assertEqual( - self._compress_shared(shares_private), set([(u1, u2, room), (u2, u1, room)]) + self._compress_shared(shares_private), {(u1, u2, room), (u2, u1, room)} ) self.assertEqual(public_users, []) @@ -226,7 +226,7 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase): public_users = self.get_users_in_public_rooms() self.assertEqual( - self._compress_shared(shares_private), set([(u1, u2, room), (u2, u1, room)]) + self._compress_shared(shares_private), {(u1, u2, room), (u2, u1, room)} ) self.assertEqual(public_users, []) @@ -358,12 +358,12 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase): public_users = self.get_users_in_public_rooms() # User 1 and User 2 are in the same public room - self.assertEqual(set(public_users), set([(u1, room), (u2, room)])) + self.assertEqual(set(public_users), {(u1, room), (u2, room)}) # User 1 and User 3 share private rooms self.assertEqual( self._compress_shared(shares_private), - set([(u1, u3, private_room), (u3, u1, private_room)]), + {(u1, u3, private_room), (u3, u1, private_room)}, ) def test_initial_share_all_users(self): @@ -398,7 +398,7 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase): # No users share rooms self.assertEqual(public_users, []) - self.assertEqual(self._compress_shared(shares_private), set([])) + self.assertEqual(self._compress_shared(shares_private), set()) # Despite not sharing a room, search_all_users means we get a search # result. diff --git a/tests/push/test_email.py b/tests/push/test_email.py index 80187406bc..83032cc9ea 100644 --- a/tests/push/test_email.py +++ b/tests/push/test_email.py @@ -163,7 +163,7 @@ class EmailPusherTests(HomeserverTestCase): # Get the stream ordering before it gets sent pushers = self.get_success( - self.hs.get_datastore().get_pushers_by(dict(user_name=self.user_id)) + self.hs.get_datastore().get_pushers_by({"user_name": self.user_id}) ) pushers = list(pushers) self.assertEqual(len(pushers), 1) @@ -174,7 +174,7 @@ class EmailPusherTests(HomeserverTestCase): # It hasn't succeeded yet, so the stream ordering shouldn't have moved pushers = self.get_success( - self.hs.get_datastore().get_pushers_by(dict(user_name=self.user_id)) + self.hs.get_datastore().get_pushers_by({"user_name": self.user_id}) ) pushers = list(pushers) self.assertEqual(len(pushers), 1) @@ -192,7 +192,7 @@ class EmailPusherTests(HomeserverTestCase): # The stream ordering has increased pushers = self.get_success( - self.hs.get_datastore().get_pushers_by(dict(user_name=self.user_id)) + self.hs.get_datastore().get_pushers_by({"user_name": self.user_id}) ) pushers = list(pushers) self.assertEqual(len(pushers), 1) diff --git a/tests/push/test_http.py b/tests/push/test_http.py index fe3441f081..baf9c785f4 100644 --- a/tests/push/test_http.py +++ b/tests/push/test_http.py @@ -102,7 +102,7 @@ class HTTPPusherTests(HomeserverTestCase): # Get the stream ordering before it gets sent pushers = self.get_success( - self.hs.get_datastore().get_pushers_by(dict(user_name=user_id)) + self.hs.get_datastore().get_pushers_by({"user_name": user_id}) ) pushers = list(pushers) self.assertEqual(len(pushers), 1) @@ -113,7 +113,7 @@ class HTTPPusherTests(HomeserverTestCase): # It hasn't succeeded yet, so the stream ordering shouldn't have moved pushers = self.get_success( - self.hs.get_datastore().get_pushers_by(dict(user_name=user_id)) + self.hs.get_datastore().get_pushers_by({"user_name": user_id}) ) pushers = list(pushers) self.assertEqual(len(pushers), 1) @@ -132,7 +132,7 @@ class HTTPPusherTests(HomeserverTestCase): # The stream ordering has increased pushers = self.get_success( - self.hs.get_datastore().get_pushers_by(dict(user_name=user_id)) + self.hs.get_datastore().get_pushers_by({"user_name": user_id}) ) pushers = list(pushers) self.assertEqual(len(pushers), 1) @@ -152,7 +152,7 @@ class HTTPPusherTests(HomeserverTestCase): # The stream ordering has increased, again pushers = self.get_success( - self.hs.get_datastore().get_pushers_by(dict(user_name=user_id)) + self.hs.get_datastore().get_pushers_by({"user_name": user_id}) ) pushers = list(pushers) self.assertEqual(len(pushers), 1) diff --git a/tests/rest/client/v2_alpha/test_sync.py b/tests/rest/client/v2_alpha/test_sync.py index 9c13a13786..fa3a3ec1bd 100644 --- a/tests/rest/client/v2_alpha/test_sync.py +++ b/tests/rest/client/v2_alpha/test_sync.py @@ -40,16 +40,14 @@ class FilterTestCase(unittest.HomeserverTestCase): self.assertEqual(channel.code, 200) self.assertTrue( - set( - [ - "next_batch", - "rooms", - "presence", - "account_data", - "to_device", - "device_lists", - ] - ).issubset(set(channel.json_body.keys())) + { + "next_batch", + "rooms", + "presence", + "account_data", + "to_device", + "device_lists", + }.issubset(set(channel.json_body.keys())) ) def test_sync_presence_disabled(self): @@ -63,9 +61,13 @@ class FilterTestCase(unittest.HomeserverTestCase): self.assertEqual(channel.code, 200) self.assertTrue( - set( - ["next_batch", "rooms", "account_data", "to_device", "device_lists"] - ).issubset(set(channel.json_body.keys())) + { + "next_batch", + "rooms", + "account_data", + "to_device", + "device_lists", + }.issubset(set(channel.json_body.keys())) ) diff --git a/tests/storage/test__base.py b/tests/storage/test__base.py index d491ea2924..e37260a820 100644 --- a/tests/storage/test__base.py +++ b/tests/storage/test__base.py @@ -373,7 +373,7 @@ class UpsertManyTests(unittest.HomeserverTestCase): ) self.assertEqual( set(self._dump_to_tuple(res)), - set([(1, "user1", "hello"), (2, "user2", "there")]), + {(1, "user1", "hello"), (2, "user2", "there")}, ) # Update only user2 @@ -400,5 +400,5 @@ class UpsertManyTests(unittest.HomeserverTestCase): ) self.assertEqual( set(self._dump_to_tuple(res)), - set([(1, "user1", "hello"), (2, "user2", "bleb")]), + {(1, "user1", "hello"), (2, "user2", "bleb")}, ) diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index fd52512696..31710949a8 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -69,14 +69,14 @@ class ApplicationServiceStoreTestCase(unittest.TestCase): pass def _add_appservice(self, as_token, id, url, hs_token, sender): - as_yaml = dict( - url=url, - as_token=as_token, - hs_token=hs_token, - id=id, - sender_localpart=sender, - namespaces={}, - ) + as_yaml = { + "url": url, + "as_token": as_token, + "hs_token": hs_token, + "id": id, + "sender_localpart": sender, + "namespaces": {}, + } # use the token as the filename with open(as_token, "w") as outfile: outfile.write(yaml.dump(as_yaml)) @@ -135,14 +135,14 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): ) def _add_service(self, url, as_token, id): - as_yaml = dict( - url=url, - as_token=as_token, - hs_token="something", - id=id, - sender_localpart="a_sender", - namespaces={}, - ) + as_yaml = { + "url": url, + "as_token": as_token, + "hs_token": "something", + "id": id, + "sender_localpart": "a_sender", + "namespaces": {}, + } # use the token as the filename with open(as_token, "w") as outfile: outfile.write(yaml.dump(as_yaml)) @@ -384,8 +384,8 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): ) self.assertEquals(2, len(services)) self.assertEquals( - set([self.as_list[2]["id"], self.as_list[0]["id"]]), - set([services[0].id, services[1].id]), + {self.as_list[2]["id"], self.as_list[0]["id"]}, + {services[0].id, services[1].id}, ) diff --git a/tests/storage/test_cleanup_extrems.py b/tests/storage/test_cleanup_extrems.py index 029ac26454..0e04b2cf92 100644 --- a/tests/storage/test_cleanup_extrems.py +++ b/tests/storage/test_cleanup_extrems.py @@ -134,7 +134,7 @@ class CleanupExtremBackgroundUpdateStoreTestCase(HomeserverTestCase): latest_event_ids = self.get_success( self.store.get_latest_event_ids_in_room(self.room_id) ) - self.assertEqual(set(latest_event_ids), set((event_id_a, event_id_b))) + self.assertEqual(set(latest_event_ids), {event_id_a, event_id_b}) # Run the background update and check it did the right thing self.run_background_update() @@ -172,7 +172,7 @@ class CleanupExtremBackgroundUpdateStoreTestCase(HomeserverTestCase): latest_event_ids = self.get_success( self.store.get_latest_event_ids_in_room(self.room_id) ) - self.assertEqual(set(latest_event_ids), set((event_id_a, event_id_b))) + self.assertEqual(set(latest_event_ids), {event_id_a, event_id_b}) # Run the background update and check it did the right thing self.run_background_update() @@ -227,9 +227,7 @@ class CleanupExtremBackgroundUpdateStoreTestCase(HomeserverTestCase): latest_event_ids = self.get_success( self.store.get_latest_event_ids_in_room(self.room_id) ) - self.assertEqual( - set(latest_event_ids), set((event_id_a, event_id_b, event_id_c)) - ) + self.assertEqual(set(latest_event_ids), {event_id_a, event_id_b, event_id_c}) # Run the background update and check it did the right thing self.run_background_update() @@ -237,7 +235,7 @@ class CleanupExtremBackgroundUpdateStoreTestCase(HomeserverTestCase): latest_event_ids = self.get_success( self.store.get_latest_event_ids_in_room(self.room_id) ) - self.assertEqual(set(latest_event_ids), set([event_id_b, event_id_c])) + self.assertEqual(set(latest_event_ids), {event_id_b, event_id_c}) class CleanupExtremDummyEventsTestCase(HomeserverTestCase): diff --git a/tests/storage/test_event_metrics.py b/tests/storage/test_event_metrics.py index f26ff57a18..a7b7fd36d3 100644 --- a/tests/storage/test_event_metrics.py +++ b/tests/storage/test_event_metrics.py @@ -59,24 +59,22 @@ class ExtremStatisticsTestCase(HomeserverTestCase): ) ) - expected = set( - [ - b'synapse_forward_extremities_bucket{le="1.0"} 0.0', - b'synapse_forward_extremities_bucket{le="2.0"} 2.0', - b'synapse_forward_extremities_bucket{le="3.0"} 2.0', - b'synapse_forward_extremities_bucket{le="5.0"} 2.0', - b'synapse_forward_extremities_bucket{le="7.0"} 3.0', - b'synapse_forward_extremities_bucket{le="10.0"} 3.0', - b'synapse_forward_extremities_bucket{le="15.0"} 3.0', - b'synapse_forward_extremities_bucket{le="20.0"} 3.0', - b'synapse_forward_extremities_bucket{le="50.0"} 3.0', - b'synapse_forward_extremities_bucket{le="100.0"} 3.0', - b'synapse_forward_extremities_bucket{le="200.0"} 3.0', - b'synapse_forward_extremities_bucket{le="500.0"} 3.0', - b'synapse_forward_extremities_bucket{le="+Inf"} 3.0', - b"synapse_forward_extremities_count 3.0", - b"synapse_forward_extremities_sum 10.0", - ] - ) + expected = { + b'synapse_forward_extremities_bucket{le="1.0"} 0.0', + b'synapse_forward_extremities_bucket{le="2.0"} 2.0', + b'synapse_forward_extremities_bucket{le="3.0"} 2.0', + b'synapse_forward_extremities_bucket{le="5.0"} 2.0', + b'synapse_forward_extremities_bucket{le="7.0"} 3.0', + b'synapse_forward_extremities_bucket{le="10.0"} 3.0', + b'synapse_forward_extremities_bucket{le="15.0"} 3.0', + b'synapse_forward_extremities_bucket{le="20.0"} 3.0', + b'synapse_forward_extremities_bucket{le="50.0"} 3.0', + b'synapse_forward_extremities_bucket{le="100.0"} 3.0', + b'synapse_forward_extremities_bucket{le="200.0"} 3.0', + b'synapse_forward_extremities_bucket{le="500.0"} 3.0', + b'synapse_forward_extremities_bucket{le="+Inf"} 3.0', + b"synapse_forward_extremities_count 3.0", + b"synapse_forward_extremities_sum 10.0", + } self.assertEqual(items, expected) diff --git a/tests/storage/test_state.py b/tests/storage/test_state.py index 04d58fbf24..0b88308ff4 100644 --- a/tests/storage/test_state.py +++ b/tests/storage/test_state.py @@ -394,7 +394,7 @@ class StateStoreTestCase(tests.unittest.TestCase): ) = self.state_datastore._state_group_cache.get(group) self.assertEqual(is_all, False) - self.assertEqual(known_absent, set([(e1.type, e1.state_key)])) + self.assertEqual(known_absent, {(e1.type, e1.state_key)}) self.assertDictEqual(state_dict_ids, {(e1.type, e1.state_key): e1.event_id}) ############################################ diff --git a/tests/test_state.py b/tests/test_state.py index d1578fe581..66f22f6813 100644 --- a/tests/test_state.py +++ b/tests/test_state.py @@ -254,9 +254,7 @@ class StateTestCase(unittest.TestCase): ctx_d = context_store["D"] prev_state_ids = yield ctx_d.get_prev_state_ids() - self.assertSetEqual( - {"START", "A", "C"}, {e_id for e_id in prev_state_ids.values()} - ) + self.assertSetEqual({"START", "A", "C"}, set(prev_state_ids.values())) self.assertEqual(ctx_c.state_group, ctx_d.state_group_before_event) self.assertEqual(ctx_d.state_group_before_event, ctx_d.state_group) @@ -313,9 +311,7 @@ class StateTestCase(unittest.TestCase): ctx_e = context_store["E"] prev_state_ids = yield ctx_e.get_prev_state_ids() - self.assertSetEqual( - {"START", "A", "B", "C"}, {e for e in prev_state_ids.values()} - ) + self.assertSetEqual({"START", "A", "B", "C"}, set(prev_state_ids.values())) self.assertEqual(ctx_c.state_group, ctx_e.state_group_before_event) self.assertEqual(ctx_e.state_group_before_event, ctx_e.state_group) @@ -388,9 +384,7 @@ class StateTestCase(unittest.TestCase): ctx_d = context_store["D"] prev_state_ids = yield ctx_d.get_prev_state_ids() - self.assertSetEqual( - {"A1", "A2", "A3", "A5", "B"}, {e for e in prev_state_ids.values()} - ) + self.assertSetEqual({"A1", "A2", "A3", "A5", "B"}, set(prev_state_ids.values())) self.assertEqual(ctx_b.state_group, ctx_d.state_group_before_event) self.assertEqual(ctx_d.state_group_before_event, ctx_d.state_group) @@ -482,7 +476,7 @@ class StateTestCase(unittest.TestCase): current_state_ids = yield context.get_current_state_ids() self.assertEqual( - set([e.event_id for e in old_state]), set(current_state_ids.values()) + {e.event_id for e in old_state}, set(current_state_ids.values()) ) self.assertEqual(group_name, context.state_group) @@ -513,9 +507,7 @@ class StateTestCase(unittest.TestCase): prev_state_ids = yield context.get_prev_state_ids() - self.assertEqual( - set([e.event_id for e in old_state]), set(prev_state_ids.values()) - ) + self.assertEqual({e.event_id for e in old_state}, set(prev_state_ids.values())) self.assertIsNotNone(context.state_group) diff --git a/tests/util/test_stream_change_cache.py b/tests/util/test_stream_change_cache.py index f2be63706b..72a9de5370 100644 --- a/tests/util/test_stream_change_cache.py +++ b/tests/util/test_stream_change_cache.py @@ -67,7 +67,7 @@ class StreamChangeCacheTests(unittest.TestCase): # If we update an existing entity, it keeps the two existing entities cache.entity_has_changed("bar@baz.net", 5) self.assertEqual( - set(["bar@baz.net", "user@elsewhere.org"]), set(cache._entity_to_key) + {"bar@baz.net", "user@elsewhere.org"}, set(cache._entity_to_key) ) def test_get_all_entities_changed(self): @@ -137,7 +137,7 @@ class StreamChangeCacheTests(unittest.TestCase): cache.get_entities_changed( ["user@foo.com", "bar@baz.net", "user@elsewhere.org"], stream_pos=2 ), - set(["bar@baz.net", "user@elsewhere.org"]), + {"bar@baz.net", "user@elsewhere.org"}, ) # Query all the entries mid-way through the stream, but include one @@ -153,7 +153,7 @@ class StreamChangeCacheTests(unittest.TestCase): ], stream_pos=2, ), - set(["bar@baz.net", "user@elsewhere.org"]), + {"bar@baz.net", "user@elsewhere.org"}, ) # Query all the entries, but before the first known point. We will get @@ -168,21 +168,13 @@ class StreamChangeCacheTests(unittest.TestCase): ], stream_pos=0, ), - set( - [ - "user@foo.com", - "bar@baz.net", - "user@elsewhere.org", - "not@here.website", - ] - ), + {"user@foo.com", "bar@baz.net", "user@elsewhere.org", "not@here.website"}, ) # Query a subset of the entries mid-way through the stream. We should # only get back the subset. self.assertEqual( - cache.get_entities_changed(["bar@baz.net"], stream_pos=2), - set(["bar@baz.net"]), + cache.get_entities_changed(["bar@baz.net"], stream_pos=2), {"bar@baz.net"}, ) def test_max_pos(self): diff --git a/tox.ini b/tox.ini index b9132a3177..b715ea0bff 100644 --- a/tox.ini +++ b/tox.ini @@ -123,6 +123,7 @@ skip_install = True basepython = python3.6 deps = flake8 + flake8-comprehensions black==19.10b0 # We pin so that our tests don't start failing on new releases of black. commands = python -m black --check --diff . -- cgit 1.4.1 From bbf8886a05be6a929556d6f09a1b6ce053a3c403 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 25 Feb 2020 16:56:55 +0000 Subject: Merge worker apps into one. (#6964) --- changelog.d/6964.misc | 1 + synapse/app/appservice.py | 156 +---- synapse/app/client_reader.py | 190 +----- synapse/app/event_creator.py | 186 +----- synapse/app/federation_reader.py | 172 +----- synapse/app/federation_sender.py | 303 +-------- synapse/app/frontend_proxy.py | 236 +------ synapse/app/generic_worker.py | 917 ++++++++++++++++++++++++++++ synapse/app/media_repository.py | 157 +---- synapse/app/pusher.py | 209 +------ synapse/app/synchrotron.py | 449 +------------- synapse/app/user_dir.py | 211 +------ synapse/replication/slave/storage/events.py | 20 + synapse/storage/data_stores/main/pusher.py | 156 ++--- tests/app/test_frontend_proxy.py | 12 +- tests/app/test_openid_listener.py | 4 +- 16 files changed, 1052 insertions(+), 2327 deletions(-) create mode 100644 changelog.d/6964.misc create mode 100644 synapse/app/generic_worker.py (limited to 'synapse/app/pusher.py') diff --git a/changelog.d/6964.misc b/changelog.d/6964.misc new file mode 100644 index 0000000000..ec5c004bbe --- /dev/null +++ b/changelog.d/6964.misc @@ -0,0 +1 @@ +Merge worker apps together. diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py index 2217d4a4fb..add43147b3 100644 --- a/synapse/app/appservice.py +++ b/synapse/app/appservice.py @@ -13,161 +13,11 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import logging -import sys - -from twisted.internet import defer, reactor -from twisted.web.resource import NoResource - -import synapse -from synapse import events -from synapse.app import _base -from synapse.config._base import ConfigError -from synapse.config.homeserver import HomeServerConfig -from synapse.config.logger import setup_logging -from synapse.http.site import SynapseSite -from synapse.logging.context import LoggingContext, run_in_background -from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy -from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore -from synapse.replication.slave.storage.directory import DirectoryStore -from synapse.replication.slave.storage.events import SlavedEventStore -from synapse.replication.slave.storage.registration import SlavedRegistrationStore -from synapse.replication.tcp.client import ReplicationClientHandler -from synapse.server import HomeServer -from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.manhole import manhole -from synapse.util.versionstring import get_version_string - -logger = logging.getLogger("synapse.app.appservice") - - -class AppserviceSlaveStore( - DirectoryStore, - SlavedEventStore, - SlavedApplicationServiceStore, - SlavedRegistrationStore, -): - pass - - -class AppserviceServer(HomeServer): - DATASTORE_CLASS = AppserviceSlaveStore - - def _listen_http(self, listener_config): - port = listener_config["port"] - bind_addresses = listener_config["bind_addresses"] - site_tag = listener_config.get("tag", port) - resources = {} - for res in listener_config["resources"]: - for name in res["names"]: - if name == "metrics": - resources[METRICS_PREFIX] = MetricsResource(RegistryProxy) - - root_resource = create_resource_tree(resources, NoResource()) - - _base.listen_tcp( - bind_addresses, - port, - SynapseSite( - "synapse.access.http.%s" % (site_tag,), - site_tag, - listener_config, - root_resource, - self.version_string, - ), - ) - - logger.info("Synapse appservice now listening on port %d", port) - - def start_listening(self, listeners): - for listener in listeners: - if listener["type"] == "http": - self._listen_http(listener) - elif listener["type"] == "manhole": - _base.listen_tcp( - listener["bind_addresses"], - listener["port"], - manhole( - username="matrix", password="rabbithole", globals={"hs": self} - ), - ) - elif listener["type"] == "metrics": - if not self.get_config().enable_metrics: - logger.warning( - ( - "Metrics listener configured, but " - "enable_metrics is not True!" - ) - ) - else: - _base.listen_metrics(listener["bind_addresses"], listener["port"]) - else: - logger.warning("Unrecognized listener type: %s", listener["type"]) - - self.get_tcp_replication().start_replication(self) - - def build_tcp_replication(self): - return ASReplicationHandler(self) +import sys -class ASReplicationHandler(ReplicationClientHandler): - def __init__(self, hs): - super(ASReplicationHandler, self).__init__(hs.get_datastore()) - self.appservice_handler = hs.get_application_service_handler() - - async def on_rdata(self, stream_name, token, rows): - await super(ASReplicationHandler, self).on_rdata(stream_name, token, rows) - - if stream_name == "events": - max_stream_id = self.store.get_room_max_stream_ordering() - run_in_background(self._notify_app_services, max_stream_id) - - @defer.inlineCallbacks - def _notify_app_services(self, room_stream_id): - try: - yield self.appservice_handler.notify_interested_services(room_stream_id) - except Exception: - logger.exception("Error notifying application services of event") - - -def start(config_options): - try: - config = HomeServerConfig.load_config("Synapse appservice", config_options) - except ConfigError as e: - sys.stderr.write("\n" + str(e) + "\n") - sys.exit(1) - - assert config.worker_app == "synapse.app.appservice" - - events.USE_FROZEN_DICTS = config.use_frozen_dicts - - if config.notify_appservices: - sys.stderr.write( - "\nThe appservices must be disabled in the main synapse process" - "\nbefore they can be run in a separate worker." - "\nPlease add ``notify_appservices: false`` to the main config" - "\n" - ) - sys.exit(1) - - # Force the pushers to start since they will be disabled in the main config - config.notify_appservices = True - - ps = AppserviceServer( - config.server_name, - config=config, - version_string="Synapse/" + get_version_string(synapse), - ) - - setup_logging(ps, config, use_worker_options=True) - - ps.setup() - reactor.addSystemEventTrigger( - "before", "startup", _base.start, ps, config.worker_listeners - ) - - _base.start_worker_reactor("synapse-appservice", config) - +from synapse.app.generic_worker import start +from synapse.util.logcontext import LoggingContext if __name__ == "__main__": with LoggingContext("main"): diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py index 7fa91a3b11..add43147b3 100644 --- a/synapse/app/client_reader.py +++ b/synapse/app/client_reader.py @@ -13,195 +13,11 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import logging -import sys - -from twisted.internet import reactor -from twisted.web.resource import NoResource - -import synapse -from synapse import events -from synapse.app import _base -from synapse.config._base import ConfigError -from synapse.config.homeserver import HomeServerConfig -from synapse.config.logger import setup_logging -from synapse.http.server import JsonResource -from synapse.http.site import SynapseSite -from synapse.logging.context import LoggingContext -from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy -from synapse.replication.slave.storage._base import BaseSlavedStore -from synapse.replication.slave.storage.account_data import SlavedAccountDataStore -from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore -from synapse.replication.slave.storage.client_ips import SlavedClientIpStore -from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore -from synapse.replication.slave.storage.devices import SlavedDeviceStore -from synapse.replication.slave.storage.directory import DirectoryStore -from synapse.replication.slave.storage.events import SlavedEventStore -from synapse.replication.slave.storage.groups import SlavedGroupServerStore -from synapse.replication.slave.storage.keys import SlavedKeyStore -from synapse.replication.slave.storage.profile import SlavedProfileStore -from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore -from synapse.replication.slave.storage.receipts import SlavedReceiptsStore -from synapse.replication.slave.storage.registration import SlavedRegistrationStore -from synapse.replication.slave.storage.room import RoomStore -from synapse.replication.slave.storage.transactions import SlavedTransactionStore -from synapse.replication.tcp.client import ReplicationClientHandler -from synapse.rest.client.v1.login import LoginRestServlet -from synapse.rest.client.v1.push_rule import PushRuleRestServlet -from synapse.rest.client.v1.room import ( - JoinedRoomMemberListRestServlet, - PublicRoomListRestServlet, - RoomEventContextServlet, - RoomMemberListRestServlet, - RoomMessageListRestServlet, - RoomStateRestServlet, -) -from synapse.rest.client.v1.voip import VoipRestServlet -from synapse.rest.client.v2_alpha import groups -from synapse.rest.client.v2_alpha.account import ThreepidRestServlet -from synapse.rest.client.v2_alpha.keys import KeyChangesServlet, KeyQueryServlet -from synapse.rest.client.v2_alpha.register import RegisterRestServlet -from synapse.rest.client.versions import VersionsRestServlet -from synapse.server import HomeServer -from synapse.storage.data_stores.main.monthly_active_users import ( - MonthlyActiveUsersWorkerStore, -) -from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.manhole import manhole -from synapse.util.versionstring import get_version_string - -logger = logging.getLogger("synapse.app.client_reader") - - -class ClientReaderSlavedStore( - SlavedDeviceInboxStore, - SlavedDeviceStore, - SlavedReceiptsStore, - SlavedPushRuleStore, - SlavedGroupServerStore, - SlavedAccountDataStore, - SlavedEventStore, - SlavedKeyStore, - RoomStore, - DirectoryStore, - SlavedApplicationServiceStore, - SlavedRegistrationStore, - SlavedTransactionStore, - SlavedProfileStore, - SlavedClientIpStore, - MonthlyActiveUsersWorkerStore, - BaseSlavedStore, -): - pass - - -class ClientReaderServer(HomeServer): - DATASTORE_CLASS = ClientReaderSlavedStore - - def _listen_http(self, listener_config): - port = listener_config["port"] - bind_addresses = listener_config["bind_addresses"] - site_tag = listener_config.get("tag", port) - resources = {} - for res in listener_config["resources"]: - for name in res["names"]: - if name == "metrics": - resources[METRICS_PREFIX] = MetricsResource(RegistryProxy) - elif name == "client": - resource = JsonResource(self, canonical_json=False) - - PublicRoomListRestServlet(self).register(resource) - RoomMemberListRestServlet(self).register(resource) - JoinedRoomMemberListRestServlet(self).register(resource) - RoomStateRestServlet(self).register(resource) - RoomEventContextServlet(self).register(resource) - RoomMessageListRestServlet(self).register(resource) - RegisterRestServlet(self).register(resource) - LoginRestServlet(self).register(resource) - ThreepidRestServlet(self).register(resource) - KeyQueryServlet(self).register(resource) - KeyChangesServlet(self).register(resource) - VoipRestServlet(self).register(resource) - PushRuleRestServlet(self).register(resource) - VersionsRestServlet(self).register(resource) - - groups.register_servlets(self, resource) - - resources.update({"/_matrix/client": resource}) - - root_resource = create_resource_tree(resources, NoResource()) - _base.listen_tcp( - bind_addresses, - port, - SynapseSite( - "synapse.access.http.%s" % (site_tag,), - site_tag, - listener_config, - root_resource, - self.version_string, - ), - ) - - logger.info("Synapse client reader now listening on port %d", port) - - def start_listening(self, listeners): - for listener in listeners: - if listener["type"] == "http": - self._listen_http(listener) - elif listener["type"] == "manhole": - _base.listen_tcp( - listener["bind_addresses"], - listener["port"], - manhole( - username="matrix", password="rabbithole", globals={"hs": self} - ), - ) - elif listener["type"] == "metrics": - if not self.get_config().enable_metrics: - logger.warning( - ( - "Metrics listener configured, but " - "enable_metrics is not True!" - ) - ) - else: - _base.listen_metrics(listener["bind_addresses"], listener["port"]) - else: - logger.warning("Unrecognized listener type: %s", listener["type"]) - - self.get_tcp_replication().start_replication(self) - - def build_tcp_replication(self): - return ReplicationClientHandler(self.get_datastore()) - - -def start(config_options): - try: - config = HomeServerConfig.load_config("Synapse client reader", config_options) - except ConfigError as e: - sys.stderr.write("\n" + str(e) + "\n") - sys.exit(1) - - assert config.worker_app == "synapse.app.client_reader" - - events.USE_FROZEN_DICTS = config.use_frozen_dicts - - ss = ClientReaderServer( - config.server_name, - config=config, - version_string="Synapse/" + get_version_string(synapse), - ) - - setup_logging(ss, config, use_worker_options=True) - - ss.setup() - reactor.addSystemEventTrigger( - "before", "startup", _base.start, ss, config.worker_listeners - ) - - _base.start_worker_reactor("synapse-client-reader", config) +import sys +from synapse.app.generic_worker import start +from synapse.util.logcontext import LoggingContext if __name__ == "__main__": with LoggingContext("main"): diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py index 58e5b354f6..e9c098c4e7 100644 --- a/synapse/app/event_creator.py +++ b/synapse/app/event_creator.py @@ -13,191 +13,11 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import logging -import sys - -from twisted.internet import reactor -from twisted.web.resource import NoResource - -import synapse -from synapse import events -from synapse.app import _base -from synapse.config._base import ConfigError -from synapse.config.homeserver import HomeServerConfig -from synapse.config.logger import setup_logging -from synapse.http.server import JsonResource -from synapse.http.site import SynapseSite -from synapse.logging.context import LoggingContext -from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy -from synapse.replication.slave.storage._base import BaseSlavedStore -from synapse.replication.slave.storage.account_data import SlavedAccountDataStore -from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore -from synapse.replication.slave.storage.client_ips import SlavedClientIpStore -from synapse.replication.slave.storage.devices import SlavedDeviceStore -from synapse.replication.slave.storage.directory import DirectoryStore -from synapse.replication.slave.storage.events import SlavedEventStore -from synapse.replication.slave.storage.profile import SlavedProfileStore -from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore -from synapse.replication.slave.storage.pushers import SlavedPusherStore -from synapse.replication.slave.storage.receipts import SlavedReceiptsStore -from synapse.replication.slave.storage.registration import SlavedRegistrationStore -from synapse.replication.slave.storage.room import RoomStore -from synapse.replication.slave.storage.transactions import SlavedTransactionStore -from synapse.replication.tcp.client import ReplicationClientHandler -from synapse.rest.client.v1.profile import ( - ProfileAvatarURLRestServlet, - ProfileDisplaynameRestServlet, - ProfileRestServlet, -) -from synapse.rest.client.v1.room import ( - JoinRoomAliasServlet, - RoomMembershipRestServlet, - RoomSendEventRestServlet, - RoomStateEventRestServlet, -) -from synapse.server import HomeServer -from synapse.storage.data_stores.main.monthly_active_users import ( - MonthlyActiveUsersWorkerStore, -) -from synapse.storage.data_stores.main.user_directory import UserDirectoryStore -from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.manhole import manhole -from synapse.util.versionstring import get_version_string - -logger = logging.getLogger("synapse.app.event_creator") - - -class EventCreatorSlavedStore( - # FIXME(#3714): We need to add UserDirectoryStore as we write directly - # rather than going via the correct worker. - UserDirectoryStore, - DirectoryStore, - SlavedTransactionStore, - SlavedProfileStore, - SlavedAccountDataStore, - SlavedPusherStore, - SlavedReceiptsStore, - SlavedPushRuleStore, - SlavedDeviceStore, - SlavedClientIpStore, - SlavedApplicationServiceStore, - SlavedEventStore, - SlavedRegistrationStore, - RoomStore, - MonthlyActiveUsersWorkerStore, - BaseSlavedStore, -): - pass - - -class EventCreatorServer(HomeServer): - DATASTORE_CLASS = EventCreatorSlavedStore - - def _listen_http(self, listener_config): - port = listener_config["port"] - bind_addresses = listener_config["bind_addresses"] - site_tag = listener_config.get("tag", port) - resources = {} - for res in listener_config["resources"]: - for name in res["names"]: - if name == "metrics": - resources[METRICS_PREFIX] = MetricsResource(RegistryProxy) - elif name == "client": - resource = JsonResource(self, canonical_json=False) - RoomSendEventRestServlet(self).register(resource) - RoomMembershipRestServlet(self).register(resource) - RoomStateEventRestServlet(self).register(resource) - JoinRoomAliasServlet(self).register(resource) - ProfileAvatarURLRestServlet(self).register(resource) - ProfileDisplaynameRestServlet(self).register(resource) - ProfileRestServlet(self).register(resource) - resources.update( - { - "/_matrix/client/r0": resource, - "/_matrix/client/unstable": resource, - "/_matrix/client/v2_alpha": resource, - "/_matrix/client/api/v1": resource, - } - ) - - root_resource = create_resource_tree(resources, NoResource()) - - _base.listen_tcp( - bind_addresses, - port, - SynapseSite( - "synapse.access.http.%s" % (site_tag,), - site_tag, - listener_config, - root_resource, - self.version_string, - ), - ) - - logger.info("Synapse event creator now listening on port %d", port) - - def start_listening(self, listeners): - for listener in listeners: - if listener["type"] == "http": - self._listen_http(listener) - elif listener["type"] == "manhole": - _base.listen_tcp( - listener["bind_addresses"], - listener["port"], - manhole( - username="matrix", password="rabbithole", globals={"hs": self} - ), - ) - elif listener["type"] == "metrics": - if not self.get_config().enable_metrics: - logger.warning( - ( - "Metrics listener configured, but " - "enable_metrics is not True!" - ) - ) - else: - _base.listen_metrics(listener["bind_addresses"], listener["port"]) - else: - logger.warning("Unrecognized listener type: %s", listener["type"]) - self.get_tcp_replication().start_replication(self) - - def build_tcp_replication(self): - return ReplicationClientHandler(self.get_datastore()) - - -def start(config_options): - try: - config = HomeServerConfig.load_config("Synapse event creator", config_options) - except ConfigError as e: - sys.stderr.write("\n" + str(e) + "\n") - sys.exit(1) - - assert config.worker_app == "synapse.app.event_creator" - - assert config.worker_replication_http_port is not None - - # This should only be done on the user directory worker or the master - config.update_user_directory = False - - events.USE_FROZEN_DICTS = config.use_frozen_dicts - - ss = EventCreatorServer( - config.server_name, - config=config, - version_string="Synapse/" + get_version_string(synapse), - ) - - setup_logging(ss, config, use_worker_options=True) - - ss.setup() - reactor.addSystemEventTrigger( - "before", "startup", _base.start, ss, config.worker_listeners - ) - - _base.start_worker_reactor("synapse-event-creator", config) +import sys +from synapse.app.generic_worker import start +from synapse.util.logcontext import LoggingContext if __name__ == "__main__": with LoggingContext("main"): diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index d055d11b23..add43147b3 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -13,177 +13,11 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import logging -import sys - -from twisted.internet import reactor -from twisted.web.resource import NoResource - -import synapse -from synapse import events -from synapse.api.urls import FEDERATION_PREFIX, SERVER_KEY_V2_PREFIX -from synapse.app import _base -from synapse.config._base import ConfigError -from synapse.config.homeserver import HomeServerConfig -from synapse.config.logger import setup_logging -from synapse.federation.transport.server import TransportLayerServer -from synapse.http.site import SynapseSite -from synapse.logging.context import LoggingContext -from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy -from synapse.replication.slave.storage._base import BaseSlavedStore -from synapse.replication.slave.storage.account_data import SlavedAccountDataStore -from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore -from synapse.replication.slave.storage.devices import SlavedDeviceStore -from synapse.replication.slave.storage.directory import DirectoryStore -from synapse.replication.slave.storage.events import SlavedEventStore -from synapse.replication.slave.storage.groups import SlavedGroupServerStore -from synapse.replication.slave.storage.keys import SlavedKeyStore -from synapse.replication.slave.storage.profile import SlavedProfileStore -from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore -from synapse.replication.slave.storage.pushers import SlavedPusherStore -from synapse.replication.slave.storage.receipts import SlavedReceiptsStore -from synapse.replication.slave.storage.registration import SlavedRegistrationStore -from synapse.replication.slave.storage.room import RoomStore -from synapse.replication.slave.storage.transactions import SlavedTransactionStore -from synapse.replication.tcp.client import ReplicationClientHandler -from synapse.rest.key.v2 import KeyApiV2Resource -from synapse.server import HomeServer -from synapse.storage.data_stores.main.monthly_active_users import ( - MonthlyActiveUsersWorkerStore, -) -from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.manhole import manhole -from synapse.util.versionstring import get_version_string - -logger = logging.getLogger("synapse.app.federation_reader") - - -class FederationReaderSlavedStore( - SlavedAccountDataStore, - SlavedProfileStore, - SlavedApplicationServiceStore, - SlavedPusherStore, - SlavedPushRuleStore, - SlavedReceiptsStore, - SlavedEventStore, - SlavedKeyStore, - SlavedRegistrationStore, - SlavedGroupServerStore, - SlavedDeviceStore, - RoomStore, - DirectoryStore, - SlavedTransactionStore, - MonthlyActiveUsersWorkerStore, - BaseSlavedStore, -): - pass - - -class FederationReaderServer(HomeServer): - DATASTORE_CLASS = FederationReaderSlavedStore - - def _listen_http(self, listener_config): - port = listener_config["port"] - bind_addresses = listener_config["bind_addresses"] - site_tag = listener_config.get("tag", port) - resources = {} - for res in listener_config["resources"]: - for name in res["names"]: - if name == "metrics": - resources[METRICS_PREFIX] = MetricsResource(RegistryProxy) - elif name == "federation": - resources.update({FEDERATION_PREFIX: TransportLayerServer(self)}) - if name == "openid" and "federation" not in res["names"]: - # Only load the openid resource separately if federation resource - # is not specified since federation resource includes openid - # resource. - resources.update( - { - FEDERATION_PREFIX: TransportLayerServer( - self, servlet_groups=["openid"] - ) - } - ) - - if name in ["keys", "federation"]: - resources[SERVER_KEY_V2_PREFIX] = KeyApiV2Resource(self) - - root_resource = create_resource_tree(resources, NoResource()) - - _base.listen_tcp( - bind_addresses, - port, - SynapseSite( - "synapse.access.http.%s" % (site_tag,), - site_tag, - listener_config, - root_resource, - self.version_string, - ), - reactor=self.get_reactor(), - ) - logger.info("Synapse federation reader now listening on port %d", port) - - def start_listening(self, listeners): - for listener in listeners: - if listener["type"] == "http": - self._listen_http(listener) - elif listener["type"] == "manhole": - _base.listen_tcp( - listener["bind_addresses"], - listener["port"], - manhole( - username="matrix", password="rabbithole", globals={"hs": self} - ), - ) - elif listener["type"] == "metrics": - if not self.get_config().enable_metrics: - logger.warning( - ( - "Metrics listener configured, but " - "enable_metrics is not True!" - ) - ) - else: - _base.listen_metrics(listener["bind_addresses"], listener["port"]) - else: - logger.warning("Unrecognized listener type: %s", listener["type"]) - - self.get_tcp_replication().start_replication(self) - - def build_tcp_replication(self): - return ReplicationClientHandler(self.get_datastore()) - - -def start(config_options): - try: - config = HomeServerConfig.load_config( - "Synapse federation reader", config_options - ) - except ConfigError as e: - sys.stderr.write("\n" + str(e) + "\n") - sys.exit(1) - - assert config.worker_app == "synapse.app.federation_reader" - - events.USE_FROZEN_DICTS = config.use_frozen_dicts - - ss = FederationReaderServer( - config.server_name, - config=config, - version_string="Synapse/" + get_version_string(synapse), - ) - - setup_logging(ss, config, use_worker_options=True) - - ss.setup() - reactor.addSystemEventTrigger( - "before", "startup", _base.start, ss, config.worker_listeners - ) - - _base.start_worker_reactor("synapse-federation-reader", config) +import sys +from synapse.app.generic_worker import start +from synapse.util.logcontext import LoggingContext if __name__ == "__main__": with LoggingContext("main"): diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index b7fcf80ddc..add43147b3 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -13,308 +13,11 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import logging -import sys - -from twisted.internet import defer, reactor -from twisted.web.resource import NoResource - -import synapse -from synapse import events -from synapse.app import _base -from synapse.config._base import ConfigError -from synapse.config.homeserver import HomeServerConfig -from synapse.config.logger import setup_logging -from synapse.federation import send_queue -from synapse.http.site import SynapseSite -from synapse.logging.context import LoggingContext, run_in_background -from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy -from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore -from synapse.replication.slave.storage.devices import SlavedDeviceStore -from synapse.replication.slave.storage.events import SlavedEventStore -from synapse.replication.slave.storage.presence import SlavedPresenceStore -from synapse.replication.slave.storage.receipts import SlavedReceiptsStore -from synapse.replication.slave.storage.registration import SlavedRegistrationStore -from synapse.replication.slave.storage.transactions import SlavedTransactionStore -from synapse.replication.tcp.client import ReplicationClientHandler -from synapse.replication.tcp.streams._base import ( - DeviceListsStream, - ReceiptsStream, - ToDeviceStream, -) -from synapse.server import HomeServer -from synapse.storage.database import Database -from synapse.types import ReadReceipt -from synapse.util.async_helpers import Linearizer -from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.manhole import manhole -from synapse.util.versionstring import get_version_string - -logger = logging.getLogger("synapse.app.federation_sender") - - -class FederationSenderSlaveStore( - SlavedDeviceInboxStore, - SlavedTransactionStore, - SlavedReceiptsStore, - SlavedEventStore, - SlavedRegistrationStore, - SlavedDeviceStore, - SlavedPresenceStore, -): - def __init__(self, database: Database, db_conn, hs): - super(FederationSenderSlaveStore, self).__init__(database, db_conn, hs) - - # We pull out the current federation stream position now so that we - # always have a known value for the federation position in memory so - # that we don't have to bounce via a deferred once when we start the - # replication streams. - self.federation_out_pos_startup = self._get_federation_out_pos(db_conn) - - def _get_federation_out_pos(self, db_conn): - sql = "SELECT stream_id FROM federation_stream_position WHERE type = ?" - sql = self.database_engine.convert_param_style(sql) - - txn = db_conn.cursor() - txn.execute(sql, ("federation",)) - rows = txn.fetchall() - txn.close() - - return rows[0][0] if rows else -1 - - -class FederationSenderServer(HomeServer): - DATASTORE_CLASS = FederationSenderSlaveStore - - def _listen_http(self, listener_config): - port = listener_config["port"] - bind_addresses = listener_config["bind_addresses"] - site_tag = listener_config.get("tag", port) - resources = {} - for res in listener_config["resources"]: - for name in res["names"]: - if name == "metrics": - resources[METRICS_PREFIX] = MetricsResource(RegistryProxy) - - root_resource = create_resource_tree(resources, NoResource()) - - _base.listen_tcp( - bind_addresses, - port, - SynapseSite( - "synapse.access.http.%s" % (site_tag,), - site_tag, - listener_config, - root_resource, - self.version_string, - ), - ) - - logger.info("Synapse federation_sender now listening on port %d", port) - - def start_listening(self, listeners): - for listener in listeners: - if listener["type"] == "http": - self._listen_http(listener) - elif listener["type"] == "manhole": - _base.listen_tcp( - listener["bind_addresses"], - listener["port"], - manhole( - username="matrix", password="rabbithole", globals={"hs": self} - ), - ) - elif listener["type"] == "metrics": - if not self.get_config().enable_metrics: - logger.warning( - ( - "Metrics listener configured, but " - "enable_metrics is not True!" - ) - ) - else: - _base.listen_metrics(listener["bind_addresses"], listener["port"]) - else: - logger.warning("Unrecognized listener type: %s", listener["type"]) - - self.get_tcp_replication().start_replication(self) - - def build_tcp_replication(self): - return FederationSenderReplicationHandler(self) - - -class FederationSenderReplicationHandler(ReplicationClientHandler): - def __init__(self, hs): - super(FederationSenderReplicationHandler, self).__init__(hs.get_datastore()) - self.send_handler = FederationSenderHandler(hs, self) - - async def on_rdata(self, stream_name, token, rows): - await super(FederationSenderReplicationHandler, self).on_rdata( - stream_name, token, rows - ) - self.send_handler.process_replication_rows(stream_name, token, rows) - - def get_streams_to_replicate(self): - args = super( - FederationSenderReplicationHandler, self - ).get_streams_to_replicate() - args.update(self.send_handler.stream_positions()) - return args - - def on_remote_server_up(self, server: str): - """Called when get a new REMOTE_SERVER_UP command.""" - - # Let's wake up the transaction queue for the server in case we have - # pending stuff to send to it. - self.send_handler.wake_destination(server) - - -def start(config_options): - try: - config = HomeServerConfig.load_config( - "Synapse federation sender", config_options - ) - except ConfigError as e: - sys.stderr.write("\n" + str(e) + "\n") - sys.exit(1) - assert config.worker_app == "synapse.app.federation_sender" - - events.USE_FROZEN_DICTS = config.use_frozen_dicts - - if config.send_federation: - sys.stderr.write( - "\nThe send_federation must be disabled in the main synapse process" - "\nbefore they can be run in a separate worker." - "\nPlease add ``send_federation: false`` to the main config" - "\n" - ) - sys.exit(1) - - # Force the pushers to start since they will be disabled in the main config - config.send_federation = True - - ss = FederationSenderServer( - config.server_name, - config=config, - version_string="Synapse/" + get_version_string(synapse), - ) - - setup_logging(ss, config, use_worker_options=True) - - ss.setup() - reactor.addSystemEventTrigger( - "before", "startup", _base.start, ss, config.worker_listeners - ) - - _base.start_worker_reactor("synapse-federation-sender", config) - - -class FederationSenderHandler(object): - """Processes the replication stream and forwards the appropriate entries - to the federation sender. - """ - - def __init__(self, hs: FederationSenderServer, replication_client): - self.store = hs.get_datastore() - self._is_mine_id = hs.is_mine_id - self.federation_sender = hs.get_federation_sender() - self.replication_client = replication_client - - self.federation_position = self.store.federation_out_pos_startup - self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer") - - self._last_ack = self.federation_position - - self._room_serials = {} - self._room_typing = {} - - def on_start(self): - # There may be some events that are persisted but haven't been sent, - # so send them now. - self.federation_sender.notify_new_events( - self.store.get_room_max_stream_ordering() - ) - - def wake_destination(self, server: str): - self.federation_sender.wake_destination(server) - - def stream_positions(self): - return {"federation": self.federation_position} - - def process_replication_rows(self, stream_name, token, rows): - # The federation stream contains things that we want to send out, e.g. - # presence, typing, etc. - if stream_name == "federation": - send_queue.process_rows_for_federation(self.federation_sender, rows) - run_in_background(self.update_token, token) - - # We also need to poke the federation sender when new events happen - elif stream_name == "events": - self.federation_sender.notify_new_events(token) - - # ... and when new receipts happen - elif stream_name == ReceiptsStream.NAME: - run_as_background_process( - "process_receipts_for_federation", self._on_new_receipts, rows - ) - - # ... as well as device updates and messages - elif stream_name == DeviceListsStream.NAME: - hosts = {row.destination for row in rows} - for host in hosts: - self.federation_sender.send_device_messages(host) - - elif stream_name == ToDeviceStream.NAME: - # The to_device stream includes stuff to be pushed to both local - # clients and remote servers, so we ignore entities that start with - # '@' (since they'll be local users rather than destinations). - hosts = {row.entity for row in rows if not row.entity.startswith("@")} - for host in hosts: - self.federation_sender.send_device_messages(host) - - @defer.inlineCallbacks - def _on_new_receipts(self, rows): - """ - Args: - rows (iterable[synapse.replication.tcp.streams.ReceiptsStreamRow]): - new receipts to be processed - """ - for receipt in rows: - # we only want to send on receipts for our own users - if not self._is_mine_id(receipt.user_id): - continue - receipt_info = ReadReceipt( - receipt.room_id, - receipt.receipt_type, - receipt.user_id, - [receipt.event_id], - receipt.data, - ) - yield self.federation_sender.send_read_receipt(receipt_info) - - @defer.inlineCallbacks - def update_token(self, token): - try: - self.federation_position = token - - # We linearize here to ensure we don't have races updating the token - with (yield self._fed_position_linearizer.queue(None)): - if self._last_ack < self.federation_position: - yield self.store.update_federation_out_pos( - "federation", self.federation_position - ) - - # We ACK this token over replication so that the master can drop - # its in memory queues - self.replication_client.send_federation_ack( - self.federation_position - ) - self._last_ack = self.federation_position - except Exception: - logger.exception("Error updating federation stream position") +import sys +from synapse.app.generic_worker import start +from synapse.util.logcontext import LoggingContext if __name__ == "__main__": with LoggingContext("main"): diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py index 30e435eead..add43147b3 100644 --- a/synapse/app/frontend_proxy.py +++ b/synapse/app/frontend_proxy.py @@ -13,241 +13,11 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import logging -import sys - -from twisted.internet import defer, reactor -from twisted.web.resource import NoResource - -import synapse -from synapse import events -from synapse.api.errors import HttpResponseException, SynapseError -from synapse.app import _base -from synapse.config._base import ConfigError -from synapse.config.homeserver import HomeServerConfig -from synapse.config.logger import setup_logging -from synapse.http.server import JsonResource -from synapse.http.servlet import RestServlet, parse_json_object_from_request -from synapse.http.site import SynapseSite -from synapse.logging.context import LoggingContext -from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy -from synapse.replication.slave.storage._base import BaseSlavedStore -from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore -from synapse.replication.slave.storage.client_ips import SlavedClientIpStore -from synapse.replication.slave.storage.devices import SlavedDeviceStore -from synapse.replication.slave.storage.registration import SlavedRegistrationStore -from synapse.replication.tcp.client import ReplicationClientHandler -from synapse.rest.client.v2_alpha._base import client_patterns -from synapse.server import HomeServer -from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.manhole import manhole -from synapse.util.versionstring import get_version_string - -logger = logging.getLogger("synapse.app.frontend_proxy") - - -class PresenceStatusStubServlet(RestServlet): - PATTERNS = client_patterns("/presence/(?P[^/]*)/status") - - def __init__(self, hs): - super(PresenceStatusStubServlet, self).__init__() - self.http_client = hs.get_simple_http_client() - self.auth = hs.get_auth() - self.main_uri = hs.config.worker_main_http_uri - - @defer.inlineCallbacks - def on_GET(self, request, user_id): - # Pass through the auth headers, if any, in case the access token - # is there. - auth_headers = request.requestHeaders.getRawHeaders("Authorization", []) - headers = {"Authorization": auth_headers} - - try: - result = yield self.http_client.get_json( - self.main_uri + request.uri.decode("ascii"), headers=headers - ) - except HttpResponseException as e: - raise e.to_synapse_error() - - return 200, result - - @defer.inlineCallbacks - def on_PUT(self, request, user_id): - yield self.auth.get_user_by_req(request) - return 200, {} - - -class KeyUploadServlet(RestServlet): - PATTERNS = client_patterns("/keys/upload(/(?P[^/]+))?$") - - def __init__(self, hs): - """ - Args: - hs (synapse.server.HomeServer): server - """ - super(KeyUploadServlet, self).__init__() - self.auth = hs.get_auth() - self.store = hs.get_datastore() - self.http_client = hs.get_simple_http_client() - self.main_uri = hs.config.worker_main_http_uri - - @defer.inlineCallbacks - def on_POST(self, request, device_id): - requester = yield self.auth.get_user_by_req(request, allow_guest=True) - user_id = requester.user.to_string() - body = parse_json_object_from_request(request) - - if device_id is not None: - # passing the device_id here is deprecated; however, we allow it - # for now for compatibility with older clients. - if requester.device_id is not None and device_id != requester.device_id: - logger.warning( - "Client uploading keys for a different device " - "(logged in as %s, uploading for %s)", - requester.device_id, - device_id, - ) - else: - device_id = requester.device_id - - if device_id is None: - raise SynapseError( - 400, "To upload keys, you must pass device_id when authenticating" - ) - - if body: - # They're actually trying to upload something, proxy to main synapse. - # Pass through the auth headers, if any, in case the access token - # is there. - auth_headers = request.requestHeaders.getRawHeaders(b"Authorization", []) - headers = {"Authorization": auth_headers} - result = yield self.http_client.post_json_get_json( - self.main_uri + request.uri.decode("ascii"), body, headers=headers - ) - - return 200, result - else: - # Just interested in counts. - result = yield self.store.count_e2e_one_time_keys(user_id, device_id) - return 200, {"one_time_key_counts": result} - - -class FrontendProxySlavedStore( - SlavedDeviceStore, - SlavedClientIpStore, - SlavedApplicationServiceStore, - SlavedRegistrationStore, - BaseSlavedStore, -): - pass +import sys -class FrontendProxyServer(HomeServer): - DATASTORE_CLASS = FrontendProxySlavedStore - - def _listen_http(self, listener_config): - port = listener_config["port"] - bind_addresses = listener_config["bind_addresses"] - site_tag = listener_config.get("tag", port) - resources = {} - for res in listener_config["resources"]: - for name in res["names"]: - if name == "metrics": - resources[METRICS_PREFIX] = MetricsResource(RegistryProxy) - elif name == "client": - resource = JsonResource(self, canonical_json=False) - KeyUploadServlet(self).register(resource) - - # If presence is disabled, use the stub servlet that does - # not allow sending presence - if not self.config.use_presence: - PresenceStatusStubServlet(self).register(resource) - - resources.update( - { - "/_matrix/client/r0": resource, - "/_matrix/client/unstable": resource, - "/_matrix/client/v2_alpha": resource, - "/_matrix/client/api/v1": resource, - } - ) - - root_resource = create_resource_tree(resources, NoResource()) - - _base.listen_tcp( - bind_addresses, - port, - SynapseSite( - "synapse.access.http.%s" % (site_tag,), - site_tag, - listener_config, - root_resource, - self.version_string, - ), - reactor=self.get_reactor(), - ) - - logger.info("Synapse client reader now listening on port %d", port) - - def start_listening(self, listeners): - for listener in listeners: - if listener["type"] == "http": - self._listen_http(listener) - elif listener["type"] == "manhole": - _base.listen_tcp( - listener["bind_addresses"], - listener["port"], - manhole( - username="matrix", password="rabbithole", globals={"hs": self} - ), - ) - elif listener["type"] == "metrics": - if not self.get_config().enable_metrics: - logger.warning( - ( - "Metrics listener configured, but " - "enable_metrics is not True!" - ) - ) - else: - _base.listen_metrics(listener["bind_addresses"], listener["port"]) - else: - logger.warning("Unrecognized listener type: %s", listener["type"]) - - self.get_tcp_replication().start_replication(self) - - def build_tcp_replication(self): - return ReplicationClientHandler(self.get_datastore()) - - -def start(config_options): - try: - config = HomeServerConfig.load_config("Synapse frontend proxy", config_options) - except ConfigError as e: - sys.stderr.write("\n" + str(e) + "\n") - sys.exit(1) - - assert config.worker_app == "synapse.app.frontend_proxy" - - assert config.worker_main_http_uri is not None - - events.USE_FROZEN_DICTS = config.use_frozen_dicts - - ss = FrontendProxyServer( - config.server_name, - config=config, - version_string="Synapse/" + get_version_string(synapse), - ) - - setup_logging(ss, config, use_worker_options=True) - - ss.setup() - reactor.addSystemEventTrigger( - "before", "startup", _base.start, ss, config.worker_listeners - ) - - _base.start_worker_reactor("synapse-frontend-proxy", config) - +from synapse.app.generic_worker import start +from synapse.util.logcontext import LoggingContext if __name__ == "__main__": with LoggingContext("main"): diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py new file mode 100644 index 0000000000..30efd39092 --- /dev/null +++ b/synapse/app/generic_worker.py @@ -0,0 +1,917 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# Copyright 2020 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import contextlib +import logging +import sys + +from twisted.internet import defer, reactor +from twisted.web.resource import NoResource + +import synapse +import synapse.events +from synapse.api.constants import EventTypes +from synapse.api.errors import HttpResponseException, SynapseError +from synapse.api.urls import ( + CLIENT_API_PREFIX, + FEDERATION_PREFIX, + LEGACY_MEDIA_PREFIX, + MEDIA_PREFIX, + SERVER_KEY_V2_PREFIX, +) +from synapse.app import _base +from synapse.config._base import ConfigError +from synapse.config.homeserver import HomeServerConfig +from synapse.config.logger import setup_logging +from synapse.federation import send_queue +from synapse.federation.transport.server import TransportLayerServer +from synapse.handlers.presence import PresenceHandler, get_interested_parties +from synapse.http.server import JsonResource +from synapse.http.servlet import RestServlet, parse_json_object_from_request +from synapse.http.site import SynapseSite +from synapse.logging.context import LoggingContext, run_in_background +from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy +from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.replication.slave.storage._base import BaseSlavedStore, __func__ +from synapse.replication.slave.storage.account_data import SlavedAccountDataStore +from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore +from synapse.replication.slave.storage.client_ips import SlavedClientIpStore +from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore +from synapse.replication.slave.storage.devices import SlavedDeviceStore +from synapse.replication.slave.storage.directory import DirectoryStore +from synapse.replication.slave.storage.events import SlavedEventStore +from synapse.replication.slave.storage.filtering import SlavedFilteringStore +from synapse.replication.slave.storage.groups import SlavedGroupServerStore +from synapse.replication.slave.storage.keys import SlavedKeyStore +from synapse.replication.slave.storage.presence import SlavedPresenceStore +from synapse.replication.slave.storage.profile import SlavedProfileStore +from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore +from synapse.replication.slave.storage.pushers import SlavedPusherStore +from synapse.replication.slave.storage.receipts import SlavedReceiptsStore +from synapse.replication.slave.storage.registration import SlavedRegistrationStore +from synapse.replication.slave.storage.room import RoomStore +from synapse.replication.slave.storage.transactions import SlavedTransactionStore +from synapse.replication.tcp.client import ReplicationClientHandler +from synapse.replication.tcp.streams._base import ( + DeviceListsStream, + ReceiptsStream, + ToDeviceStream, +) +from synapse.replication.tcp.streams.events import EventsStreamEventRow, EventsStreamRow +from synapse.rest.admin import register_servlets_for_media_repo +from synapse.rest.client.v1 import events +from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet +from synapse.rest.client.v1.login import LoginRestServlet +from synapse.rest.client.v1.profile import ( + ProfileAvatarURLRestServlet, + ProfileDisplaynameRestServlet, + ProfileRestServlet, +) +from synapse.rest.client.v1.push_rule import PushRuleRestServlet +from synapse.rest.client.v1.room import ( + JoinedRoomMemberListRestServlet, + JoinRoomAliasServlet, + PublicRoomListRestServlet, + RoomEventContextServlet, + RoomInitialSyncRestServlet, + RoomMemberListRestServlet, + RoomMembershipRestServlet, + RoomMessageListRestServlet, + RoomSendEventRestServlet, + RoomStateEventRestServlet, + RoomStateRestServlet, +) +from synapse.rest.client.v1.voip import VoipRestServlet +from synapse.rest.client.v2_alpha import groups, sync, user_directory +from synapse.rest.client.v2_alpha._base import client_patterns +from synapse.rest.client.v2_alpha.account import ThreepidRestServlet +from synapse.rest.client.v2_alpha.keys import KeyChangesServlet, KeyQueryServlet +from synapse.rest.client.v2_alpha.register import RegisterRestServlet +from synapse.rest.client.versions import VersionsRestServlet +from synapse.rest.key.v2 import KeyApiV2Resource +from synapse.server import HomeServer +from synapse.storage.data_stores.main.media_repository import MediaRepositoryStore +from synapse.storage.data_stores.main.monthly_active_users import ( + MonthlyActiveUsersWorkerStore, +) +from synapse.storage.data_stores.main.presence import UserPresenceState +from synapse.storage.data_stores.main.user_directory import UserDirectoryStore +from synapse.types import ReadReceipt +from synapse.util.async_helpers import Linearizer +from synapse.util.httpresourcetree import create_resource_tree +from synapse.util.manhole import manhole +from synapse.util.stringutils import random_string +from synapse.util.versionstring import get_version_string + +logger = logging.getLogger("synapse.app.generic_worker") + + +class PresenceStatusStubServlet(RestServlet): + """If presence is disabled this servlet can be used to stub out setting + presence status, while proxying the getters to the master instance. + """ + + PATTERNS = client_patterns("/presence/(?P[^/]*)/status") + + def __init__(self, hs): + super(PresenceStatusStubServlet, self).__init__() + self.http_client = hs.get_simple_http_client() + self.auth = hs.get_auth() + self.main_uri = hs.config.worker_main_http_uri + + async def on_GET(self, request, user_id): + # Pass through the auth headers, if any, in case the access token + # is there. + auth_headers = request.requestHeaders.getRawHeaders("Authorization", []) + headers = {"Authorization": auth_headers} + + try: + result = await self.http_client.get_json( + self.main_uri + request.uri.decode("ascii"), headers=headers + ) + except HttpResponseException as e: + raise e.to_synapse_error() + + return 200, result + + async def on_PUT(self, request, user_id): + await self.auth.get_user_by_req(request) + return 200, {} + + +class KeyUploadServlet(RestServlet): + """An implementation of the `KeyUploadServlet` that responds to read only + requests, but otherwise proxies through to the master instance. + """ + + PATTERNS = client_patterns("/keys/upload(/(?P[^/]+))?$") + + def __init__(self, hs): + """ + Args: + hs (synapse.server.HomeServer): server + """ + super(KeyUploadServlet, self).__init__() + self.auth = hs.get_auth() + self.store = hs.get_datastore() + self.http_client = hs.get_simple_http_client() + self.main_uri = hs.config.worker_main_http_uri + + async def on_POST(self, request, device_id): + requester = await self.auth.get_user_by_req(request, allow_guest=True) + user_id = requester.user.to_string() + body = parse_json_object_from_request(request) + + if device_id is not None: + # passing the device_id here is deprecated; however, we allow it + # for now for compatibility with older clients. + if requester.device_id is not None and device_id != requester.device_id: + logger.warning( + "Client uploading keys for a different device " + "(logged in as %s, uploading for %s)", + requester.device_id, + device_id, + ) + else: + device_id = requester.device_id + + if device_id is None: + raise SynapseError( + 400, "To upload keys, you must pass device_id when authenticating" + ) + + if body: + # They're actually trying to upload something, proxy to main synapse. + # Pass through the auth headers, if any, in case the access token + # is there. + auth_headers = request.requestHeaders.getRawHeaders(b"Authorization", []) + headers = {"Authorization": auth_headers} + result = await self.http_client.post_json_get_json( + self.main_uri + request.uri.decode("ascii"), body, headers=headers + ) + + return 200, result + else: + # Just interested in counts. + result = await self.store.count_e2e_one_time_keys(user_id, device_id) + return 200, {"one_time_key_counts": result} + + +UPDATE_SYNCING_USERS_MS = 10 * 1000 + + +class GenericWorkerPresence(object): + def __init__(self, hs): + self.hs = hs + self.is_mine_id = hs.is_mine_id + self.http_client = hs.get_simple_http_client() + self.store = hs.get_datastore() + self.user_to_num_current_syncs = {} + self.clock = hs.get_clock() + self.notifier = hs.get_notifier() + + active_presence = self.store.take_presence_startup_info() + self.user_to_current_state = {state.user_id: state for state in active_presence} + + # user_id -> last_sync_ms. Lists the users that have stopped syncing + # but we haven't notified the master of that yet + self.users_going_offline = {} + + self._send_stop_syncing_loop = self.clock.looping_call( + self.send_stop_syncing, UPDATE_SYNCING_USERS_MS + ) + + self.process_id = random_string(16) + logger.info("Presence process_id is %r", self.process_id) + + def send_user_sync(self, user_id, is_syncing, last_sync_ms): + if self.hs.config.use_presence: + self.hs.get_tcp_replication().send_user_sync( + user_id, is_syncing, last_sync_ms + ) + + def mark_as_coming_online(self, user_id): + """A user has started syncing. Send a UserSync to the master, unless they + had recently stopped syncing. + + Args: + user_id (str) + """ + going_offline = self.users_going_offline.pop(user_id, None) + if not going_offline: + # Safe to skip because we haven't yet told the master they were offline + self.send_user_sync(user_id, True, self.clock.time_msec()) + + def mark_as_going_offline(self, user_id): + """A user has stopped syncing. We wait before notifying the master as + its likely they'll come back soon. This allows us to avoid sending + a stopped syncing immediately followed by a started syncing notification + to the master + + Args: + user_id (str) + """ + self.users_going_offline[user_id] = self.clock.time_msec() + + def send_stop_syncing(self): + """Check if there are any users who have stopped syncing a while ago + and haven't come back yet. If there are poke the master about them. + """ + now = self.clock.time_msec() + for user_id, last_sync_ms in list(self.users_going_offline.items()): + if now - last_sync_ms > UPDATE_SYNCING_USERS_MS: + self.users_going_offline.pop(user_id, None) + self.send_user_sync(user_id, False, last_sync_ms) + + def set_state(self, user, state, ignore_status_msg=False): + # TODO Hows this supposed to work? + return defer.succeed(None) + + get_states = __func__(PresenceHandler.get_states) + get_state = __func__(PresenceHandler.get_state) + current_state_for_users = __func__(PresenceHandler.current_state_for_users) + + def user_syncing(self, user_id, affect_presence): + if affect_presence: + curr_sync = self.user_to_num_current_syncs.get(user_id, 0) + self.user_to_num_current_syncs[user_id] = curr_sync + 1 + + # If we went from no in flight sync to some, notify replication + if self.user_to_num_current_syncs[user_id] == 1: + self.mark_as_coming_online(user_id) + + def _end(): + # We check that the user_id is in user_to_num_current_syncs because + # user_to_num_current_syncs may have been cleared if we are + # shutting down. + if affect_presence and user_id in self.user_to_num_current_syncs: + self.user_to_num_current_syncs[user_id] -= 1 + + # If we went from one in flight sync to non, notify replication + if self.user_to_num_current_syncs[user_id] == 0: + self.mark_as_going_offline(user_id) + + @contextlib.contextmanager + def _user_syncing(): + try: + yield + finally: + _end() + + return defer.succeed(_user_syncing()) + + @defer.inlineCallbacks + def notify_from_replication(self, states, stream_id): + parties = yield get_interested_parties(self.store, states) + room_ids_to_states, users_to_states = parties + + self.notifier.on_new_event( + "presence_key", + stream_id, + rooms=room_ids_to_states.keys(), + users=users_to_states.keys(), + ) + + @defer.inlineCallbacks + def process_replication_rows(self, token, rows): + states = [ + UserPresenceState( + row.user_id, + row.state, + row.last_active_ts, + row.last_federation_update_ts, + row.last_user_sync_ts, + row.status_msg, + row.currently_active, + ) + for row in rows + ] + + for state in states: + self.user_to_current_state[state.user_id] = state + + stream_id = token + yield self.notify_from_replication(states, stream_id) + + def get_currently_syncing_users(self): + if self.hs.config.use_presence: + return [ + user_id + for user_id, count in self.user_to_num_current_syncs.items() + if count > 0 + ] + else: + return set() + + +class GenericWorkerTyping(object): + def __init__(self, hs): + self._latest_room_serial = 0 + self._reset() + + def _reset(self): + """ + Reset the typing handler's data caches. + """ + # map room IDs to serial numbers + self._room_serials = {} + # map room IDs to sets of users currently typing + self._room_typing = {} + + def stream_positions(self): + # We must update this typing token from the response of the previous + # sync. In particular, the stream id may "reset" back to zero/a low + # value which we *must* use for the next replication request. + return {"typing": self._latest_room_serial} + + def process_replication_rows(self, token, rows): + if self._latest_room_serial > token: + # The master has gone backwards. To prevent inconsistent data, just + # clear everything. + self._reset() + + # Set the latest serial token to whatever the server gave us. + self._latest_room_serial = token + + for row in rows: + self._room_serials[row.room_id] = token + self._room_typing[row.room_id] = row.user_ids + + +class GenericWorkerSlavedStore( + # FIXME(#3714): We need to add UserDirectoryStore as we write directly + # rather than going via the correct worker. + UserDirectoryStore, + SlavedDeviceInboxStore, + SlavedDeviceStore, + SlavedReceiptsStore, + SlavedPushRuleStore, + SlavedGroupServerStore, + SlavedAccountDataStore, + SlavedPusherStore, + SlavedEventStore, + SlavedKeyStore, + RoomStore, + DirectoryStore, + SlavedApplicationServiceStore, + SlavedRegistrationStore, + SlavedTransactionStore, + SlavedProfileStore, + SlavedClientIpStore, + SlavedPresenceStore, + SlavedFilteringStore, + MonthlyActiveUsersWorkerStore, + MediaRepositoryStore, + BaseSlavedStore, +): + def __init__(self, database, db_conn, hs): + super(GenericWorkerSlavedStore, self).__init__(database, db_conn, hs) + + # We pull out the current federation stream position now so that we + # always have a known value for the federation position in memory so + # that we don't have to bounce via a deferred once when we start the + # replication streams. + self.federation_out_pos_startup = self._get_federation_out_pos(db_conn) + + def _get_federation_out_pos(self, db_conn): + sql = "SELECT stream_id FROM federation_stream_position WHERE type = ?" + sql = self.database_engine.convert_param_style(sql) + + txn = db_conn.cursor() + txn.execute(sql, ("federation",)) + rows = txn.fetchall() + txn.close() + + return rows[0][0] if rows else -1 + + +class GenericWorkerServer(HomeServer): + DATASTORE_CLASS = GenericWorkerSlavedStore + + def _listen_http(self, listener_config): + port = listener_config["port"] + bind_addresses = listener_config["bind_addresses"] + site_tag = listener_config.get("tag", port) + resources = {} + for res in listener_config["resources"]: + for name in res["names"]: + if name == "metrics": + resources[METRICS_PREFIX] = MetricsResource(RegistryProxy) + elif name == "client": + resource = JsonResource(self, canonical_json=False) + + PublicRoomListRestServlet(self).register(resource) + RoomMemberListRestServlet(self).register(resource) + JoinedRoomMemberListRestServlet(self).register(resource) + RoomStateRestServlet(self).register(resource) + RoomEventContextServlet(self).register(resource) + RoomMessageListRestServlet(self).register(resource) + RegisterRestServlet(self).register(resource) + LoginRestServlet(self).register(resource) + ThreepidRestServlet(self).register(resource) + KeyQueryServlet(self).register(resource) + KeyChangesServlet(self).register(resource) + VoipRestServlet(self).register(resource) + PushRuleRestServlet(self).register(resource) + VersionsRestServlet(self).register(resource) + RoomSendEventRestServlet(self).register(resource) + RoomMembershipRestServlet(self).register(resource) + RoomStateEventRestServlet(self).register(resource) + JoinRoomAliasServlet(self).register(resource) + ProfileAvatarURLRestServlet(self).register(resource) + ProfileDisplaynameRestServlet(self).register(resource) + ProfileRestServlet(self).register(resource) + KeyUploadServlet(self).register(resource) + + sync.register_servlets(self, resource) + events.register_servlets(self, resource) + InitialSyncRestServlet(self).register(resource) + RoomInitialSyncRestServlet(self).register(resource) + + user_directory.register_servlets(self, resource) + + # If presence is disabled, use the stub servlet that does + # not allow sending presence + if not self.config.use_presence: + PresenceStatusStubServlet(self).register(resource) + + groups.register_servlets(self, resource) + + resources.update({CLIENT_API_PREFIX: resource}) + elif name == "federation": + resources.update({FEDERATION_PREFIX: TransportLayerServer(self)}) + elif name == "media": + media_repo = self.get_media_repository_resource() + + # We need to serve the admin servlets for media on the + # worker. + admin_resource = JsonResource(self, canonical_json=False) + register_servlets_for_media_repo(self, admin_resource) + + resources.update( + { + MEDIA_PREFIX: media_repo, + LEGACY_MEDIA_PREFIX: media_repo, + "/_synapse/admin": admin_resource, + } + ) + + if name == "openid" and "federation" not in res["names"]: + # Only load the openid resource separately if federation resource + # is not specified since federation resource includes openid + # resource. + resources.update( + { + FEDERATION_PREFIX: TransportLayerServer( + self, servlet_groups=["openid"] + ) + } + ) + + if name in ["keys", "federation"]: + resources[SERVER_KEY_V2_PREFIX] = KeyApiV2Resource(self) + + root_resource = create_resource_tree(resources, NoResource()) + + _base.listen_tcp( + bind_addresses, + port, + SynapseSite( + "synapse.access.http.%s" % (site_tag,), + site_tag, + listener_config, + root_resource, + self.version_string, + ), + reactor=self.get_reactor(), + ) + + logger.info("Synapse worker now listening on port %d", port) + + def start_listening(self, listeners): + for listener in listeners: + if listener["type"] == "http": + self._listen_http(listener) + elif listener["type"] == "manhole": + _base.listen_tcp( + listener["bind_addresses"], + listener["port"], + manhole( + username="matrix", password="rabbithole", globals={"hs": self} + ), + ) + elif listener["type"] == "metrics": + if not self.get_config().enable_metrics: + logger.warning( + ( + "Metrics listener configured, but " + "enable_metrics is not True!" + ) + ) + else: + _base.listen_metrics(listener["bind_addresses"], listener["port"]) + else: + logger.warning("Unrecognized listener type: %s", listener["type"]) + + self.get_tcp_replication().start_replication(self) + + def remove_pusher(self, app_id, push_key, user_id): + self.get_tcp_replication().send_remove_pusher(app_id, push_key, user_id) + + def build_tcp_replication(self): + return GenericWorkerReplicationHandler(self) + + def build_presence_handler(self): + return GenericWorkerPresence(self) + + def build_typing_handler(self): + return GenericWorkerTyping(self) + + +class GenericWorkerReplicationHandler(ReplicationClientHandler): + def __init__(self, hs): + super(GenericWorkerReplicationHandler, self).__init__(hs.get_datastore()) + + self.store = hs.get_datastore() + self.typing_handler = hs.get_typing_handler() + # NB this is a SynchrotronPresence, not a normal PresenceHandler + self.presence_handler = hs.get_presence_handler() + self.notifier = hs.get_notifier() + + self.notify_pushers = hs.config.start_pushers + self.pusher_pool = hs.get_pusherpool() + + if hs.config.send_federation: + self.send_handler = FederationSenderHandler(hs, self) + else: + self.send_handler = None + + async def on_rdata(self, stream_name, token, rows): + await super(GenericWorkerReplicationHandler, self).on_rdata( + stream_name, token, rows + ) + run_in_background(self.process_and_notify, stream_name, token, rows) + + def get_streams_to_replicate(self): + args = super(GenericWorkerReplicationHandler, self).get_streams_to_replicate() + args.update(self.typing_handler.stream_positions()) + if self.send_handler: + args.update(self.send_handler.stream_positions()) + return args + + def get_currently_syncing_users(self): + return self.presence_handler.get_currently_syncing_users() + + async def process_and_notify(self, stream_name, token, rows): + try: + if self.send_handler: + self.send_handler.process_replication_rows(stream_name, token, rows) + + if stream_name == "events": + # We shouldn't get multiple rows per token for events stream, so + # we don't need to optimise this for multiple rows. + for row in rows: + if row.type != EventsStreamEventRow.TypeId: + continue + assert isinstance(row, EventsStreamRow) + + event = await self.store.get_event( + row.data.event_id, allow_rejected=True + ) + if event.rejected_reason: + continue + + extra_users = () + if event.type == EventTypes.Member: + extra_users = (event.state_key,) + max_token = self.store.get_room_max_stream_ordering() + self.notifier.on_new_room_event( + event, token, max_token, extra_users + ) + + await self.pusher_pool.on_new_notifications(token, token) + elif stream_name == "push_rules": + self.notifier.on_new_event( + "push_rules_key", token, users=[row.user_id for row in rows] + ) + elif stream_name in ("account_data", "tag_account_data"): + self.notifier.on_new_event( + "account_data_key", token, users=[row.user_id for row in rows] + ) + elif stream_name == "receipts": + self.notifier.on_new_event( + "receipt_key", token, rooms=[row.room_id for row in rows] + ) + await self.pusher_pool.on_new_receipts( + token, token, {row.room_id for row in rows} + ) + elif stream_name == "typing": + self.typing_handler.process_replication_rows(token, rows) + self.notifier.on_new_event( + "typing_key", token, rooms=[row.room_id for row in rows] + ) + elif stream_name == "to_device": + entities = [row.entity for row in rows if row.entity.startswith("@")] + if entities: + self.notifier.on_new_event("to_device_key", token, users=entities) + elif stream_name == "device_lists": + all_room_ids = set() + for row in rows: + room_ids = await self.store.get_rooms_for_user(row.user_id) + all_room_ids.update(room_ids) + self.notifier.on_new_event("device_list_key", token, rooms=all_room_ids) + elif stream_name == "presence": + await self.presence_handler.process_replication_rows(token, rows) + elif stream_name == "receipts": + self.notifier.on_new_event( + "groups_key", token, users=[row.user_id for row in rows] + ) + elif stream_name == "pushers": + for row in rows: + if row.deleted: + self.stop_pusher(row.user_id, row.app_id, row.pushkey) + else: + await self.start_pusher(row.user_id, row.app_id, row.pushkey) + except Exception: + logger.exception("Error processing replication") + + def stop_pusher(self, user_id, app_id, pushkey): + if not self.notify_pushers: + return + + key = "%s:%s" % (app_id, pushkey) + pushers_for_user = self.pusher_pool.pushers.get(user_id, {}) + pusher = pushers_for_user.pop(key, None) + if pusher is None: + return + logger.info("Stopping pusher %r / %r", user_id, key) + pusher.on_stop() + + async def start_pusher(self, user_id, app_id, pushkey): + if not self.notify_pushers: + return + + key = "%s:%s" % (app_id, pushkey) + logger.info("Starting pusher %r / %r", user_id, key) + return await self.pusher_pool.start_pusher_by_id(app_id, pushkey, user_id) + + def on_remote_server_up(self, server: str): + """Called when get a new REMOTE_SERVER_UP command.""" + + # Let's wake up the transaction queue for the server in case we have + # pending stuff to send to it. + if self.send_handler: + self.send_handler.wake_destination(server) + + +class FederationSenderHandler(object): + """Processes the replication stream and forwards the appropriate entries + to the federation sender. + """ + + def __init__(self, hs: GenericWorkerServer, replication_client): + self.store = hs.get_datastore() + self._is_mine_id = hs.is_mine_id + self.federation_sender = hs.get_federation_sender() + self.replication_client = replication_client + + self.federation_position = self.store.federation_out_pos_startup + self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer") + + self._last_ack = self.federation_position + + self._room_serials = {} + self._room_typing = {} + + def on_start(self): + # There may be some events that are persisted but haven't been sent, + # so send them now. + self.federation_sender.notify_new_events( + self.store.get_room_max_stream_ordering() + ) + + def wake_destination(self, server: str): + self.federation_sender.wake_destination(server) + + def stream_positions(self): + return {"federation": self.federation_position} + + def process_replication_rows(self, stream_name, token, rows): + # The federation stream contains things that we want to send out, e.g. + # presence, typing, etc. + if stream_name == "federation": + send_queue.process_rows_for_federation(self.federation_sender, rows) + run_in_background(self.update_token, token) + + # We also need to poke the federation sender when new events happen + elif stream_name == "events": + self.federation_sender.notify_new_events(token) + + # ... and when new receipts happen + elif stream_name == ReceiptsStream.NAME: + run_as_background_process( + "process_receipts_for_federation", self._on_new_receipts, rows + ) + + # ... as well as device updates and messages + elif stream_name == DeviceListsStream.NAME: + hosts = {row.destination for row in rows} + for host in hosts: + self.federation_sender.send_device_messages(host) + + elif stream_name == ToDeviceStream.NAME: + # The to_device stream includes stuff to be pushed to both local + # clients and remote servers, so we ignore entities that start with + # '@' (since they'll be local users rather than destinations). + hosts = {row.entity for row in rows if not row.entity.startswith("@")} + for host in hosts: + self.federation_sender.send_device_messages(host) + + async def _on_new_receipts(self, rows): + """ + Args: + rows (iterable[synapse.replication.tcp.streams.ReceiptsStreamRow]): + new receipts to be processed + """ + for receipt in rows: + # we only want to send on receipts for our own users + if not self._is_mine_id(receipt.user_id): + continue + receipt_info = ReadReceipt( + receipt.room_id, + receipt.receipt_type, + receipt.user_id, + [receipt.event_id], + receipt.data, + ) + await self.federation_sender.send_read_receipt(receipt_info) + + async def update_token(self, token): + try: + self.federation_position = token + + # We linearize here to ensure we don't have races updating the token + with (await self._fed_position_linearizer.queue(None)): + if self._last_ack < self.federation_position: + await self.store.update_federation_out_pos( + "federation", self.federation_position + ) + + # We ACK this token over replication so that the master can drop + # its in memory queues + self.replication_client.send_federation_ack( + self.federation_position + ) + self._last_ack = self.federation_position + except Exception: + logger.exception("Error updating federation stream position") + + +def start(config_options): + try: + config = HomeServerConfig.load_config("Synapse worker", config_options) + except ConfigError as e: + sys.stderr.write("\n" + str(e) + "\n") + sys.exit(1) + + # For backwards compatibility let any of the old app names. + assert config.worker_app in ( + "synapse.app.appservice", + "synapse.app.client_reader", + "synapse.app.event_creator", + "synapse.app.federation_reader", + "synapse.app.federation_sender", + "synapse.app.frontend_proxy", + "synapse.app.generic_worker", + "synapse.app.media_repository", + "synapse.app.pusher", + "synapse.app.synchrotron", + "synapse.app.user_dir", + ) + + if config.worker_app == "synapse.app.appservice": + if config.notify_appservices: + sys.stderr.write( + "\nThe appservices must be disabled in the main synapse process" + "\nbefore they can be run in a separate worker." + "\nPlease add ``notify_appservices: false`` to the main config" + "\n" + ) + sys.exit(1) + + # Force the appservice to start since they will be disabled in the main config + config.notify_appservices = True + + if config.worker_app == "synapse.app.pusher": + if config.start_pushers: + sys.stderr.write( + "\nThe pushers must be disabled in the main synapse process" + "\nbefore they can be run in a separate worker." + "\nPlease add ``start_pushers: false`` to the main config" + "\n" + ) + sys.exit(1) + + # Force the pushers to start since they will be disabled in the main config + config.start_pushers = True + + if config.worker_app == "synapse.app.user_dir": + if config.update_user_directory: + sys.stderr.write( + "\nThe update_user_directory must be disabled in the main synapse process" + "\nbefore they can be run in a separate worker." + "\nPlease add ``update_user_directory: false`` to the main config" + "\n" + ) + sys.exit(1) + + # Force the pushers to start since they will be disabled in the main config + config.update_user_directory = True + + if config.worker_app == "synapse.app.federation_sender": + if config.send_federation: + sys.stderr.write( + "\nThe send_federation must be disabled in the main synapse process" + "\nbefore they can be run in a separate worker." + "\nPlease add ``send_federation: false`` to the main config" + "\n" + ) + sys.exit(1) + + # Force the pushers to start since they will be disabled in the main config + config.send_federation = True + + synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts + + ss = GenericWorkerServer( + config.server_name, + config=config, + version_string="Synapse/" + get_version_string(synapse), + ) + + setup_logging(ss, config, use_worker_options=True) + + ss.setup() + reactor.addSystemEventTrigger( + "before", "startup", _base.start, ss, config.worker_listeners + ) + + _base.start_worker_reactor("synapse-generic-worker", config) + + +if __name__ == "__main__": + with LoggingContext("main"): + start(sys.argv[1:]) diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py index 5b5832214a..add43147b3 100644 --- a/synapse/app/media_repository.py +++ b/synapse/app/media_repository.py @@ -13,162 +13,11 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import logging -import sys - -from twisted.internet import reactor -from twisted.web.resource import NoResource - -import synapse -from synapse import events -from synapse.api.urls import LEGACY_MEDIA_PREFIX, MEDIA_PREFIX -from synapse.app import _base -from synapse.config._base import ConfigError -from synapse.config.homeserver import HomeServerConfig -from synapse.config.logger import setup_logging -from synapse.http.server import JsonResource -from synapse.http.site import SynapseSite -from synapse.logging.context import LoggingContext -from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy -from synapse.replication.slave.storage._base import BaseSlavedStore -from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore -from synapse.replication.slave.storage.client_ips import SlavedClientIpStore -from synapse.replication.slave.storage.registration import SlavedRegistrationStore -from synapse.replication.slave.storage.room import RoomStore -from synapse.replication.slave.storage.transactions import SlavedTransactionStore -from synapse.replication.tcp.client import ReplicationClientHandler -from synapse.rest.admin import register_servlets_for_media_repo -from synapse.server import HomeServer -from synapse.storage.data_stores.main.media_repository import MediaRepositoryStore -from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.manhole import manhole -from synapse.util.versionstring import get_version_string - -logger = logging.getLogger("synapse.app.media_repository") - - -class MediaRepositorySlavedStore( - RoomStore, - SlavedApplicationServiceStore, - SlavedRegistrationStore, - SlavedClientIpStore, - SlavedTransactionStore, - BaseSlavedStore, - MediaRepositoryStore, -): - pass - - -class MediaRepositoryServer(HomeServer): - DATASTORE_CLASS = MediaRepositorySlavedStore - - def _listen_http(self, listener_config): - port = listener_config["port"] - bind_addresses = listener_config["bind_addresses"] - site_tag = listener_config.get("tag", port) - resources = {} - for res in listener_config["resources"]: - for name in res["names"]: - if name == "metrics": - resources[METRICS_PREFIX] = MetricsResource(RegistryProxy) - elif name == "media": - media_repo = self.get_media_repository_resource() - - # We need to serve the admin servlets for media on the - # worker. - admin_resource = JsonResource(self, canonical_json=False) - register_servlets_for_media_repo(self, admin_resource) - - resources.update( - { - MEDIA_PREFIX: media_repo, - LEGACY_MEDIA_PREFIX: media_repo, - "/_synapse/admin": admin_resource, - } - ) - - root_resource = create_resource_tree(resources, NoResource()) - - _base.listen_tcp( - bind_addresses, - port, - SynapseSite( - "synapse.access.http.%s" % (site_tag,), - site_tag, - listener_config, - root_resource, - self.version_string, - ), - ) - logger.info("Synapse media repository now listening on port %d", port) - - def start_listening(self, listeners): - for listener in listeners: - if listener["type"] == "http": - self._listen_http(listener) - elif listener["type"] == "manhole": - _base.listen_tcp( - listener["bind_addresses"], - listener["port"], - manhole( - username="matrix", password="rabbithole", globals={"hs": self} - ), - ) - elif listener["type"] == "metrics": - if not self.get_config().enable_metrics: - logger.warning( - ( - "Metrics listener configured, but " - "enable_metrics is not True!" - ) - ) - else: - _base.listen_metrics(listener["bind_addresses"], listener["port"]) - else: - logger.warning("Unrecognized listener type: %s", listener["type"]) - - self.get_tcp_replication().start_replication(self) - - def build_tcp_replication(self): - return ReplicationClientHandler(self.get_datastore()) - - -def start(config_options): - try: - config = HomeServerConfig.load_config( - "Synapse media repository", config_options - ) - except ConfigError as e: - sys.stderr.write("\n" + str(e) + "\n") - sys.exit(1) - - assert config.worker_app == "synapse.app.media_repository" - - if config.enable_media_repo: - _base.quit_with_error( - "enable_media_repo must be disabled in the main synapse process\n" - "before the media repo can be run in a separate worker.\n" - "Please add ``enable_media_repo: false`` to the main config\n" - ) - - events.USE_FROZEN_DICTS = config.use_frozen_dicts - - ss = MediaRepositoryServer( - config.server_name, - config=config, - version_string="Synapse/" + get_version_string(synapse), - ) - - setup_logging(ss, config, use_worker_options=True) - - ss.setup() - reactor.addSystemEventTrigger( - "before", "startup", _base.start, ss, config.worker_listeners - ) - - _base.start_worker_reactor("synapse-media-repository", config) +import sys +from synapse.app.generic_worker import start +from synapse.util.logcontext import LoggingContext if __name__ == "__main__": with LoggingContext("main"): diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 84e9f8d5e2..add43147b3 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -13,213 +13,12 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import logging -import sys - -from twisted.internet import defer, reactor -from twisted.web.resource import NoResource - -import synapse -from synapse import events -from synapse.app import _base -from synapse.config._base import ConfigError -from synapse.config.homeserver import HomeServerConfig -from synapse.config.logger import setup_logging -from synapse.http.site import SynapseSite -from synapse.logging.context import LoggingContext, run_in_background -from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy -from synapse.replication.slave.storage._base import __func__ -from synapse.replication.slave.storage.account_data import SlavedAccountDataStore -from synapse.replication.slave.storage.events import SlavedEventStore -from synapse.replication.slave.storage.pushers import SlavedPusherStore -from synapse.replication.slave.storage.receipts import SlavedReceiptsStore -from synapse.replication.slave.storage.room import RoomStore -from synapse.replication.tcp.client import ReplicationClientHandler -from synapse.server import HomeServer -from synapse.storage import DataStore -from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.manhole import manhole -from synapse.util.versionstring import get_version_string - -logger = logging.getLogger("synapse.app.pusher") - - -class PusherSlaveStore( - SlavedEventStore, - SlavedPusherStore, - SlavedReceiptsStore, - SlavedAccountDataStore, - RoomStore, -): - update_pusher_last_stream_ordering_and_success = __func__( - DataStore.update_pusher_last_stream_ordering_and_success - ) - - update_pusher_failing_since = __func__(DataStore.update_pusher_failing_since) - - update_pusher_last_stream_ordering = __func__( - DataStore.update_pusher_last_stream_ordering - ) - - get_throttle_params_by_room = __func__(DataStore.get_throttle_params_by_room) - - set_throttle_params = __func__(DataStore.set_throttle_params) - - get_time_of_last_push_action_before = __func__( - DataStore.get_time_of_last_push_action_before - ) - - get_profile_displayname = __func__(DataStore.get_profile_displayname) - - -class PusherServer(HomeServer): - DATASTORE_CLASS = PusherSlaveStore - - def remove_pusher(self, app_id, push_key, user_id): - self.get_tcp_replication().send_remove_pusher(app_id, push_key, user_id) - - def _listen_http(self, listener_config): - port = listener_config["port"] - bind_addresses = listener_config["bind_addresses"] - site_tag = listener_config.get("tag", port) - resources = {} - for res in listener_config["resources"]: - for name in res["names"]: - if name == "metrics": - resources[METRICS_PREFIX] = MetricsResource(RegistryProxy) - - root_resource = create_resource_tree(resources, NoResource()) - - _base.listen_tcp( - bind_addresses, - port, - SynapseSite( - "synapse.access.http.%s" % (site_tag,), - site_tag, - listener_config, - root_resource, - self.version_string, - ), - ) - - logger.info("Synapse pusher now listening on port %d", port) - - def start_listening(self, listeners): - for listener in listeners: - if listener["type"] == "http": - self._listen_http(listener) - elif listener["type"] == "manhole": - _base.listen_tcp( - listener["bind_addresses"], - listener["port"], - manhole( - username="matrix", password="rabbithole", globals={"hs": self} - ), - ) - elif listener["type"] == "metrics": - if not self.get_config().enable_metrics: - logger.warning( - ( - "Metrics listener configured, but " - "enable_metrics is not True!" - ) - ) - else: - _base.listen_metrics(listener["bind_addresses"], listener["port"]) - else: - logger.warning("Unrecognized listener type: %s", listener["type"]) - - self.get_tcp_replication().start_replication(self) - def build_tcp_replication(self): - return PusherReplicationHandler(self) - - -class PusherReplicationHandler(ReplicationClientHandler): - def __init__(self, hs): - super(PusherReplicationHandler, self).__init__(hs.get_datastore()) - - self.pusher_pool = hs.get_pusherpool() - - async def on_rdata(self, stream_name, token, rows): - await super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows) - run_in_background(self.poke_pushers, stream_name, token, rows) - - @defer.inlineCallbacks - def poke_pushers(self, stream_name, token, rows): - try: - if stream_name == "pushers": - for row in rows: - if row.deleted: - yield self.stop_pusher(row.user_id, row.app_id, row.pushkey) - else: - yield self.start_pusher(row.user_id, row.app_id, row.pushkey) - elif stream_name == "events": - yield self.pusher_pool.on_new_notifications(token, token) - elif stream_name == "receipts": - yield self.pusher_pool.on_new_receipts( - token, token, {row.room_id for row in rows} - ) - except Exception: - logger.exception("Error poking pushers") - - def stop_pusher(self, user_id, app_id, pushkey): - key = "%s:%s" % (app_id, pushkey) - pushers_for_user = self.pusher_pool.pushers.get(user_id, {}) - pusher = pushers_for_user.pop(key, None) - if pusher is None: - return - logger.info("Stopping pusher %r / %r", user_id, key) - pusher.on_stop() - - def start_pusher(self, user_id, app_id, pushkey): - key = "%s:%s" % (app_id, pushkey) - logger.info("Starting pusher %r / %r", user_id, key) - return self.pusher_pool.start_pusher_by_id(app_id, pushkey, user_id) - - -def start(config_options): - try: - config = HomeServerConfig.load_config("Synapse pusher", config_options) - except ConfigError as e: - sys.stderr.write("\n" + str(e) + "\n") - sys.exit(1) - - assert config.worker_app == "synapse.app.pusher" - - events.USE_FROZEN_DICTS = config.use_frozen_dicts - - if config.start_pushers: - sys.stderr.write( - "\nThe pushers must be disabled in the main synapse process" - "\nbefore they can be run in a separate worker." - "\nPlease add ``start_pushers: false`` to the main config" - "\n" - ) - sys.exit(1) - - # Force the pushers to start since they will be disabled in the main config - config.start_pushers = True - - ps = PusherServer( - config.server_name, - config=config, - version_string="Synapse/" + get_version_string(synapse), - ) - - setup_logging(ps, config, use_worker_options=True) - - ps.setup() - - def start(): - _base.start(ps, config.worker_listeners) - ps.get_pusherpool().start() - - reactor.addSystemEventTrigger("before", "startup", start) - - _base.start_worker_reactor("synapse-pusher", config) +import sys +from synapse.app.generic_worker import start +from synapse.util.logcontext import LoggingContext if __name__ == "__main__": with LoggingContext("main"): - ps = start(sys.argv[1:]) + start(sys.argv[1:]) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 8982c0676e..add43147b3 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -13,454 +13,11 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import contextlib -import logging -import sys - -from six import iteritems - -from twisted.internet import defer, reactor -from twisted.web.resource import NoResource - -import synapse -from synapse.api.constants import EventTypes -from synapse.app import _base -from synapse.config._base import ConfigError -from synapse.config.homeserver import HomeServerConfig -from synapse.config.logger import setup_logging -from synapse.handlers.presence import PresenceHandler, get_interested_parties -from synapse.http.server import JsonResource -from synapse.http.site import SynapseSite -from synapse.logging.context import LoggingContext, run_in_background -from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy -from synapse.replication.slave.storage._base import BaseSlavedStore, __func__ -from synapse.replication.slave.storage.account_data import SlavedAccountDataStore -from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore -from synapse.replication.slave.storage.client_ips import SlavedClientIpStore -from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore -from synapse.replication.slave.storage.devices import SlavedDeviceStore -from synapse.replication.slave.storage.events import SlavedEventStore -from synapse.replication.slave.storage.filtering import SlavedFilteringStore -from synapse.replication.slave.storage.groups import SlavedGroupServerStore -from synapse.replication.slave.storage.presence import SlavedPresenceStore -from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore -from synapse.replication.slave.storage.receipts import SlavedReceiptsStore -from synapse.replication.slave.storage.registration import SlavedRegistrationStore -from synapse.replication.slave.storage.room import RoomStore -from synapse.replication.tcp.client import ReplicationClientHandler -from synapse.replication.tcp.streams.events import EventsStreamEventRow, EventsStreamRow -from synapse.rest.client.v1 import events -from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet -from synapse.rest.client.v1.room import RoomInitialSyncRestServlet -from synapse.rest.client.v2_alpha import sync -from synapse.server import HomeServer -from synapse.storage.data_stores.main.monthly_active_users import ( - MonthlyActiveUsersWorkerStore, -) -from synapse.storage.data_stores.main.presence import UserPresenceState -from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.manhole import manhole -from synapse.util.stringutils import random_string -from synapse.util.versionstring import get_version_string - -logger = logging.getLogger("synapse.app.synchrotron") - - -class SynchrotronSlavedStore( - SlavedReceiptsStore, - SlavedAccountDataStore, - SlavedApplicationServiceStore, - SlavedRegistrationStore, - SlavedFilteringStore, - SlavedPresenceStore, - SlavedGroupServerStore, - SlavedDeviceInboxStore, - SlavedDeviceStore, - SlavedPushRuleStore, - SlavedEventStore, - SlavedClientIpStore, - RoomStore, - MonthlyActiveUsersWorkerStore, - BaseSlavedStore, -): - pass - - -UPDATE_SYNCING_USERS_MS = 10 * 1000 - - -class SynchrotronPresence(object): - def __init__(self, hs): - self.hs = hs - self.is_mine_id = hs.is_mine_id - self.http_client = hs.get_simple_http_client() - self.store = hs.get_datastore() - self.user_to_num_current_syncs = {} - self.clock = hs.get_clock() - self.notifier = hs.get_notifier() - - active_presence = self.store.take_presence_startup_info() - self.user_to_current_state = {state.user_id: state for state in active_presence} - - # user_id -> last_sync_ms. Lists the users that have stopped syncing - # but we haven't notified the master of that yet - self.users_going_offline = {} - - self._send_stop_syncing_loop = self.clock.looping_call( - self.send_stop_syncing, 10 * 1000 - ) - - self.process_id = random_string(16) - logger.info("Presence process_id is %r", self.process_id) - - def send_user_sync(self, user_id, is_syncing, last_sync_ms): - if self.hs.config.use_presence: - self.hs.get_tcp_replication().send_user_sync( - user_id, is_syncing, last_sync_ms - ) - - def mark_as_coming_online(self, user_id): - """A user has started syncing. Send a UserSync to the master, unless they - had recently stopped syncing. - - Args: - user_id (str) - """ - going_offline = self.users_going_offline.pop(user_id, None) - if not going_offline: - # Safe to skip because we haven't yet told the master they were offline - self.send_user_sync(user_id, True, self.clock.time_msec()) - - def mark_as_going_offline(self, user_id): - """A user has stopped syncing. We wait before notifying the master as - its likely they'll come back soon. This allows us to avoid sending - a stopped syncing immediately followed by a started syncing notification - to the master - - Args: - user_id (str) - """ - self.users_going_offline[user_id] = self.clock.time_msec() - - def send_stop_syncing(self): - """Check if there are any users who have stopped syncing a while ago - and haven't come back yet. If there are poke the master about them. - """ - now = self.clock.time_msec() - for user_id, last_sync_ms in list(self.users_going_offline.items()): - if now - last_sync_ms > 10 * 1000: - self.users_going_offline.pop(user_id, None) - self.send_user_sync(user_id, False, last_sync_ms) - - def set_state(self, user, state, ignore_status_msg=False): - # TODO Hows this supposed to work? - return defer.succeed(None) - - get_states = __func__(PresenceHandler.get_states) - get_state = __func__(PresenceHandler.get_state) - current_state_for_users = __func__(PresenceHandler.current_state_for_users) - - def user_syncing(self, user_id, affect_presence): - if affect_presence: - curr_sync = self.user_to_num_current_syncs.get(user_id, 0) - self.user_to_num_current_syncs[user_id] = curr_sync + 1 - - # If we went from no in flight sync to some, notify replication - if self.user_to_num_current_syncs[user_id] == 1: - self.mark_as_coming_online(user_id) - - def _end(): - # We check that the user_id is in user_to_num_current_syncs because - # user_to_num_current_syncs may have been cleared if we are - # shutting down. - if affect_presence and user_id in self.user_to_num_current_syncs: - self.user_to_num_current_syncs[user_id] -= 1 - - # If we went from one in flight sync to non, notify replication - if self.user_to_num_current_syncs[user_id] == 0: - self.mark_as_going_offline(user_id) - - @contextlib.contextmanager - def _user_syncing(): - try: - yield - finally: - _end() - - return defer.succeed(_user_syncing()) - - @defer.inlineCallbacks - def notify_from_replication(self, states, stream_id): - parties = yield get_interested_parties(self.store, states) - room_ids_to_states, users_to_states = parties - - self.notifier.on_new_event( - "presence_key", - stream_id, - rooms=room_ids_to_states.keys(), - users=users_to_states.keys(), - ) - - @defer.inlineCallbacks - def process_replication_rows(self, token, rows): - states = [ - UserPresenceState( - row.user_id, - row.state, - row.last_active_ts, - row.last_federation_update_ts, - row.last_user_sync_ts, - row.status_msg, - row.currently_active, - ) - for row in rows - ] - - for state in states: - self.user_to_current_state[state.user_id] = state - - stream_id = token - yield self.notify_from_replication(states, stream_id) - - def get_currently_syncing_users(self): - if self.hs.config.use_presence: - return [ - user_id - for user_id, count in iteritems(self.user_to_num_current_syncs) - if count > 0 - ] - else: - return set() - -class SynchrotronTyping(object): - def __init__(self, hs): - self._latest_room_serial = 0 - self._reset() - - def _reset(self): - """ - Reset the typing handler's data caches. - """ - # map room IDs to serial numbers - self._room_serials = {} - # map room IDs to sets of users currently typing - self._room_typing = {} - - def stream_positions(self): - # We must update this typing token from the response of the previous - # sync. In particular, the stream id may "reset" back to zero/a low - # value which we *must* use for the next replication request. - return {"typing": self._latest_room_serial} - - def process_replication_rows(self, token, rows): - if self._latest_room_serial > token: - # The master has gone backwards. To prevent inconsistent data, just - # clear everything. - self._reset() - - # Set the latest serial token to whatever the server gave us. - self._latest_room_serial = token - - for row in rows: - self._room_serials[row.room_id] = token - self._room_typing[row.room_id] = row.user_ids - - -class SynchrotronApplicationService(object): - def notify_interested_services(self, event): - pass - - -class SynchrotronServer(HomeServer): - DATASTORE_CLASS = SynchrotronSlavedStore - - def _listen_http(self, listener_config): - port = listener_config["port"] - bind_addresses = listener_config["bind_addresses"] - site_tag = listener_config.get("tag", port) - resources = {} - for res in listener_config["resources"]: - for name in res["names"]: - if name == "metrics": - resources[METRICS_PREFIX] = MetricsResource(RegistryProxy) - elif name == "client": - resource = JsonResource(self, canonical_json=False) - sync.register_servlets(self, resource) - events.register_servlets(self, resource) - InitialSyncRestServlet(self).register(resource) - RoomInitialSyncRestServlet(self).register(resource) - resources.update( - { - "/_matrix/client/r0": resource, - "/_matrix/client/unstable": resource, - "/_matrix/client/v2_alpha": resource, - "/_matrix/client/api/v1": resource, - } - ) - - root_resource = create_resource_tree(resources, NoResource()) - - _base.listen_tcp( - bind_addresses, - port, - SynapseSite( - "synapse.access.http.%s" % (site_tag,), - site_tag, - listener_config, - root_resource, - self.version_string, - ), - ) - - logger.info("Synapse synchrotron now listening on port %d", port) - - def start_listening(self, listeners): - for listener in listeners: - if listener["type"] == "http": - self._listen_http(listener) - elif listener["type"] == "manhole": - _base.listen_tcp( - listener["bind_addresses"], - listener["port"], - manhole( - username="matrix", password="rabbithole", globals={"hs": self} - ), - ) - elif listener["type"] == "metrics": - if not self.get_config().enable_metrics: - logger.warning( - ( - "Metrics listener configured, but " - "enable_metrics is not True!" - ) - ) - else: - _base.listen_metrics(listener["bind_addresses"], listener["port"]) - else: - logger.warning("Unrecognized listener type: %s", listener["type"]) - - self.get_tcp_replication().start_replication(self) - - def build_tcp_replication(self): - return SyncReplicationHandler(self) - - def build_presence_handler(self): - return SynchrotronPresence(self) - - def build_typing_handler(self): - return SynchrotronTyping(self) - - -class SyncReplicationHandler(ReplicationClientHandler): - def __init__(self, hs): - super(SyncReplicationHandler, self).__init__(hs.get_datastore()) - - self.store = hs.get_datastore() - self.typing_handler = hs.get_typing_handler() - # NB this is a SynchrotronPresence, not a normal PresenceHandler - self.presence_handler = hs.get_presence_handler() - self.notifier = hs.get_notifier() - - async def on_rdata(self, stream_name, token, rows): - await super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows) - run_in_background(self.process_and_notify, stream_name, token, rows) - - def get_streams_to_replicate(self): - args = super(SyncReplicationHandler, self).get_streams_to_replicate() - args.update(self.typing_handler.stream_positions()) - return args - - def get_currently_syncing_users(self): - return self.presence_handler.get_currently_syncing_users() - - async def process_and_notify(self, stream_name, token, rows): - try: - if stream_name == "events": - # We shouldn't get multiple rows per token for events stream, so - # we don't need to optimise this for multiple rows. - for row in rows: - if row.type != EventsStreamEventRow.TypeId: - continue - assert isinstance(row, EventsStreamRow) - - event = await self.store.get_event( - row.data.event_id, allow_rejected=True - ) - if event.rejected_reason: - continue - - extra_users = () - if event.type == EventTypes.Member: - extra_users = (event.state_key,) - max_token = self.store.get_room_max_stream_ordering() - self.notifier.on_new_room_event( - event, token, max_token, extra_users - ) - elif stream_name == "push_rules": - self.notifier.on_new_event( - "push_rules_key", token, users=[row.user_id for row in rows] - ) - elif stream_name in ("account_data", "tag_account_data"): - self.notifier.on_new_event( - "account_data_key", token, users=[row.user_id for row in rows] - ) - elif stream_name == "receipts": - self.notifier.on_new_event( - "receipt_key", token, rooms=[row.room_id for row in rows] - ) - elif stream_name == "typing": - self.typing_handler.process_replication_rows(token, rows) - self.notifier.on_new_event( - "typing_key", token, rooms=[row.room_id for row in rows] - ) - elif stream_name == "to_device": - entities = [row.entity for row in rows if row.entity.startswith("@")] - if entities: - self.notifier.on_new_event("to_device_key", token, users=entities) - elif stream_name == "device_lists": - all_room_ids = set() - for row in rows: - room_ids = await self.store.get_rooms_for_user(row.user_id) - all_room_ids.update(room_ids) - self.notifier.on_new_event("device_list_key", token, rooms=all_room_ids) - elif stream_name == "presence": - await self.presence_handler.process_replication_rows(token, rows) - elif stream_name == "receipts": - self.notifier.on_new_event( - "groups_key", token, users=[row.user_id for row in rows] - ) - except Exception: - logger.exception("Error processing replication") - - -def start(config_options): - try: - config = HomeServerConfig.load_config("Synapse synchrotron", config_options) - except ConfigError as e: - sys.stderr.write("\n" + str(e) + "\n") - sys.exit(1) - - assert config.worker_app == "synapse.app.synchrotron" - - synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts - - ss = SynchrotronServer( - config.server_name, - config=config, - version_string="Synapse/" + get_version_string(synapse), - application_service_handler=SynchrotronApplicationService(), - ) - - setup_logging(ss, config, use_worker_options=True) - - ss.setup() - reactor.addSystemEventTrigger( - "before", "startup", _base.start, ss, config.worker_listeners - ) - - _base.start_worker_reactor("synapse-synchrotron", config) +import sys +from synapse.app.generic_worker import start +from synapse.util.logcontext import LoggingContext if __name__ == "__main__": with LoggingContext("main"): diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py index ba536d6f04..503d44f687 100644 --- a/synapse/app/user_dir.py +++ b/synapse/app/user_dir.py @@ -14,217 +14,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging import sys -from twisted.internet import defer, reactor -from twisted.web.resource import NoResource - -import synapse -from synapse import events -from synapse.app import _base -from synapse.config._base import ConfigError -from synapse.config.homeserver import HomeServerConfig -from synapse.config.logger import setup_logging -from synapse.http.server import JsonResource -from synapse.http.site import SynapseSite -from synapse.logging.context import LoggingContext, run_in_background -from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy -from synapse.replication.slave.storage._base import BaseSlavedStore -from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore -from synapse.replication.slave.storage.client_ips import SlavedClientIpStore -from synapse.replication.slave.storage.events import SlavedEventStore -from synapse.replication.slave.storage.registration import SlavedRegistrationStore -from synapse.replication.tcp.client import ReplicationClientHandler -from synapse.replication.tcp.streams.events import ( - EventsStream, - EventsStreamCurrentStateRow, -) -from synapse.rest.client.v2_alpha import user_directory -from synapse.server import HomeServer -from synapse.storage.data_stores.main.user_directory import UserDirectoryStore -from synapse.storage.database import Database -from synapse.util.caches.stream_change_cache import StreamChangeCache -from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.manhole import manhole -from synapse.util.versionstring import get_version_string - -logger = logging.getLogger("synapse.app.user_dir") - - -class UserDirectorySlaveStore( - SlavedEventStore, - SlavedApplicationServiceStore, - SlavedRegistrationStore, - SlavedClientIpStore, - UserDirectoryStore, - BaseSlavedStore, -): - def __init__(self, database: Database, db_conn, hs): - super(UserDirectorySlaveStore, self).__init__(database, db_conn, hs) - - events_max = self._stream_id_gen.get_current_token() - curr_state_delta_prefill, min_curr_state_delta_id = self.db.get_cache_dict( - db_conn, - "current_state_delta_stream", - entity_column="room_id", - stream_column="stream_id", - max_value=events_max, # As we share the stream id with events token - limit=1000, - ) - self._curr_state_delta_stream_cache = StreamChangeCache( - "_curr_state_delta_stream_cache", - min_curr_state_delta_id, - prefilled_cache=curr_state_delta_prefill, - ) - - def stream_positions(self): - result = super(UserDirectorySlaveStore, self).stream_positions() - return result - - def process_replication_rows(self, stream_name, token, rows): - if stream_name == EventsStream.NAME: - self._stream_id_gen.advance(token) - for row in rows: - if row.type != EventsStreamCurrentStateRow.TypeId: - continue - self._curr_state_delta_stream_cache.entity_has_changed( - row.data.room_id, token - ) - return super(UserDirectorySlaveStore, self).process_replication_rows( - stream_name, token, rows - ) - - -class UserDirectoryServer(HomeServer): - DATASTORE_CLASS = UserDirectorySlaveStore - - def _listen_http(self, listener_config): - port = listener_config["port"] - bind_addresses = listener_config["bind_addresses"] - site_tag = listener_config.get("tag", port) - resources = {} - for res in listener_config["resources"]: - for name in res["names"]: - if name == "metrics": - resources[METRICS_PREFIX] = MetricsResource(RegistryProxy) - elif name == "client": - resource = JsonResource(self, canonical_json=False) - user_directory.register_servlets(self, resource) - resources.update( - { - "/_matrix/client/r0": resource, - "/_matrix/client/unstable": resource, - "/_matrix/client/v2_alpha": resource, - "/_matrix/client/api/v1": resource, - } - ) - - root_resource = create_resource_tree(resources, NoResource()) - - _base.listen_tcp( - bind_addresses, - port, - SynapseSite( - "synapse.access.http.%s" % (site_tag,), - site_tag, - listener_config, - root_resource, - self.version_string, - ), - ) - - logger.info("Synapse user_dir now listening on port %d", port) - - def start_listening(self, listeners): - for listener in listeners: - if listener["type"] == "http": - self._listen_http(listener) - elif listener["type"] == "manhole": - _base.listen_tcp( - listener["bind_addresses"], - listener["port"], - manhole( - username="matrix", password="rabbithole", globals={"hs": self} - ), - ) - elif listener["type"] == "metrics": - if not self.get_config().enable_metrics: - logger.warning( - ( - "Metrics listener configured, but " - "enable_metrics is not True!" - ) - ) - else: - _base.listen_metrics(listener["bind_addresses"], listener["port"]) - else: - logger.warning("Unrecognized listener type: %s", listener["type"]) - - self.get_tcp_replication().start_replication(self) - - def build_tcp_replication(self): - return UserDirectoryReplicationHandler(self) - - -class UserDirectoryReplicationHandler(ReplicationClientHandler): - def __init__(self, hs): - super(UserDirectoryReplicationHandler, self).__init__(hs.get_datastore()) - self.user_directory = hs.get_user_directory_handler() - - async def on_rdata(self, stream_name, token, rows): - await super(UserDirectoryReplicationHandler, self).on_rdata( - stream_name, token, rows - ) - if stream_name == EventsStream.NAME: - run_in_background(self._notify_directory) - - @defer.inlineCallbacks - def _notify_directory(self): - try: - yield self.user_directory.notify_new_event() - except Exception: - logger.exception("Error notifiying user directory of state update") - - -def start(config_options): - try: - config = HomeServerConfig.load_config("Synapse user directory", config_options) - except ConfigError as e: - sys.stderr.write("\n" + str(e) + "\n") - sys.exit(1) - - assert config.worker_app == "synapse.app.user_dir" - - events.USE_FROZEN_DICTS = config.use_frozen_dicts - - if config.update_user_directory: - sys.stderr.write( - "\nThe update_user_directory must be disabled in the main synapse process" - "\nbefore they can be run in a separate worker." - "\nPlease add ``update_user_directory: false`` to the main config" - "\n" - ) - sys.exit(1) - - # Force the pushers to start since they will be disabled in the main config - config.update_user_directory = True - - ss = UserDirectoryServer( - config.server_name, - config=config, - version_string="Synapse/" + get_version_string(synapse), - ) - - setup_logging(ss, config, use_worker_options=True) - - ss.setup() - reactor.addSystemEventTrigger( - "before", "startup", _base.start, ss, config.worker_listeners - ) - - _base.start_worker_reactor("synapse-user-dir", config) - +from synapse.app.generic_worker import start +from synapse.util.logcontext import LoggingContext if __name__ == "__main__": with LoggingContext("main"): diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 3aa6cb8b96..e73342c657 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -32,6 +32,7 @@ from synapse.storage.data_stores.main.state import StateGroupWorkerStore from synapse.storage.data_stores.main.stream import StreamWorkerStore from synapse.storage.data_stores.main.user_erasure_store import UserErasureWorkerStore from synapse.storage.database import Database +from synapse.util.caches.stream_change_cache import StreamChangeCache from ._base import BaseSlavedStore from ._slaved_id_tracker import SlavedIdTracker @@ -68,6 +69,21 @@ class SlavedEventStore( super(SlavedEventStore, self).__init__(database, db_conn, hs) + events_max = self._stream_id_gen.get_current_token() + curr_state_delta_prefill, min_curr_state_delta_id = self.db.get_cache_dict( + db_conn, + "current_state_delta_stream", + entity_column="room_id", + stream_column="stream_id", + max_value=events_max, # As we share the stream id with events token + limit=1000, + ) + self._curr_state_delta_stream_cache = StreamChangeCache( + "_curr_state_delta_stream_cache", + min_curr_state_delta_id, + prefilled_cache=curr_state_delta_prefill, + ) + # Cached functions can't be accessed through a class instance so we need # to reach inside the __dict__ to extract them. @@ -120,6 +136,10 @@ class SlavedEventStore( backfilled=False, ) elif row.type == EventsStreamCurrentStateRow.TypeId: + self._curr_state_delta_stream_cache.entity_has_changed( + row.data.room_id, token + ) + if data.type == EventTypes.Member: self.get_rooms_for_user_with_stream_ordering.invalidate( (data.state_key,) diff --git a/synapse/storage/data_stores/main/pusher.py b/synapse/storage/data_stores/main/pusher.py index 6b03233262..547b9d69cb 100644 --- a/synapse/storage/data_stores/main/pusher.py +++ b/synapse/storage/data_stores/main/pusher.py @@ -197,6 +197,84 @@ class PusherWorkerStore(SQLBaseStore): return result + @defer.inlineCallbacks + def update_pusher_last_stream_ordering( + self, app_id, pushkey, user_id, last_stream_ordering + ): + yield self.db.simple_update_one( + "pushers", + {"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, + {"last_stream_ordering": last_stream_ordering}, + desc="update_pusher_last_stream_ordering", + ) + + @defer.inlineCallbacks + def update_pusher_last_stream_ordering_and_success( + self, app_id, pushkey, user_id, last_stream_ordering, last_success + ): + """Update the last stream ordering position we've processed up to for + the given pusher. + + Args: + app_id (str) + pushkey (str) + last_stream_ordering (int) + last_success (int) + + Returns: + Deferred[bool]: True if the pusher still exists; False if it has been deleted. + """ + updated = yield self.db.simple_update( + table="pushers", + keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, + updatevalues={ + "last_stream_ordering": last_stream_ordering, + "last_success": last_success, + }, + desc="update_pusher_last_stream_ordering_and_success", + ) + + return bool(updated) + + @defer.inlineCallbacks + def update_pusher_failing_since(self, app_id, pushkey, user_id, failing_since): + yield self.db.simple_update( + table="pushers", + keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, + updatevalues={"failing_since": failing_since}, + desc="update_pusher_failing_since", + ) + + @defer.inlineCallbacks + def get_throttle_params_by_room(self, pusher_id): + res = yield self.db.simple_select_list( + "pusher_throttle", + {"pusher": pusher_id}, + ["room_id", "last_sent_ts", "throttle_ms"], + desc="get_throttle_params_by_room", + ) + + params_by_room = {} + for row in res: + params_by_room[row["room_id"]] = { + "last_sent_ts": row["last_sent_ts"], + "throttle_ms": row["throttle_ms"], + } + + return params_by_room + + @defer.inlineCallbacks + def set_throttle_params(self, pusher_id, room_id, params): + # no need to lock because `pusher_throttle` has a primary key on + # (pusher, room_id) so simple_upsert will retry + yield self.db.simple_upsert( + "pusher_throttle", + {"pusher": pusher_id, "room_id": room_id}, + params, + desc="set_throttle_params", + lock=False, + ) + class PusherStore(PusherWorkerStore): def get_pushers_stream_token(self): @@ -282,81 +360,3 @@ class PusherStore(PusherWorkerStore): with self._pushers_id_gen.get_next() as stream_id: yield self.db.runInteraction("delete_pusher", delete_pusher_txn, stream_id) - - @defer.inlineCallbacks - def update_pusher_last_stream_ordering( - self, app_id, pushkey, user_id, last_stream_ordering - ): - yield self.db.simple_update_one( - "pushers", - {"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, - {"last_stream_ordering": last_stream_ordering}, - desc="update_pusher_last_stream_ordering", - ) - - @defer.inlineCallbacks - def update_pusher_last_stream_ordering_and_success( - self, app_id, pushkey, user_id, last_stream_ordering, last_success - ): - """Update the last stream ordering position we've processed up to for - the given pusher. - - Args: - app_id (str) - pushkey (str) - last_stream_ordering (int) - last_success (int) - - Returns: - Deferred[bool]: True if the pusher still exists; False if it has been deleted. - """ - updated = yield self.db.simple_update( - table="pushers", - keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, - updatevalues={ - "last_stream_ordering": last_stream_ordering, - "last_success": last_success, - }, - desc="update_pusher_last_stream_ordering_and_success", - ) - - return bool(updated) - - @defer.inlineCallbacks - def update_pusher_failing_since(self, app_id, pushkey, user_id, failing_since): - yield self.db.simple_update( - table="pushers", - keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, - updatevalues={"failing_since": failing_since}, - desc="update_pusher_failing_since", - ) - - @defer.inlineCallbacks - def get_throttle_params_by_room(self, pusher_id): - res = yield self.db.simple_select_list( - "pusher_throttle", - {"pusher": pusher_id}, - ["room_id", "last_sent_ts", "throttle_ms"], - desc="get_throttle_params_by_room", - ) - - params_by_room = {} - for row in res: - params_by_room[row["room_id"]] = { - "last_sent_ts": row["last_sent_ts"], - "throttle_ms": row["throttle_ms"], - } - - return params_by_room - - @defer.inlineCallbacks - def set_throttle_params(self, pusher_id, room_id, params): - # no need to lock because `pusher_throttle` has a primary key on - # (pusher, room_id) so simple_upsert will retry - yield self.db.simple_upsert( - "pusher_throttle", - {"pusher": pusher_id, "room_id": room_id}, - params, - desc="set_throttle_params", - lock=False, - ) diff --git a/tests/app/test_frontend_proxy.py b/tests/app/test_frontend_proxy.py index 8bdbc608a9..160e55aca9 100644 --- a/tests/app/test_frontend_proxy.py +++ b/tests/app/test_frontend_proxy.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.app.frontend_proxy import FrontendProxyServer +from synapse.app.generic_worker import GenericWorkerServer from tests.unittest import HomeserverTestCase @@ -22,7 +22,7 @@ class FrontendProxyTests(HomeserverTestCase): def make_homeserver(self, reactor, clock): hs = self.setup_test_homeserver( - http_client=None, homeserverToUse=FrontendProxyServer + http_client=None, homeserverToUse=GenericWorkerServer ) return hs @@ -46,9 +46,7 @@ class FrontendProxyTests(HomeserverTestCase): # Grab the resource from the site that was told to listen self.assertEqual(len(self.reactor.tcpServers), 1) site = self.reactor.tcpServers[0][1] - self.resource = ( - site.resource.children[b"_matrix"].children[b"client"].children[b"r0"] - ) + self.resource = site.resource.children[b"_matrix"].children[b"client"] request, channel = self.make_request("PUT", "presence/a/status") self.render(request) @@ -76,9 +74,7 @@ class FrontendProxyTests(HomeserverTestCase): # Grab the resource from the site that was told to listen self.assertEqual(len(self.reactor.tcpServers), 1) site = self.reactor.tcpServers[0][1] - self.resource = ( - site.resource.children[b"_matrix"].children[b"client"].children[b"r0"] - ) + self.resource = site.resource.children[b"_matrix"].children[b"client"] request, channel = self.make_request("PUT", "presence/a/status") self.render(request) diff --git a/tests/app/test_openid_listener.py b/tests/app/test_openid_listener.py index 48792d1480..1fe048048b 100644 --- a/tests/app/test_openid_listener.py +++ b/tests/app/test_openid_listener.py @@ -16,7 +16,7 @@ from mock import Mock, patch from parameterized import parameterized -from synapse.app.federation_reader import FederationReaderServer +from synapse.app.generic_worker import GenericWorkerServer from synapse.app.homeserver import SynapseHomeServer from tests.unittest import HomeserverTestCase @@ -25,7 +25,7 @@ from tests.unittest import HomeserverTestCase class FederationReaderOpenIDListenerTests(HomeserverTestCase): def make_homeserver(self, reactor, clock): hs = self.setup_test_homeserver( - http_client=None, homeserverToUse=FederationReaderServer + http_client=None, homeserverToUse=GenericWorkerServer ) return hs -- cgit 1.4.1