diff options
author | Erik Johnston <erik@matrix.org> | 2021-04-19 10:50:49 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-04-19 10:50:49 +0100 |
commit | 2b7dd21655b1ed2db490853d2cdbf6fb38704d81 (patch) | |
tree | efc25e330f62856a8d6091cc78be9749e64c0879 /synapse/federation/sender | |
parent | User directory: use calculated room membership state instead (#9821) (diff) | |
download | synapse-2b7dd21655b1ed2db490853d2cdbf6fb38704d81.tar.xz |
Don't send normal presence updates over federation replication stream (#9828)
Diffstat (limited to 'synapse/federation/sender')
-rw-r--r-- | synapse/federation/sender/__init__.py | 96 |
1 files changed, 1 insertions, 95 deletions
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 952ad39f8c..6266accaf5 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -24,8 +24,6 @@ from synapse.events import EventBase from synapse.federation.sender.per_destination_queue import PerDestinationQueue from synapse.federation.sender.transaction_manager import TransactionManager from synapse.federation.units import Edu -from synapse.handlers.presence import get_interested_remotes -from synapse.logging.context import preserve_fn from synapse.metrics import ( LaterGauge, event_processing_loop_counter, @@ -34,7 +32,7 @@ from synapse.metrics import ( ) from synapse.metrics.background_process_metrics import run_as_background_process from synapse.types import Collection, JsonDict, ReadReceipt, RoomStreamToken -from synapse.util.metrics import Measure, measure_func +from synapse.util.metrics import Measure if TYPE_CHECKING: from synapse.events.presence_router import PresenceRouter @@ -80,15 +78,6 @@ class AbstractFederationSender(metaclass=abc.ABCMeta): raise NotImplementedError() @abc.abstractmethod - def send_presence(self, states: List[UserPresenceState]) -> None: - """Send the new presence states to the appropriate destinations. - - This actually queues up the presence states ready for sending and - triggers a background task to process them and send out the transactions. - """ - raise NotImplementedError() - - @abc.abstractmethod def send_presence_to_destinations( self, states: Iterable[UserPresenceState], destinations: Iterable[str] ) -> None: @@ -176,11 +165,6 @@ class FederationSender(AbstractFederationSender): ), ) - # Map of user_id -> UserPresenceState for all the pending presence - # to be sent out by user_id. Entries here get processed and put in - # pending_presence_by_dest - self.pending_presence = {} # type: Dict[str, UserPresenceState] - LaterGauge( "synapse_federation_transaction_queue_pending_pdus", "", @@ -201,8 +185,6 @@ class FederationSender(AbstractFederationSender): self._is_processing = False self._last_poked_id = -1 - self._processing_pending_presence = False - # map from room_id to a set of PerDestinationQueues which we believe are # awaiting a call to flush_read_receipts_for_room. The presence of an entry # here for a given room means that we are rate-limiting RR flushes to that room, @@ -546,48 +528,6 @@ class FederationSender(AbstractFederationSender): for queue in queues: queue.flush_read_receipts_for_room(room_id) - @preserve_fn # the caller should not yield on this - async def send_presence(self, states: List[UserPresenceState]) -> None: - """Send the new presence states to the appropriate destinations. - - This actually queues up the presence states ready for sending and - triggers a background task to process them and send out the transactions. - """ - if not self.hs.config.use_presence: - # No-op if presence is disabled. - return - - # First we queue up the new presence by user ID, so multiple presence - # updates in quick succession are correctly handled. - # We only want to send presence for our own users, so lets always just - # filter here just in case. - self.pending_presence.update( - {state.user_id: state for state in states if self.is_mine_id(state.user_id)} - ) - - # We then handle the new pending presence in batches, first figuring - # out the destinations we need to send each state to and then poking it - # to attempt a new transaction. We linearize this so that we don't - # accidentally mess up the ordering and send multiple presence updates - # in the wrong order - if self._processing_pending_presence: - return - - self._processing_pending_presence = True - try: - while True: - states_map = self.pending_presence - self.pending_presence = {} - - if not states_map: - break - - await self._process_presence_inner(list(states_map.values())) - except Exception: - logger.exception("Error sending presence states to servers") - finally: - self._processing_pending_presence = False - def send_presence_to_destinations( self, states: Iterable[UserPresenceState], destinations: Iterable[str] ) -> None: @@ -608,40 +548,6 @@ class FederationSender(AbstractFederationSender): continue self._get_per_destination_queue(destination).send_presence(states) - @measure_func("txnqueue._process_presence") - async def _process_presence_inner(self, states: List[UserPresenceState]) -> None: - """Given a list of states populate self.pending_presence_by_dest and - poke to send a new transaction to each destination - """ - # We pull the presence router here instead of __init__ - # to prevent a dependency cycle: - # - # AuthHandler -> Notifier -> FederationSender - # -> PresenceRouter -> ModuleApi -> AuthHandler - if self._presence_router is None: - self._presence_router = self.hs.get_presence_router() - - assert self._presence_router is not None - - hosts_and_states = await get_interested_remotes( - self.store, - self._presence_router, - states, - self.state, - ) - - for destinations, states in hosts_and_states: - for destination in destinations: - if destination == self.server_name: - continue - - if not self._federation_shard_config.should_handle( - self._instance_name, destination - ): - continue - - self._get_per_destination_queue(destination).send_presence(states) - def build_and_send_edu( self, destination: str, |