diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/federation/send_queue.py | 70 | ||||
-rw-r--r-- | synapse/federation/sender/__init__.py | 96 | ||||
-rw-r--r-- | synapse/handlers/presence.py | 78 | ||||
-rw-r--r-- | synapse/module_api/__init__.py | 13 |
4 files changed, 74 insertions, 183 deletions
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index e3f0bc2471..d71f04e43e 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -76,9 +76,6 @@ class FederationRemoteSendQueue(AbstractFederationSender): # Pending presence map user_id -> UserPresenceState self.presence_map = {} # type: Dict[str, UserPresenceState] - # Stream position -> list[user_id] - self.presence_changed = SortedDict() # type: SortedDict[int, List[str]] - # Stores the destinations we need to explicitly send presence to about a # given user. # Stream position -> (user_id, destinations) @@ -96,7 +93,7 @@ class FederationRemoteSendQueue(AbstractFederationSender): self.edus = SortedDict() # type: SortedDict[int, Edu] - # stream ID for the next entry into presence_changed/keyed_edu_changed/edus. + # stream ID for the next entry into keyed_edu_changed/edus. self.pos = 1 # map from stream ID to the time that stream entry was generated, so that we @@ -117,7 +114,6 @@ class FederationRemoteSendQueue(AbstractFederationSender): for queue_name in [ "presence_map", - "presence_changed", "keyed_edu", "keyed_edu_changed", "edus", @@ -155,23 +151,12 @@ class FederationRemoteSendQueue(AbstractFederationSender): """Clear all the queues from before a given position""" with Measure(self.clock, "send_queue._clear"): # Delete things out of presence maps - keys = self.presence_changed.keys() - i = self.presence_changed.bisect_left(position_to_delete) - for key in keys[:i]: - del self.presence_changed[key] - - user_ids = { - user_id for uids in self.presence_changed.values() for user_id in uids - } - keys = self.presence_destinations.keys() i = self.presence_destinations.bisect_left(position_to_delete) for key in keys[:i]: del self.presence_destinations[key] - user_ids.update( - user_id for user_id, _ in self.presence_destinations.values() - ) + user_ids = {user_id for user_id, _ in self.presence_destinations.values()} to_del = [ user_id for user_id in self.presence_map if user_id not in user_ids @@ -244,23 +229,6 @@ class FederationRemoteSendQueue(AbstractFederationSender): """ # nothing to do here: the replication listener will handle it. - def send_presence(self, states: List[UserPresenceState]) -> None: - """As per FederationSender - - Args: - states - """ - pos = self._next_pos() - - # We only want to send presence for our own users, so lets always just - # filter here just in case. - local_states = [s for s in states if self.is_mine_id(s.user_id)] - - self.presence_map.update({state.user_id: state for state in local_states}) - self.presence_changed[pos] = [state.user_id for state in local_states] - - self.notifier.on_new_replication_data() - def send_presence_to_destinations( self, states: Iterable[UserPresenceState], destinations: Iterable[str] ) -> None: @@ -325,18 +293,6 @@ class FederationRemoteSendQueue(AbstractFederationSender): # of the federation stream. rows = [] # type: List[Tuple[int, BaseFederationRow]] - # Fetch changed presence - i = self.presence_changed.bisect_right(from_token) - j = self.presence_changed.bisect_right(to_token) + 1 - dest_user_ids = [ - (pos, user_id) - for pos, user_id_list in self.presence_changed.items()[i:j] - for user_id in user_id_list - ] - - for (key, user_id) in dest_user_ids: - rows.append((key, PresenceRow(state=self.presence_map[user_id]))) - # Fetch presence to send to destinations i = self.presence_destinations.bisect_right(from_token) j = self.presence_destinations.bisect_right(to_token) + 1 @@ -427,22 +383,6 @@ class BaseFederationRow: raise NotImplementedError() -class PresenceRow( - BaseFederationRow, namedtuple("PresenceRow", ("state",)) # UserPresenceState -): - TypeId = "p" - - @staticmethod - def from_data(data): - return PresenceRow(state=UserPresenceState.from_dict(data)) - - def to_data(self): - return self.state.as_dict() - - def add_to_buffer(self, buff): - buff.presence.append(self.state) - - class PresenceDestinationsRow( BaseFederationRow, namedtuple( @@ -506,7 +446,6 @@ class EduRow(BaseFederationRow, namedtuple("EduRow", ("edu",))): # Edu _rowtypes = ( - PresenceRow, PresenceDestinationsRow, KeyedEduRow, EduRow, @@ -518,7 +457,6 @@ TypeToRow = {Row.TypeId: Row for Row in _rowtypes} ParsedFederationStreamData = namedtuple( "ParsedFederationStreamData", ( - "presence", # list(UserPresenceState) "presence_destinations", # list of tuples of UserPresenceState and destinations "keyed_edus", # dict of destination -> { key -> Edu } "edus", # dict of destination -> [Edu] @@ -543,7 +481,6 @@ def process_rows_for_federation( # them into the appropriate collection and then send them off. buff = ParsedFederationStreamData( - presence=[], presence_destinations=[], keyed_edus={}, edus={}, @@ -559,9 +496,6 @@ def process_rows_for_federation( parsed_row = RowType.from_data(row.data) parsed_row.add_to_buffer(buff) - if buff.presence: - transaction_queue.send_presence(buff.presence) - for state, destinations in buff.presence_destinations: transaction_queue.send_presence_to_destinations( states=[state], destinations=destinations diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 952ad39f8c..6266accaf5 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -24,8 +24,6 @@ from synapse.events import EventBase from synapse.federation.sender.per_destination_queue import PerDestinationQueue from synapse.federation.sender.transaction_manager import TransactionManager from synapse.federation.units import Edu -from synapse.handlers.presence import get_interested_remotes -from synapse.logging.context import preserve_fn from synapse.metrics import ( LaterGauge, event_processing_loop_counter, @@ -34,7 +32,7 @@ from synapse.metrics import ( ) from synapse.metrics.background_process_metrics import run_as_background_process from synapse.types import Collection, JsonDict, ReadReceipt, RoomStreamToken -from synapse.util.metrics import Measure, measure_func +from synapse.util.metrics import Measure if TYPE_CHECKING: from synapse.events.presence_router import PresenceRouter @@ -80,15 +78,6 @@ class AbstractFederationSender(metaclass=abc.ABCMeta): raise NotImplementedError() @abc.abstractmethod - def send_presence(self, states: List[UserPresenceState]) -> None: - """Send the new presence states to the appropriate destinations. - - This actually queues up the presence states ready for sending and - triggers a background task to process them and send out the transactions. - """ - raise NotImplementedError() - - @abc.abstractmethod def send_presence_to_destinations( self, states: Iterable[UserPresenceState], destinations: Iterable[str] ) -> None: @@ -176,11 +165,6 @@ class FederationSender(AbstractFederationSender): ), ) - # Map of user_id -> UserPresenceState for all the pending presence - # to be sent out by user_id. Entries here get processed and put in - # pending_presence_by_dest - self.pending_presence = {} # type: Dict[str, UserPresenceState] - LaterGauge( "synapse_federation_transaction_queue_pending_pdus", "", @@ -201,8 +185,6 @@ class FederationSender(AbstractFederationSender): self._is_processing = False self._last_poked_id = -1 - self._processing_pending_presence = False - # map from room_id to a set of PerDestinationQueues which we believe are # awaiting a call to flush_read_receipts_for_room. The presence of an entry # here for a given room means that we are rate-limiting RR flushes to that room, @@ -546,48 +528,6 @@ class FederationSender(AbstractFederationSender): for queue in queues: queue.flush_read_receipts_for_room(room_id) - @preserve_fn # the caller should not yield on this - async def send_presence(self, states: List[UserPresenceState]) -> None: - """Send the new presence states to the appropriate destinations. - - This actually queues up the presence states ready for sending and - triggers a background task to process them and send out the transactions. - """ - if not self.hs.config.use_presence: - # No-op if presence is disabled. - return - - # First we queue up the new presence by user ID, so multiple presence - # updates in quick succession are correctly handled. - # We only want to send presence for our own users, so lets always just - # filter here just in case. - self.pending_presence.update( - {state.user_id: state for state in states if self.is_mine_id(state.user_id)} - ) - - # We then handle the new pending presence in batches, first figuring - # out the destinations we need to send each state to and then poking it - # to attempt a new transaction. We linearize this so that we don't - # accidentally mess up the ordering and send multiple presence updates - # in the wrong order - if self._processing_pending_presence: - return - - self._processing_pending_presence = True - try: - while True: - states_map = self.pending_presence - self.pending_presence = {} - - if not states_map: - break - - await self._process_presence_inner(list(states_map.values())) - except Exception: - logger.exception("Error sending presence states to servers") - finally: - self._processing_pending_presence = False - def send_presence_to_destinations( self, states: Iterable[UserPresenceState], destinations: Iterable[str] ) -> None: @@ -608,40 +548,6 @@ class FederationSender(AbstractFederationSender): continue self._get_per_destination_queue(destination).send_presence(states) - @measure_func("txnqueue._process_presence") - async def _process_presence_inner(self, states: List[UserPresenceState]) -> None: - """Given a list of states populate self.pending_presence_by_dest and - poke to send a new transaction to each destination - """ - # We pull the presence router here instead of __init__ - # to prevent a dependency cycle: - # - # AuthHandler -> Notifier -> FederationSender - # -> PresenceRouter -> ModuleApi -> AuthHandler - if self._presence_router is None: - self._presence_router = self.hs.get_presence_router() - - assert self._presence_router is not None - - hosts_and_states = await get_interested_remotes( - self.store, - self._presence_router, - states, - self.state, - ) - - for destinations, states in hosts_and_states: - for destination in destinations: - if destination == self.server_name: - continue - - if not self._federation_shard_config.should_handle( - self._instance_name, destination - ): - continue - - self._get_per_destination_queue(destination).send_presence(states) - def build_and_send_edu( self, destination: str, diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index e120dd1f48..6460eb9952 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -123,6 +123,14 @@ class BasePresenceHandler(abc.ABC): def __init__(self, hs: "HomeServer"): self.clock = hs.get_clock() self.store = hs.get_datastore() + self.presence_router = hs.get_presence_router() + self.state = hs.get_state_handler() + + self._federation = None + if hs.should_send_federation() or not hs.config.worker_app: + self._federation = hs.get_federation_sender() + + self._send_federation = hs.should_send_federation() self._busy_presence_enabled = hs.config.experimental.msc3026_enabled @@ -249,6 +257,29 @@ class BasePresenceHandler(abc.ABC): """Process presence stream rows received over replication.""" pass + async def maybe_send_presence_to_interested_destinations( + self, states: List[UserPresenceState] + ): + """If this instance is a federation sender, send the states to all + destinations that are interested. + """ + + if not self._send_federation: + return + + # If this worker sends federation we must have a FederationSender. + assert self._federation + + hosts_and_states = await get_interested_remotes( + self.store, + self.presence_router, + states, + self.state, + ) + + for destinations, states in hosts_and_states: + self._federation.send_presence_to_destinations(states, destinations) + class _NullContextManager(ContextManager[None]): """A context manager which does nothing.""" @@ -263,7 +294,6 @@ class WorkerPresenceHandler(BasePresenceHandler): self.hs = hs self.is_mine_id = hs.is_mine_id - self.presence_router = hs.get_presence_router() self._presence_enabled = hs.config.use_presence # The number of ongoing syncs on this process, by user id. @@ -388,6 +418,9 @@ class WorkerPresenceHandler(BasePresenceHandler): users=users_to_states.keys(), ) + # If this is a federation sender, notify about presence updates. + await self.maybe_send_presence_to_interested_destinations(states) + async def process_replication_rows(self, token, rows): states = [ UserPresenceState( @@ -463,9 +496,6 @@ class PresenceHandler(BasePresenceHandler): self.server_name = hs.hostname self.wheel_timer = WheelTimer() self.notifier = hs.get_notifier() - self.federation = hs.get_federation_sender() - self.state = hs.get_state_handler() - self.presence_router = hs.get_presence_router() self._presence_enabled = hs.config.use_presence federation_registry = hs.get_federation_registry() @@ -672,6 +702,13 @@ class PresenceHandler(BasePresenceHandler): self.unpersisted_users_changes |= {s.user_id for s in new_states} self.unpersisted_users_changes -= set(to_notify.keys()) + # Check if we need to resend any presence states to remote hosts. We + # only do this for states that haven't been updated in a while to + # ensure that the remote host doesn't time the presence state out. + # + # Note that since these are states that have *not* been updated, + # they won't get sent down the normal presence replication stream, + # and so we have to explicitly send them via the federation stream. to_federation_ping = { user_id: state for user_id, state in to_federation_ping.items() @@ -680,7 +717,19 @@ class PresenceHandler(BasePresenceHandler): if to_federation_ping: federation_presence_out_counter.inc(len(to_federation_ping)) - self._push_to_remotes(to_federation_ping.values()) + hosts_and_states = await get_interested_remotes( + self.store, + self.presence_router, + list(to_federation_ping.values()), + self.state, + ) + + # Since this is master we know that we have a federation sender or + # queue, and so this will be defined. + assert self._federation + + for destinations, states in hosts_and_states: + self._federation.send_presence_to_destinations(states, destinations) async def _handle_timeouts(self): """Checks the presence of users that have timed out and updates as @@ -920,15 +969,10 @@ class PresenceHandler(BasePresenceHandler): users=[UserID.from_string(u) for u in users_to_states], ) - self._push_to_remotes(states) - - def _push_to_remotes(self, states): - """Sends state updates to remote servers. - - Args: - states (list(UserPresenceState)) - """ - self.federation.send_presence(states) + # We only want to poke the local federation sender, if any, as other + # workers will receive the presence updates via the presence replication + # stream (which is updated by `store.update_presence`). + await self.maybe_send_presence_to_interested_destinations(states) async def incoming_presence(self, origin, content): """Called when we receive a `m.presence` EDU from a remote server.""" @@ -1164,9 +1208,13 @@ class PresenceHandler(BasePresenceHandler): user_presence_states ) + # Since this is master we know that we have a federation sender or + # queue, and so this will be defined. + assert self._federation + # Send out user presence updates for each destination for destination, user_state_set in presence_destinations.items(): - self.federation.send_presence_to_destinations( + self._federation.send_presence_to_destinations( destinations=[destination], states=user_state_set ) diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index b7dbbfc27c..a1a2b9aecc 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -50,6 +50,7 @@ class ModuleApi: self._auth_handler = auth_handler self._server_name = hs.hostname self._presence_stream = hs.get_event_sources().sources["presence"] + self._state = hs.get_state_handler() # We expose these as properties below in order to attach a helpful docstring. self._http_client = hs.get_simple_http_client() # type: SimpleHttpClient @@ -429,11 +430,13 @@ class ModuleApi: UserID.from_string(user), from_key=None, include_offline=False ) - # Send to remote destinations - await make_deferred_yieldable( - # We pull the federation sender here as we can only do so on workers - # that support sending presence - self._hs.get_federation_sender().send_presence(presence_events) + # Send to remote destinations. + + # We pull out the presence handler here to break a cyclic + # dependency between the presence router and module API. + presence_handler = self._hs.get_presence_handler() + await presence_handler.maybe_send_presence_to_interested_destinations( + presence_events ) |