summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/federation/send_queue.py70
-rw-r--r--synapse/federation/sender/__init__.py96
-rw-r--r--synapse/handlers/presence.py78
-rw-r--r--synapse/module_api/__init__.py13
4 files changed, 74 insertions, 183 deletions
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index e3f0bc2471..d71f04e43e 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -76,9 +76,6 @@ class FederationRemoteSendQueue(AbstractFederationSender):
         # Pending presence map user_id -> UserPresenceState
         self.presence_map = {}  # type: Dict[str, UserPresenceState]
 
-        # Stream position -> list[user_id]
-        self.presence_changed = SortedDict()  # type: SortedDict[int, List[str]]
-
         # Stores the destinations we need to explicitly send presence to about a
         # given user.
         # Stream position -> (user_id, destinations)
@@ -96,7 +93,7 @@ class FederationRemoteSendQueue(AbstractFederationSender):
 
         self.edus = SortedDict()  # type: SortedDict[int, Edu]
 
-        # stream ID for the next entry into presence_changed/keyed_edu_changed/edus.
+        # stream ID for the next entry into keyed_edu_changed/edus.
         self.pos = 1
 
         # map from stream ID to the time that stream entry was generated, so that we
@@ -117,7 +114,6 @@ class FederationRemoteSendQueue(AbstractFederationSender):
 
         for queue_name in [
             "presence_map",
-            "presence_changed",
             "keyed_edu",
             "keyed_edu_changed",
             "edus",
@@ -155,23 +151,12 @@ class FederationRemoteSendQueue(AbstractFederationSender):
         """Clear all the queues from before a given position"""
         with Measure(self.clock, "send_queue._clear"):
             # Delete things out of presence maps
-            keys = self.presence_changed.keys()
-            i = self.presence_changed.bisect_left(position_to_delete)
-            for key in keys[:i]:
-                del self.presence_changed[key]
-
-            user_ids = {
-                user_id for uids in self.presence_changed.values() for user_id in uids
-            }
-
             keys = self.presence_destinations.keys()
             i = self.presence_destinations.bisect_left(position_to_delete)
             for key in keys[:i]:
                 del self.presence_destinations[key]
 
-            user_ids.update(
-                user_id for user_id, _ in self.presence_destinations.values()
-            )
+            user_ids = {user_id for user_id, _ in self.presence_destinations.values()}
 
             to_del = [
                 user_id for user_id in self.presence_map if user_id not in user_ids
@@ -244,23 +229,6 @@ class FederationRemoteSendQueue(AbstractFederationSender):
         """
         # nothing to do here: the replication listener will handle it.
 
-    def send_presence(self, states: List[UserPresenceState]) -> None:
-        """As per FederationSender
-
-        Args:
-            states
-        """
-        pos = self._next_pos()
-
-        # We only want to send presence for our own users, so lets always just
-        # filter here just in case.
-        local_states = [s for s in states if self.is_mine_id(s.user_id)]
-
-        self.presence_map.update({state.user_id: state for state in local_states})
-        self.presence_changed[pos] = [state.user_id for state in local_states]
-
-        self.notifier.on_new_replication_data()
-
     def send_presence_to_destinations(
         self, states: Iterable[UserPresenceState], destinations: Iterable[str]
     ) -> None:
@@ -325,18 +293,6 @@ class FederationRemoteSendQueue(AbstractFederationSender):
         # of the federation stream.
         rows = []  # type: List[Tuple[int, BaseFederationRow]]
 
-        # Fetch changed presence
-        i = self.presence_changed.bisect_right(from_token)
-        j = self.presence_changed.bisect_right(to_token) + 1
-        dest_user_ids = [
-            (pos, user_id)
-            for pos, user_id_list in self.presence_changed.items()[i:j]
-            for user_id in user_id_list
-        ]
-
-        for (key, user_id) in dest_user_ids:
-            rows.append((key, PresenceRow(state=self.presence_map[user_id])))
-
         # Fetch presence to send to destinations
         i = self.presence_destinations.bisect_right(from_token)
         j = self.presence_destinations.bisect_right(to_token) + 1
@@ -427,22 +383,6 @@ class BaseFederationRow:
         raise NotImplementedError()
 
 
-class PresenceRow(
-    BaseFederationRow, namedtuple("PresenceRow", ("state",))  # UserPresenceState
-):
-    TypeId = "p"
-
-    @staticmethod
-    def from_data(data):
-        return PresenceRow(state=UserPresenceState.from_dict(data))
-
-    def to_data(self):
-        return self.state.as_dict()
-
-    def add_to_buffer(self, buff):
-        buff.presence.append(self.state)
-
-
 class PresenceDestinationsRow(
     BaseFederationRow,
     namedtuple(
@@ -506,7 +446,6 @@ class EduRow(BaseFederationRow, namedtuple("EduRow", ("edu",))):  # Edu
 
 
 _rowtypes = (
-    PresenceRow,
     PresenceDestinationsRow,
     KeyedEduRow,
     EduRow,
@@ -518,7 +457,6 @@ TypeToRow = {Row.TypeId: Row for Row in _rowtypes}
 ParsedFederationStreamData = namedtuple(
     "ParsedFederationStreamData",
     (
-        "presence",  # list(UserPresenceState)
         "presence_destinations",  # list of tuples of UserPresenceState and destinations
         "keyed_edus",  # dict of destination -> { key -> Edu }
         "edus",  # dict of destination -> [Edu]
@@ -543,7 +481,6 @@ def process_rows_for_federation(
     # them into the appropriate collection and then send them off.
 
     buff = ParsedFederationStreamData(
-        presence=[],
         presence_destinations=[],
         keyed_edus={},
         edus={},
@@ -559,9 +496,6 @@ def process_rows_for_federation(
         parsed_row = RowType.from_data(row.data)
         parsed_row.add_to_buffer(buff)
 
-    if buff.presence:
-        transaction_queue.send_presence(buff.presence)
-
     for state, destinations in buff.presence_destinations:
         transaction_queue.send_presence_to_destinations(
             states=[state], destinations=destinations
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 952ad39f8c..6266accaf5 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -24,8 +24,6 @@ from synapse.events import EventBase
 from synapse.federation.sender.per_destination_queue import PerDestinationQueue
 from synapse.federation.sender.transaction_manager import TransactionManager
 from synapse.federation.units import Edu
-from synapse.handlers.presence import get_interested_remotes
-from synapse.logging.context import preserve_fn
 from synapse.metrics import (
     LaterGauge,
     event_processing_loop_counter,
@@ -34,7 +32,7 @@ from synapse.metrics import (
 )
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.types import Collection, JsonDict, ReadReceipt, RoomStreamToken
-from synapse.util.metrics import Measure, measure_func
+from synapse.util.metrics import Measure
 
 if TYPE_CHECKING:
     from synapse.events.presence_router import PresenceRouter
@@ -80,15 +78,6 @@ class AbstractFederationSender(metaclass=abc.ABCMeta):
         raise NotImplementedError()
 
     @abc.abstractmethod
-    def send_presence(self, states: List[UserPresenceState]) -> None:
-        """Send the new presence states to the appropriate destinations.
-
-        This actually queues up the presence states ready for sending and
-        triggers a background task to process them and send out the transactions.
-        """
-        raise NotImplementedError()
-
-    @abc.abstractmethod
     def send_presence_to_destinations(
         self, states: Iterable[UserPresenceState], destinations: Iterable[str]
     ) -> None:
@@ -176,11 +165,6 @@ class FederationSender(AbstractFederationSender):
             ),
         )
 
-        # Map of user_id -> UserPresenceState for all the pending presence
-        # to be sent out by user_id. Entries here get processed and put in
-        # pending_presence_by_dest
-        self.pending_presence = {}  # type: Dict[str, UserPresenceState]
-
         LaterGauge(
             "synapse_federation_transaction_queue_pending_pdus",
             "",
@@ -201,8 +185,6 @@ class FederationSender(AbstractFederationSender):
         self._is_processing = False
         self._last_poked_id = -1
 
-        self._processing_pending_presence = False
-
         # map from room_id to a set of PerDestinationQueues which we believe are
         # awaiting a call to flush_read_receipts_for_room. The presence of an entry
         # here for a given room means that we are rate-limiting RR flushes to that room,
@@ -546,48 +528,6 @@ class FederationSender(AbstractFederationSender):
         for queue in queues:
             queue.flush_read_receipts_for_room(room_id)
 
-    @preserve_fn  # the caller should not yield on this
-    async def send_presence(self, states: List[UserPresenceState]) -> None:
-        """Send the new presence states to the appropriate destinations.
-
-        This actually queues up the presence states ready for sending and
-        triggers a background task to process them and send out the transactions.
-        """
-        if not self.hs.config.use_presence:
-            # No-op if presence is disabled.
-            return
-
-        # First we queue up the new presence by user ID, so multiple presence
-        # updates in quick succession are correctly handled.
-        # We only want to send presence for our own users, so lets always just
-        # filter here just in case.
-        self.pending_presence.update(
-            {state.user_id: state for state in states if self.is_mine_id(state.user_id)}
-        )
-
-        # We then handle the new pending presence in batches, first figuring
-        # out the destinations we need to send each state to and then poking it
-        # to attempt a new transaction. We linearize this so that we don't
-        # accidentally mess up the ordering and send multiple presence updates
-        # in the wrong order
-        if self._processing_pending_presence:
-            return
-
-        self._processing_pending_presence = True
-        try:
-            while True:
-                states_map = self.pending_presence
-                self.pending_presence = {}
-
-                if not states_map:
-                    break
-
-                await self._process_presence_inner(list(states_map.values()))
-        except Exception:
-            logger.exception("Error sending presence states to servers")
-        finally:
-            self._processing_pending_presence = False
-
     def send_presence_to_destinations(
         self, states: Iterable[UserPresenceState], destinations: Iterable[str]
     ) -> None:
@@ -608,40 +548,6 @@ class FederationSender(AbstractFederationSender):
                 continue
             self._get_per_destination_queue(destination).send_presence(states)
 
-    @measure_func("txnqueue._process_presence")
-    async def _process_presence_inner(self, states: List[UserPresenceState]) -> None:
-        """Given a list of states populate self.pending_presence_by_dest and
-        poke to send a new transaction to each destination
-        """
-        # We pull the presence router here instead of __init__
-        # to prevent a dependency cycle:
-        #
-        # AuthHandler -> Notifier -> FederationSender
-        # -> PresenceRouter -> ModuleApi -> AuthHandler
-        if self._presence_router is None:
-            self._presence_router = self.hs.get_presence_router()
-
-        assert self._presence_router is not None
-
-        hosts_and_states = await get_interested_remotes(
-            self.store,
-            self._presence_router,
-            states,
-            self.state,
-        )
-
-        for destinations, states in hosts_and_states:
-            for destination in destinations:
-                if destination == self.server_name:
-                    continue
-
-                if not self._federation_shard_config.should_handle(
-                    self._instance_name, destination
-                ):
-                    continue
-
-                self._get_per_destination_queue(destination).send_presence(states)
-
     def build_and_send_edu(
         self,
         destination: str,
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index e120dd1f48..6460eb9952 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -123,6 +123,14 @@ class BasePresenceHandler(abc.ABC):
     def __init__(self, hs: "HomeServer"):
         self.clock = hs.get_clock()
         self.store = hs.get_datastore()
+        self.presence_router = hs.get_presence_router()
+        self.state = hs.get_state_handler()
+
+        self._federation = None
+        if hs.should_send_federation() or not hs.config.worker_app:
+            self._federation = hs.get_federation_sender()
+
+        self._send_federation = hs.should_send_federation()
 
         self._busy_presence_enabled = hs.config.experimental.msc3026_enabled
 
@@ -249,6 +257,29 @@ class BasePresenceHandler(abc.ABC):
         """Process presence stream rows received over replication."""
         pass
 
+    async def maybe_send_presence_to_interested_destinations(
+        self, states: List[UserPresenceState]
+    ):
+        """If this instance is a federation sender, send the states to all
+        destinations that are interested.
+        """
+
+        if not self._send_federation:
+            return
+
+        # If this worker sends federation we must have a FederationSender.
+        assert self._federation
+
+        hosts_and_states = await get_interested_remotes(
+            self.store,
+            self.presence_router,
+            states,
+            self.state,
+        )
+
+        for destinations, states in hosts_and_states:
+            self._federation.send_presence_to_destinations(states, destinations)
+
 
 class _NullContextManager(ContextManager[None]):
     """A context manager which does nothing."""
@@ -263,7 +294,6 @@ class WorkerPresenceHandler(BasePresenceHandler):
         self.hs = hs
         self.is_mine_id = hs.is_mine_id
 
-        self.presence_router = hs.get_presence_router()
         self._presence_enabled = hs.config.use_presence
 
         # The number of ongoing syncs on this process, by user id.
@@ -388,6 +418,9 @@ class WorkerPresenceHandler(BasePresenceHandler):
             users=users_to_states.keys(),
         )
 
+        # If this is a federation sender, notify about presence updates.
+        await self.maybe_send_presence_to_interested_destinations(states)
+
     async def process_replication_rows(self, token, rows):
         states = [
             UserPresenceState(
@@ -463,9 +496,6 @@ class PresenceHandler(BasePresenceHandler):
         self.server_name = hs.hostname
         self.wheel_timer = WheelTimer()
         self.notifier = hs.get_notifier()
-        self.federation = hs.get_federation_sender()
-        self.state = hs.get_state_handler()
-        self.presence_router = hs.get_presence_router()
         self._presence_enabled = hs.config.use_presence
 
         federation_registry = hs.get_federation_registry()
@@ -672,6 +702,13 @@ class PresenceHandler(BasePresenceHandler):
             self.unpersisted_users_changes |= {s.user_id for s in new_states}
             self.unpersisted_users_changes -= set(to_notify.keys())
 
+            # Check if we need to resend any presence states to remote hosts. We
+            # only do this for states that haven't been updated in a while to
+            # ensure that the remote host doesn't time the presence state out.
+            #
+            # Note that since these are states that have *not* been updated,
+            # they won't get sent down the normal presence replication stream,
+            # and so we have to explicitly send them via the federation stream.
             to_federation_ping = {
                 user_id: state
                 for user_id, state in to_federation_ping.items()
@@ -680,7 +717,19 @@ class PresenceHandler(BasePresenceHandler):
             if to_federation_ping:
                 federation_presence_out_counter.inc(len(to_federation_ping))
 
-                self._push_to_remotes(to_federation_ping.values())
+                hosts_and_states = await get_interested_remotes(
+                    self.store,
+                    self.presence_router,
+                    list(to_federation_ping.values()),
+                    self.state,
+                )
+
+                # Since this is master we know that we have a federation sender or
+                # queue, and so this will be defined.
+                assert self._federation
+
+                for destinations, states in hosts_and_states:
+                    self._federation.send_presence_to_destinations(states, destinations)
 
     async def _handle_timeouts(self):
         """Checks the presence of users that have timed out and updates as
@@ -920,15 +969,10 @@ class PresenceHandler(BasePresenceHandler):
             users=[UserID.from_string(u) for u in users_to_states],
         )
 
-        self._push_to_remotes(states)
-
-    def _push_to_remotes(self, states):
-        """Sends state updates to remote servers.
-
-        Args:
-            states (list(UserPresenceState))
-        """
-        self.federation.send_presence(states)
+        # We only want to poke the local federation sender, if any, as other
+        # workers will receive the presence updates via the presence replication
+        # stream (which is updated by `store.update_presence`).
+        await self.maybe_send_presence_to_interested_destinations(states)
 
     async def incoming_presence(self, origin, content):
         """Called when we receive a `m.presence` EDU from a remote server."""
@@ -1164,9 +1208,13 @@ class PresenceHandler(BasePresenceHandler):
                     user_presence_states
                 )
 
+        # Since this is master we know that we have a federation sender or
+        # queue, and so this will be defined.
+        assert self._federation
+
         # Send out user presence updates for each destination
         for destination, user_state_set in presence_destinations.items():
-            self.federation.send_presence_to_destinations(
+            self._federation.send_presence_to_destinations(
                 destinations=[destination], states=user_state_set
             )
 
diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py
index b7dbbfc27c..a1a2b9aecc 100644
--- a/synapse/module_api/__init__.py
+++ b/synapse/module_api/__init__.py
@@ -50,6 +50,7 @@ class ModuleApi:
         self._auth_handler = auth_handler
         self._server_name = hs.hostname
         self._presence_stream = hs.get_event_sources().sources["presence"]
+        self._state = hs.get_state_handler()
 
         # We expose these as properties below in order to attach a helpful docstring.
         self._http_client = hs.get_simple_http_client()  # type: SimpleHttpClient
@@ -429,11 +430,13 @@ class ModuleApi:
                     UserID.from_string(user), from_key=None, include_offline=False
                 )
 
-                # Send to remote destinations
-                await make_deferred_yieldable(
-                    # We pull the federation sender here as we can only do so on workers
-                    # that support sending presence
-                    self._hs.get_federation_sender().send_presence(presence_events)
+                # Send to remote destinations.
+
+                # We pull out the presence handler here to break a cyclic
+                # dependency between the presence router and module API.
+                presence_handler = self._hs.get_presence_handler()
+                await presence_handler.maybe_send_presence_to_interested_destinations(
+                    presence_events
                 )