diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 952ad39f8c..6266accaf5 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -24,8 +24,6 @@ from synapse.events import EventBase
from synapse.federation.sender.per_destination_queue import PerDestinationQueue
from synapse.federation.sender.transaction_manager import TransactionManager
from synapse.federation.units import Edu
-from synapse.handlers.presence import get_interested_remotes
-from synapse.logging.context import preserve_fn
from synapse.metrics import (
LaterGauge,
event_processing_loop_counter,
@@ -34,7 +32,7 @@ from synapse.metrics import (
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import Collection, JsonDict, ReadReceipt, RoomStreamToken
-from synapse.util.metrics import Measure, measure_func
+from synapse.util.metrics import Measure
if TYPE_CHECKING:
from synapse.events.presence_router import PresenceRouter
@@ -80,15 +78,6 @@ class AbstractFederationSender(metaclass=abc.ABCMeta):
raise NotImplementedError()
@abc.abstractmethod
- def send_presence(self, states: List[UserPresenceState]) -> None:
- """Send the new presence states to the appropriate destinations.
-
- This actually queues up the presence states ready for sending and
- triggers a background task to process them and send out the transactions.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
def send_presence_to_destinations(
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
) -> None:
@@ -176,11 +165,6 @@ class FederationSender(AbstractFederationSender):
),
)
- # Map of user_id -> UserPresenceState for all the pending presence
- # to be sent out by user_id. Entries here get processed and put in
- # pending_presence_by_dest
- self.pending_presence = {} # type: Dict[str, UserPresenceState]
-
LaterGauge(
"synapse_federation_transaction_queue_pending_pdus",
"",
@@ -201,8 +185,6 @@ class FederationSender(AbstractFederationSender):
self._is_processing = False
self._last_poked_id = -1
- self._processing_pending_presence = False
-
# map from room_id to a set of PerDestinationQueues which we believe are
# awaiting a call to flush_read_receipts_for_room. The presence of an entry
# here for a given room means that we are rate-limiting RR flushes to that room,
@@ -546,48 +528,6 @@ class FederationSender(AbstractFederationSender):
for queue in queues:
queue.flush_read_receipts_for_room(room_id)
- @preserve_fn # the caller should not yield on this
- async def send_presence(self, states: List[UserPresenceState]) -> None:
- """Send the new presence states to the appropriate destinations.
-
- This actually queues up the presence states ready for sending and
- triggers a background task to process them and send out the transactions.
- """
- if not self.hs.config.use_presence:
- # No-op if presence is disabled.
- return
-
- # First we queue up the new presence by user ID, so multiple presence
- # updates in quick succession are correctly handled.
- # We only want to send presence for our own users, so lets always just
- # filter here just in case.
- self.pending_presence.update(
- {state.user_id: state for state in states if self.is_mine_id(state.user_id)}
- )
-
- # We then handle the new pending presence in batches, first figuring
- # out the destinations we need to send each state to and then poking it
- # to attempt a new transaction. We linearize this so that we don't
- # accidentally mess up the ordering and send multiple presence updates
- # in the wrong order
- if self._processing_pending_presence:
- return
-
- self._processing_pending_presence = True
- try:
- while True:
- states_map = self.pending_presence
- self.pending_presence = {}
-
- if not states_map:
- break
-
- await self._process_presence_inner(list(states_map.values()))
- except Exception:
- logger.exception("Error sending presence states to servers")
- finally:
- self._processing_pending_presence = False
-
def send_presence_to_destinations(
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
) -> None:
@@ -608,40 +548,6 @@ class FederationSender(AbstractFederationSender):
continue
self._get_per_destination_queue(destination).send_presence(states)
- @measure_func("txnqueue._process_presence")
- async def _process_presence_inner(self, states: List[UserPresenceState]) -> None:
- """Given a list of states populate self.pending_presence_by_dest and
- poke to send a new transaction to each destination
- """
- # We pull the presence router here instead of __init__
- # to prevent a dependency cycle:
- #
- # AuthHandler -> Notifier -> FederationSender
- # -> PresenceRouter -> ModuleApi -> AuthHandler
- if self._presence_router is None:
- self._presence_router = self.hs.get_presence_router()
-
- assert self._presence_router is not None
-
- hosts_and_states = await get_interested_remotes(
- self.store,
- self._presence_router,
- states,
- self.state,
- )
-
- for destinations, states in hosts_and_states:
- for destination in destinations:
- if destination == self.server_name:
- continue
-
- if not self._federation_shard_config.should_handle(
- self._instance_name, destination
- ):
- continue
-
- self._get_per_destination_queue(destination).send_presence(states)
-
def build_and_send_edu(
self,
destination: str,
|