diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index b30f41dc4b..f8b234cee2 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -19,11 +19,9 @@
import itertools
import logging
-from typing import Dict, Iterable, List, Optional, Sequence, Tuple
-
-import six
-from six import iteritems, itervalues
-from six.moves import http_client, zip
+from collections.abc import Container
+from http import HTTPStatus
+from typing import Dict, Iterable, List, Optional, Sequence, Tuple, Union
import attr
from signedjson.key import decode_verify_key_bytes
@@ -46,6 +44,7 @@ from synapse.api.errors import (
FederationDeniedError,
FederationError,
HttpResponseException,
+ NotFoundError,
RequestSendFailed,
SynapseError,
)
@@ -63,6 +62,7 @@ from synapse.logging.context import (
run_in_background,
)
from synapse.logging.utils import log_function
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
from synapse.replication.http.federation import (
ReplicationCleanRoomRestServlet,
@@ -71,7 +71,7 @@ from synapse.replication.http.federation import (
)
from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet
from synapse.state import StateResolutionStore, resolve_events_with_store
-from synapse.storage.data_stores.main.events_worker import EventRedactBehaviour
+from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.types import JsonDict, StateMap, UserID, get_domain_from_id
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.distributor import user_joined_room
@@ -243,7 +243,7 @@ class FederationHandler(BaseHandler):
logger.debug("[%s %s] min_depth: %d", room_id, event_id, min_depth)
prevs = set(pdu.prev_event_ids())
- seen = await self.store.have_seen_events(prevs)
+ seen = await self.store.have_events_in_timeline(prevs)
if min_depth is not None and pdu.depth < min_depth:
# This is so that we don't notify the user about this
@@ -283,7 +283,7 @@ class FederationHandler(BaseHandler):
# Update the set of things we've seen after trying to
# fetch the missing stuff
- seen = await self.store.have_seen_events(prevs)
+ seen = await self.store.have_events_in_timeline(prevs)
if not prevs - seen:
logger.info(
@@ -379,6 +379,7 @@ class FederationHandler(BaseHandler):
room_version = await self.store.get_room_version_id(room_id)
state_map = await resolve_events_with_store(
+ self.clock,
room_id,
room_version,
state_maps,
@@ -398,7 +399,7 @@ class FederationHandler(BaseHandler):
)
event_map.update(evs)
- state = [event_map[e] for e in six.itervalues(state_map)]
+ state = [event_map[e] for e in state_map.values()]
except Exception:
logger.warning(
"[%s %s] Error attempting to resolve state at missing "
@@ -428,7 +429,7 @@ class FederationHandler(BaseHandler):
room_id = pdu.room_id
event_id = pdu.event_id
- seen = await self.store.have_seen_events(prevs)
+ seen = await self.store.have_events_in_timeline(prevs)
if not prevs - seen:
return
@@ -619,6 +620,11 @@ class FederationHandler(BaseHandler):
will be omitted from the result. Likewise, any events which turn out not to
be in the given room.
+ This function *does not* automatically get missing auth events of the
+ newly fetched events. Callers must include the full auth chain of
+ of the missing events in the `event_ids` argument, to ensure that any
+ missing auth events are correctly fetched.
+
Returns:
map from event_id to event
"""
@@ -744,6 +750,9 @@ class FederationHandler(BaseHandler):
# device and recognize the algorithm then we can work out the
# exact key to expect. Otherwise check it matches any key we
# have for that device.
+
+ current_keys = [] # type: Container[str]
+
if device:
keys = device.get("keys", {}).get("keys", {})
@@ -760,15 +769,15 @@ class FederationHandler(BaseHandler):
current_keys = keys.values()
elif device_id:
# We don't have any keys for the device ID.
- current_keys = []
+ pass
else:
# The event didn't include a device ID, so we just look for
# keys across all devices.
- current_keys = (
+ current_keys = [
key
for device in cached_devices
for key in device.get("keys", {}).get("keys", {}).values()
- )
+ ]
# We now check that the sender key matches (one of) the expected
# keys.
@@ -782,15 +791,25 @@ class FederationHandler(BaseHandler):
resync = True
if resync:
- await self.store.mark_remote_user_device_cache_as_stale(event.sender)
+ run_as_background_process(
+ "resync_device_due_to_pdu", self._resync_device, event.sender
+ )
- # Immediately attempt a resync in the background
- if self.config.worker_app:
- return run_in_background(self._user_device_resync, event.sender)
- else:
- return run_in_background(
- self._device_list_updater.user_device_resync, event.sender
- )
+ async def _resync_device(self, sender: str) -> None:
+ """We have detected that the device list for the given user may be out
+ of sync, so we try and resync them.
+ """
+
+ try:
+ await self.store.mark_remote_user_device_cache_as_stale(sender)
+
+ # Immediately attempt a resync in the background
+ if self.config.worker_app:
+ await self._user_device_resync(user_id=sender)
+ else:
+ await self._device_list_updater.user_device_resync(sender)
+ except Exception:
+ logger.exception("Failed to resync device for %s", sender)
@log_function
async def backfill(self, dest, room_id, limit, extremities):
@@ -1009,11 +1028,11 @@ class FederationHandler(BaseHandler):
"""
joined_users = [
(state_key, int(event.depth))
- for (e_type, state_key), event in iteritems(state)
+ for (e_type, state_key), event in state.items()
if e_type == EventTypes.Member and event.membership == Membership.JOIN
]
- joined_domains = {}
+ joined_domains = {} # type: Dict[str, int]
for u, d in joined_users:
try:
dom = get_domain_from_id(u)
@@ -1099,16 +1118,16 @@ class FederationHandler(BaseHandler):
states = dict(zip(event_ids, [s.state for s in states]))
state_map = await self.store.get_events(
- [e_id for ids in itervalues(states) for e_id in itervalues(ids)],
+ [e_id for ids in states.values() for e_id in ids.values()],
get_prev_content=False,
)
states = {
key: {
k: state_map[e_id]
- for k, e_id in iteritems(state_dict)
+ for k, e_id in state_dict.items()
if e_id in state_map
}
- for key, state_dict in iteritems(states)
+ for key, state_dict in states.items()
}
for e_id, _ in sorted_extremeties_tuple:
@@ -1129,12 +1148,16 @@ class FederationHandler(BaseHandler):
):
"""Fetch the given events from a server, and persist them as outliers.
+ This function *does not* recursively get missing auth events of the
+ newly fetched events. Callers must include in the `events` argument
+ any missing events from the auth chain.
+
Logs a warning if we can't find the given event.
"""
room_version = await self.store.get_room_version(room_id)
- event_infos = []
+ event_map = {} # type: Dict[str, EventBase]
async def get_event(event_id: str):
with nested_logging_context(event_id):
@@ -1148,17 +1171,7 @@ class FederationHandler(BaseHandler):
)
return
- # recursively fetch the auth events for this event
- auth_events = await self._get_events_from_store_or_dest(
- destination, room_id, event.auth_event_ids()
- )
- auth = {}
- for auth_event_id in event.auth_event_ids():
- ae = auth_events.get(auth_event_id)
- if ae:
- auth[(ae.type, ae.state_key)] = ae
-
- event_infos.append(_NewEventInfo(event, None, auth))
+ event_map[event.event_id] = event
except Exception as e:
logger.warning(
@@ -1170,6 +1183,32 @@ class FederationHandler(BaseHandler):
await concurrently_execute(get_event, events, 5)
+ # Make a map of auth events for each event. We do this after fetching
+ # all the events as some of the events' auth events will be in the list
+ # of requested events.
+
+ auth_events = [
+ aid
+ for event in event_map.values()
+ for aid in event.auth_event_ids()
+ if aid not in event_map
+ ]
+ persisted_events = await self.store.get_events(
+ auth_events, allow_rejected=True,
+ )
+
+ event_infos = []
+ for event in event_map.values():
+ auth = {}
+ for auth_event_id in event.auth_event_ids():
+ ae = persisted_events.get(auth_event_id) or event_map.get(auth_event_id)
+ if ae:
+ auth[(ae.type, ae.state_key)] = ae
+ else:
+ logger.info("Missing auth event %s", auth_event_id)
+
+ event_infos.append(_NewEventInfo(event, None, auth))
+
await self._handle_new_events(
destination, event_infos,
)
@@ -1196,7 +1235,7 @@ class FederationHandler(BaseHandler):
ev.event_id,
len(ev.prev_event_ids()),
)
- raise SynapseError(http_client.BAD_REQUEST, "Too many prev_events")
+ raise SynapseError(HTTPStatus.BAD_REQUEST, "Too many prev_events")
if len(ev.auth_event_ids()) > 10:
logger.warning(
@@ -1204,7 +1243,7 @@ class FederationHandler(BaseHandler):
ev.event_id,
len(ev.auth_event_ids()),
)
- raise SynapseError(http_client.BAD_REQUEST, "Too many auth_events")
+ raise SynapseError(HTTPStatus.BAD_REQUEST, "Too many auth_events")
async def send_invite(self, target_host, event):
""" Sends the invite to the remote server for signing.
@@ -1279,14 +1318,15 @@ class FederationHandler(BaseHandler):
try:
# Try the host we successfully got a response to /make_join/
# request first.
+ host_list = list(target_hosts)
try:
- target_hosts.remove(origin)
- target_hosts.insert(0, origin)
+ host_list.remove(origin)
+ host_list.insert(0, origin)
except ValueError:
pass
ret = await self.federation_client.send_join(
- target_hosts, event, room_version_obj
+ host_list, event, room_version_obj
)
origin = ret["origin"]
@@ -1354,7 +1394,7 @@ class FederationHandler(BaseHandler):
# it's just a best-effort thing at this point. We do want to do
# them roughly in order, though, otherwise we'll end up making
# lots of requests for missing prev_events which we do actually
- # have. Hence we fire off the deferred, but don't wait for it.
+ # have. Hence we fire off the background task, but don't wait for it.
run_in_background(self._handle_queued_pdus, room_queue)
@@ -1400,10 +1440,20 @@ class FederationHandler(BaseHandler):
)
raise SynapseError(403, "User not from origin", Codes.FORBIDDEN)
- event_content = {"membership": Membership.JOIN}
-
+ # checking the room version will check that we've actually heard of the room
+ # (and return a 404 otherwise)
room_version = await self.store.get_room_version_id(room_id)
+ # now check that we are *still* in the room
+ is_in_room = await self.auth.check_host_in_room(room_id, self.server_name)
+ if not is_in_room:
+ logger.info(
+ "Got /make_join request for room %s we are no longer in", room_id,
+ )
+ raise NotFoundError("Not an active room on this server")
+
+ event_content = {"membership": Membership.JOIN}
+
builder = self.event_builder_factory.new(
room_version,
{
@@ -1547,7 +1597,7 @@ class FederationHandler(BaseHandler):
# block any attempts to invite the server notices mxid
if event.state_key == self._server_notices_mxid:
- raise SynapseError(http_client.FORBIDDEN, "Cannot invite this user")
+ raise SynapseError(HTTPStatus.FORBIDDEN, "Cannot invite this user")
# keep a record of the room version, if we don't yet know it.
# (this may get overwritten if we later get a different room version in a
@@ -1564,7 +1614,7 @@ class FederationHandler(BaseHandler):
room_version,
event.get_pdu_json(),
self.hs.hostname,
- self.hs.config.signing_key[0],
+ self.hs.signing_key,
)
)
@@ -1586,13 +1636,14 @@ class FederationHandler(BaseHandler):
# Try the host that we succesfully called /make_leave/ on first for
# the /send_leave/ request.
+ host_list = list(target_hosts)
try:
- target_hosts.remove(origin)
- target_hosts.insert(0, origin)
+ host_list.remove(origin)
+ host_list.insert(0, origin)
except ValueError:
pass
- await self.federation_client.send_leave(target_hosts, event)
+ await self.federation_client.send_leave(host_list, event)
context = await self.state_handler.compute_event_context(event)
stream_id = await self.persist_events_and_notify([(event, context)])
@@ -1606,7 +1657,7 @@ class FederationHandler(BaseHandler):
user_id: str,
membership: str,
content: JsonDict = {},
- params: Optional[Dict[str, str]] = None,
+ params: Optional[Dict[str, Union[str, Iterable[str]]]] = None,
) -> Tuple[str, EventBase, RoomVersion]:
(
origin,
@@ -1726,14 +1777,12 @@ class FederationHandler(BaseHandler):
"""Returns the state at the event. i.e. not including said event.
"""
- event = await self.store.get_event(
- event_id, allow_none=False, check_room_id=room_id
- )
+ event = await self.store.get_event(event_id, check_room_id=room_id)
state_groups = await self.state_store.get_state_groups(room_id, [event_id])
if state_groups:
- _, state = list(iteritems(state_groups)).pop()
+ _, state = list(state_groups.items()).pop()
results = {(e.type, e.state_key): e for e in state}
if event.is_state():
@@ -1754,9 +1803,7 @@ class FederationHandler(BaseHandler):
async def get_state_ids_for_pdu(self, room_id: str, event_id: str) -> List[str]:
"""Returns the state at the event. i.e. not including said event.
"""
- event = await self.store.get_event(
- event_id, allow_none=False, check_room_id=room_id
- )
+ event = await self.store.get_event(event_id, check_room_id=room_id)
state_groups = await self.state_store.get_state_groups_ids(room_id, [event_id])
@@ -1836,9 +1883,6 @@ class FederationHandler(BaseHandler):
origin, event, state=state, auth_events=auth_events, backfilled=backfilled
)
- # reraise does not allow inlineCallbacks to preserve the stacktrace, so we
- # hack around with a try/finally instead.
- success = False
try:
if (
not event.internal_metadata.is_outlier()
@@ -1852,12 +1896,11 @@ class FederationHandler(BaseHandler):
await self.persist_events_and_notify(
[(event, context)], backfilled=backfilled
)
- success = True
- finally:
- if not success:
- run_in_background(
- self.store.remove_push_actions_from_staging, event.event_id
- )
+ except Exception:
+ run_in_background(
+ self.store.remove_push_actions_from_staging, event.event_id
+ )
+ raise
return context
@@ -2017,11 +2060,11 @@ class FederationHandler(BaseHandler):
if not auth_events:
prev_state_ids = await context.get_prev_state_ids()
- auth_events_ids = await self.auth.compute_auth_events(
+ auth_events_ids = self.auth.compute_auth_events(
event, prev_state_ids, for_verification=True
)
- auth_events = await self.store.get_events(auth_events_ids)
- auth_events = {(e.type, e.state_key): e for e in auth_events.values()}
+ auth_events_x = await self.store.get_events(auth_events_ids)
+ auth_events = {(e.type, e.state_key): e for e in auth_events_x.values()}
# This is a hack to fix some old rooms where the initial join event
# didn't reference the create event in its auth events.
@@ -2057,76 +2100,69 @@ class FederationHandler(BaseHandler):
# For new (non-backfilled and non-outlier) events we check if the event
# passes auth based on the current state. If it doesn't then we
# "soft-fail" the event.
- do_soft_fail_check = not backfilled and not event.internal_metadata.is_outlier()
- if do_soft_fail_check:
- extrem_ids = await self.store.get_latest_event_ids_in_room(event.room_id)
-
- extrem_ids = set(extrem_ids)
- prev_event_ids = set(event.prev_event_ids())
-
- if extrem_ids == prev_event_ids:
- # If they're the same then the current state is the same as the
- # state at the event, so no point rechecking auth for soft fail.
- do_soft_fail_check = False
-
- if do_soft_fail_check:
- room_version = await self.store.get_room_version_id(event.room_id)
- room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
-
- # Calculate the "current state".
- if state is not None:
- # If we're explicitly given the state then we won't have all the
- # prev events, and so we have a gap in the graph. In this case
- # we want to be a little careful as we might have been down for
- # a while and have an incorrect view of the current state,
- # however we still want to do checks as gaps are easy to
- # maliciously manufacture.
- #
- # So we use a "current state" that is actually a state
- # resolution across the current forward extremities and the
- # given state at the event. This should correctly handle cases
- # like bans, especially with state res v2.
+ if backfilled or event.internal_metadata.is_outlier():
+ return
- state_sets = await self.state_store.get_state_groups(
- event.room_id, extrem_ids
- )
- state_sets = list(state_sets.values())
- state_sets.append(state)
- current_state_ids = await self.state_handler.resolve_events(
- room_version, state_sets, event
- )
- current_state_ids = {
- k: e.event_id for k, e in iteritems(current_state_ids)
- }
- else:
- current_state_ids = await self.state_handler.get_current_state_ids(
- event.room_id, latest_event_ids=extrem_ids
- )
+ extrem_ids = await self.store.get_latest_event_ids_in_room(event.room_id)
+ extrem_ids = set(extrem_ids)
+ prev_event_ids = set(event.prev_event_ids())
- logger.debug(
- "Doing soft-fail check for %s: state %s",
- event.event_id,
- current_state_ids,
+ if extrem_ids == prev_event_ids:
+ # If they're the same then the current state is the same as the
+ # state at the event, so no point rechecking auth for soft fail.
+ return
+
+ room_version = await self.store.get_room_version_id(event.room_id)
+ room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
+
+ # Calculate the "current state".
+ if state is not None:
+ # If we're explicitly given the state then we won't have all the
+ # prev events, and so we have a gap in the graph. In this case
+ # we want to be a little careful as we might have been down for
+ # a while and have an incorrect view of the current state,
+ # however we still want to do checks as gaps are easy to
+ # maliciously manufacture.
+ #
+ # So we use a "current state" that is actually a state
+ # resolution across the current forward extremities and the
+ # given state at the event. This should correctly handle cases
+ # like bans, especially with state res v2.
+
+ state_sets = await self.state_store.get_state_groups(
+ event.room_id, extrem_ids
+ )
+ state_sets = list(state_sets.values())
+ state_sets.append(state)
+ current_states = await self.state_handler.resolve_events(
+ room_version, state_sets, event
+ )
+ current_state_ids = {k: e.event_id for k, e in current_states.items()}
+ else:
+ current_state_ids = await self.state_handler.get_current_state_ids(
+ event.room_id, latest_event_ids=extrem_ids
)
- # Now check if event pass auth against said current state
- auth_types = auth_types_for_event(event)
- current_state_ids = [
- e for k, e in iteritems(current_state_ids) if k in auth_types
- ]
+ logger.debug(
+ "Doing soft-fail check for %s: state %s", event.event_id, current_state_ids,
+ )
- current_auth_events = await self.store.get_events(current_state_ids)
- current_auth_events = {
- (e.type, e.state_key): e for e in current_auth_events.values()
- }
+ # Now check if event pass auth against said current state
+ auth_types = auth_types_for_event(event)
+ current_state_ids_list = [
+ e for k, e in current_state_ids.items() if k in auth_types
+ ]
- try:
- event_auth.check(
- room_version_obj, event, auth_events=current_auth_events
- )
- except AuthError as e:
- logger.warning("Soft-failing %r because %s", event, e)
- event.internal_metadata.soft_failed = True
+ auth_events_map = await self.store.get_events(current_state_ids_list)
+ current_auth_events = {
+ (e.type, e.state_key): e for e in auth_events_map.values()
+ }
+
+ try:
+ event_auth.check(room_version_obj, event, auth_events=current_auth_events)
+ except AuthError as e:
+ logger.warning("Soft-failing %r because %s", event, e)
+ event.internal_metadata.soft_failed = True
async def on_query_auth(
self, origin, event_id, room_id, remote_auth_chain, rejects, missing
@@ -2135,9 +2171,7 @@ class FederationHandler(BaseHandler):
if not in_room:
raise AuthError(403, "Host not in room.")
- event = await self.store.get_event(
- event_id, allow_none=False, check_room_id=room_id
- )
+ event = await self.store.get_event(event_id, check_room_id=room_id)
# Just go through and process each event in `remote_auth_chain`. We
# don't want to fall into the trap of `missing` being wrong.
@@ -2295,10 +2329,10 @@ class FederationHandler(BaseHandler):
remote_auth_chain = await self.federation_client.get_event_auth(
origin, event.room_id, event.event_id
)
- except RequestSendFailed as e:
+ except RequestSendFailed as e1:
# The other side isn't around or doesn't implement the
# endpoint, so lets just bail out.
- logger.info("Failed to get event auth from remote: %s", e)
+ logger.info("Failed to get event auth from remote: %s", e1)
return context
seen_remotes = await self.store.have_seen_events(
@@ -2428,18 +2462,18 @@ class FederationHandler(BaseHandler):
else:
event_key = None
state_updates = {
- k: a.event_id for k, a in iteritems(auth_events) if k != event_key
+ k: a.event_id for k, a in auth_events.items() if k != event_key
}
current_state_ids = await context.get_current_state_ids()
- current_state_ids = dict(current_state_ids)
+ current_state_ids = dict(current_state_ids) # type: ignore
current_state_ids.update(state_updates)
prev_state_ids = await context.get_prev_state_ids()
prev_state_ids = dict(prev_state_ids)
- prev_state_ids.update({k: a.event_id for k, a in iteritems(auth_events)})
+ prev_state_ids.update({k: a.event_id for k, a in auth_events.items()})
# create a new state group as a delta from the existing one.
prev_group = context.state_group
@@ -2776,7 +2810,8 @@ class FederationHandler(BaseHandler):
logger.debug("Checking auth on event %r", event.content)
- last_exception = None
+ last_exception = None # type: Optional[Exception]
+
# for each public key in the 3pid invite event
for public_key_object in self.hs.get_auth().get_public_keys(invite_event):
try:
@@ -2830,6 +2865,12 @@ class FederationHandler(BaseHandler):
return
except Exception as e:
last_exception = e
+
+ if last_exception is None:
+ # we can only get here if get_public_keys() returned an empty list
+ # TODO: make this better
+ raise RuntimeError("no public key in invite event")
+
raise last_exception
async def _check_key_revocation(self, public_key, url):
@@ -2945,7 +2986,9 @@ class FederationHandler(BaseHandler):
else:
user_joined_room(self.distributor, user, room_id)
- async def get_room_complexity(self, remote_room_hosts, room_id):
+ async def get_room_complexity(
+ self, remote_room_hosts: List[str], room_id: str
+ ) -> Optional[dict]:
"""
Fetch the complexity of a remote room over federation.
@@ -2954,7 +2997,7 @@ class FederationHandler(BaseHandler):
room_id (str): The room ID to ask about.
Returns:
- Deferred[dict] or Deferred[None]: Dict contains the complexity
+ Dict contains the complexity
metric versions, while None means we could not fetch the complexity.
"""
|