diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index d821dcbf6a..deb40f4610 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2019 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -27,12 +26,7 @@ from synapse.events import EventBase
from synapse.federation.sender.per_destination_queue import PerDestinationQueue
from synapse.federation.sender.transaction_manager import TransactionManager
from synapse.federation.units import Edu
-from synapse.handlers.presence import get_interested_remotes
-from synapse.logging.context import (
- make_deferred_yieldable,
- preserve_fn,
- run_in_background,
-)
+from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics import (
LaterGauge,
event_processing_loop_counter,
@@ -41,7 +35,7 @@ from synapse.metrics import (
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import JsonDict, ReadReceipt, RoomStreamToken
-from synapse.util.metrics import Measure, measure_func
+from synapse.util.metrics import Measure
if TYPE_CHECKING:
from synapse.events.presence_router import PresenceRouter
@@ -87,15 +81,6 @@ class AbstractFederationSender(metaclass=abc.ABCMeta):
raise NotImplementedError()
@abc.abstractmethod
- def send_presence(self, states: List[UserPresenceState]) -> None:
- """Send the new presence states to the appropriate destinations.
-
- This actually queues up the presence states ready for sending and
- triggers a background task to process them and send out the transactions.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
def send_presence_to_destinations(
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
) -> None:
@@ -183,11 +168,6 @@ class FederationSender(AbstractFederationSender):
),
)
- # Map of user_id -> UserPresenceState for all the pending presence
- # to be sent out by user_id. Entries here get processed and put in
- # pending_presence_by_dest
- self.pending_presence = {} # type: Dict[str, UserPresenceState]
-
LaterGauge(
"synapse_federation_transaction_queue_pending_pdus",
"",
@@ -208,8 +188,6 @@ class FederationSender(AbstractFederationSender):
self._is_processing = False
self._last_poked_id = -1
- self._processing_pending_presence = False
-
# map from room_id to a set of PerDestinationQueues which we believe are
# awaiting a call to flush_read_receipts_for_room. The presence of an entry
# here for a given room means that we are rate-limiting RR flushes to that room,
@@ -519,48 +497,6 @@ class FederationSender(AbstractFederationSender):
for queue in queues:
queue.flush_read_receipts_for_room(room_id)
- @preserve_fn # the caller should not yield on this
- async def send_presence(self, states: List[UserPresenceState]) -> None:
- """Send the new presence states to the appropriate destinations.
-
- This actually queues up the presence states ready for sending and
- triggers a background task to process them and send out the transactions.
- """
- if not self.hs.config.use_presence:
- # No-op if presence is disabled.
- return
-
- # First we queue up the new presence by user ID, so multiple presence
- # updates in quick succession are correctly handled.
- # We only want to send presence for our own users, so lets always just
- # filter here just in case.
- self.pending_presence.update(
- {state.user_id: state for state in states if self.is_mine_id(state.user_id)}
- )
-
- # We then handle the new pending presence in batches, first figuring
- # out the destinations we need to send each state to and then poking it
- # to attempt a new transaction. We linearize this so that we don't
- # accidentally mess up the ordering and send multiple presence updates
- # in the wrong order
- if self._processing_pending_presence:
- return
-
- self._processing_pending_presence = True
- try:
- while True:
- states_map = self.pending_presence
- self.pending_presence = {}
-
- if not states_map:
- break
-
- await self._process_presence_inner(list(states_map.values()))
- except Exception:
- logger.exception("Error sending presence states to servers")
- finally:
- self._processing_pending_presence = False
-
def send_presence_to_destinations(
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
) -> None:
@@ -572,6 +508,10 @@ class FederationSender(AbstractFederationSender):
# No-op if presence is disabled.
return
+ # Ensure we only send out presence states for local users.
+ for state in states:
+ assert self.is_mine_id(state.user_id)
+
for destination in destinations:
if destination == self.server_name:
continue
@@ -581,40 +521,6 @@ class FederationSender(AbstractFederationSender):
continue
self._get_per_destination_queue(destination).send_presence(states)
- @measure_func("txnqueue._process_presence")
- async def _process_presence_inner(self, states: List[UserPresenceState]) -> None:
- """Given a list of states populate self.pending_presence_by_dest and
- poke to send a new transaction to each destination
- """
- # We pull the presence router here instead of __init__
- # to prevent a dependency cycle:
- #
- # AuthHandler -> Notifier -> FederationSender
- # -> PresenceRouter -> ModuleApi -> AuthHandler
- if self._presence_router is None:
- self._presence_router = self.hs.get_presence_router()
-
- assert self._presence_router is not None
-
- hosts_and_states = await get_interested_remotes(
- self.store,
- self._presence_router,
- states,
- self.state,
- )
-
- for destinations, states in hosts_and_states:
- for destination in destinations:
- if destination == self.server_name:
- continue
-
- if not self._federation_shard_config.should_handle(
- self._instance_name, destination
- ):
- continue
-
- self._get_per_destination_queue(destination).send_presence(states)
-
def build_and_send_edu(
self,
destination: str,
|