From fdb13447167da0670dd6ad95fdf4a99cde450eb9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 20 Mar 2020 14:40:47 +0000 Subject: Remove concept of a non-limited stream. (#7011) --- synapse/handlers/presence.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/handlers/presence.py') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 5526015ddb..6912165622 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -747,7 +747,7 @@ class PresenceHandler(object): return False - async def get_all_presence_updates(self, last_id, current_id): + async def get_all_presence_updates(self, last_id, current_id, limit): """ Gets a list of presence update rows from between the given stream ids. Each row has: @@ -762,7 +762,7 @@ class PresenceHandler(object): """ # TODO(markjh): replicate the unpersisted changes. # This could use the in-memory stores for recent changes. - rows = await self.store.get_all_presence_updates(last_id, current_id) + rows = await self.store.get_all_presence_updates(last_id, current_id, limit) return rows def notify_new_event(self): -- cgit 1.5.1 From 71a1abb8a116372556fd577ff1b85c7cfbe3c2b3 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Wed, 22 Apr 2020 22:39:04 +0100 Subject: Stop the master relaying USER_SYNC for other workers (#7318) Long story short: if we're handling presence on the current worker, we shouldn't be sending USER_SYNC commands over replication. In an attempt to figure out what is going on here, I ended up refactoring some bits of the presencehandler code, so the first 4 commits here are non-functional refactors to move this code slightly closer to sanity. (There's still plenty to do here :/). Suggest reviewing individual commits. Fixes (I hope) #7257. --- changelog.d/7318.misc | 1 + docs/tcp_replication.md | 6 +- synapse/api/constants.py | 2 + synapse/app/generic_worker.py | 85 ++++++++------- synapse/handlers/events.py | 20 ++-- synapse/handlers/initial_sync.py | 10 +- synapse/handlers/presence.py | 210 ++++++++++++++++++++---------------- synapse/replication/tcp/commands.py | 7 +- synapse/replication/tcp/handler.py | 15 +-- synapse/server.pyi | 2 +- 10 files changed, 199 insertions(+), 159 deletions(-) create mode 100644 changelog.d/7318.misc (limited to 'synapse/handlers/presence.py') diff --git a/changelog.d/7318.misc b/changelog.d/7318.misc new file mode 100644 index 0000000000..676f285377 --- /dev/null +++ b/changelog.d/7318.misc @@ -0,0 +1 @@ +Move catchup of replication streams logic to worker. diff --git a/docs/tcp_replication.md b/docs/tcp_replication.md index 3be8e50c4c..b922d9cf7e 100644 --- a/docs/tcp_replication.md +++ b/docs/tcp_replication.md @@ -196,7 +196,7 @@ Asks the server for the current position of all streams. #### USER_SYNC (C) - A user has started or stopped syncing + A user has started or stopped syncing on this process. #### CLEAR_USER_SYNC (C) @@ -216,10 +216,6 @@ Asks the server for the current position of all streams. Inform the server a cache should be invalidated -#### SYNC (S, C) - - Used exclusively in tests - ### REMOTE_SERVER_UP (S, C) Inform other processes that a remote server may have come back online. diff --git a/synapse/api/constants.py b/synapse/api/constants.py index fda2c2e5bb..bcaf2c3600 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -97,6 +97,8 @@ class EventTypes(object): Retention = "m.room.retention" + Presence = "m.presence" + class RejectedReason(object): AUTH_ERROR = "auth_error" diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 37afd2f810..2a56fe0bd5 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -17,6 +17,9 @@ import contextlib import logging import sys +from typing import Dict, Iterable + +from typing_extensions import ContextManager from twisted.internet import defer, reactor from twisted.web.resource import NoResource @@ -38,14 +41,14 @@ from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging from synapse.federation import send_queue from synapse.federation.transport.server import TransportLayerServer -from synapse.handlers.presence import PresenceHandler, get_interested_parties +from synapse.handlers.presence import BasePresenceHandler, get_interested_parties from synapse.http.server import JsonResource from synapse.http.servlet import RestServlet, parse_json_object_from_request from synapse.http.site import SynapseSite from synapse.logging.context import LoggingContext from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.replication.slave.storage._base import BaseSlavedStore, __func__ +from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.account_data import SlavedAccountDataStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.client_ips import SlavedClientIpStore @@ -225,23 +228,32 @@ class KeyUploadServlet(RestServlet): return 200, {"one_time_key_counts": result} +class _NullContextManager(ContextManager[None]): + """A context manager which does nothing.""" + + def __exit__(self, exc_type, exc_val, exc_tb): + pass + + UPDATE_SYNCING_USERS_MS = 10 * 1000 -class GenericWorkerPresence(object): +class GenericWorkerPresence(BasePresenceHandler): def __init__(self, hs): + super().__init__(hs) self.hs = hs self.is_mine_id = hs.is_mine_id self.http_client = hs.get_simple_http_client() - self.store = hs.get_datastore() - self.user_to_num_current_syncs = {} - self.clock = hs.get_clock() + + self._presence_enabled = hs.config.use_presence + + # The number of ongoing syncs on this process, by user id. + # Empty if _presence_enabled is false. + self._user_to_num_current_syncs = {} # type: Dict[str, int] + self.notifier = hs.get_notifier() self.instance_id = hs.get_instance_id() - active_presence = self.store.take_presence_startup_info() - self.user_to_current_state = {state.user_id: state for state in active_presence} - # user_id -> last_sync_ms. Lists the users that have stopped syncing # but we haven't notified the master of that yet self.users_going_offline = {} @@ -259,13 +271,13 @@ class GenericWorkerPresence(object): ) def _on_shutdown(self): - if self.hs.config.use_presence: + if self._presence_enabled: self.hs.get_tcp_replication().send_command( ClearUserSyncsCommand(self.instance_id) ) def send_user_sync(self, user_id, is_syncing, last_sync_ms): - if self.hs.config.use_presence: + if self._presence_enabled: self.hs.get_tcp_replication().send_user_sync( self.instance_id, user_id, is_syncing, last_sync_ms ) @@ -307,28 +319,33 @@ class GenericWorkerPresence(object): # TODO Hows this supposed to work? return defer.succeed(None) - get_states = __func__(PresenceHandler.get_states) - get_state = __func__(PresenceHandler.get_state) - current_state_for_users = __func__(PresenceHandler.current_state_for_users) + async def user_syncing( + self, user_id: str, affect_presence: bool + ) -> ContextManager[None]: + """Record that a user is syncing. + + Called by the sync and events servlets to record that a user has connected to + this worker and is waiting for some events. + """ + if not affect_presence or not self._presence_enabled: + return _NullContextManager() - def user_syncing(self, user_id, affect_presence): - if affect_presence: - curr_sync = self.user_to_num_current_syncs.get(user_id, 0) - self.user_to_num_current_syncs[user_id] = curr_sync + 1 + curr_sync = self._user_to_num_current_syncs.get(user_id, 0) + self._user_to_num_current_syncs[user_id] = curr_sync + 1 - # If we went from no in flight sync to some, notify replication - if self.user_to_num_current_syncs[user_id] == 1: - self.mark_as_coming_online(user_id) + # If we went from no in flight sync to some, notify replication + if self._user_to_num_current_syncs[user_id] == 1: + self.mark_as_coming_online(user_id) def _end(): # We check that the user_id is in user_to_num_current_syncs because # user_to_num_current_syncs may have been cleared if we are # shutting down. - if affect_presence and user_id in self.user_to_num_current_syncs: - self.user_to_num_current_syncs[user_id] -= 1 + if user_id in self._user_to_num_current_syncs: + self._user_to_num_current_syncs[user_id] -= 1 # If we went from one in flight sync to non, notify replication - if self.user_to_num_current_syncs[user_id] == 0: + if self._user_to_num_current_syncs[user_id] == 0: self.mark_as_going_offline(user_id) @contextlib.contextmanager @@ -338,7 +355,7 @@ class GenericWorkerPresence(object): finally: _end() - return defer.succeed(_user_syncing()) + return _user_syncing() @defer.inlineCallbacks def notify_from_replication(self, states, stream_id): @@ -373,15 +390,12 @@ class GenericWorkerPresence(object): stream_id = token yield self.notify_from_replication(states, stream_id) - def get_currently_syncing_users(self): - if self.hs.config.use_presence: - return [ - user_id - for user_id, count in self.user_to_num_current_syncs.items() - if count > 0 - ] - else: - return set() + def get_currently_syncing_users_for_replication(self) -> Iterable[str]: + return [ + user_id + for user_id, count in self._user_to_num_current_syncs.items() + if count > 0 + ] class GenericWorkerTyping(object): @@ -625,8 +639,7 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler): self.store = hs.get_datastore() self.typing_handler = hs.get_typing_handler() - # NB this is a SynchrotronPresence, not a normal PresenceHandler - self.presence_handler = hs.get_presence_handler() + self.presence_handler = hs.get_presence_handler() # type: GenericWorkerPresence self.notifier = hs.get_notifier() self.notify_pushers = hs.config.start_pushers diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index ec18a42a68..71a89f09c7 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -19,6 +19,7 @@ import random from synapse.api.constants import EventTypes, Membership from synapse.api.errors import AuthError, SynapseError from synapse.events import EventBase +from synapse.handlers.presence import format_user_presence_state from synapse.logging.utils import log_function from synapse.types import UserID from synapse.visibility import filter_events_for_client @@ -97,6 +98,8 @@ class EventStreamHandler(BaseHandler): explicit_room_id=room_id, ) + time_now = self.clock.time_msec() + # When the user joins a new room, or another user joins a currently # joined room, we need to send down presence for those users. to_add = [] @@ -112,19 +115,20 @@ class EventStreamHandler(BaseHandler): users = await self.state.get_current_users_in_room( event.room_id ) - states = await presence_handler.get_states(users, as_event=True) - to_add.extend(states) else: + users = [event.state_key] - ev = await presence_handler.get_state( - UserID.from_string(event.state_key), as_event=True - ) - to_add.append(ev) + states = await presence_handler.get_states(users) + to_add.extend( + { + "type": EventTypes.Presence, + "content": format_user_presence_state(state, time_now), + } + for state in states + ) events.extend(to_add) - time_now = self.clock.time_msec() - chunks = await self._event_serializer.serialize_events( events, time_now, diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index b116500c7d..f88bad5f25 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -381,10 +381,16 @@ class InitialSyncHandler(BaseHandler): return [] states = await presence_handler.get_states( - [m.user_id for m in room_members], as_event=True + [m.user_id for m in room_members] ) - return states + return [ + { + "type": EventTypes.Presence, + "content": format_user_presence_state(s, time_now), + } + for s in states + ] async def get_receipts(): receipts = await self.store.get_linearized_receipts_for_room( diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 6912165622..5cbefae177 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2020 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -21,10 +22,10 @@ The methods that define policy are: - PresenceHandler._handle_timeouts - should_notify """ - +import abc import logging from contextlib import contextmanager -from typing import Dict, List, Set +from typing import Dict, Iterable, List, Set from six import iteritems, itervalues @@ -41,7 +42,7 @@ from synapse.logging.utils import log_function from synapse.metrics import LaterGauge from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.presence import UserPresenceState -from synapse.types import UserID, get_domain_from_id +from synapse.types import JsonDict, UserID, get_domain_from_id from synapse.util.async_helpers import Linearizer from synapse.util.caches.descriptors import cached from synapse.util.metrics import Measure @@ -99,13 +100,106 @@ EXTERNAL_PROCESS_EXPIRY = 5 * 60 * 1000 assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER -class PresenceHandler(object): +class BasePresenceHandler(abc.ABC): + """Parts of the PresenceHandler that are shared between workers and master""" + + def __init__(self, hs: "synapse.server.HomeServer"): + self.clock = hs.get_clock() + self.store = hs.get_datastore() + + active_presence = self.store.take_presence_startup_info() + self.user_to_current_state = {state.user_id: state for state in active_presence} + + @abc.abstractmethod + async def user_syncing( + self, user_id: str, affect_presence: bool + ) -> ContextManager[None]: + """Returns a context manager that should surround any stream requests + from the user. + + This allows us to keep track of who is currently streaming and who isn't + without having to have timers outside of this module to avoid flickering + when users disconnect/reconnect. + + Args: + user_id: the user that is starting a sync + affect_presence: If false this function will be a no-op. + Useful for streams that are not associated with an actual + client that is being used by a user. + """ + + @abc.abstractmethod + def get_currently_syncing_users_for_replication(self) -> Iterable[str]: + """Get an iterable of syncing users on this worker, to send to the presence handler + + This is called when a replication connection is established. It should return + a list of user ids, which are then sent as USER_SYNC commands to inform the + process handling presence about those users. + + Returns: + An iterable of user_id strings. + """ + + async def get_state(self, target_user: UserID) -> UserPresenceState: + results = await self.get_states([target_user.to_string()]) + return results[0] + + async def get_states( + self, target_user_ids: Iterable[str] + ) -> List[UserPresenceState]: + """Get the presence state for users.""" + + updates_d = await self.current_state_for_users(target_user_ids) + updates = list(updates_d.values()) + + for user_id in set(target_user_ids) - {u.user_id for u in updates}: + updates.append(UserPresenceState.default(user_id)) + + return updates + + async def current_state_for_users( + self, user_ids: Iterable[str] + ) -> Dict[str, UserPresenceState]: + """Get the current presence state for multiple users. + + Returns: + dict: `user_id` -> `UserPresenceState` + """ + states = { + user_id: self.user_to_current_state.get(user_id, None) + for user_id in user_ids + } + + missing = [user_id for user_id, state in iteritems(states) if not state] + if missing: + # There are things not in our in memory cache. Lets pull them out of + # the database. + res = await self.store.get_presence_for_users(missing) + states.update(res) + + missing = [user_id for user_id, state in iteritems(states) if not state] + if missing: + new = { + user_id: UserPresenceState.default(user_id) for user_id in missing + } + states.update(new) + self.user_to_current_state.update(new) + + return states + + @abc.abstractmethod + async def set_state( + self, target_user: UserID, state: JsonDict, ignore_status_msg: bool = False + ) -> None: + """Set the presence state of the user. """ + + +class PresenceHandler(BasePresenceHandler): def __init__(self, hs: "synapse.server.HomeServer"): + super().__init__(hs) self.hs = hs self.is_mine_id = hs.is_mine_id self.server_name = hs.hostname - self.clock = hs.get_clock() - self.store = hs.get_datastore() self.wheel_timer = WheelTimer() self.notifier = hs.get_notifier() self.federation = hs.get_federation_sender() @@ -115,13 +209,6 @@ class PresenceHandler(object): federation_registry.register_edu_handler("m.presence", self.incoming_presence) - active_presence = self.store.take_presence_startup_info() - - # A dictionary of the current state of users. This is prefilled with - # non-offline presence from the DB. We should fetch from the DB if - # we can't find a users presence in here. - self.user_to_current_state = {state.user_id: state for state in active_presence} - LaterGauge( "synapse_handlers_presence_user_to_current_state_size", "", @@ -130,7 +217,7 @@ class PresenceHandler(object): ) now = self.clock.time_msec() - for state in active_presence: + for state in self.user_to_current_state.values(): self.wheel_timer.insert( now=now, obj=state.user_id, then=state.last_active_ts + IDLE_TIMER ) @@ -361,10 +448,18 @@ class PresenceHandler(object): timers_fired_counter.inc(len(states)) + syncing_user_ids = { + user_id + for user_id, count in self.user_to_num_current_syncs.items() + if count + } + for user_ids in self.external_process_to_current_syncs.values(): + syncing_user_ids.update(user_ids) + changes = handle_timeouts( states, is_mine_fn=self.is_mine_id, - syncing_user_ids=self.get_currently_syncing_users(), + syncing_user_ids=syncing_user_ids, now=now, ) @@ -462,22 +557,9 @@ class PresenceHandler(object): return _user_syncing() - def get_currently_syncing_users(self): - """Get the set of user ids that are currently syncing on this HS. - Returns: - set(str): A set of user_id strings. - """ - if self.hs.config.use_presence: - syncing_user_ids = { - user_id - for user_id, count in self.user_to_num_current_syncs.items() - if count - } - for user_ids in self.external_process_to_current_syncs.values(): - syncing_user_ids.update(user_ids) - return syncing_user_ids - else: - return set() + def get_currently_syncing_users_for_replication(self) -> Iterable[str]: + # since we are the process handling presence, there is nothing to do here. + return [] async def update_external_syncs_row( self, process_id, user_id, is_syncing, sync_time_msec @@ -554,34 +636,6 @@ class PresenceHandler(object): res = await self.current_state_for_users([user_id]) return res[user_id] - async def current_state_for_users(self, user_ids): - """Get the current presence state for multiple users. - - Returns: - dict: `user_id` -> `UserPresenceState` - """ - states = { - user_id: self.user_to_current_state.get(user_id, None) - for user_id in user_ids - } - - missing = [user_id for user_id, state in iteritems(states) if not state] - if missing: - # There are things not in our in memory cache. Lets pull them out of - # the database. - res = await self.store.get_presence_for_users(missing) - states.update(res) - - missing = [user_id for user_id, state in iteritems(states) if not state] - if missing: - new = { - user_id: UserPresenceState.default(user_id) for user_id in missing - } - states.update(new) - self.user_to_current_state.update(new) - - return states - async def _persist_and_notify(self, states): """Persist states in the database, poke the notifier and send to interested remote servers @@ -669,40 +723,6 @@ class PresenceHandler(object): federation_presence_counter.inc(len(updates)) await self._update_states(updates) - async def get_state(self, target_user, as_event=False): - results = await self.get_states([target_user.to_string()], as_event=as_event) - - return results[0] - - async def get_states(self, target_user_ids, as_event=False): - """Get the presence state for users. - - Args: - target_user_ids (list) - as_event (bool): Whether to format it as a client event or not. - - Returns: - list - """ - - updates = await self.current_state_for_users(target_user_ids) - updates = list(updates.values()) - - for user_id in set(target_user_ids) - {u.user_id for u in updates}: - updates.append(UserPresenceState.default(user_id)) - - now = self.clock.time_msec() - if as_event: - return [ - { - "type": "m.presence", - "content": format_user_presence_state(state, now), - } - for state in updates - ] - else: - return updates - async def set_state(self, target_user, state, ignore_status_msg=False): """Set the presence state of the user. """ @@ -889,7 +909,7 @@ class PresenceHandler(object): user_ids = await self.state.get_current_users_in_room(room_id) user_ids = list(filter(self.is_mine_id, user_ids)) - states = await self.current_state_for_users(user_ids) + states_d = await self.current_state_for_users(user_ids) # Filter out old presence, i.e. offline presence states where # the user hasn't been active for a week. We can change this @@ -899,7 +919,7 @@ class PresenceHandler(object): now = self.clock.time_msec() states = [ state - for state in states.values() + for state in states_d.values() if state.state != PresenceState.OFFLINE or now - state.last_active_ts < 7 * 24 * 60 * 60 * 1000 or state.status_msg is not None diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py index f26aee83cb..c7880d4b63 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py @@ -210,7 +210,10 @@ class ReplicateCommand(Command): class UserSyncCommand(Command): """Sent by the client to inform the server that a user has started or - stopped syncing. Used to calculate presence on the master. + stopped syncing on this process. + + This is used by the process handling presence (typically the master) to + calculate who is online and who is not. Includes a timestamp of when the last user sync was. @@ -218,7 +221,7 @@ class UserSyncCommand(Command): USER_SYNC - Where is either "start" or "stop" + Where is either "start" or "end" """ NAME = "USER_SYNC" diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 5b5ee2c13e..0db5a3a24d 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -337,13 +337,6 @@ class ReplicationCommandHandler: if self._is_master: self._notifier.notify_remote_server_up(cmd.data) - def get_currently_syncing_users(self): - """Get the list of currently syncing users (if any). This is called - when a connection has been established and we need to send the - currently syncing users. - """ - return self._presence_handler.get_currently_syncing_users() - def new_connection(self, connection: AbstractConnection): """Called when we have a new connection. """ @@ -361,9 +354,11 @@ class ReplicationCommandHandler: if self._factory: self._factory.resetDelay() - # Tell the server if we have any users currently syncing (should only - # happen on synchrotrons) - currently_syncing = self.get_currently_syncing_users() + # Tell the other end if we have any users currently syncing. + currently_syncing = ( + self._presence_handler.get_currently_syncing_users_for_replication() + ) + now = self._clock.time_msec() for user_id in currently_syncing: connection.send_command( diff --git a/synapse/server.pyi b/synapse/server.pyi index 9013e9bac9..f1a5717028 100644 --- a/synapse/server.pyi +++ b/synapse/server.pyi @@ -97,7 +97,7 @@ class HomeServer(object): pass def get_notifier(self) -> synapse.notifier.Notifier: pass - def get_presence_handler(self) -> synapse.handlers.presence.PresenceHandler: + def get_presence_handler(self) -> synapse.handlers.presence.BasePresenceHandler: pass def get_clock(self) -> synapse.util.Clock: pass -- cgit 1.5.1 From 02d97fc3baa8056c7ef823f98d114e307979d9cc Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Fri, 15 May 2020 11:44:00 +0100 Subject: Ignore incoming presence updates when presence is disabled (#7508) --- changelog.d/7508.bugfix | 1 + synapse/handlers/presence.py | 6 ++++-- 2 files changed, 5 insertions(+), 2 deletions(-) create mode 100644 changelog.d/7508.bugfix (limited to 'synapse/handlers/presence.py') diff --git a/changelog.d/7508.bugfix b/changelog.d/7508.bugfix new file mode 100644 index 0000000000..c78e77c631 --- /dev/null +++ b/changelog.d/7508.bugfix @@ -0,0 +1 @@ +Ignore incoming presence events from other homeservers if presence is disabled locally. \ No newline at end of file diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 5cbefae177..9ea11c0754 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -204,6 +204,7 @@ class PresenceHandler(BasePresenceHandler): self.notifier = hs.get_notifier() self.federation = hs.get_federation_sender() self.state = hs.get_state_handler() + self._presence_enabled = hs.config.use_presence federation_registry = hs.get_federation_registry() @@ -676,13 +677,14 @@ class PresenceHandler(BasePresenceHandler): async def incoming_presence(self, origin, content): """Called when we receive a `m.presence` EDU from a remote server. """ + if not self._presence_enabled: + return + now = self.clock.time_msec() updates = [] for push in content.get("push", []): # A "push" contains a list of presence that we are probably interested # in. - # TODO: Actually check if we're interested, rather than blindly - # accepting presence updates. user_id = push.get("user_id", None) if not user_id: logger.info( -- cgit 1.5.1 From e5c67d04dbe5ed45d659e826a5dfcd5044a4e374 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 22 May 2020 16:11:35 +0100 Subject: Add option to move event persistence off master (#7517) --- changelog.d/7517.feature | 1 + scripts/synapse_port_db | 3 + synapse/app/generic_worker.py | 53 +++++++++- synapse/config/logger.py | 1 + synapse/config/workers.py | 30 +++++- synapse/handlers/federation.py | 16 +-- synapse/handlers/message.py | 12 ++- synapse/handlers/presence.py | 6 ++ synapse/handlers/room.py | 7 ++ synapse/handlers/room_member.py | 39 ++++++-- synapse/replication/http/__init__.py | 4 +- synapse/replication/http/_base.py | 3 + synapse/replication/http/membership.py | 40 +++++++- synapse/replication/http/presence.py | 116 ++++++++++++++++++++++ synapse/replication/tcp/handler.py | 10 ++ synapse/rest/admin/rooms.py | 11 +- synapse/storage/data_stores/__init__.py | 6 +- synapse/storage/data_stores/main/devices.py | 30 ++++-- synapse/storage/data_stores/main/events.py | 34 ++++++- synapse/storage/data_stores/main/events_worker.py | 2 +- synapse/storage/data_stores/main/roommember.py | 25 ----- synapse/storage/persist_events.py | 6 ++ 22 files changed, 382 insertions(+), 73 deletions(-) create mode 100644 changelog.d/7517.feature create mode 100644 synapse/replication/http/presence.py (limited to 'synapse/handlers/presence.py') diff --git a/changelog.d/7517.feature b/changelog.d/7517.feature new file mode 100644 index 0000000000..6062f0061c --- /dev/null +++ b/changelog.d/7517.feature @@ -0,0 +1 @@ +Add option to move event persistence off master. diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index acd9ac4b75..9a0fbc61d8 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -196,6 +196,9 @@ class MockHomeserver: def get_reactor(self): return reactor + def get_instance_name(self): + return "master" + class Porter(object): def __init__(self, **kwargs): diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index a37520000a..2906b93f6a 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -39,7 +39,11 @@ from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging from synapse.federation import send_queue from synapse.federation.transport.server import TransportLayerServer -from synapse.handlers.presence import BasePresenceHandler, get_interested_parties +from synapse.handlers.presence import ( + BasePresenceHandler, + PresenceState, + get_interested_parties, +) from synapse.http.server import JsonResource, OptionsResource from synapse.http.servlet import RestServlet, parse_json_object_from_request from synapse.http.site import SynapseSite @@ -47,6 +51,10 @@ from synapse.logging.context import LoggingContext from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource +from synapse.replication.http.presence import ( + ReplicationBumpPresenceActiveTime, + ReplicationPresenceSetState, +) from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.account_data import SlavedAccountDataStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore @@ -247,6 +255,9 @@ class GenericWorkerPresence(BasePresenceHandler): # but we haven't notified the master of that yet self.users_going_offline = {} + self._bump_active_client = ReplicationBumpPresenceActiveTime.make_client(hs) + self._set_state_client = ReplicationPresenceSetState.make_client(hs) + self._send_stop_syncing_loop = self.clock.looping_call( self.send_stop_syncing, UPDATE_SYNCING_USERS_MS ) @@ -304,10 +315,6 @@ class GenericWorkerPresence(BasePresenceHandler): self.users_going_offline.pop(user_id, None) self.send_user_sync(user_id, False, last_sync_ms) - def set_state(self, user, state, ignore_status_msg=False): - # TODO Hows this supposed to work? - return defer.succeed(None) - async def user_syncing( self, user_id: str, affect_presence: bool ) -> ContextManager[None]: @@ -386,6 +393,42 @@ class GenericWorkerPresence(BasePresenceHandler): if count > 0 ] + async def set_state(self, target_user, state, ignore_status_msg=False): + """Set the presence state of the user. + """ + presence = state["presence"] + + valid_presence = ( + PresenceState.ONLINE, + PresenceState.UNAVAILABLE, + PresenceState.OFFLINE, + ) + if presence not in valid_presence: + raise SynapseError(400, "Invalid presence state") + + user_id = target_user.to_string() + + # If presence is disabled, no-op + if not self.hs.config.use_presence: + return + + # Proxy request to master + await self._set_state_client( + user_id=user_id, state=state, ignore_status_msg=ignore_status_msg + ) + + async def bump_presence_active_time(self, user): + """We've seen the user do something that indicates they're interacting + with the app. + """ + # If presence is disabled, no-op + if not self.hs.config.use_presence: + return + + # Proxy request to master + user_id = user.to_string() + await self._bump_active_client(user_id=user_id) + class GenericWorkerTyping(object): def __init__(self, hs): diff --git a/synapse/config/logger.py b/synapse/config/logger.py index a25c70e928..49f6c32beb 100644 --- a/synapse/config/logger.py +++ b/synapse/config/logger.py @@ -257,5 +257,6 @@ def setup_logging( logging.warning("***** STARTING SERVER *****") logging.warning("Server %s version %s", sys.argv[0], get_version_string(synapse)) logging.info("Server hostname: %s", config.server_name) + logging.info("Instance name: %s", hs.get_instance_name()) return logger diff --git a/synapse/config/workers.py b/synapse/config/workers.py index c80c338584..ed06b91a54 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -15,7 +15,7 @@ import attr -from ._base import Config +from ._base import Config, ConfigError @attr.s @@ -27,6 +27,17 @@ class InstanceLocationConfig: port = attr.ib(type=int) +@attr.s +class WriterLocations: + """Specifies the instances that write various streams. + + Attributes: + events: The instance that writes to the event and backfill streams. + """ + + events = attr.ib(default="master", type=str) + + class WorkerConfig(Config): """The workers are processes run separately to the main synapse process. They have their own pid_file and listener configuration. They use the @@ -83,11 +94,26 @@ class WorkerConfig(Config): bind_addresses.append("") # A map from instance name to host/port of their HTTP replication endpoint. - instance_map = config.get("instance_map", {}) or {} + instance_map = config.get("instance_map") or {} self.instance_map = { name: InstanceLocationConfig(**c) for name, c in instance_map.items() } + # Map from type of streams to source, c.f. WriterLocations. + writers = config.get("stream_writers") or {} + self.writers = WriterLocations(**writers) + + # Check that the configured writer for events also appears in + # `instance_map`. + if ( + self.writers.events != "master" + and self.writers.events not in self.instance_map + ): + raise ConfigError( + "Instance %r is configured to write events but does not appear in `instance_map` config." + % (self.writers.events,) + ) + def read_arguments(self, args): # We support a bunch of command line arguments that override options in # the config. A lot of these options have a worker_* prefix when running diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index e354c803db..75ec90d267 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -126,11 +126,10 @@ class FederationHandler(BaseHandler): self._server_notices_mxid = hs.config.server_notices_mxid self.config = hs.config self.http_client = hs.get_simple_http_client() + self._instance_name = hs.get_instance_name() self._replication = hs.get_replication_data_handler() - self._send_events_to_master = ReplicationFederationSendEventsRestServlet.make_client( - hs - ) + self._send_events = ReplicationFederationSendEventsRestServlet.make_client(hs) self._notify_user_membership_change = ReplicationUserJoinedLeftRoomRestServlet.make_client( hs ) @@ -1243,6 +1242,10 @@ class FederationHandler(BaseHandler): content: The event content to use for the join event. """ + # TODO: We should be able to call this on workers, but the upgrading of + # room stuff after join currently doesn't work on workers. + assert self.config.worker.worker_app is None + logger.debug("Joining %s to %s", joinee, room_id) origin, event, room_version_obj = await self._make_and_verify_event( @@ -1314,7 +1317,7 @@ class FederationHandler(BaseHandler): # # TODO: Currently the events stream is written to from master await self._replication.wait_for_stream_position( - "master", "events", max_stream_id + self.config.worker.writers.events, "events", max_stream_id ) # Check whether this room is the result of an upgrade of a room we already know @@ -2854,8 +2857,9 @@ class FederationHandler(BaseHandler): backfilled: Whether these events are a result of backfilling or not """ - if self.config.worker_app: - result = await self._send_events_to_master( + if self.config.worker.writers.events != self._instance_name: + result = await self._send_events( + instance_name=self.config.worker.writers.events, store=self.store, event_and_contexts=event_and_contexts, backfilled=backfilled, diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index f445e2aa2a..ea25f0515a 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -366,10 +366,11 @@ class EventCreationHandler(object): self.notifier = hs.get_notifier() self.config = hs.config self.require_membership_for_aliases = hs.config.require_membership_for_aliases + self._instance_name = hs.get_instance_name() self.room_invite_state_types = self.hs.config.room_invite_state_types - self.send_event_to_master = ReplicationSendEventRestServlet.make_client(hs) + self.send_event = ReplicationSendEventRestServlet.make_client(hs) # This is only used to get at ratelimit function, and maybe_kick_guest_users self.base_handler = BaseHandler(hs) @@ -835,8 +836,9 @@ class EventCreationHandler(object): success = False try: # If we're a worker we need to hit out to the master. - if self.config.worker_app: - result = await self.send_event_to_master( + if self.config.worker.writers.events != self._instance_name: + result = await self.send_event( + instance_name=self.config.worker.writers.events, event_id=event.event_id, store=self.store, requester=requester, @@ -902,9 +904,9 @@ class EventCreationHandler(object): """Called when we have fully built the event, have already calculated the push actions for the event, and checked auth. - This should only be run on master. + This should only be run on the instance in charge of persisting events. """ - assert not self.config.worker_app + assert self.config.worker.writers.events == self._instance_name if ratelimit: # We check if this is a room admin redacting an event so that we diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 9ea11c0754..3594f3b00f 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -193,6 +193,12 @@ class BasePresenceHandler(abc.ABC): ) -> None: """Set the presence state of the user. """ + @abc.abstractmethod + async def bump_presence_active_time(self, user: UserID): + """We've seen the user do something that indicates they're interacting + with the app. + """ + class PresenceHandler(BasePresenceHandler): def __init__(self, hs: "synapse.server.HomeServer"): diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 2698a129ca..61db3ccc43 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -89,6 +89,8 @@ class RoomCreationHandler(BaseHandler): self.room_member_handler = hs.get_room_member_handler() self.config = hs.config + self._replication = hs.get_replication_data_handler() + # linearizer to stop two upgrades happening at once self._upgrade_linearizer = Linearizer("room_upgrade_linearizer") @@ -752,6 +754,11 @@ class RoomCreationHandler(BaseHandler): if room_alias: result["room_alias"] = room_alias.to_string() + # Always wait for room creation to progate before returning + await self._replication.wait_for_stream_position( + self.hs.config.worker.writers.events, "events", last_stream_id + ) + return result, last_stream_id async def _send_events_for_new_room( diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 691b6705b2..0f7af982f0 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -26,6 +26,9 @@ from synapse.api.constants import EventTypes, Membership from synapse.api.errors import AuthError, Codes, SynapseError from synapse.events import EventBase from synapse.events.snapshot import EventContext +from synapse.replication.http.membership import ( + ReplicationLocallyRejectInviteRestServlet, +) from synapse.types import Collection, Requester, RoomAlias, RoomID, UserID from synapse.util.async_helpers import Linearizer from synapse.util.distributor import user_joined_room, user_left_room @@ -44,11 +47,6 @@ class RoomMemberHandler(object): __metaclass__ = abc.ABCMeta def __init__(self, hs): - """ - - Args: - hs (synapse.server.HomeServer): - """ self.hs = hs self.store = hs.get_datastore() self.auth = hs.get_auth() @@ -71,6 +69,17 @@ class RoomMemberHandler(object): self._enable_lookup = hs.config.enable_3pid_lookup self.allow_per_room_profiles = self.config.allow_per_room_profiles + self._event_stream_writer_instance = hs.config.worker.writers.events + self._is_on_event_persistence_instance = ( + self._event_stream_writer_instance == hs.get_instance_name() + ) + if self._is_on_event_persistence_instance: + self.persist_event_storage = hs.get_storage().persistence + else: + self._locally_reject_client = ReplicationLocallyRejectInviteRestServlet.make_client( + hs + ) + # This is only used to get at ratelimit function, and # maybe_kick_guest_users. It's fine there are multiple of these as # it doesn't store state. @@ -121,6 +130,22 @@ class RoomMemberHandler(object): """ raise NotImplementedError() + async def locally_reject_invite(self, user_id: str, room_id: str) -> int: + """Mark the invite has having been rejected even though we failed to + create a leave event for it. + """ + if self._is_on_event_persistence_instance: + return await self.persist_event_storage.locally_reject_invite( + user_id, room_id + ) + else: + result = await self._locally_reject_client( + instance_name=self._event_stream_writer_instance, + user_id=user_id, + room_id=room_id, + ) + return result["stream_id"] + @abc.abstractmethod async def _user_joined_room(self, target: UserID, room_id: str) -> None: """Notifies distributor on master process that the user has joined the @@ -1015,9 +1040,7 @@ class RoomMemberMasterHandler(RoomMemberHandler): # logger.warning("Failed to reject invite: %s", e) - stream_id = await self.store.locally_reject_invite( - target.to_string(), room_id - ) + stream_id = await self.locally_reject_invite(target.to_string(), room_id) return None, stream_id async def _user_joined_room(self, target: UserID, room_id: str) -> None: diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py index a909744e93..19b69e0e11 100644 --- a/synapse/replication/http/__init__.py +++ b/synapse/replication/http/__init__.py @@ -19,6 +19,7 @@ from synapse.replication.http import ( federation, login, membership, + presence, register, send_event, streams, @@ -35,10 +36,11 @@ class ReplicationRestResource(JsonResource): def register_servlets(self, hs): send_event.register_servlets(hs, self) federation.register_servlets(hs, self) + presence.register_servlets(hs, self) + membership.register_servlets(hs, self) # The following can't currently be instantiated on workers. if hs.config.worker.worker_app is None: - membership.register_servlets(hs, self) login.register_servlets(hs, self) register.register_servlets(hs, self) devices.register_servlets(hs, self) diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py index c3136a4eb9..793cef6c26 100644 --- a/synapse/replication/http/_base.py +++ b/synapse/replication/http/_base.py @@ -142,6 +142,7 @@ class ReplicationEndpoint(object): """ clock = hs.get_clock() client = hs.get_simple_http_client() + local_instance_name = hs.get_instance_name() master_host = hs.config.worker_replication_host master_port = hs.config.worker_replication_http_port @@ -151,6 +152,8 @@ class ReplicationEndpoint(object): @trace(opname="outgoing_replication_request") @defer.inlineCallbacks def send_request(instance_name="master", **kwargs): + if instance_name == local_instance_name: + raise Exception("Trying to send HTTP request to self") if instance_name == "master": host = master_host port = master_port diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py index 050fd34562..a7174c4a8f 100644 --- a/synapse/replication/http/membership.py +++ b/synapse/replication/http/membership.py @@ -14,12 +14,16 @@ # limitations under the License. import logging +from typing import TYPE_CHECKING from synapse.http.servlet import parse_json_object_from_request from synapse.replication.http._base import ReplicationEndpoint from synapse.types import Requester, UserID from synapse.util.distributor import user_joined_room, user_left_room +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) @@ -106,6 +110,7 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint): self.federation_handler = hs.get_handlers().federation_handler self.store = hs.get_datastore() self.clock = hs.get_clock() + self.member_handler = hs.get_room_member_handler() @staticmethod def _serialize_payload(requester, room_id, user_id, remote_room_hosts, content): @@ -149,12 +154,44 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint): # logger.warning("Failed to reject invite: %s", e) - stream_id = await self.store.locally_reject_invite(user_id, room_id) + stream_id = await self.member_handler.locally_reject_invite( + user_id, room_id + ) event_id = None return 200, {"event_id": event_id, "stream_id": stream_id} +class ReplicationLocallyRejectInviteRestServlet(ReplicationEndpoint): + """Rejects the invite for the user and room locally. + + Request format: + + POST /_synapse/replication/locally_reject_invite/:room_id/:user_id + + {} + """ + + NAME = "locally_reject_invite" + PATH_ARGS = ("room_id", "user_id") + + def __init__(self, hs: "HomeServer"): + super().__init__(hs) + + self.member_handler = hs.get_room_member_handler() + + @staticmethod + def _serialize_payload(room_id, user_id): + return {} + + async def _handle_request(self, request, room_id, user_id): + logger.info("locally_reject_invite: %s out of room: %s", user_id, room_id) + + stream_id = await self.member_handler.locally_reject_invite(user_id, room_id) + + return 200, {"stream_id": stream_id} + + class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint): """Notifies that a user has joined or left the room @@ -208,3 +245,4 @@ def register_servlets(hs, http_server): ReplicationRemoteJoinRestServlet(hs).register(http_server) ReplicationRemoteRejectInviteRestServlet(hs).register(http_server) ReplicationUserJoinedLeftRoomRestServlet(hs).register(http_server) + ReplicationLocallyRejectInviteRestServlet(hs).register(http_server) diff --git a/synapse/replication/http/presence.py b/synapse/replication/http/presence.py new file mode 100644 index 0000000000..ea1b33331b --- /dev/null +++ b/synapse/replication/http/presence.py @@ -0,0 +1,116 @@ +# -*- coding: utf-8 -*- +# Copyright 2020 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from typing import TYPE_CHECKING + +from synapse.http.servlet import parse_json_object_from_request +from synapse.replication.http._base import ReplicationEndpoint +from synapse.types import UserID + +if TYPE_CHECKING: + from synapse.server import HomeServer + +logger = logging.getLogger(__name__) + + +class ReplicationBumpPresenceActiveTime(ReplicationEndpoint): + """We've seen the user do something that indicates they're interacting + with the app. + + The POST looks like: + + POST /_synapse/replication/bump_presence_active_time/ + + 200 OK + + {} + """ + + NAME = "bump_presence_active_time" + PATH_ARGS = ("user_id",) + METHOD = "POST" + CACHE = False + + def __init__(self, hs: "HomeServer"): + super().__init__(hs) + + self._presence_handler = hs.get_presence_handler() + + @staticmethod + def _serialize_payload(user_id): + return {} + + async def _handle_request(self, request, user_id): + await self._presence_handler.bump_presence_active_time( + UserID.from_string(user_id) + ) + + return ( + 200, + {}, + ) + + +class ReplicationPresenceSetState(ReplicationEndpoint): + """Set the presence state for a user. + + The POST looks like: + + POST /_synapse/replication/presence_set_state/ + + { + "state": { ... }, + "ignore_status_msg": false, + } + + 200 OK + + {} + """ + + NAME = "presence_set_state" + PATH_ARGS = ("user_id",) + METHOD = "POST" + CACHE = False + + def __init__(self, hs: "HomeServer"): + super().__init__(hs) + + self._presence_handler = hs.get_presence_handler() + + @staticmethod + def _serialize_payload(user_id, state, ignore_status_msg=False): + return { + "state": state, + "ignore_status_msg": ignore_status_msg, + } + + async def _handle_request(self, request, user_id): + content = parse_json_object_from_request(request) + + await self._presence_handler.set_state( + UserID.from_string(user_id), content["state"], content["ignore_status_msg"] + ) + + return ( + 200, + {}, + ) + + +def register_servlets(hs, http_server): + ReplicationBumpPresenceActiveTime(hs).register(http_server) + ReplicationPresenceSetState(hs).register(http_server) diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index acfa66a7a8..03300e5336 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -38,7 +38,9 @@ from synapse.replication.tcp.commands import ( from synapse.replication.tcp.protocol import AbstractConnection from synapse.replication.tcp.streams import ( STREAMS_MAP, + BackfillStream, CachesStream, + EventsStream, FederationStream, Stream, ) @@ -87,6 +89,14 @@ class ReplicationCommandHandler: self._streams_to_replicate.append(stream) continue + if isinstance(stream, (EventsStream, BackfillStream)): + # Only add EventStream and BackfillStream as a source on the + # instance in charge of event persistence. + if hs.config.worker.writers.events == hs.get_instance_name(): + self._streams_to_replicate.append(stream) + + continue + # Only add any other streams if we're on master. if hs.config.worker_app is not None: continue diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py index 0a13e1ed34..8173baef8f 100644 --- a/synapse/rest/admin/rooms.py +++ b/synapse/rest/admin/rooms.py @@ -100,7 +100,9 @@ class ShutdownRoomRestServlet(RestServlet): # we try and auto join below. # # TODO: Currently the events stream is written to from master - await self._replication.wait_for_stream_position("master", "events", stream_id) + await self._replication.wait_for_stream_position( + self.hs.config.worker.writers.events, "events", stream_id + ) users = await self.state.get_current_users_in_room(room_id) kicked_users = [] @@ -113,7 +115,7 @@ class ShutdownRoomRestServlet(RestServlet): try: target_requester = create_requester(user_id) - await self.room_member_handler.update_membership( + _, stream_id = await self.room_member_handler.update_membership( requester=target_requester, target=target_requester.user, room_id=room_id, @@ -123,6 +125,11 @@ class ShutdownRoomRestServlet(RestServlet): require_consent=False, ) + # Wait for leave to come in over replication before trying to forget. + await self._replication.wait_for_stream_position( + self.hs.config.worker.writers.events, "events", stream_id + ) + await self.room_member_handler.forget(target_requester.user, room_id) await self.room_member_handler.update_membership( diff --git a/synapse/storage/data_stores/__init__.py b/synapse/storage/data_stores/__init__.py index 791961b296..599ee470d4 100644 --- a/synapse/storage/data_stores/__init__.py +++ b/synapse/storage/data_stores/__init__.py @@ -66,9 +66,9 @@ class DataStores(object): self.main = main_store_class(database, db_conn, hs) - # If we're on a process that can persist events (currently - # master), also instantiate a `PersistEventsStore` - if hs.config.worker.worker_app is None: + # If we're on a process that can persist events also + # instantiate a `PersistEventsStore` + if hs.config.worker.writers.events == hs.get_instance_name(): self.persist_events = PersistEventsStore( hs, database, self.main ) diff --git a/synapse/storage/data_stores/main/devices.py b/synapse/storage/data_stores/main/devices.py index 0e8378714a..417ac8dc7c 100644 --- a/synapse/storage/data_stores/main/devices.py +++ b/synapse/storage/data_stores/main/devices.py @@ -689,6 +689,25 @@ class DeviceWorkerStore(SQLBaseStore): desc="make_remote_user_device_cache_as_stale", ) + def mark_remote_user_device_list_as_unsubscribed(self, user_id): + """Mark that we no longer track device lists for remote user. + """ + + def _mark_remote_user_device_list_as_unsubscribed_txn(txn): + self.db.simple_delete_txn( + txn, + table="device_lists_remote_extremeties", + keyvalues={"user_id": user_id}, + ) + self._invalidate_cache_and_stream( + txn, self.get_device_list_last_stream_id_for_remote, (user_id,) + ) + + return self.db.runInteraction( + "mark_remote_user_device_list_as_unsubscribed", + _mark_remote_user_device_list_as_unsubscribed_txn, + ) + class DeviceBackgroundUpdateStore(SQLBaseStore): def __init__(self, database: Database, db_conn, hs): @@ -969,17 +988,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): desc="update_device", ) - @defer.inlineCallbacks - def mark_remote_user_device_list_as_unsubscribed(self, user_id): - """Mark that we no longer track device lists for remote user. - """ - yield self.db.simple_delete( - table="device_lists_remote_extremeties", - keyvalues={"user_id": user_id}, - desc="mark_remote_user_device_list_as_unsubscribed", - ) - self.get_device_list_last_stream_id_for_remote.invalidate((user_id,)) - def update_remote_device_list_cache_entry( self, user_id, device_id, content, stream_id ): diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index a97f8b3934..a6572571b4 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -138,10 +138,10 @@ class PersistEventsStore: self._backfill_id_gen = self.store._backfill_id_gen # type: StreamIdGenerator self._stream_id_gen = self.store._stream_id_gen # type: StreamIdGenerator - # This should only exist on master for now + # This should only exist on instances that are configured to write assert ( - hs.config.worker.worker_app is None - ), "Can only instantiate PersistEventsStore on master" + hs.config.worker.writers.events == hs.get_instance_name() + ), "Can only instantiate EventsStore on master" @_retry_on_integrity_error @defer.inlineCallbacks @@ -1590,3 +1590,31 @@ class PersistEventsStore: if not ev.internal_metadata.is_outlier() ], ) + + async def locally_reject_invite(self, user_id: str, room_id: str) -> int: + """Mark the invite has having been rejected even though we failed to + create a leave event for it. + """ + + sql = ( + "UPDATE local_invites SET stream_id = ?, locally_rejected = ? WHERE" + " room_id = ? AND invitee = ? AND locally_rejected is NULL" + " AND replaced_by is NULL" + ) + + def f(txn, stream_ordering): + txn.execute(sql, (stream_ordering, True, room_id, user_id)) + + # We also clear this entry from `local_current_membership`. + # Ideally we'd point to a leave event, but we don't have one, so + # nevermind. + self.db.simple_delete_txn( + txn, + table="local_current_membership", + keyvalues={"room_id": room_id, "user_id": user_id}, + ) + + with self._stream_id_gen.get_next() as stream_ordering: + await self.db.runInteraction("locally_reject_invite", f, stream_ordering) + + return stream_ordering diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py index b880a71782..213d69100a 100644 --- a/synapse/storage/data_stores/main/events_worker.py +++ b/synapse/storage/data_stores/main/events_worker.py @@ -76,7 +76,7 @@ class EventsWorkerStore(SQLBaseStore): def __init__(self, database: Database, db_conn, hs): super(EventsWorkerStore, self).__init__(database, db_conn, hs) - if hs.config.worker_app is None: + if hs.config.worker.writers.events == hs.get_instance_name(): # We are the process in charge of generating stream ids for events, # so instantiate ID generators based on the database self._stream_id_gen = StreamIdGenerator( diff --git a/synapse/storage/data_stores/main/roommember.py b/synapse/storage/data_stores/main/roommember.py index 7c5ca81ae0..137ebac833 100644 --- a/synapse/storage/data_stores/main/roommember.py +++ b/synapse/storage/data_stores/main/roommember.py @@ -1046,31 +1046,6 @@ class RoomMemberStore(RoomMemberWorkerStore, RoomMemberBackgroundUpdateStore): def __init__(self, database: Database, db_conn, hs): super(RoomMemberStore, self).__init__(database, db_conn, hs) - @defer.inlineCallbacks - def locally_reject_invite(self, user_id, room_id): - sql = ( - "UPDATE local_invites SET stream_id = ?, locally_rejected = ? WHERE" - " room_id = ? AND invitee = ? AND locally_rejected is NULL" - " AND replaced_by is NULL" - ) - - def f(txn, stream_ordering): - txn.execute(sql, (stream_ordering, True, room_id, user_id)) - - # We also clear this entry from `local_current_membership`. - # Ideally we'd point to a leave event, but we don't have one, so - # nevermind. - self.db.simple_delete_txn( - txn, - table="local_current_membership", - keyvalues={"room_id": room_id, "user_id": user_id}, - ) - - with self._stream_id_gen.get_next() as stream_ordering: - yield self.db.runInteraction("locally_reject_invite", f, stream_ordering) - - return stream_ordering - def forget(self, user_id, room_id): """Indicate that user_id wishes to discard history for room_id.""" diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index 12e1ffb9a2..f159400a87 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -786,3 +786,9 @@ class EventsPersistenceStorage(object): for user_id in left_users: await self.main_store.mark_remote_user_device_list_as_unsubscribed(user_id) + + async def locally_reject_invite(self, user_id: str, room_id: str) -> int: + """Mark the invite has having been rejected even though we failed to + create a leave event for it. + """ + return await self.persist_events_store.locally_reject_invite(user_id, room_id) -- cgit 1.5.1