summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/__init__.py1
-rw-r--r--synapse/handlers/_base.py1
-rw-r--r--synapse/handlers/account_data.py1
-rw-r--r--synapse/handlers/account_validity.py102
-rw-r--r--synapse/handlers/acme.py1
-rw-r--r--synapse/handlers/acme_issuing_service.py1
-rw-r--r--synapse/handlers/admin.py1
-rw-r--r--synapse/handlers/appservice.py1
-rw-r--r--synapse/handlers/auth.py3
-rw-r--r--synapse/handlers/cas_handler.py1
-rw-r--r--synapse/handlers/deactivate_account.py5
-rw-r--r--synapse/handlers/device.py14
-rw-r--r--synapse/handlers/devicemessage.py1
-rw-r--r--synapse/handlers/directory.py1
-rw-r--r--synapse/handlers/e2e_keys.py1
-rw-r--r--synapse/handlers/e2e_room_keys.py1
-rw-r--r--synapse/handlers/events.py1
-rw-r--r--synapse/handlers/federation.py181
-rw-r--r--synapse/handlers/groups_local.py1
-rw-r--r--synapse/handlers/identity.py30
-rw-r--r--synapse/handlers/initial_sync.py1
-rw-r--r--synapse/handlers/message.py1
-rw-r--r--synapse/handlers/oidc_handler.py1
-rw-r--r--synapse/handlers/pagination.py1
-rw-r--r--synapse/handlers/password_policy.py1
-rw-r--r--synapse/handlers/presence.py533
-rw-r--r--synapse/handlers/profile.py1
-rw-r--r--synapse/handlers/read_marker.py1
-rw-r--r--synapse/handlers/receipts.py1
-rw-r--r--synapse/handlers/register.py1
-rw-r--r--synapse/handlers/room.py1
-rw-r--r--synapse/handlers/room_list.py1
-rw-r--r--synapse/handlers/room_member.py1
-rw-r--r--synapse/handlers/room_member_worker.py1
-rw-r--r--synapse/handlers/saml_handler.py1
-rw-r--r--synapse/handlers/search.py1
-rw-r--r--synapse/handlers/set_password.py1
-rw-r--r--synapse/handlers/space_summary.py1
-rw-r--r--synapse/handlers/sso.py1
-rw-r--r--synapse/handlers/state_deltas.py1
-rw-r--r--synapse/handlers/stats.py1
-rw-r--r--synapse/handlers/sync.py1
-rw-r--r--synapse/handlers/typing.py1
-rw-r--r--synapse/handlers/ui_auth/__init__.py1
-rw-r--r--synapse/handlers/ui_auth/checkers.py1
-rw-r--r--synapse/handlers/user_directory.py16
46 files changed, 751 insertions, 171 deletions
diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py
index bfebb0f644..5e83dba2ed 100644
--- a/synapse/handlers/__init__.py
+++ b/synapse/handlers/__init__.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index fb899aa90d..d800e16912 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
 # Copyright 2014 - 2016 OpenMarket Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/synapse/handlers/account_data.py b/synapse/handlers/account_data.py
index 1ce6d697ed..affb54e0ee 100644
--- a/synapse/handlers/account_data.py
+++ b/synapse/handlers/account_data.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
 # Copyright 2015, 2016 OpenMarket Ltd
 # Copyright 2021 The Matrix.org Foundation C.I.C.
 #
diff --git a/synapse/handlers/account_validity.py b/synapse/handlers/account_validity.py
index bee1447c2e..5b927f10b3 100644
--- a/synapse/handlers/account_validity.py
+++ b/synapse/handlers/account_validity.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
 # Copyright 2019 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
@@ -18,7 +17,7 @@ import email.utils
 import logging
 from email.mime.multipart import MIMEMultipart
 from email.mime.text import MIMEText
-from typing import TYPE_CHECKING, List, Optional
+from typing import TYPE_CHECKING, List, Optional, Tuple
 
 from synapse.api.errors import StoreError, SynapseError
 from synapse.logging.context import make_deferred_yieldable
@@ -40,28 +39,44 @@ class AccountValidityHandler:
         self.sendmail = self.hs.get_sendmail()
         self.clock = self.hs.get_clock()
 
-        self._account_validity = self.hs.config.account_validity
+        self._account_validity_enabled = (
+            hs.config.account_validity.account_validity_enabled
+        )
+        self._account_validity_renew_by_email_enabled = (
+            hs.config.account_validity.account_validity_renew_by_email_enabled
+        )
+
+        self._account_validity_period = None
+        if self._account_validity_enabled:
+            self._account_validity_period = (
+                hs.config.account_validity.account_validity_period
+            )
 
         if (
-            self._account_validity.enabled
-            and self._account_validity.renew_by_email_enabled
+            self._account_validity_enabled
+            and self._account_validity_renew_by_email_enabled
         ):
             # Don't do email-specific configuration if renewal by email is disabled.
-            self._template_html = self.config.account_validity_template_html
-            self._template_text = self.config.account_validity_template_text
+            self._template_html = (
+                hs.config.account_validity.account_validity_template_html
+            )
+            self._template_text = (
+                hs.config.account_validity.account_validity_template_text
+            )
+            account_validity_renew_email_subject = (
+                hs.config.account_validity.account_validity_renew_email_subject
+            )
 
             try:
-                app_name = self.hs.config.email_app_name
+                app_name = hs.config.email_app_name
 
-                self._subject = self._account_validity.renew_email_subject % {
-                    "app": app_name
-                }
+                self._subject = account_validity_renew_email_subject % {"app": app_name}
 
-                self._from_string = self.hs.config.email_notif_from % {"app": app_name}
+                self._from_string = hs.config.email_notif_from % {"app": app_name}
             except Exception:
                 # If substitution failed, fall back to the bare strings.
-                self._subject = self._account_validity.renew_email_subject
-                self._from_string = self.hs.config.email_notif_from
+                self._subject = account_validity_renew_email_subject
+                self._from_string = hs.config.email_notif_from
 
             self._raw_from = email.utils.parseaddr(self._from_string)[1]
 
@@ -221,50 +236,87 @@ class AccountValidityHandler:
                 attempts += 1
         raise StoreError(500, "Couldn't generate a unique string as refresh string.")
 
-    async def renew_account(self, renewal_token: str) -> bool:
+    async def renew_account(self, renewal_token: str) -> Tuple[bool, bool, int]:
         """Renews the account attached to a given renewal token by pushing back the
         expiration date by the current validity period in the server's configuration.
 
+        If it turns out that the token is valid but has already been used, then the
+        token is considered stale. A token is stale if the 'token_used_ts_ms' db column
+        is non-null.
+
         Args:
             renewal_token: Token sent with the renewal request.
         Returns:
-            Whether the provided token is valid.
+            A tuple containing:
+              * A bool representing whether the token is valid and unused.
+              * A bool which is `True` if the token is valid, but stale.
+              * An int representing the user's expiry timestamp as milliseconds since the
+                epoch, or 0 if the token was invalid.
         """
         try:
-            user_id = await self.store.get_user_from_renewal_token(renewal_token)
+            (
+                user_id,
+                current_expiration_ts,
+                token_used_ts,
+            ) = await self.store.get_user_from_renewal_token(renewal_token)
         except StoreError:
-            return False
+            return False, False, 0
+
+        # Check whether this token has already been used.
+        if token_used_ts:
+            logger.info(
+                "User '%s' attempted to use previously used token '%s' to renew account",
+                user_id,
+                renewal_token,
+            )
+            return False, True, current_expiration_ts
 
         logger.debug("Renewing an account for user %s", user_id)
-        await self.renew_account_for_user(user_id)
 
-        return True
+        # Renew the account. Pass the renewal_token here so that it is not cleared.
+        # We want to keep the token around in case the user attempts to renew their
+        # account with the same token twice (clicking the email link twice).
+        #
+        # In that case, the token will be accepted, but the account's expiration ts
+        # will remain unchanged.
+        new_expiration_ts = await self.renew_account_for_user(
+            user_id, renewal_token=renewal_token
+        )
+
+        return True, False, new_expiration_ts
 
     async def renew_account_for_user(
         self,
         user_id: str,
         expiration_ts: Optional[int] = None,
         email_sent: bool = False,
+        renewal_token: Optional[str] = None,
     ) -> int:
         """Renews the account attached to a given user by pushing back the
         expiration date by the current validity period in the server's
         configuration.
 
         Args:
-            renewal_token: Token sent with the renewal request.
+            user_id: The ID of the user to renew.
             expiration_ts: New expiration date. Defaults to now + validity period.
-            email_sen: Whether an email has been sent for this validity period.
-                Defaults to False.
+            email_sent: Whether an email has been sent for this validity period.
+            renewal_token: Token sent with the renewal request. The user's token
+                will be cleared if this is None.
 
         Returns:
             New expiration date for this account, as a timestamp in
             milliseconds since epoch.
         """
+        now = self.clock.time_msec()
         if expiration_ts is None:
-            expiration_ts = self.clock.time_msec() + self._account_validity.period
+            expiration_ts = now + self._account_validity_period
 
         await self.store.set_account_validity_for_user(
-            user_id=user_id, expiration_ts=expiration_ts, email_sent=email_sent
+            user_id=user_id,
+            expiration_ts=expiration_ts,
+            email_sent=email_sent,
+            renewal_token=renewal_token,
+            token_used_ts=now,
         )
 
         return expiration_ts
diff --git a/synapse/handlers/acme.py b/synapse/handlers/acme.py
index 2a25af6288..16ab93f580 100644
--- a/synapse/handlers/acme.py
+++ b/synapse/handlers/acme.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
 # Copyright 2019 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/synapse/handlers/acme_issuing_service.py b/synapse/handlers/acme_issuing_service.py
index ae2a9dd9c2..a972d3fa0a 100644
--- a/synapse/handlers/acme_issuing_service.py
+++ b/synapse/handlers/acme_issuing_service.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
 # Copyright 2019 New Vector Ltd
 # Copyright 2019 The Matrix.org Foundation C.I.C.
 #
diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py
index c494de49a3..f72ded038e 100644
--- a/synapse/handlers/admin.py
+++ b/synapse/handlers/admin.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 9fb7ee335d..d7bc4e23ed 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
 # Copyright 2015, 2016 OpenMarket Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 08e413bc98..36f2450e2e 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
 # Copyright 2014 - 2016 OpenMarket Ltd
 # Copyright 2017 Vector Creations Ltd
 # Copyright 2019 - 2020 The Matrix.org Foundation C.I.C.
@@ -1249,7 +1248,7 @@ class AuthHandler(BaseHandler):
 
         # see if any of our auth providers want to know about this
         for provider in self.password_providers:
-            for token, token_id, device_id in tokens_and_devices:
+            for token, _, device_id in tokens_and_devices:
                 await provider.on_logged_out(
                     user_id=user_id, device_id=device_id, access_token=token
                 )
diff --git a/synapse/handlers/cas_handler.py b/synapse/handlers/cas_handler.py
index 5060936f94..7346ccfe93 100644
--- a/synapse/handlers/cas_handler.py
+++ b/synapse/handlers/cas_handler.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
 # Copyright 2020 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index 2bcd8f5435..45d2404dde 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
 # Copyright 2017, 2018 New Vector Ltd
 # Copyright 2019 The Matrix.org Foundation C.I.C.
 #
@@ -50,7 +49,9 @@ class DeactivateAccountHandler(BaseHandler):
         if hs.config.run_background_tasks:
             hs.get_reactor().callWhenRunning(self._start_user_parting)
 
-        self._account_validity_enabled = hs.config.account_validity.enabled
+        self._account_validity_enabled = (
+            hs.config.account_validity.account_validity_enabled
+        )
 
     async def deactivate_account(
         self,
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 7e76db3e2a..c1d7800981 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
 # Copyright 2016 OpenMarket Ltd
 # Copyright 2019 New Vector Ltd
 # Copyright 2019,2020 The Matrix.org Foundation C.I.C.
@@ -157,8 +156,7 @@ class DeviceWorkerHandler(BaseHandler):
             # The user may have left the room
             # TODO: Check if they actually did or if we were just invited.
             if room_id not in room_ids:
-                for key, event_id in current_state_ids.items():
-                    etype, state_key = key
+                for etype, state_key in current_state_ids.keys():
                     if etype != EventTypes.Member:
                         continue
                     possibly_left.add(state_key)
@@ -180,8 +178,7 @@ class DeviceWorkerHandler(BaseHandler):
                 log_kv(
                     {"event": "encountered empty previous state", "room_id": room_id}
                 )
-                for key, event_id in current_state_ids.items():
-                    etype, state_key = key
+                for etype, state_key in current_state_ids.keys():
                     if etype != EventTypes.Member:
                         continue
                     possibly_changed.add(state_key)
@@ -199,8 +196,7 @@ class DeviceWorkerHandler(BaseHandler):
             for state_dict in prev_state_ids.values():
                 member_event = state_dict.get((EventTypes.Member, user_id), None)
                 if not member_event or member_event != current_member_id:
-                    for key, event_id in current_state_ids.items():
-                        etype, state_key = key
+                    for etype, state_key in current_state_ids.keys():
                         if etype != EventTypes.Member:
                             continue
                         possibly_changed.add(state_key)
@@ -715,7 +711,7 @@ class DeviceListUpdater:
                 # This can happen since we batch updates
                 return
 
-            for device_id, stream_id, prev_ids, content in pending_updates:
+            for device_id, stream_id, prev_ids, _ in pending_updates:
                 logger.debug(
                     "Handling update %r/%r, ID: %r, prev: %r ",
                     user_id,
@@ -741,7 +737,7 @@ class DeviceListUpdater:
             else:
                 # Simply update the single device, since we know that is the only
                 # change (because of the single prev_id matching the current cache)
-                for device_id, stream_id, prev_ids, content in pending_updates:
+                for device_id, stream_id, _, content in pending_updates:
                     await self.store.update_remote_device_list_cache_entry(
                         user_id, device_id, content, stream_id
                     )
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index c971eeb4d2..c5d631de07 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
 # Copyright 2016 OpenMarket Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index abcf86352d..90932316f3 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 92b18378fc..974487800d 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
 # Copyright 2016 OpenMarket Ltd
 # Copyright 2018-2019 New Vector Ltd
 # Copyright 2019 The Matrix.org Foundation C.I.C.
diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py
index a910d246d6..31742236a9 100644
--- a/synapse/handlers/e2e_room_keys.py
+++ b/synapse/handlers/e2e_room_keys.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
 # Copyright 2017, 2018 New Vector Ltd
 # Copyright 2019 Matrix.org Foundation C.I.C.
 #
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index f46cab7325..d82144d7fa 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 67888898ff..dbdd7d2db3 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
 # Copyright 2017-2018 New Vector Ltd
 # Copyright 2019 The Matrix.org Foundation C.I.C.
@@ -104,7 +103,7 @@ logger = logging.getLogger(__name__)
 
 @attr.s(slots=True)
 class _NewEventInfo:
-    """Holds information about a received event, ready for passing to _handle_new_events
+    """Holds information about a received event, ready for passing to _auth_and_persist_events
 
     Attributes:
         event: the received event
@@ -808,7 +807,10 @@ class FederationHandler(BaseHandler):
         logger.debug("Processing event: %s", event)
 
         try:
-            await self._handle_new_event(origin, event, state=state)
+            context = await self.state_handler.compute_event_context(
+                event, old_state=state
+            )
+            await self._auth_and_persist_event(origin, event, context, state=state)
         except AuthError as e:
             raise FederationError("ERROR", e.code, e.msg, affected=event.event_id)
 
@@ -1011,7 +1013,9 @@ class FederationHandler(BaseHandler):
             )
 
         if ev_infos:
-            await self._handle_new_events(dest, room_id, ev_infos, backfilled=True)
+            await self._auth_and_persist_events(
+                dest, room_id, ev_infos, backfilled=True
+            )
 
         # Step 2: Persist the rest of the events in the chunk one by one
         events.sort(key=lambda e: e.depth)
@@ -1024,10 +1028,12 @@ class FederationHandler(BaseHandler):
             # non-outliers
             assert not event.internal_metadata.is_outlier()
 
+            context = await self.state_handler.compute_event_context(event)
+
             # We store these one at a time since each event depends on the
             # previous to work out the state.
             # TODO: We can probably do something more clever here.
-            await self._handle_new_event(dest, event, backfilled=True)
+            await self._auth_and_persist_event(dest, event, context, backfilled=True)
 
         return events
 
@@ -1361,7 +1367,7 @@ class FederationHandler(BaseHandler):
 
             event_infos.append(_NewEventInfo(event, None, auth))
 
-        await self._handle_new_events(
+        await self._auth_and_persist_events(
             destination,
             room_id,
             event_infos,
@@ -1667,10 +1673,11 @@ class FederationHandler(BaseHandler):
         # would introduce the danger of backwards-compatibility problems.
         event.internal_metadata.send_on_behalf_of = origin
 
-        context = await self._handle_new_event(origin, event)
+        context = await self.state_handler.compute_event_context(event)
+        context = await self._auth_and_persist_event(origin, event, context)
 
         logger.debug(
-            "on_send_join_request: After _handle_new_event: %s, sigs: %s",
+            "on_send_join_request: After _auth_and_persist_event: %s, sigs: %s",
             event.event_id,
             event.signatures,
         )
@@ -1879,10 +1886,11 @@ class FederationHandler(BaseHandler):
 
         event.internal_metadata.outlier = False
 
-        await self._handle_new_event(origin, event)
+        context = await self.state_handler.compute_event_context(event)
+        await self._auth_and_persist_event(origin, event, context)
 
         logger.debug(
-            "on_send_leave_request: After _handle_new_event: %s, sigs: %s",
+            "on_send_leave_request: After _auth_and_persist_event: %s, sigs: %s",
             event.event_id,
             event.signatures,
         )
@@ -1990,16 +1998,47 @@ class FederationHandler(BaseHandler):
     async def get_min_depth_for_context(self, context: str) -> int:
         return await self.store.get_min_depth(context)
 
-    async def _handle_new_event(
+    async def _auth_and_persist_event(
         self,
         origin: str,
         event: EventBase,
+        context: EventContext,
         state: Optional[Iterable[EventBase]] = None,
         auth_events: Optional[MutableStateMap[EventBase]] = None,
         backfilled: bool = False,
     ) -> EventContext:
-        context = await self._prep_event(
-            origin, event, state=state, auth_events=auth_events, backfilled=backfilled
+        """
+        Process an event by performing auth checks and then persisting to the database.
+
+        Args:
+            origin: The host the event originates from.
+            event: The event itself.
+            context:
+                The event context.
+
+                NB that this function potentially modifies it.
+            state:
+                The state events used to check the event for soft-fail. If this is
+                not provided the current state events will be used.
+            auth_events:
+                Map from (event_type, state_key) to event
+
+                Normally, our calculated auth_events based on the state of the room
+                at the event's position in the DAG, though occasionally (eg if the
+                event is an outlier), may be the auth events claimed by the remote
+                server.
+            backfilled: True if the event was backfilled.
+
+        Returns:
+             The event context.
+        """
+        context = await self._check_event_auth(
+            origin,
+            event,
+            context,
+            state=state,
+            auth_events=auth_events,
+            backfilled=backfilled,
         )
 
         try:
@@ -2023,7 +2062,7 @@ class FederationHandler(BaseHandler):
 
         return context
 
-    async def _handle_new_events(
+    async def _auth_and_persist_events(
         self,
         origin: str,
         room_id: str,
@@ -2041,9 +2080,13 @@ class FederationHandler(BaseHandler):
         async def prep(ev_info: _NewEventInfo):
             event = ev_info.event
             with nested_logging_context(suffix=event.event_id):
-                res = await self._prep_event(
+                res = await self.state_handler.compute_event_context(
+                    event, old_state=ev_info.state
+                )
+                res = await self._check_event_auth(
                     origin,
                     event,
+                    res,
                     state=ev_info.state,
                     auth_events=ev_info.auth_events,
                     backfilled=backfilled,
@@ -2178,49 +2221,6 @@ class FederationHandler(BaseHandler):
             room_id, [(event, new_event_context)]
         )
 
-    async def _prep_event(
-        self,
-        origin: str,
-        event: EventBase,
-        state: Optional[Iterable[EventBase]],
-        auth_events: Optional[MutableStateMap[EventBase]],
-        backfilled: bool,
-    ) -> EventContext:
-        context = await self.state_handler.compute_event_context(event, old_state=state)
-
-        if not auth_events:
-            prev_state_ids = await context.get_prev_state_ids()
-            auth_events_ids = self.auth.compute_auth_events(
-                event, prev_state_ids, for_verification=True
-            )
-            auth_events_x = await self.store.get_events(auth_events_ids)
-            auth_events = {(e.type, e.state_key): e for e in auth_events_x.values()}
-
-        # This is a hack to fix some old rooms where the initial join event
-        # didn't reference the create event in its auth events.
-        if event.type == EventTypes.Member and not event.auth_event_ids():
-            if len(event.prev_event_ids()) == 1 and event.depth < 5:
-                c = await self.store.get_event(
-                    event.prev_event_ids()[0], allow_none=True
-                )
-                if c and c.type == EventTypes.Create:
-                    auth_events[(c.type, c.state_key)] = c
-
-        context = await self.do_auth(origin, event, context, auth_events=auth_events)
-
-        if not context.rejected:
-            await self._check_for_soft_fail(event, state, backfilled)
-
-        if event.type == EventTypes.GuestAccess and not context.rejected:
-            await self.maybe_kick_guest_users(event)
-
-        # If we are going to send this event over federation we precaclculate
-        # the joined hosts.
-        if event.internal_metadata.get_send_on_behalf_of():
-            await self.event_creation_handler.cache_joined_hosts_for_event(event)
-
-        return context
-
     async def _check_for_soft_fail(
         self, event: EventBase, state: Optional[Iterable[EventBase]], backfilled: bool
     ) -> None:
@@ -2331,19 +2331,28 @@ class FederationHandler(BaseHandler):
 
         return missing_events
 
-    async def do_auth(
+    async def _check_event_auth(
         self,
         origin: str,
         event: EventBase,
         context: EventContext,
-        auth_events: MutableStateMap[EventBase],
+        state: Optional[Iterable[EventBase]],
+        auth_events: Optional[MutableStateMap[EventBase]],
+        backfilled: bool,
     ) -> EventContext:
         """
+        Checks whether an event should be rejected (for failing auth checks).
 
         Args:
-            origin:
-            event:
+            origin: The host the event originates from.
+            event: The event itself.
             context:
+                The event context.
+
+                NB that this function potentially modifies it.
+            state:
+                The state events used to check the event for soft-fail. If this is
+                not provided the current state events will be used.
             auth_events:
                 Map from (event_type, state_key) to event
 
@@ -2353,12 +2362,34 @@ class FederationHandler(BaseHandler):
                 server.
 
                 Also NB that this function adds entries to it.
+
+                If this is not provided, it is calculated from the previous state IDs.
+            backfilled: True if the event was backfilled.
+
         Returns:
-            updated context object
+            The updated context object.
         """
         room_version = await self.store.get_room_version_id(event.room_id)
         room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
 
+        if not auth_events:
+            prev_state_ids = await context.get_prev_state_ids()
+            auth_events_ids = self.auth.compute_auth_events(
+                event, prev_state_ids, for_verification=True
+            )
+            auth_events_x = await self.store.get_events(auth_events_ids)
+            auth_events = {(e.type, e.state_key): e for e in auth_events_x.values()}
+
+        # This is a hack to fix some old rooms where the initial join event
+        # didn't reference the create event in its auth events.
+        if event.type == EventTypes.Member and not event.auth_event_ids():
+            if len(event.prev_event_ids()) == 1 and event.depth < 5:
+                c = await self.store.get_event(
+                    event.prev_event_ids()[0], allow_none=True
+                )
+                if c and c.type == EventTypes.Create:
+                    auth_events[(c.type, c.state_key)] = c
+
         try:
             context = await self._update_auth_events_and_context_for_auth(
                 origin, event, context, auth_events
@@ -2380,6 +2411,17 @@ class FederationHandler(BaseHandler):
             logger.warning("Failed auth resolution for %r because %s", event, e)
             context.rejected = RejectedReason.AUTH_ERROR
 
+        if not context.rejected:
+            await self._check_for_soft_fail(event, state, backfilled)
+
+        if event.type == EventTypes.GuestAccess and not context.rejected:
+            await self.maybe_kick_guest_users(event)
+
+        # If we are going to send this event over federation we precaclculate
+        # the joined hosts.
+        if event.internal_metadata.get_send_on_behalf_of():
+            await self.event_creation_handler.cache_joined_hosts_for_event(event)
+
         return context
 
     async def _update_auth_events_and_context_for_auth(
@@ -2389,7 +2431,7 @@ class FederationHandler(BaseHandler):
         context: EventContext,
         auth_events: MutableStateMap[EventBase],
     ) -> EventContext:
-        """Helper for do_auth. See there for docs.
+        """Helper for _check_event_auth. See there for docs.
 
         Checks whether a given event has the expected auth events. If it
         doesn't then we talk to the remote server to compare state to see if
@@ -2469,9 +2511,14 @@ class FederationHandler(BaseHandler):
                         e.internal_metadata.outlier = True
 
                         logger.debug(
-                            "do_auth %s missing_auth: %s", event.event_id, e.event_id
+                            "_check_event_auth %s missing_auth: %s",
+                            event.event_id,
+                            e.event_id,
+                        )
+                        context = await self.state_handler.compute_event_context(e)
+                        await self._auth_and_persist_event(
+                            origin, e, context, auth_events=auth
                         )
-                        await self._handle_new_event(origin, e, auth_events=auth)
 
                         if e.event_id in event_auth_events:
                             auth_events[(e.type, e.state_key)] = e
@@ -2909,7 +2956,7 @@ class FederationHandler(BaseHandler):
             try:
                 # for each sig on the third_party_invite block of the actual invite
                 for server, signature_block in signed["signatures"].items():
-                    for key_name, encoded_signature in signature_block.items():
+                    for key_name in signature_block.keys():
                         if not key_name.startswith("ed25519:"):
                             continue
 
diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py
index a41ca5df9c..157f2ff218 100644
--- a/synapse/handlers/groups_local.py
+++ b/synapse/handlers/groups_local.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
 # Copyright 2017 Vector Creations Ltd
 # Copyright 2018 New Vector Ltd
 #
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index d89fa5fb30..0b3b1fadb5 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
 # Copyright 2015, 2016 OpenMarket Ltd
 # Copyright 2017 Vector Creations Ltd
 # Copyright 2018 New Vector Ltd
@@ -16,7 +15,6 @@
 # limitations under the License.
 
 """Utilities for interacting with Identity Servers"""
-
 import logging
 import urllib.parse
 from typing import Awaitable, Callable, Dict, List, Optional, Tuple
@@ -35,7 +33,11 @@ from synapse.http.site import SynapseRequest
 from synapse.types import JsonDict, Requester
 from synapse.util import json_decoder
 from synapse.util.hash import sha256_and_url_safe_base64
-from synapse.util.stringutils import assert_valid_client_secret, random_string
+from synapse.util.stringutils import (
+    assert_valid_client_secret,
+    random_string,
+    valid_id_server_location,
+)
 
 from ._base import BaseHandler
 
@@ -173,6 +175,11 @@ class IdentityHandler(BaseHandler):
                 server with, if necessary. Required if use_v2 is true
             use_v2: Whether to use v2 Identity Service API endpoints. Defaults to True
 
+        Raises:
+            SynapseError: On any of the following conditions
+                - the supplied id_server is not a valid identity server name
+                - we failed to contact the supplied identity server
+
         Returns:
             The response from the identity server
         """
@@ -182,6 +189,12 @@ class IdentityHandler(BaseHandler):
         if id_access_token is None:
             use_v2 = False
 
+        if not valid_id_server_location(id_server):
+            raise SynapseError(
+                400,
+                "id_server must be a valid hostname with optional port and path components",
+            )
+
         # Decide which API endpoint URLs to use
         headers = {}
         bind_data = {"sid": sid, "client_secret": client_secret, "mxid": mxid}
@@ -270,12 +283,21 @@ class IdentityHandler(BaseHandler):
             id_server: Identity server to unbind from
 
         Raises:
-            SynapseError: If we failed to contact the identity server
+            SynapseError: On any of the following conditions
+                - the supplied id_server is not a valid identity server name
+                - we failed to contact the supplied identity server
 
         Returns:
             True on success, otherwise False if the identity
             server doesn't support unbinding
         """
+
+        if not valid_id_server_location(id_server):
+            raise SynapseError(
+                400,
+                "id_server must be a valid hostname with optional port and path components",
+            )
+
         url = "https://%s/_matrix/identity/api/v1/3pid/unbind" % (id_server,)
         url_bytes = "/_matrix/identity/api/v1/3pid/unbind".encode("ascii")
 
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index 13f8152283..76242865ae 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
 # Copyright 2016 OpenMarket Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 125dae6d25..ec8eb21674 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
 # Copyright 2017-2018 New Vector Ltd
 # Copyright 2019 The Matrix.org Foundation C.I.C.
diff --git a/synapse/handlers/oidc_handler.py b/synapse/handlers/oidc_handler.py
index 6624212d6f..b156196a70 100644
--- a/synapse/handlers/oidc_handler.py
+++ b/synapse/handlers/oidc_handler.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
 # Copyright 2020 Quentin Gliech
 # Copyright 2021 The Matrix.org Foundation C.I.C.
 #
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 66dc886c81..1e1186c29e 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
 # Copyright 2014 - 2016 OpenMarket Ltd
 # Copyright 2017 - 2018 New Vector Ltd
 #
diff --git a/synapse/handlers/password_policy.py b/synapse/handlers/password_policy.py
index 92cefa11aa..cd21efdcc6 100644
--- a/synapse/handlers/password_policy.py
+++ b/synapse/handlers/password_policy.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
 # Copyright 2019 New Vector Ltd
 # Copyright 2019 The Matrix.org Foundation C.I.C.
 #
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 0047907cd9..598466c9bd 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
 # Copyright 2020 The Matrix.org Foundation C.I.C.
 #
@@ -23,7 +22,9 @@ The methods that define policy are:
     - should_notify
 """
 import abc
+import contextlib
 import logging
+from bisect import bisect
 from contextlib import contextmanager
 from typing import (
     TYPE_CHECKING,
@@ -49,6 +50,13 @@ from synapse.logging.context import run_in_background
 from synapse.logging.utils import log_function
 from synapse.metrics import LaterGauge
 from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.replication.http.presence import (
+    ReplicationBumpPresenceActiveTime,
+    ReplicationPresenceSetState,
+)
+from synapse.replication.http.streams import ReplicationGetStreamUpdates
+from synapse.replication.tcp.commands import ClearUserSyncsCommand
+from synapse.replication.tcp.streams import PresenceFederationStream, PresenceStream
 from synapse.state import StateHandler
 from synapse.storage.databases.main import DataStore
 from synapse.types import Collection, JsonDict, UserID, get_domain_from_id
@@ -105,6 +113,10 @@ FEDERATION_PING_INTERVAL = 25 * 60 * 1000
 # are dead.
 EXTERNAL_PROCESS_EXPIRY = 5 * 60 * 1000
 
+# Delay before a worker tells the presence handler that a user has stopped
+# syncing.
+UPDATE_SYNCING_USERS_MS = 10 * 1000
+
 assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER
 
 
@@ -114,6 +126,15 @@ class BasePresenceHandler(abc.ABC):
     def __init__(self, hs: "HomeServer"):
         self.clock = hs.get_clock()
         self.store = hs.get_datastore()
+        self.presence_router = hs.get_presence_router()
+        self.state = hs.get_state_handler()
+        self.is_mine_id = hs.is_mine_id
+
+        self._federation = None
+        if hs.should_send_federation():
+            self._federation = hs.get_federation_sender()
+
+        self._federation_queue = PresenceFederationQueue(hs, self)
 
         self._busy_presence_enabled = hs.config.experimental.msc3026_enabled
 
@@ -209,18 +230,292 @@ class BasePresenceHandler(abc.ABC):
         with the app.
         """
 
+    async def update_external_syncs_row(
+        self, process_id, user_id, is_syncing, sync_time_msec
+    ):
+        """Update the syncing users for an external process as a delta.
+
+        This is a no-op when presence is handled by a different worker.
+
+        Args:
+            process_id (str): An identifier for the process the users are
+                syncing against. This allows synapse to process updates
+                as user start and stop syncing against a given process.
+            user_id (str): The user who has started or stopped syncing
+            is_syncing (bool): Whether or not the user is now syncing
+            sync_time_msec(int): Time in ms when the user was last syncing
+        """
+        pass
+
+    async def update_external_syncs_clear(self, process_id):
+        """Marks all users that had been marked as syncing by a given process
+        as offline.
+
+        Used when the process has stopped/disappeared.
+
+        This is a no-op when presence is handled by a different worker.
+        """
+        pass
+
+    async def process_replication_rows(
+        self, stream_name: str, instance_name: str, token: int, rows: list
+    ):
+        """Process streams received over replication."""
+        await self._federation_queue.process_replication_rows(
+            stream_name, instance_name, token, rows
+        )
+
+    def get_federation_queue(self) -> "PresenceFederationQueue":
+        """Get the presence federation queue."""
+        return self._federation_queue
+
+    async def maybe_send_presence_to_interested_destinations(
+        self, states: List[UserPresenceState]
+    ):
+        """If this instance is a federation sender, send the states to all
+        destinations that are interested. Filters out any states for remote
+        users.
+        """
+
+        if not self._federation:
+            return
+
+        states = [s for s in states if self.is_mine_id(s.user_id)]
+
+        if not states:
+            return
+
+        hosts_and_states = await get_interested_remotes(
+            self.store,
+            self.presence_router,
+            states,
+            self.state,
+        )
+
+        for destinations, states in hosts_and_states:
+            self._federation.send_presence_to_destinations(states, destinations)
+
+
+class _NullContextManager(ContextManager[None]):
+    """A context manager which does nothing."""
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        pass
+
+
+class WorkerPresenceHandler(BasePresenceHandler):
+    def __init__(self, hs):
+        super().__init__(hs)
+        self.hs = hs
+
+        self._presence_enabled = hs.config.use_presence
+
+        # The number of ongoing syncs on this process, by user id.
+        # Empty if _presence_enabled is false.
+        self._user_to_num_current_syncs = {}  # type: Dict[str, int]
+
+        self.notifier = hs.get_notifier()
+        self.instance_id = hs.get_instance_id()
+
+        # user_id -> last_sync_ms. Lists the users that have stopped syncing
+        # but we haven't notified the master of that yet
+        self.users_going_offline = {}
+
+        self._bump_active_client = ReplicationBumpPresenceActiveTime.make_client(hs)
+        self._set_state_client = ReplicationPresenceSetState.make_client(hs)
+
+        self._send_stop_syncing_loop = self.clock.looping_call(
+            self.send_stop_syncing, UPDATE_SYNCING_USERS_MS
+        )
+
+        self._busy_presence_enabled = hs.config.experimental.msc3026_enabled
+
+        hs.get_reactor().addSystemEventTrigger(
+            "before",
+            "shutdown",
+            run_as_background_process,
+            "generic_presence.on_shutdown",
+            self._on_shutdown,
+        )
+
+    def _on_shutdown(self):
+        if self._presence_enabled:
+            self.hs.get_tcp_replication().send_command(
+                ClearUserSyncsCommand(self.instance_id)
+            )
+
+    def send_user_sync(self, user_id, is_syncing, last_sync_ms):
+        if self._presence_enabled:
+            self.hs.get_tcp_replication().send_user_sync(
+                self.instance_id, user_id, is_syncing, last_sync_ms
+            )
+
+    def mark_as_coming_online(self, user_id):
+        """A user has started syncing. Send a UserSync to the master, unless they
+        had recently stopped syncing.
+
+        Args:
+            user_id (str)
+        """
+        going_offline = self.users_going_offline.pop(user_id, None)
+        if not going_offline:
+            # Safe to skip because we haven't yet told the master they were offline
+            self.send_user_sync(user_id, True, self.clock.time_msec())
+
+    def mark_as_going_offline(self, user_id):
+        """A user has stopped syncing. We wait before notifying the master as
+        its likely they'll come back soon. This allows us to avoid sending
+        a stopped syncing immediately followed by a started syncing notification
+        to the master
+
+        Args:
+            user_id (str)
+        """
+        self.users_going_offline[user_id] = self.clock.time_msec()
+
+    def send_stop_syncing(self):
+        """Check if there are any users who have stopped syncing a while ago
+        and haven't come back yet. If there are poke the master about them.
+        """
+        now = self.clock.time_msec()
+        for user_id, last_sync_ms in list(self.users_going_offline.items()):
+            if now - last_sync_ms > UPDATE_SYNCING_USERS_MS:
+                self.users_going_offline.pop(user_id, None)
+                self.send_user_sync(user_id, False, last_sync_ms)
+
+    async def user_syncing(
+        self, user_id: str, affect_presence: bool
+    ) -> ContextManager[None]:
+        """Record that a user is syncing.
+
+        Called by the sync and events servlets to record that a user has connected to
+        this worker and is waiting for some events.
+        """
+        if not affect_presence or not self._presence_enabled:
+            return _NullContextManager()
+
+        curr_sync = self._user_to_num_current_syncs.get(user_id, 0)
+        self._user_to_num_current_syncs[user_id] = curr_sync + 1
+
+        # If we went from no in flight sync to some, notify replication
+        if self._user_to_num_current_syncs[user_id] == 1:
+            self.mark_as_coming_online(user_id)
+
+        def _end():
+            # We check that the user_id is in user_to_num_current_syncs because
+            # user_to_num_current_syncs may have been cleared if we are
+            # shutting down.
+            if user_id in self._user_to_num_current_syncs:
+                self._user_to_num_current_syncs[user_id] -= 1
+
+                # If we went from one in flight sync to non, notify replication
+                if self._user_to_num_current_syncs[user_id] == 0:
+                    self.mark_as_going_offline(user_id)
+
+        @contextlib.contextmanager
+        def _user_syncing():
+            try:
+                yield
+            finally:
+                _end()
+
+        return _user_syncing()
+
+    async def notify_from_replication(self, states, stream_id):
+        parties = await get_interested_parties(self.store, self.presence_router, states)
+        room_ids_to_states, users_to_states = parties
+
+        self.notifier.on_new_event(
+            "presence_key",
+            stream_id,
+            rooms=room_ids_to_states.keys(),
+            users=users_to_states.keys(),
+        )
+
+        # If this is a federation sender, notify about presence updates.
+        await self.maybe_send_presence_to_interested_destinations(states)
+
+    async def process_replication_rows(
+        self, stream_name: str, instance_name: str, token: int, rows: list
+    ):
+        await super().process_replication_rows(stream_name, instance_name, token, rows)
+
+        if stream_name != PresenceStream.NAME:
+            return
+
+        states = [
+            UserPresenceState(
+                row.user_id,
+                row.state,
+                row.last_active_ts,
+                row.last_federation_update_ts,
+                row.last_user_sync_ts,
+                row.status_msg,
+                row.currently_active,
+            )
+            for row in rows
+        ]
+
+        for state in states:
+            self.user_to_current_state[state.user_id] = state
+
+        stream_id = token
+        await self.notify_from_replication(states, stream_id)
+
+    def get_currently_syncing_users_for_replication(self) -> Iterable[str]:
+        return [
+            user_id
+            for user_id, count in self._user_to_num_current_syncs.items()
+            if count > 0
+        ]
+
+    async def set_state(self, target_user, state, ignore_status_msg=False):
+        """Set the presence state of the user."""
+        presence = state["presence"]
+
+        valid_presence = (
+            PresenceState.ONLINE,
+            PresenceState.UNAVAILABLE,
+            PresenceState.OFFLINE,
+            PresenceState.BUSY,
+        )
+
+        if presence not in valid_presence or (
+            presence == PresenceState.BUSY and not self._busy_presence_enabled
+        ):
+            raise SynapseError(400, "Invalid presence state")
+
+        user_id = target_user.to_string()
+
+        # If presence is disabled, no-op
+        if not self.hs.config.use_presence:
+            return
+
+        # Proxy request to master
+        await self._set_state_client(
+            user_id=user_id, state=state, ignore_status_msg=ignore_status_msg
+        )
+
+    async def bump_presence_active_time(self, user):
+        """We've seen the user do something that indicates they're interacting
+        with the app.
+        """
+        # If presence is disabled, no-op
+        if not self.hs.config.use_presence:
+            return
+
+        # Proxy request to master
+        user_id = user.to_string()
+        await self._bump_active_client(user_id=user_id)
+
 
 class PresenceHandler(BasePresenceHandler):
     def __init__(self, hs: "HomeServer"):
         super().__init__(hs)
         self.hs = hs
-        self.is_mine_id = hs.is_mine_id
         self.server_name = hs.hostname
         self.wheel_timer = WheelTimer()
         self.notifier = hs.get_notifier()
-        self.federation = hs.get_federation_sender()
-        self.state = hs.get_state_handler()
-        self.presence_router = hs.get_presence_router()
         self._presence_enabled = hs.config.use_presence
 
         federation_registry = hs.get_federation_registry()
@@ -427,6 +722,13 @@ class PresenceHandler(BasePresenceHandler):
             self.unpersisted_users_changes |= {s.user_id for s in new_states}
             self.unpersisted_users_changes -= set(to_notify.keys())
 
+            # Check if we need to resend any presence states to remote hosts. We
+            # only do this for states that haven't been updated in a while to
+            # ensure that the remote host doesn't time the presence state out.
+            #
+            # Note that since these are states that have *not* been updated,
+            # they won't get sent down the normal presence replication stream,
+            # and so we have to explicitly send them via the federation stream.
             to_federation_ping = {
                 user_id: state
                 for user_id, state in to_federation_ping.items()
@@ -435,7 +737,17 @@ class PresenceHandler(BasePresenceHandler):
             if to_federation_ping:
                 federation_presence_out_counter.inc(len(to_federation_ping))
 
-                self._push_to_remotes(to_federation_ping.values())
+                hosts_and_states = await get_interested_remotes(
+                    self.store,
+                    self.presence_router,
+                    list(to_federation_ping.values()),
+                    self.state,
+                )
+
+                for destinations, states in hosts_and_states:
+                    self._federation_queue.send_presence_to_destinations(
+                        states, destinations
+                    )
 
     async def _handle_timeouts(self):
         """Checks the presence of users that have timed out and updates as
@@ -675,15 +987,10 @@ class PresenceHandler(BasePresenceHandler):
             users=[UserID.from_string(u) for u in users_to_states],
         )
 
-        self._push_to_remotes(states)
-
-    def _push_to_remotes(self, states):
-        """Sends state updates to remote servers.
-
-        Args:
-            states (list(UserPresenceState))
-        """
-        self.federation.send_presence(states)
+        # We only want to poke the local federation sender, if any, as other
+        # workers will receive the presence updates via the presence replication
+        # stream (which is updated by `store.update_presence`).
+        await self.maybe_send_presence_to_interested_destinations(states)
 
     async def incoming_presence(self, origin, content):
         """Called when we receive a `m.presence` EDU from a remote server."""
@@ -921,7 +1228,7 @@ class PresenceHandler(BasePresenceHandler):
 
         # Send out user presence updates for each destination
         for destination, user_state_set in presence_destinations.items():
-            self.federation.send_presence_to_destinations(
+            self._federation_queue.send_presence_to_destinations(
                 destinations=[destination], states=user_state_set
             )
 
@@ -1566,3 +1873,197 @@ async def get_interested_remotes(
         hosts_and_states.append(([host], states))
 
     return hosts_and_states
+
+
+class PresenceFederationQueue:
+    """Handles sending ad hoc presence updates over federation, which are *not*
+    due to state updates (that get handled via the presence stream), e.g.
+    federation pings and sending existing present states to newly joined hosts.
+
+    Only the last N minutes will be queued, so if a federation sender instance
+    is down for longer then some updates will be dropped. This is OK as presence
+    is ephemeral, and so it will self correct eventually.
+
+    On workers the class tracks the last received position of the stream from
+    replication, and handles querying for missed updates over HTTP replication,
+    c.f. `get_current_token` and `get_replication_rows`.
+    """
+
+    # How long to keep entries in the queue for. Workers that are down for
+    # longer than this duration will miss out on older updates.
+    _KEEP_ITEMS_IN_QUEUE_FOR_MS = 5 * 60 * 1000
+
+    # How often to check if we can expire entries from the queue.
+    _CLEAR_ITEMS_EVERY_MS = 60 * 1000
+
+    def __init__(self, hs: "HomeServer", presence_handler: BasePresenceHandler):
+        self._clock = hs.get_clock()
+        self._notifier = hs.get_notifier()
+        self._instance_name = hs.get_instance_name()
+        self._presence_handler = presence_handler
+        self._repl_client = ReplicationGetStreamUpdates.make_client(hs)
+
+        # Should we keep a queue of recent presence updates? We only bother if
+        # another process may be handling federation sending.
+        self._queue_presence_updates = True
+
+        # Whether this instance is a presence writer.
+        self._presence_writer = hs.config.worker.worker_app is None
+
+        # The FederationSender instance, if this process sends federation traffic directly.
+        self._federation = None
+
+        if hs.should_send_federation():
+            self._federation = hs.get_federation_sender()
+
+            # We don't bother queuing up presence states if only this instance
+            # is sending federation.
+            if hs.config.worker.federation_shard_config.instances == [
+                self._instance_name
+            ]:
+                self._queue_presence_updates = False
+
+        # The queue of recently queued updates as tuples of: `(timestamp,
+        # stream_id, destinations, user_ids)`. We don't store the full states
+        # for efficiency, and remote workers will already have the full states
+        # cached.
+        self._queue = []  # type: List[Tuple[int, int, Collection[str], Set[str]]]
+
+        self._next_id = 1
+
+        # Map from instance name to current token
+        self._current_tokens = {}  # type: Dict[str, int]
+
+        if self._queue_presence_updates:
+            self._clock.looping_call(self._clear_queue, self._CLEAR_ITEMS_EVERY_MS)
+
+    def _clear_queue(self):
+        """Clear out older entries from the queue."""
+        clear_before = self._clock.time_msec() - self._KEEP_ITEMS_IN_QUEUE_FOR_MS
+
+        # The queue is sorted by timestamp, so we can bisect to find the right
+        # place to purge before. Note that we are searching using a 1-tuple with
+        # the time, which does The Right Thing since the queue is a tuple where
+        # the first item is a timestamp.
+        index = bisect(self._queue, (clear_before,))
+        self._queue = self._queue[index:]
+
+    def send_presence_to_destinations(
+        self, states: Collection[UserPresenceState], destinations: Collection[str]
+    ) -> None:
+        """Send the presence states to the given destinations.
+
+        Will forward to the local federation sender (if there is one) and queue
+        to send over replication (if there are other federation sender instances.).
+
+        Must only be called on the master process.
+        """
+
+        # This should only be called on a presence writer.
+        assert self._presence_writer
+
+        if self._federation:
+            self._federation.send_presence_to_destinations(
+                states=states,
+                destinations=destinations,
+            )
+
+        if not self._queue_presence_updates:
+            return
+
+        now = self._clock.time_msec()
+
+        stream_id = self._next_id
+        self._next_id += 1
+
+        self._queue.append((now, stream_id, destinations, {s.user_id for s in states}))
+
+        self._notifier.notify_replication()
+
+    def get_current_token(self, instance_name: str) -> int:
+        """Get the current position of the stream.
+
+        On workers this returns the last stream ID received from replication.
+        """
+        if instance_name == self._instance_name:
+            return self._next_id - 1
+        else:
+            return self._current_tokens.get(instance_name, 0)
+
+    async def get_replication_rows(
+        self,
+        instance_name: str,
+        from_token: int,
+        upto_token: int,
+        target_row_count: int,
+    ) -> Tuple[List[Tuple[int, Tuple[str, str]]], int, bool]:
+        """Get all the updates between the two tokens.
+
+        We return rows in the form of `(destination, user_id)` to keep the size
+        of each row bounded (rather than returning the sets in a row).
+
+        On workers this will query the master process via HTTP replication.
+        """
+        if instance_name != self._instance_name:
+            # If not local we query over http replication from the master
+            result = await self._repl_client(
+                instance_name=instance_name,
+                stream_name=PresenceFederationStream.NAME,
+                from_token=from_token,
+                upto_token=upto_token,
+            )
+            return result["updates"], result["upto_token"], result["limited"]
+
+        # We can find the correct position in the queue by noting that there is
+        # exactly one entry per stream ID, and that the last entry has an ID of
+        # `self._next_id - 1`, so we can count backwards from the end.
+        #
+        # Since the start of the queue is periodically truncated we need to
+        # handle the case where `from_token` stream ID has already been dropped.
+        start_idx = max(from_token - self._next_id, -len(self._queue))
+
+        to_send = []  # type: List[Tuple[int, Tuple[str, str]]]
+        limited = False
+        new_id = upto_token
+        for _, stream_id, destinations, user_ids in self._queue[start_idx:]:
+            if stream_id > upto_token:
+                break
+
+            new_id = stream_id
+
+            to_send.extend(
+                (stream_id, (destination, user_id))
+                for destination in destinations
+                for user_id in user_ids
+            )
+
+            if len(to_send) > target_row_count:
+                limited = True
+                break
+
+        return to_send, new_id, limited
+
+    async def process_replication_rows(
+        self, stream_name: str, instance_name: str, token: int, rows: list
+    ):
+        if stream_name != PresenceFederationStream.NAME:
+            return
+
+        # We keep track of the current tokens (so that we can catch up with anything we missed after a disconnect)
+        self._current_tokens[instance_name] = token
+
+        # If we're a federation sender we pull out the presence states to send
+        # and forward them on.
+        if not self._federation:
+            return
+
+        hosts_to_users = {}  # type: Dict[str, Set[str]]
+        for row in rows:
+            hosts_to_users.setdefault(row.destination, set()).add(row.user_id)
+
+        for host, user_ids in hosts_to_users.items():
+            states = await self._presence_handler.current_state_for_users(user_ids)
+            self._federation.send_presence_to_destinations(
+                states=states.values(),
+                destinations=[host],
+            )
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index a755363c3f..05b4a97b59 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py
index a54fe1968e..c679a8303e 100644
--- a/synapse/handlers/read_marker.py
+++ b/synapse/handlers/read_marker.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
 # Copyright 2017 Vector Creations Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index dbfe9bfaca..f782d9db32 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
 # Copyright 2015, 2016 OpenMarket Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 3b6660c873..007fb12840 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
 # Copyright 2014 - 2016 OpenMarket Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 4b3d0d72e3..5a888b7941 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
 # Copyright 2014 - 2016 OpenMarket Ltd
 # Copyright 2018-2019 New Vector Ltd
 # Copyright 2019 The Matrix.org Foundation C.I.C.
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index 924b81db7c..141c9c0444 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
 # Copyright 2014 - 2016 OpenMarket Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 894ef859f4..2bbfac6471 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
 # Copyright 2016-2020 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/synapse/handlers/room_member_worker.py b/synapse/handlers/room_member_worker.py
index 3a90fc0c16..3e89dd2315 100644
--- a/synapse/handlers/room_member_worker.py
+++ b/synapse/handlers/room_member_worker.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
 # Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/synapse/handlers/saml_handler.py b/synapse/handlers/saml_handler.py
index ec2ba11c75..80ba65b9e0 100644
--- a/synapse/handlers/saml_handler.py
+++ b/synapse/handlers/saml_handler.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
 # Copyright 2019 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index d742dfbd53..4e718d3f63 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
 # Copyright 2015, 2016 OpenMarket Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/synapse/handlers/set_password.py b/synapse/handlers/set_password.py
index f98a338ec5..a63fac8283 100644
--- a/synapse/handlers/set_password.py
+++ b/synapse/handlers/set_password.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
 # Copyright 2017 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/synapse/handlers/space_summary.py b/synapse/handlers/space_summary.py
index 5d9418969d..01e3e050f9 100644
--- a/synapse/handlers/space_summary.py
+++ b/synapse/handlers/space_summary.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
 # Copyright 2021 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/synapse/handlers/sso.py b/synapse/handlers/sso.py
index 415b1c2d17..8d00ffdc73 100644
--- a/synapse/handlers/sso.py
+++ b/synapse/handlers/sso.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
 # Copyright 2020 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/synapse/handlers/state_deltas.py b/synapse/handlers/state_deltas.py
index ee8f87e59a..077c7c0649 100644
--- a/synapse/handlers/state_deltas.py
+++ b/synapse/handlers/state_deltas.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
 # Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
index 8730f99d03..383e34026e 100644
--- a/synapse/handlers/stats.py
+++ b/synapse/handlers/stats.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
 # Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index f8d88ef77b..dc8ee8cd17 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
 # Copyright 2015, 2016 OpenMarket Ltd
 # Copyright 2018, 2019 New Vector Ltd
 #
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index bb35af099d..e22393adc4 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/synapse/handlers/ui_auth/__init__.py b/synapse/handlers/ui_auth/__init__.py
index a68d5e790e..4c3b669fae 100644
--- a/synapse/handlers/ui_auth/__init__.py
+++ b/synapse/handlers/ui_auth/__init__.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
 # Copyright 2019 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/synapse/handlers/ui_auth/checkers.py b/synapse/handlers/ui_auth/checkers.py
index 3d66bf305e..0eeb7c03f2 100644
--- a/synapse/handlers/ui_auth/checkers.py
+++ b/synapse/handlers/ui_auth/checkers.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
 # Copyright 2019 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index b121286d95..dacc4f3076 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
 # Copyright 2017 Vector Creations Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
@@ -45,7 +44,6 @@ class UserDirectoryHandler(StateDeltasHandler):
         super().__init__(hs)
 
         self.store = hs.get_datastore()
-        self.state = hs.get_state_handler()
         self.server_name = hs.hostname
         self.clock = hs.get_clock()
         self.notifier = hs.get_notifier()
@@ -303,10 +301,12 @@ class UserDirectoryHandler(StateDeltasHandler):
             # ignore the change
             return
 
-        users_with_profile = await self.state.get_current_users_in_room(room_id)
+        other_users_in_room_with_profiles = (
+            await self.store.get_users_in_room_with_profiles(room_id)
+        )
 
         # Remove every user from the sharing tables for that room.
-        for user_id in users_with_profile.keys():
+        for user_id in other_users_in_room_with_profiles.keys():
             await self.store.remove_user_who_share_room(user_id, room_id)
 
         # Then, re-add them to the tables.
@@ -315,7 +315,7 @@ class UserDirectoryHandler(StateDeltasHandler):
         # which when ran over an entire room, will result in the same values
         # being added multiple times. The batching upserts shouldn't make this
         # too bad, though.
-        for user_id, profile in users_with_profile.items():
+        for user_id, profile in other_users_in_room_with_profiles.items():
             await self._handle_new_user(room_id, user_id, profile)
 
     async def _handle_new_user(
@@ -337,7 +337,7 @@ class UserDirectoryHandler(StateDeltasHandler):
             room_id
         )
         # Now we update users who share rooms with users.
-        users_with_profile = await self.state.get_current_users_in_room(room_id)
+        other_users_in_room = await self.store.get_users_in_room(room_id)
 
         if is_public:
             await self.store.add_users_in_public_rooms(room_id, (user_id,))
@@ -353,14 +353,14 @@ class UserDirectoryHandler(StateDeltasHandler):
 
                 # We don't care about appservice users.
                 if not is_appservice:
-                    for other_user_id in users_with_profile:
+                    for other_user_id in other_users_in_room:
                         if user_id == other_user_id:
                             continue
 
                         to_insert.add((user_id, other_user_id))
 
             # Next we need to update for every local user in the room
-            for other_user_id in users_with_profile:
+            for other_user_id in other_users_in_room:
                 if user_id == other_user_id:
                     continue