summary refs log tree commit diff
path: root/synapse/handlers/federation.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/federation.py')
-rw-r--r--synapse/handlers/federation.py335
1 files changed, 189 insertions, 146 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index b30f41dc4b..f8b234cee2 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -19,11 +19,9 @@
 
 import itertools
 import logging
-from typing import Dict, Iterable, List, Optional, Sequence, Tuple
-
-import six
-from six import iteritems, itervalues
-from six.moves import http_client, zip
+from collections.abc import Container
+from http import HTTPStatus
+from typing import Dict, Iterable, List, Optional, Sequence, Tuple, Union
 
 import attr
 from signedjson.key import decode_verify_key_bytes
@@ -46,6 +44,7 @@ from synapse.api.errors import (
     FederationDeniedError,
     FederationError,
     HttpResponseException,
+    NotFoundError,
     RequestSendFailed,
     SynapseError,
 )
@@ -63,6 +62,7 @@ from synapse.logging.context import (
     run_in_background,
 )
 from synapse.logging.utils import log_function
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
 from synapse.replication.http.federation import (
     ReplicationCleanRoomRestServlet,
@@ -71,7 +71,7 @@ from synapse.replication.http.federation import (
 )
 from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet
 from synapse.state import StateResolutionStore, resolve_events_with_store
-from synapse.storage.data_stores.main.events_worker import EventRedactBehaviour
+from synapse.storage.databases.main.events_worker import EventRedactBehaviour
 from synapse.types import JsonDict, StateMap, UserID, get_domain_from_id
 from synapse.util.async_helpers import Linearizer, concurrently_execute
 from synapse.util.distributor import user_joined_room
@@ -243,7 +243,7 @@ class FederationHandler(BaseHandler):
             logger.debug("[%s %s] min_depth: %d", room_id, event_id, min_depth)
 
             prevs = set(pdu.prev_event_ids())
-            seen = await self.store.have_seen_events(prevs)
+            seen = await self.store.have_events_in_timeline(prevs)
 
             if min_depth is not None and pdu.depth < min_depth:
                 # This is so that we don't notify the user about this
@@ -283,7 +283,7 @@ class FederationHandler(BaseHandler):
 
                         # Update the set of things we've seen after trying to
                         # fetch the missing stuff
-                        seen = await self.store.have_seen_events(prevs)
+                        seen = await self.store.have_events_in_timeline(prevs)
 
                         if not prevs - seen:
                             logger.info(
@@ -379,6 +379,7 @@ class FederationHandler(BaseHandler):
 
                     room_version = await self.store.get_room_version_id(room_id)
                     state_map = await resolve_events_with_store(
+                        self.clock,
                         room_id,
                         room_version,
                         state_maps,
@@ -398,7 +399,7 @@ class FederationHandler(BaseHandler):
                     )
                     event_map.update(evs)
 
-                    state = [event_map[e] for e in six.itervalues(state_map)]
+                    state = [event_map[e] for e in state_map.values()]
                 except Exception:
                     logger.warning(
                         "[%s %s] Error attempting to resolve state at missing "
@@ -428,7 +429,7 @@ class FederationHandler(BaseHandler):
         room_id = pdu.room_id
         event_id = pdu.event_id
 
-        seen = await self.store.have_seen_events(prevs)
+        seen = await self.store.have_events_in_timeline(prevs)
 
         if not prevs - seen:
             return
@@ -619,6 +620,11 @@ class FederationHandler(BaseHandler):
         will be omitted from the result. Likewise, any events which turn out not to
         be in the given room.
 
+        This function *does not* automatically get missing auth events of the
+        newly fetched events. Callers must include the full auth chain of
+        of the missing events in the `event_ids` argument, to ensure that any
+        missing auth events are correctly fetched.
+
         Returns:
             map from event_id to event
         """
@@ -744,6 +750,9 @@ class FederationHandler(BaseHandler):
                 # device and recognize the algorithm then we can work out the
                 # exact key to expect. Otherwise check it matches any key we
                 # have for that device.
+
+                current_keys = []  # type: Container[str]
+
                 if device:
                     keys = device.get("keys", {}).get("keys", {})
 
@@ -760,15 +769,15 @@ class FederationHandler(BaseHandler):
                         current_keys = keys.values()
                 elif device_id:
                     # We don't have any keys for the device ID.
-                    current_keys = []
+                    pass
                 else:
                     # The event didn't include a device ID, so we just look for
                     # keys across all devices.
-                    current_keys = (
+                    current_keys = [
                         key
                         for device in cached_devices
                         for key in device.get("keys", {}).get("keys", {}).values()
-                    )
+                    ]
 
                 # We now check that the sender key matches (one of) the expected
                 # keys.
@@ -782,15 +791,25 @@ class FederationHandler(BaseHandler):
                     resync = True
 
             if resync:
-                await self.store.mark_remote_user_device_cache_as_stale(event.sender)
+                run_as_background_process(
+                    "resync_device_due_to_pdu", self._resync_device, event.sender
+                )
 
-                # Immediately attempt a resync in the background
-                if self.config.worker_app:
-                    return run_in_background(self._user_device_resync, event.sender)
-                else:
-                    return run_in_background(
-                        self._device_list_updater.user_device_resync, event.sender
-                    )
+    async def _resync_device(self, sender: str) -> None:
+        """We have detected that the device list for the given user may be out
+        of sync, so we try and resync them.
+        """
+
+        try:
+            await self.store.mark_remote_user_device_cache_as_stale(sender)
+
+            # Immediately attempt a resync in the background
+            if self.config.worker_app:
+                await self._user_device_resync(user_id=sender)
+            else:
+                await self._device_list_updater.user_device_resync(sender)
+        except Exception:
+            logger.exception("Failed to resync device for %s", sender)
 
     @log_function
     async def backfill(self, dest, room_id, limit, extremities):
@@ -1009,11 +1028,11 @@ class FederationHandler(BaseHandler):
             """
             joined_users = [
                 (state_key, int(event.depth))
-                for (e_type, state_key), event in iteritems(state)
+                for (e_type, state_key), event in state.items()
                 if e_type == EventTypes.Member and event.membership == Membership.JOIN
             ]
 
-            joined_domains = {}
+            joined_domains = {}  # type: Dict[str, int]
             for u, d in joined_users:
                 try:
                     dom = get_domain_from_id(u)
@@ -1099,16 +1118,16 @@ class FederationHandler(BaseHandler):
         states = dict(zip(event_ids, [s.state for s in states]))
 
         state_map = await self.store.get_events(
-            [e_id for ids in itervalues(states) for e_id in itervalues(ids)],
+            [e_id for ids in states.values() for e_id in ids.values()],
             get_prev_content=False,
         )
         states = {
             key: {
                 k: state_map[e_id]
-                for k, e_id in iteritems(state_dict)
+                for k, e_id in state_dict.items()
                 if e_id in state_map
             }
-            for key, state_dict in iteritems(states)
+            for key, state_dict in states.items()
         }
 
         for e_id, _ in sorted_extremeties_tuple:
@@ -1129,12 +1148,16 @@ class FederationHandler(BaseHandler):
     ):
         """Fetch the given events from a server, and persist them as outliers.
 
+        This function *does not* recursively get missing auth events of the
+        newly fetched events. Callers must include in the `events` argument
+        any missing events from the auth chain.
+
         Logs a warning if we can't find the given event.
         """
 
         room_version = await self.store.get_room_version(room_id)
 
-        event_infos = []
+        event_map = {}  # type: Dict[str, EventBase]
 
         async def get_event(event_id: str):
             with nested_logging_context(event_id):
@@ -1148,17 +1171,7 @@ class FederationHandler(BaseHandler):
                         )
                         return
 
-                    # recursively fetch the auth events for this event
-                    auth_events = await self._get_events_from_store_or_dest(
-                        destination, room_id, event.auth_event_ids()
-                    )
-                    auth = {}
-                    for auth_event_id in event.auth_event_ids():
-                        ae = auth_events.get(auth_event_id)
-                        if ae:
-                            auth[(ae.type, ae.state_key)] = ae
-
-                    event_infos.append(_NewEventInfo(event, None, auth))
+                    event_map[event.event_id] = event
 
                 except Exception as e:
                     logger.warning(
@@ -1170,6 +1183,32 @@ class FederationHandler(BaseHandler):
 
         await concurrently_execute(get_event, events, 5)
 
+        # Make a map of auth events for each event. We do this after fetching
+        # all the events as some of the events' auth events will be in the list
+        # of requested events.
+
+        auth_events = [
+            aid
+            for event in event_map.values()
+            for aid in event.auth_event_ids()
+            if aid not in event_map
+        ]
+        persisted_events = await self.store.get_events(
+            auth_events, allow_rejected=True,
+        )
+
+        event_infos = []
+        for event in event_map.values():
+            auth = {}
+            for auth_event_id in event.auth_event_ids():
+                ae = persisted_events.get(auth_event_id) or event_map.get(auth_event_id)
+                if ae:
+                    auth[(ae.type, ae.state_key)] = ae
+                else:
+                    logger.info("Missing auth event %s", auth_event_id)
+
+            event_infos.append(_NewEventInfo(event, None, auth))
+
         await self._handle_new_events(
             destination, event_infos,
         )
@@ -1196,7 +1235,7 @@ class FederationHandler(BaseHandler):
                 ev.event_id,
                 len(ev.prev_event_ids()),
             )
-            raise SynapseError(http_client.BAD_REQUEST, "Too many prev_events")
+            raise SynapseError(HTTPStatus.BAD_REQUEST, "Too many prev_events")
 
         if len(ev.auth_event_ids()) > 10:
             logger.warning(
@@ -1204,7 +1243,7 @@ class FederationHandler(BaseHandler):
                 ev.event_id,
                 len(ev.auth_event_ids()),
             )
-            raise SynapseError(http_client.BAD_REQUEST, "Too many auth_events")
+            raise SynapseError(HTTPStatus.BAD_REQUEST, "Too many auth_events")
 
     async def send_invite(self, target_host, event):
         """ Sends the invite to the remote server for signing.
@@ -1279,14 +1318,15 @@ class FederationHandler(BaseHandler):
         try:
             # Try the host we successfully got a response to /make_join/
             # request first.
+            host_list = list(target_hosts)
             try:
-                target_hosts.remove(origin)
-                target_hosts.insert(0, origin)
+                host_list.remove(origin)
+                host_list.insert(0, origin)
             except ValueError:
                 pass
 
             ret = await self.federation_client.send_join(
-                target_hosts, event, room_version_obj
+                host_list, event, room_version_obj
             )
 
             origin = ret["origin"]
@@ -1354,7 +1394,7 @@ class FederationHandler(BaseHandler):
             # it's just a best-effort thing at this point. We do want to do
             # them roughly in order, though, otherwise we'll end up making
             # lots of requests for missing prev_events which we do actually
-            # have. Hence we fire off the deferred, but don't wait for it.
+            # have. Hence we fire off the background task, but don't wait for it.
 
             run_in_background(self._handle_queued_pdus, room_queue)
 
@@ -1400,10 +1440,20 @@ class FederationHandler(BaseHandler):
             )
             raise SynapseError(403, "User not from origin", Codes.FORBIDDEN)
 
-        event_content = {"membership": Membership.JOIN}
-
+        # checking the room version will check that we've actually heard of the room
+        # (and return a 404 otherwise)
         room_version = await self.store.get_room_version_id(room_id)
 
+        # now check that we are *still* in the room
+        is_in_room = await self.auth.check_host_in_room(room_id, self.server_name)
+        if not is_in_room:
+            logger.info(
+                "Got /make_join request for room %s we are no longer in", room_id,
+            )
+            raise NotFoundError("Not an active room on this server")
+
+        event_content = {"membership": Membership.JOIN}
+
         builder = self.event_builder_factory.new(
             room_version,
             {
@@ -1547,7 +1597,7 @@ class FederationHandler(BaseHandler):
 
         # block any attempts to invite the server notices mxid
         if event.state_key == self._server_notices_mxid:
-            raise SynapseError(http_client.FORBIDDEN, "Cannot invite this user")
+            raise SynapseError(HTTPStatus.FORBIDDEN, "Cannot invite this user")
 
         # keep a record of the room version, if we don't yet know it.
         # (this may get overwritten if we later get a different room version in a
@@ -1564,7 +1614,7 @@ class FederationHandler(BaseHandler):
                 room_version,
                 event.get_pdu_json(),
                 self.hs.hostname,
-                self.hs.config.signing_key[0],
+                self.hs.signing_key,
             )
         )
 
@@ -1586,13 +1636,14 @@ class FederationHandler(BaseHandler):
 
         # Try the host that we succesfully called /make_leave/ on first for
         # the /send_leave/ request.
+        host_list = list(target_hosts)
         try:
-            target_hosts.remove(origin)
-            target_hosts.insert(0, origin)
+            host_list.remove(origin)
+            host_list.insert(0, origin)
         except ValueError:
             pass
 
-        await self.federation_client.send_leave(target_hosts, event)
+        await self.federation_client.send_leave(host_list, event)
 
         context = await self.state_handler.compute_event_context(event)
         stream_id = await self.persist_events_and_notify([(event, context)])
@@ -1606,7 +1657,7 @@ class FederationHandler(BaseHandler):
         user_id: str,
         membership: str,
         content: JsonDict = {},
-        params: Optional[Dict[str, str]] = None,
+        params: Optional[Dict[str, Union[str, Iterable[str]]]] = None,
     ) -> Tuple[str, EventBase, RoomVersion]:
         (
             origin,
@@ -1726,14 +1777,12 @@ class FederationHandler(BaseHandler):
         """Returns the state at the event. i.e. not including said event.
         """
 
-        event = await self.store.get_event(
-            event_id, allow_none=False, check_room_id=room_id
-        )
+        event = await self.store.get_event(event_id, check_room_id=room_id)
 
         state_groups = await self.state_store.get_state_groups(room_id, [event_id])
 
         if state_groups:
-            _, state = list(iteritems(state_groups)).pop()
+            _, state = list(state_groups.items()).pop()
             results = {(e.type, e.state_key): e for e in state}
 
             if event.is_state():
@@ -1754,9 +1803,7 @@ class FederationHandler(BaseHandler):
     async def get_state_ids_for_pdu(self, room_id: str, event_id: str) -> List[str]:
         """Returns the state at the event. i.e. not including said event.
         """
-        event = await self.store.get_event(
-            event_id, allow_none=False, check_room_id=room_id
-        )
+        event = await self.store.get_event(event_id, check_room_id=room_id)
 
         state_groups = await self.state_store.get_state_groups_ids(room_id, [event_id])
 
@@ -1836,9 +1883,6 @@ class FederationHandler(BaseHandler):
             origin, event, state=state, auth_events=auth_events, backfilled=backfilled
         )
 
-        # reraise does not allow inlineCallbacks to preserve the stacktrace, so we
-        # hack around with a try/finally instead.
-        success = False
         try:
             if (
                 not event.internal_metadata.is_outlier()
@@ -1852,12 +1896,11 @@ class FederationHandler(BaseHandler):
             await self.persist_events_and_notify(
                 [(event, context)], backfilled=backfilled
             )
-            success = True
-        finally:
-            if not success:
-                run_in_background(
-                    self.store.remove_push_actions_from_staging, event.event_id
-                )
+        except Exception:
+            run_in_background(
+                self.store.remove_push_actions_from_staging, event.event_id
+            )
+            raise
 
         return context
 
@@ -2017,11 +2060,11 @@ class FederationHandler(BaseHandler):
 
         if not auth_events:
             prev_state_ids = await context.get_prev_state_ids()
-            auth_events_ids = await self.auth.compute_auth_events(
+            auth_events_ids = self.auth.compute_auth_events(
                 event, prev_state_ids, for_verification=True
             )
-            auth_events = await self.store.get_events(auth_events_ids)
-            auth_events = {(e.type, e.state_key): e for e in auth_events.values()}
+            auth_events_x = await self.store.get_events(auth_events_ids)
+            auth_events = {(e.type, e.state_key): e for e in auth_events_x.values()}
 
         # This is a hack to fix some old rooms where the initial join event
         # didn't reference the create event in its auth events.
@@ -2057,76 +2100,69 @@ class FederationHandler(BaseHandler):
         # For new (non-backfilled and non-outlier) events we check if the event
         # passes auth based on the current state. If it doesn't then we
         # "soft-fail" the event.
-        do_soft_fail_check = not backfilled and not event.internal_metadata.is_outlier()
-        if do_soft_fail_check:
-            extrem_ids = await self.store.get_latest_event_ids_in_room(event.room_id)
-
-            extrem_ids = set(extrem_ids)
-            prev_event_ids = set(event.prev_event_ids())
-
-            if extrem_ids == prev_event_ids:
-                # If they're the same then the current state is the same as the
-                # state at the event, so no point rechecking auth for soft fail.
-                do_soft_fail_check = False
-
-        if do_soft_fail_check:
-            room_version = await self.store.get_room_version_id(event.room_id)
-            room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
-
-            # Calculate the "current state".
-            if state is not None:
-                # If we're explicitly given the state then we won't have all the
-                # prev events, and so we have a gap in the graph. In this case
-                # we want to be a little careful as we might have been down for
-                # a while and have an incorrect view of the current state,
-                # however we still want to do checks as gaps are easy to
-                # maliciously manufacture.
-                #
-                # So we use a "current state" that is actually a state
-                # resolution across the current forward extremities and the
-                # given state at the event. This should correctly handle cases
-                # like bans, especially with state res v2.
+        if backfilled or event.internal_metadata.is_outlier():
+            return
 
-                state_sets = await self.state_store.get_state_groups(
-                    event.room_id, extrem_ids
-                )
-                state_sets = list(state_sets.values())
-                state_sets.append(state)
-                current_state_ids = await self.state_handler.resolve_events(
-                    room_version, state_sets, event
-                )
-                current_state_ids = {
-                    k: e.event_id for k, e in iteritems(current_state_ids)
-                }
-            else:
-                current_state_ids = await self.state_handler.get_current_state_ids(
-                    event.room_id, latest_event_ids=extrem_ids
-                )
+        extrem_ids = await self.store.get_latest_event_ids_in_room(event.room_id)
+        extrem_ids = set(extrem_ids)
+        prev_event_ids = set(event.prev_event_ids())
 
-            logger.debug(
-                "Doing soft-fail check for %s: state %s",
-                event.event_id,
-                current_state_ids,
+        if extrem_ids == prev_event_ids:
+            # If they're the same then the current state is the same as the
+            # state at the event, so no point rechecking auth for soft fail.
+            return
+
+        room_version = await self.store.get_room_version_id(event.room_id)
+        room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
+
+        # Calculate the "current state".
+        if state is not None:
+            # If we're explicitly given the state then we won't have all the
+            # prev events, and so we have a gap in the graph. In this case
+            # we want to be a little careful as we might have been down for
+            # a while and have an incorrect view of the current state,
+            # however we still want to do checks as gaps are easy to
+            # maliciously manufacture.
+            #
+            # So we use a "current state" that is actually a state
+            # resolution across the current forward extremities and the
+            # given state at the event. This should correctly handle cases
+            # like bans, especially with state res v2.
+
+            state_sets = await self.state_store.get_state_groups(
+                event.room_id, extrem_ids
+            )
+            state_sets = list(state_sets.values())
+            state_sets.append(state)
+            current_states = await self.state_handler.resolve_events(
+                room_version, state_sets, event
+            )
+            current_state_ids = {k: e.event_id for k, e in current_states.items()}
+        else:
+            current_state_ids = await self.state_handler.get_current_state_ids(
+                event.room_id, latest_event_ids=extrem_ids
             )
 
-            # Now check if event pass auth against said current state
-            auth_types = auth_types_for_event(event)
-            current_state_ids = [
-                e for k, e in iteritems(current_state_ids) if k in auth_types
-            ]
+        logger.debug(
+            "Doing soft-fail check for %s: state %s", event.event_id, current_state_ids,
+        )
 
-            current_auth_events = await self.store.get_events(current_state_ids)
-            current_auth_events = {
-                (e.type, e.state_key): e for e in current_auth_events.values()
-            }
+        # Now check if event pass auth against said current state
+        auth_types = auth_types_for_event(event)
+        current_state_ids_list = [
+            e for k, e in current_state_ids.items() if k in auth_types
+        ]
 
-            try:
-                event_auth.check(
-                    room_version_obj, event, auth_events=current_auth_events
-                )
-            except AuthError as e:
-                logger.warning("Soft-failing %r because %s", event, e)
-                event.internal_metadata.soft_failed = True
+        auth_events_map = await self.store.get_events(current_state_ids_list)
+        current_auth_events = {
+            (e.type, e.state_key): e for e in auth_events_map.values()
+        }
+
+        try:
+            event_auth.check(room_version_obj, event, auth_events=current_auth_events)
+        except AuthError as e:
+            logger.warning("Soft-failing %r because %s", event, e)
+            event.internal_metadata.soft_failed = True
 
     async def on_query_auth(
         self, origin, event_id, room_id, remote_auth_chain, rejects, missing
@@ -2135,9 +2171,7 @@ class FederationHandler(BaseHandler):
         if not in_room:
             raise AuthError(403, "Host not in room.")
 
-        event = await self.store.get_event(
-            event_id, allow_none=False, check_room_id=room_id
-        )
+        event = await self.store.get_event(event_id, check_room_id=room_id)
 
         # Just go through and process each event in `remote_auth_chain`. We
         # don't want to fall into the trap of `missing` being wrong.
@@ -2295,10 +2329,10 @@ class FederationHandler(BaseHandler):
                     remote_auth_chain = await self.federation_client.get_event_auth(
                         origin, event.room_id, event.event_id
                     )
-                except RequestSendFailed as e:
+                except RequestSendFailed as e1:
                     # The other side isn't around or doesn't implement the
                     # endpoint, so lets just bail out.
-                    logger.info("Failed to get event auth from remote: %s", e)
+                    logger.info("Failed to get event auth from remote: %s", e1)
                     return context
 
                 seen_remotes = await self.store.have_seen_events(
@@ -2428,18 +2462,18 @@ class FederationHandler(BaseHandler):
         else:
             event_key = None
         state_updates = {
-            k: a.event_id for k, a in iteritems(auth_events) if k != event_key
+            k: a.event_id for k, a in auth_events.items() if k != event_key
         }
 
         current_state_ids = await context.get_current_state_ids()
-        current_state_ids = dict(current_state_ids)
+        current_state_ids = dict(current_state_ids)  # type: ignore
 
         current_state_ids.update(state_updates)
 
         prev_state_ids = await context.get_prev_state_ids()
         prev_state_ids = dict(prev_state_ids)
 
-        prev_state_ids.update({k: a.event_id for k, a in iteritems(auth_events)})
+        prev_state_ids.update({k: a.event_id for k, a in auth_events.items()})
 
         # create a new state group as a delta from the existing one.
         prev_group = context.state_group
@@ -2776,7 +2810,8 @@ class FederationHandler(BaseHandler):
 
         logger.debug("Checking auth on event %r", event.content)
 
-        last_exception = None
+        last_exception = None  # type: Optional[Exception]
+
         # for each public key in the 3pid invite event
         for public_key_object in self.hs.get_auth().get_public_keys(invite_event):
             try:
@@ -2830,6 +2865,12 @@ class FederationHandler(BaseHandler):
                         return
             except Exception as e:
                 last_exception = e
+
+        if last_exception is None:
+            # we can only get here if get_public_keys() returned an empty list
+            # TODO: make this better
+            raise RuntimeError("no public key in invite event")
+
         raise last_exception
 
     async def _check_key_revocation(self, public_key, url):
@@ -2945,7 +2986,9 @@ class FederationHandler(BaseHandler):
         else:
             user_joined_room(self.distributor, user, room_id)
 
-    async def get_room_complexity(self, remote_room_hosts, room_id):
+    async def get_room_complexity(
+        self, remote_room_hosts: List[str], room_id: str
+    ) -> Optional[dict]:
         """
         Fetch the complexity of a remote room over federation.
 
@@ -2954,7 +2997,7 @@ class FederationHandler(BaseHandler):
             room_id (str): The room ID to ask about.
 
         Returns:
-            Deferred[dict] or Deferred[None]: Dict contains the complexity
+            Dict contains the complexity
             metric versions, while None means we could not fetch the complexity.
         """