diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 488058fe68..8cafcfdab0 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -45,6 +45,7 @@ from synapse.api.errors import (
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersions
from synapse.crypto.event_signing import compute_event_signature
from synapse.event_auth import auth_types_for_event
+from synapse.events.snapshot import EventContext
from synapse.events.validator import EventValidator
from synapse.logging.context import (
make_deferred_yieldable,
@@ -109,6 +110,8 @@ class FederationHandler(BaseHandler):
self.hs = hs
self.store = hs.get_datastore()
+ self.storage = hs.get_storage()
+ self.state_store = self.storage.state
self.federation_client = hs.get_federation_client()
self.state_handler = hs.get_state_handler()
self.server_name = hs.hostname
@@ -180,7 +183,7 @@ class FederationHandler(BaseHandler):
try:
self._sanity_check_event(pdu)
except SynapseError as err:
- logger.warn(
+ logger.warning(
"[%s %s] Received event failed sanity checks", room_id, event_id
)
raise FederationError("ERROR", err.code, err.msg, affected=pdu.event_id)
@@ -301,7 +304,7 @@ class FederationHandler(BaseHandler):
# following.
if sent_to_us_directly:
- logger.warn(
+ logger.warning(
"[%s %s] Rejecting: failed to fetch %d prev events: %s",
room_id,
event_id,
@@ -324,7 +327,7 @@ class FederationHandler(BaseHandler):
event_map = {event_id: pdu}
try:
# Get the state of the events we know about
- ours = yield self.store.get_state_groups_ids(room_id, seen)
+ ours = yield self.state_store.get_state_groups_ids(room_id, seen)
# state_maps is a list of mappings from (type, state_key) to event_id
state_maps = list(
@@ -350,10 +353,11 @@ class FederationHandler(BaseHandler):
# note that if any of the missing prevs share missing state or
# auth events, the requests to fetch those events are deduped
# by the get_pdu_cache in federation_client.
- remote_state, got_auth_chain = (
- yield self.federation_client.get_state_for_room(
- origin, room_id, p
- )
+ (
+ remote_state,
+ got_auth_chain,
+ ) = yield self.federation_client.get_state_for_room(
+ origin, room_id, p
)
# we want the state *after* p; get_state_for_room returns the
@@ -405,7 +409,7 @@ class FederationHandler(BaseHandler):
state = [event_map[e] for e in six.itervalues(state_map)]
auth_chain = list(auth_chains)
except Exception:
- logger.warn(
+ logger.warning(
"[%s %s] Error attempting to resolve state at missing "
"prev_events",
room_id,
@@ -518,7 +522,9 @@ class FederationHandler(BaseHandler):
# We failed to get the missing events, but since we need to handle
# the case of `get_missing_events` not returning the necessary
# events anyway, it is safe to simply log the error and continue.
- logger.warn("[%s %s]: Failed to get prev_events: %s", room_id, event_id, e)
+ logger.warning(
+ "[%s %s]: Failed to get prev_events: %s", room_id, event_id, e
+ )
return
logger.info(
@@ -545,7 +551,7 @@ class FederationHandler(BaseHandler):
yield self.on_receive_pdu(origin, ev, sent_to_us_directly=False)
except FederationError as e:
if e.code == 403:
- logger.warn(
+ logger.warning(
"[%s %s] Received prev_event %s failed history check.",
room_id,
event_id,
@@ -888,7 +894,7 @@ class FederationHandler(BaseHandler):
# We set `check_history_visibility_only` as we might otherwise get false
# positives from users having been erased.
filtered_extremities = yield filter_events_for_server(
- self.store,
+ self.storage,
self.server_name,
list(extremities_events.values()),
redact=False,
@@ -1059,7 +1065,7 @@ class FederationHandler(BaseHandler):
SynapseError if the event does not pass muster
"""
if len(ev.prev_event_ids()) > 20:
- logger.warn(
+ logger.warning(
"Rejecting event %s which has %i prev_events",
ev.event_id,
len(ev.prev_event_ids()),
@@ -1067,7 +1073,7 @@ class FederationHandler(BaseHandler):
raise SynapseError(http_client.BAD_REQUEST, "Too many prev_events")
if len(ev.auth_event_ids()) > 10:
- logger.warn(
+ logger.warning(
"Rejecting event %s which has %i auth_events",
ev.event_id,
len(ev.auth_event_ids()),
@@ -1101,7 +1107,7 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
def do_invite_join(self, target_hosts, room_id, joinee, content):
""" Attempts to join the `joinee` to the room `room_id` via the
- server `target_host`.
+ servers contained in `target_hosts`.
This first triggers a /make_join/ request that returns a partial
event that we can fill out and sign. This is then sent to the
@@ -1110,6 +1116,15 @@ class FederationHandler(BaseHandler):
We suspend processing of any received events from this room until we
have finished processing the join.
+
+ Args:
+ target_hosts (Iterable[str]): List of servers to attempt to join the room with.
+
+ room_id (str): The ID of the room to join.
+
+ joinee (str): The User ID of the joining user.
+
+ content (dict): The event content to use for the join event.
"""
logger.debug("Joining %s to %s", joinee, room_id)
@@ -1169,6 +1184,22 @@ class FederationHandler(BaseHandler):
yield self._persist_auth_tree(origin, auth_chain, state, event)
+ # Check whether this room is the result of an upgrade of a room we already know
+ # about. If so, migrate over user information
+ predecessor = yield self.store.get_room_predecessor(room_id)
+ if not predecessor:
+ return
+ old_room_id = predecessor["room_id"]
+ logger.debug(
+ "Found predecessor for %s during remote join: %s", room_id, old_room_id
+ )
+
+ # We retrieve the room member handler here as to not cause a cyclic dependency
+ member_handler = self.hs.get_room_member_handler()
+ yield member_handler.transfer_room_state_on_room_upgrade(
+ old_room_id, room_id
+ )
+
logger.debug("Finished joining %s to %s", joinee, room_id)
finally:
room_queue = self.room_queues[room_id]
@@ -1203,7 +1234,7 @@ class FederationHandler(BaseHandler):
with nested_logging_context(p.event_id):
yield self.on_receive_pdu(origin, p, sent_to_us_directly=True)
except Exception as e:
- logger.warn(
+ logger.warning(
"Error handling queued PDU %s from %s: %s", p.event_id, origin, e
)
@@ -1250,7 +1281,7 @@ class FederationHandler(BaseHandler):
builder=builder
)
except AuthError as e:
- logger.warn("Failed to create join %r because %s", event, e)
+ logger.warning("Failed to create join to %s because %s", room_id, e)
raise e
event_allowed = yield self.third_party_event_rules.check_event_allowed(
@@ -1494,7 +1525,7 @@ class FederationHandler(BaseHandler):
room_version, event, context, do_sig_check=False
)
except AuthError as e:
- logger.warn("Failed to create new leave %r because %s", event, e)
+ logger.warning("Failed to create new leave %r because %s", event, e)
raise e
return event
@@ -1549,7 +1580,7 @@ class FederationHandler(BaseHandler):
event_id, allow_none=False, check_room_id=room_id
)
- state_groups = yield self.store.get_state_groups(room_id, [event_id])
+ state_groups = yield self.state_store.get_state_groups(room_id, [event_id])
if state_groups:
_, state = list(iteritems(state_groups)).pop()
@@ -1578,7 +1609,7 @@ class FederationHandler(BaseHandler):
event_id, allow_none=False, check_room_id=room_id
)
- state_groups = yield self.store.get_state_groups_ids(room_id, [event_id])
+ state_groups = yield self.state_store.get_state_groups_ids(room_id, [event_id])
if state_groups:
_, state = list(state_groups.items()).pop()
@@ -1606,7 +1637,7 @@ class FederationHandler(BaseHandler):
events = yield self.store.get_backfill_events(room_id, pdu_list, limit)
- events = yield filter_events_for_server(self.store, origin, events)
+ events = yield filter_events_for_server(self.storage, origin, events)
return events
@@ -1636,7 +1667,7 @@ class FederationHandler(BaseHandler):
if not in_room:
raise AuthError(403, "Host not in room.")
- events = yield filter_events_for_server(self.store, origin, [event])
+ events = yield filter_events_for_server(self.storage, origin, [event])
event = events[0]
return event
else:
@@ -1788,7 +1819,7 @@ class FederationHandler(BaseHandler):
# cause SynapseErrors in auth.check. We don't want to give up
# the attempt to federate altogether in such cases.
- logger.warn("Rejecting %s because %s", e.event_id, err.msg)
+ logger.warning("Rejecting %s because %s", e.event_id, err.msg)
if e == event:
raise
@@ -1841,12 +1872,7 @@ class FederationHandler(BaseHandler):
if c and c.type == EventTypes.Create:
auth_events[(c.type, c.state_key)] = c
- try:
- yield self.do_auth(origin, event, context, auth_events=auth_events)
- except AuthError as e:
- logger.warn("[%s %s] Rejecting: %s", event.room_id, event.event_id, e.msg)
-
- context.rejected = RejectedReason.AUTH_ERROR
+ context = yield self.do_auth(origin, event, context, auth_events=auth_events)
if not context.rejected:
yield self._check_for_soft_fail(event, state, backfilled)
@@ -1902,7 +1928,7 @@ class FederationHandler(BaseHandler):
# given state at the event. This should correctly handle cases
# like bans, especially with state res v2.
- state_sets = yield self.store.get_state_groups(
+ state_sets = yield self.state_store.get_state_groups(
event.room_id, extrem_ids
)
state_sets = list(state_sets.values())
@@ -1938,7 +1964,7 @@ class FederationHandler(BaseHandler):
try:
event_auth.check(room_version, event, auth_events=current_auth_events)
except AuthError as e:
- logger.warn("Soft-failing %r because %s", event, e)
+ logger.warning("Soft-failing %r because %s", event, e)
event.internal_metadata.soft_failed = True
@defer.inlineCallbacks
@@ -1993,7 +2019,7 @@ class FederationHandler(BaseHandler):
)
missing_events = yield filter_events_for_server(
- self.store, origin, missing_events
+ self.storage, origin, missing_events
)
return missing_events
@@ -2015,12 +2041,12 @@ class FederationHandler(BaseHandler):
Also NB that this function adds entries to it.
Returns:
- defer.Deferred[None]
+ defer.Deferred[EventContext]: updated context object
"""
room_version = yield self.store.get_room_version(event.room_id)
try:
- yield self._update_auth_events_and_context_for_auth(
+ context = yield self._update_auth_events_and_context_for_auth(
origin, event, context, auth_events
)
except Exception:
@@ -2037,8 +2063,10 @@ class FederationHandler(BaseHandler):
try:
event_auth.check(room_version, event, auth_events=auth_events)
except AuthError as e:
- logger.warn("Failed auth resolution for %r because %s", event, e)
- raise e
+ logger.warning("Failed auth resolution for %r because %s", event, e)
+ context.rejected = RejectedReason.AUTH_ERROR
+
+ return context
@defer.inlineCallbacks
def _update_auth_events_and_context_for_auth(
@@ -2062,7 +2090,7 @@ class FederationHandler(BaseHandler):
auth_events (dict[(str, str)->synapse.events.EventBase]):
Returns:
- defer.Deferred[None]
+ defer.Deferred[EventContext]: updated context
"""
event_auth_events = set(event.auth_event_ids())
@@ -2101,7 +2129,7 @@ class FederationHandler(BaseHandler):
# The other side isn't around or doesn't implement the
# endpoint, so lets just bail out.
logger.info("Failed to get event auth from remote: %s", e)
- return
+ return context
seen_remotes = yield self.store.have_seen_events(
[e.event_id for e in remote_auth_chain]
@@ -2142,7 +2170,7 @@ class FederationHandler(BaseHandler):
if event.internal_metadata.is_outlier():
logger.info("Skipping auth_event fetch for outlier")
- return
+ return context
# FIXME: Assumes we have and stored all the state for all the
# prev_events
@@ -2151,7 +2179,7 @@ class FederationHandler(BaseHandler):
)
if not different_auth:
- return
+ return context
logger.info(
"auth_events refers to events which are not in our calculated auth "
@@ -2198,10 +2226,12 @@ class FederationHandler(BaseHandler):
auth_events.update(new_state)
- yield self._update_context_for_auth_events(
+ context = yield self._update_context_for_auth_events(
event, context, auth_events, event_key
)
+ return context
+
@defer.inlineCallbacks
def _update_context_for_auth_events(self, event, context, auth_events, event_key):
"""Update the state_ids in an event context after auth event resolution,
@@ -2210,14 +2240,16 @@ class FederationHandler(BaseHandler):
Args:
event (Event): The event we're handling the context for
- context (synapse.events.snapshot.EventContext): event context
- to be updated
+ context (synapse.events.snapshot.EventContext): initial event context
auth_events (dict[(str, str)->str]): Events to update in the event
context.
event_key ((str, str)): (type, state_key) for the current event.
this will not be included in the current_state in the context.
+
+ Returns:
+ Deferred[EventContext]: new event context
"""
state_updates = {
k: a.event_id for k, a in iteritems(auth_events) if k != event_key
@@ -2234,7 +2266,7 @@ class FederationHandler(BaseHandler):
# create a new state group as a delta from the existing one.
prev_group = context.state_group
- state_group = yield self.store.store_state_group(
+ state_group = yield self.state_store.store_state_group(
event.event_id,
event.room_id,
prev_group=prev_group,
@@ -2242,7 +2274,7 @@ class FederationHandler(BaseHandler):
current_state_ids=current_state_ids,
)
- yield context.update_state(
+ return EventContext.with_state(
state_group=state_group,
current_state_ids=current_state_ids,
prev_state_ids=prev_state_ids,
@@ -2431,10 +2463,12 @@ class FederationHandler(BaseHandler):
try:
yield self.auth.check_from_context(room_version, event, context)
except AuthError as e:
- logger.warn("Denying new third party invite %r because %s", event, e)
+ logger.warning("Denying new third party invite %r because %s", event, e)
raise e
yield self._check_signature(event, context)
+
+ # We retrieve the room member handler here as to not cause a cyclic dependency
member_handler = self.hs.get_room_member_handler()
yield member_handler.send_membership_event(None, event, context)
else:
@@ -2487,7 +2521,7 @@ class FederationHandler(BaseHandler):
try:
yield self.auth.check_from_context(room_version, event, context)
except AuthError as e:
- logger.warn("Denying third party invite %r because %s", event, e)
+ logger.warning("Denying third party invite %r because %s", event, e)
raise e
yield self._check_signature(event, context)
@@ -2495,6 +2529,7 @@ class FederationHandler(BaseHandler):
# though the sender isn't a local user.
event.internal_metadata.send_on_behalf_of = get_domain_from_id(event.sender)
+ # We retrieve the room member handler here as to not cause a cyclic dependency
member_handler = self.hs.get_room_member_handler()
yield member_handler.send_membership_event(None, event, context)
@@ -2664,7 +2699,7 @@ class FederationHandler(BaseHandler):
backfilled=backfilled,
)
else:
- max_stream_id = yield self.store.persist_events(
+ max_stream_id = yield self.storage.persistence.persist_events(
event_and_contexts, backfilled=backfilled
)
|