diff options
Diffstat (limited to 'synapse/handlers/federation.py')
-rw-r--r-- | synapse/handlers/federation.py | 665 |
1 files changed, 342 insertions, 323 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 495ac4c648..0ebf0fd188 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -20,37 +20,51 @@ import itertools import logging import sys +import six +from six import iteritems, itervalues +from six.moves import http_client, zip + from signedjson.key import decode_verify_key_bytes from signedjson.sign import verify_signed_json -import six -from six.moves import http_client -from six import iteritems -from twisted.internet import defer from unpaddedbase64 import decode_base64 -from ._base import BaseHandler +from twisted.internet import defer +from synapse.api.constants import ( + KNOWN_ROOM_VERSIONS, + EventTypes, + Membership, + RejectedReason, +) from synapse.api.errors import ( - AuthError, FederationError, StoreError, CodeMessageException, SynapseError, + AuthError, + CodeMessageException, FederationDeniedError, + FederationError, + StoreError, + SynapseError, ) -from synapse.api.constants import EventTypes, Membership, RejectedReason -from synapse.events.validator import EventValidator -from synapse.util import unwrapFirstError, logcontext -from synapse.util.metrics import measure_func -from synapse.util.logutils import log_function -from synapse.util.async import run_on_reactor, Linearizer -from synapse.util.frozenutils import unfreeze from synapse.crypto.event_signing import ( - compute_event_signature, add_hashes_and_signatures, + add_hashes_and_signatures, + compute_event_signature, ) +from synapse.events.validator import EventValidator +from synapse.replication.http.federation import ( + ReplicationCleanRoomRestServlet, + ReplicationFederationSendEventsRestServlet, +) +from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet +from synapse.state import resolve_events_with_factory from synapse.types import UserID, get_domain_from_id - -from synapse.events.utils import prune_event - +from synapse.util import logcontext, unwrapFirstError +from synapse.util.async_helpers import Linearizer +from synapse.util.distributor import user_joined_room +from synapse.util.frozenutils import unfreeze +from synapse.util.logutils import log_function from synapse.util.retryutils import NotRetryingDestination +from synapse.visibility import filter_events_for_server -from synapse.util.distributor import user_joined_room +from ._base import BaseHandler logger = logging.getLogger(__name__) @@ -72,7 +86,7 @@ class FederationHandler(BaseHandler): self.hs = hs self.store = hs.get_datastore() - self.replication_layer = hs.get_federation_client() + self.federation_client = hs.get_federation_client() self.state_handler = hs.get_state_handler() self.server_name = hs.hostname self.keyring = hs.get_keyring() @@ -82,6 +96,18 @@ class FederationHandler(BaseHandler): self.spam_checker = hs.get_spam_checker() self.event_creation_handler = hs.get_event_creation_handler() self._server_notices_mxid = hs.config.server_notices_mxid + self.config = hs.config + self.http_client = hs.get_simple_http_client() + + self._send_events_to_master = ( + ReplicationFederationSendEventsRestServlet.make_client(hs) + ) + self._notify_user_membership_change = ( + ReplicationUserJoinedLeftRoomRestServlet.make_client(hs) + ) + self._clean_room_for_join_client = ( + ReplicationCleanRoomRestServlet.make_client(hs) + ) # When joining a room we need to queue any events for that room up self.room_queues = {} @@ -89,7 +115,9 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks @log_function - def on_receive_pdu(self, origin, pdu, get_missing=True): + def on_receive_pdu( + self, origin, pdu, get_missing=True, sent_to_us_directly=False, + ): """ Process a PDU received via a federation /send/ transaction, or via backfill of missing prev_events @@ -103,8 +131,10 @@ class FederationHandler(BaseHandler): """ # We reprocess pdus when we have seen them only as outliers - existing = yield self.get_persisted_pdu( - origin, pdu.event_id, do_auth=False + existing = yield self.store.get_event( + pdu.event_id, + allow_none=True, + allow_rejected=True, ) # FIXME: Currently we fetch an event again when we already have it @@ -161,14 +191,11 @@ class FederationHandler(BaseHandler): "Ignoring PDU %s for room %s from %s as we've left the room!", pdu.event_id, pdu.room_id, origin, ) - return + defer.returnValue(None) state = None - auth_chain = [] - fetch_state = False - # Get missing pdus if necessary. if not pdu.internal_metadata.is_outlier(): # We only backfill backwards to the min depth. @@ -223,26 +250,61 @@ class FederationHandler(BaseHandler): list(prevs - seen)[:5], ) - if prevs - seen: - logger.info( - "Still missing %d events for room %r: %r...", - len(prevs - seen), pdu.room_id, list(prevs - seen)[:5] + if sent_to_us_directly and prevs - seen: + # If they have sent it to us directly, and the server + # isn't telling us about the auth events that it's + # made a message referencing, we explode + raise FederationError( + "ERROR", + 403, + ( + "Your server isn't divulging details about prev_events " + "referenced in this event." + ), + affected=pdu.event_id, ) - fetch_state = True + elif prevs - seen: + # Calculate the state of the previous events, and + # de-conflict them to find the current state. + state_groups = [] + auth_chains = set() + try: + # Get the state of the events we know about + ours = yield self.store.get_state_groups(pdu.room_id, list(seen)) + state_groups.append(ours) + + # Ask the remote server for the states we don't + # know about + for p in prevs - seen: + state, got_auth_chain = ( + yield self.federation_client.get_state_for_room( + origin, pdu.room_id, p + ) + ) + auth_chains.update(got_auth_chain) + state_group = {(x.type, x.state_key): x.event_id for x in state} + state_groups.append(state_group) + + # Resolve any conflicting state + def fetch(ev_ids): + return self.store.get_events( + ev_ids, get_prev_content=False, check_redacted=False + ) - if fetch_state: - # We need to get the state at this event, since we haven't - # processed all the prev events. - logger.debug( - "_handle_new_pdu getting state for %s", - pdu.room_id - ) - try: - state, auth_chain = yield self.replication_layer.get_state_for_room( - origin, pdu.room_id, pdu.event_id, - ) - except Exception: - logger.exception("Failed to get state for event: %s", pdu.event_id) + room_version = yield self.store.get_room_version(pdu.room_id) + state_map = yield resolve_events_with_factory( + room_version, state_groups, {pdu.event_id: pdu}, fetch + ) + + state = (yield self.store.get_events(state_map.values())).values() + auth_chain = list(auth_chains) + except Exception: + raise FederationError( + "ERROR", + 403, + "We can't get valid state history.", + affected=pdu.event_id, + ) yield self._process_received_pdu( origin, @@ -299,7 +361,7 @@ class FederationHandler(BaseHandler): # # see https://github.com/matrix-org/synapse/pull/1744 - missing_events = yield self.replication_layer.get_missing_events( + missing_events = yield self.federation_client.get_missing_events( origin, pdu.room_id, earliest_events_ids=list(latest), @@ -320,11 +382,17 @@ class FederationHandler(BaseHandler): for e in missing_events: logger.info("Handling found event %s", e.event_id) - yield self.on_receive_pdu( - origin, - e, - get_missing=False - ) + try: + yield self.on_receive_pdu( + origin, + e, + get_missing=False + ) + except FederationError as e: + if e.code == 403: + logger.warn("Event %s failed history check.") + else: + raise @log_function @defer.inlineCallbacks @@ -355,7 +423,7 @@ class FederationHandler(BaseHandler): ) try: - event_stream_id, max_stream_id = yield self._persist_auth_tree( + yield self._persist_auth_tree( origin, auth_chain, state, event ) except AuthError as e: @@ -399,7 +467,7 @@ class FederationHandler(BaseHandler): yield self._handle_new_events(origin, event_infos) try: - context, event_stream_id, max_stream_id = yield self._handle_new_event( + context = yield self._handle_new_event( origin, event, state=state, @@ -424,24 +492,16 @@ class FederationHandler(BaseHandler): except StoreError: logger.exception("Failed to store room.") - extra_users = [] - if event.type == EventTypes.Member: - target_user_id = event.state_key - target_user = UserID.from_string(target_user_id) - extra_users.append(target_user) - - self.notifier.on_new_room_event( - event, event_stream_id, max_stream_id, - extra_users=extra_users - ) - if event.type == EventTypes.Member: if event.membership == Membership.JOIN: # Only fire user_joined_room if the user has acutally # joined the room. Don't bother if the user is just # changing their profile info. newly_joined = True - prev_state_id = context.prev_state_ids.get( + + prev_state_ids = yield context.get_prev_state_ids(self.store) + + prev_state_id = prev_state_ids.get( (event.type, event.state_key) ) if prev_state_id: @@ -453,84 +513,7 @@ class FederationHandler(BaseHandler): if newly_joined: user = UserID.from_string(event.state_key) - yield user_joined_room(self.distributor, user, event.room_id) - - @measure_func("_filter_events_for_server") - @defer.inlineCallbacks - def _filter_events_for_server(self, server_name, room_id, events): - event_to_state_ids = yield self.store.get_state_ids_for_events( - frozenset(e.event_id for e in events), - types=( - (EventTypes.RoomHistoryVisibility, ""), - (EventTypes.Member, None), - ) - ) - - # We only want to pull out member events that correspond to the - # server's domain. - - def check_match(id): - try: - return server_name == get_domain_from_id(id) - except Exception: - return False - - # Parses mapping `event_id -> (type, state_key) -> state event_id` - # to get all state ids that we're interested in. - event_map = yield self.store.get_events([ - e_id - for key_to_eid in list(event_to_state_ids.values()) - for key, e_id in key_to_eid.items() - if key[0] != EventTypes.Member or check_match(key[1]) - ]) - - event_to_state = { - e_id: { - key: event_map[inner_e_id] - for key, inner_e_id in key_to_eid.iteritems() - if inner_e_id in event_map - } - for e_id, key_to_eid in event_to_state_ids.iteritems() - } - - def redact_disallowed(event, state): - if not state: - return event - - history = state.get((EventTypes.RoomHistoryVisibility, ''), None) - if history: - visibility = history.content.get("history_visibility", "shared") - if visibility in ["invited", "joined"]: - # We now loop through all state events looking for - # membership states for the requesting server to determine - # if the server is either in the room or has been invited - # into the room. - for ev in state.itervalues(): - if ev.type != EventTypes.Member: - continue - try: - domain = get_domain_from_id(ev.state_key) - except Exception: - continue - - if domain != server_name: - continue - - memtype = ev.membership - if memtype == Membership.JOIN: - return event - elif memtype == Membership.INVITE: - if visibility == "invited": - return event - else: - return prune_event(event) - - return event - - defer.returnValue([ - redact_disallowed(e, event_to_state[e.event_id]) - for e in events - ]) + yield self.user_joined_room(user, event.room_id) @log_function @defer.inlineCallbacks @@ -551,7 +534,7 @@ class FederationHandler(BaseHandler): if dest == self.server_name: raise SynapseError(400, "Can't backfill from self.") - events = yield self.replication_layer.backfill( + events = yield self.federation_client.backfill( dest, room_id, limit=limit, @@ -599,7 +582,7 @@ class FederationHandler(BaseHandler): state_events = {} events_to_state = {} for e_id in edges: - state, auth = yield self.replication_layer.get_state_for_room( + state, auth = yield self.federation_client.get_state_for_room( destination=dest, room_id=room_id, event_id=e_id @@ -641,7 +624,7 @@ class FederationHandler(BaseHandler): results = yield logcontext.make_deferred_yieldable(defer.gatherResults( [ logcontext.run_in_background( - self.replication_layer.get_pdu, + self.federation_client.get_pdu, [dest], event_id, outlier=True, @@ -763,7 +746,7 @@ class FederationHandler(BaseHandler): """ joined_users = [ (state_key, int(event.depth)) - for (e_type, state_key), event in state.iteritems() + for (e_type, state_key), event in iteritems(state) if e_type == EventTypes.Member and event.membership == Membership.JOIN ] @@ -780,7 +763,7 @@ class FederationHandler(BaseHandler): except Exception: pass - return sorted(joined_domains.iteritems(), key=lambda d: d[1]) + return sorted(joined_domains.items(), key=lambda d: d[1]) curr_domains = get_domains_from_state(curr_state) @@ -843,7 +826,7 @@ class FederationHandler(BaseHandler): tried_domains = set(likely_domains) tried_domains.add(self.server_name) - event_ids = list(extremities.iterkeys()) + event_ids = list(extremities.keys()) logger.debug("calling resolve_state_groups in _maybe_backfill") resolve = logcontext.preserve_fn( @@ -859,15 +842,15 @@ class FederationHandler(BaseHandler): states = dict(zip(event_ids, [s.state for s in states])) state_map = yield self.store.get_events( - [e_id for ids in states.itervalues() for e_id in ids.itervalues()], + [e_id for ids in itervalues(states) for e_id in itervalues(ids)], get_prev_content=False ) states = { key: { k: state_map[e_id] - for k, e_id in state_dict.iteritems() + for k, e_id in iteritems(state_dict) if e_id in state_map - } for key, state_dict in states.iteritems() + } for key, state_dict in iteritems(states) } for e_id, _ in sorted_extremeties_tuple: @@ -922,7 +905,7 @@ class FederationHandler(BaseHandler): Invites must be signed by the invitee's server before distribution. """ - pdu = yield self.replication_layer.send_invite( + pdu = yield self.federation_client.send_invite( destination=target_host, room_id=event.room_id, event_id=event.event_id, @@ -938,16 +921,6 @@ class FederationHandler(BaseHandler): [auth_id for auth_id, _ in event.auth_events], include_given=True ) - - for event in auth: - event.signatures.update( - compute_event_signature( - event, - self.hs.hostname, - self.hs.config.signing_key[0] - ) - ) - defer.returnValue([e for e in auth]) @log_function @@ -972,6 +945,9 @@ class FederationHandler(BaseHandler): joinee, "join", content, + params={ + "ver": KNOWN_ROOM_VERSIONS, + }, ) # This shouldn't happen, because the RoomMemberHandler has a @@ -981,7 +957,7 @@ class FederationHandler(BaseHandler): self.room_queues[room_id] = [] - yield self.store.clean_room_for_join(room_id) + yield self._clean_room_for_join(room_id) handled_events = set() @@ -994,7 +970,7 @@ class FederationHandler(BaseHandler): target_hosts.insert(0, origin) except ValueError: pass - ret = yield self.replication_layer.send_join(target_hosts, event) + ret = yield self.federation_client.send_join(target_hosts, event) origin = ret["origin"] state = ret["state"] @@ -1020,15 +996,10 @@ class FederationHandler(BaseHandler): # FIXME pass - event_stream_id, max_stream_id = yield self._persist_auth_tree( + yield self._persist_auth_tree( origin, auth_chain, state, event ) - self.notifier.on_new_room_event( - event, event_stream_id, max_stream_id, - extra_users=[joinee] - ) - logger.debug("Finished joining %s to %s", joinee, room_id) finally: room_queue = self.room_queues[room_id] @@ -1123,7 +1094,7 @@ class FederationHandler(BaseHandler): # would introduce the danger of backwards-compatibility problems. event.internal_metadata.send_on_behalf_of = origin - context, event_stream_id, max_stream_id = yield self._handle_new_event( + context = yield self._handle_new_event( origin, event ) @@ -1133,25 +1104,17 @@ class FederationHandler(BaseHandler): event.signatures, ) - extra_users = [] - if event.type == EventTypes.Member: - target_user_id = event.state_key - target_user = UserID.from_string(target_user_id) - extra_users.append(target_user) - - self.notifier.on_new_room_event( - event, event_stream_id, max_stream_id, extra_users=extra_users - ) - if event.type == EventTypes.Member: if event.content["membership"] == Membership.JOIN: user = UserID.from_string(event.state_key) - yield user_joined_room(self.distributor, user, event.room_id) + yield self.user_joined_room(user, event.room_id) + + prev_state_ids = yield context.get_prev_state_ids(self.store) - state_ids = list(context.prev_state_ids.values()) + state_ids = list(prev_state_ids.values()) auth_chain = yield self.store.get_auth_chain(state_ids) - state = yield self.store.get_events(list(context.prev_state_ids.values())) + state = yield self.store.get_events(list(prev_state_ids.values())) defer.returnValue({ "state": list(state.values()), @@ -1213,17 +1176,7 @@ class FederationHandler(BaseHandler): ) context = yield self.state_handler.compute_event_context(event) - - event_stream_id, max_stream_id = yield self.store.persist_event( - event, - context=context, - ) - - target_user = UserID.from_string(event.state_key) - self.notifier.on_new_room_event( - event, event_stream_id, max_stream_id, - extra_users=[target_user], - ) + yield self.persist_events_and_notify([(event, context)]) defer.returnValue(event) @@ -1248,35 +1201,26 @@ class FederationHandler(BaseHandler): except ValueError: pass - yield self.replication_layer.send_leave( + yield self.federation_client.send_leave( target_hosts, event ) context = yield self.state_handler.compute_event_context(event) - - event_stream_id, max_stream_id = yield self.store.persist_event( - event, - context=context, - ) - - target_user = UserID.from_string(event.state_key) - self.notifier.on_new_room_event( - event, event_stream_id, max_stream_id, - extra_users=[target_user], - ) + yield self.persist_events_and_notify([(event, context)]) defer.returnValue(event) @defer.inlineCallbacks def _make_and_verify_event(self, target_hosts, room_id, user_id, membership, - content={},): - origin, pdu = yield self.replication_layer.make_membership_event( + content={}, params=None): + origin, pdu = yield self.federation_client.make_membership_event( target_hosts, room_id, user_id, membership, content, + params=params, ) logger.debug("Got response to make_%s: %s", membership, pdu) @@ -1316,7 +1260,7 @@ class FederationHandler(BaseHandler): @log_function def on_make_leave_request(self, room_id, user_id): """ We've received a /make_leave/ request, so we create a partial - join event for the room and return that. We do *not* persist or + leave event for the room and return that. We do *not* persist or process it until the other server has signed it and sent it back. """ builder = self.event_builder_factory.new({ @@ -1355,7 +1299,7 @@ class FederationHandler(BaseHandler): event.internal_metadata.outlier = False - context, event_stream_id, max_stream_id = yield self._handle_new_event( + yield self._handle_new_event( origin, event ) @@ -1365,23 +1309,16 @@ class FederationHandler(BaseHandler): event.signatures, ) - extra_users = [] - if event.type == EventTypes.Member: - target_user_id = event.state_key - target_user = UserID.from_string(target_user_id) - extra_users.append(target_user) - - self.notifier.on_new_room_event( - event, event_stream_id, max_stream_id, extra_users=extra_users - ) - defer.returnValue(None) @defer.inlineCallbacks def get_state_for_pdu(self, room_id, event_id): """Returns the state at the event. i.e. not including said event. """ - yield run_on_reactor() + + event = yield self.store.get_event( + event_id, allow_none=False, check_room_id=room_id, + ) state_groups = yield self.store.get_state_groups( room_id, [event_id] @@ -1393,8 +1330,7 @@ class FederationHandler(BaseHandler): (e.type, e.state_key): e for e in state } - event = yield self.store.get_event(event_id) - if event and event.is_state(): + if event.is_state(): # Get previous state if "replaces_state" in event.unsigned: prev_id = event.unsigned["replaces_state"] @@ -1405,18 +1341,6 @@ class FederationHandler(BaseHandler): del results[(event.type, event.state_key)] res = list(results.values()) - for event in res: - # We sign these again because there was a bug where we - # incorrectly signed things the first time round - if self.is_mine_id(event.event_id): - event.signatures.update( - compute_event_signature( - event, - self.hs.hostname, - self.hs.config.signing_key[0] - ) - ) - defer.returnValue(res) else: defer.returnValue([]) @@ -1425,7 +1349,9 @@ class FederationHandler(BaseHandler): def get_state_ids_for_pdu(self, room_id, event_id): """Returns the state at the event. i.e. not including said event. """ - yield run_on_reactor() + event = yield self.store.get_event( + event_id, allow_none=False, check_room_id=room_id, + ) state_groups = yield self.store.get_state_groups_ids( room_id, [event_id] @@ -1435,8 +1361,7 @@ class FederationHandler(BaseHandler): _, state = state_groups.items().pop() results = state - event = yield self.store.get_event(event_id) - if event and event.is_state(): + if event.is_state(): # Get previous state if "replaces_state" in event.unsigned: prev_id = event.unsigned["replaces_state"] @@ -1462,17 +1387,26 @@ class FederationHandler(BaseHandler): limit ) - events = yield self._filter_events_for_server(origin, room_id, events) + events = yield filter_events_for_server(self.store, origin, events) defer.returnValue(events) @defer.inlineCallbacks @log_function - def get_persisted_pdu(self, origin, event_id, do_auth=True): - """ Get a PDU from the database with given origin and id. + def get_persisted_pdu(self, origin, event_id): + """Get an event from the database for the given server. + + Args: + origin [str]: hostname of server which is requesting the event; we + will check that the server is allowed to see it. + event_id [str]: id of the event being requested Returns: - Deferred: Results in a `Pdu`. + Deferred[EventBase|None]: None if we know nothing about the event; + otherwise the (possibly-redacted) event. + + Raises: + AuthError if the server is not currently in the room """ event = yield self.store.get_event( event_id, @@ -1481,32 +1415,17 @@ class FederationHandler(BaseHandler): ) if event: - if self.is_mine_id(event.event_id): - # FIXME: This is a temporary work around where we occasionally - # return events slightly differently than when they were - # originally signed - event.signatures.update( - compute_event_signature( - event, - self.hs.hostname, - self.hs.config.signing_key[0] - ) - ) - - if do_auth: - in_room = yield self.auth.check_host_in_room( - event.room_id, - origin - ) - if not in_room: - raise AuthError(403, "Host not in room.") - - events = yield self._filter_events_for_server( - origin, event.room_id, [event] - ) - - event = events[0] + in_room = yield self.auth.check_host_in_room( + event.room_id, + origin + ) + if not in_room: + raise AuthError(403, "Host not in room.") + events = yield filter_events_for_server( + self.store, origin, [event], + ) + event = events[0] defer.returnValue(event) else: defer.returnValue(None) @@ -1531,9 +1450,8 @@ class FederationHandler(BaseHandler): event, context ) - event_stream_id, max_stream_id = yield self.store.persist_event( - event, - context=context, + yield self.persist_events_and_notify( + [(event, context)], backfilled=backfilled, ) except: # noqa: E722, as we reraise the exception this is fine. @@ -1546,15 +1464,7 @@ class FederationHandler(BaseHandler): six.reraise(tp, value, tb) - if not backfilled: - # this intentionally does not yield: we don't care about the result - # and don't need to wait for it. - logcontext.run_in_background( - self.pusher_pool.on_new_notifications, - event_stream_id, max_stream_id, - ) - - defer.returnValue((context, event_stream_id, max_stream_id)) + defer.returnValue(context) @defer.inlineCallbacks def _handle_new_events(self, origin, event_infos, backfilled=False): @@ -1562,6 +1472,8 @@ class FederationHandler(BaseHandler): should not depend on one another, e.g. this should be used to persist a bunch of outliers, but not a chunk of individual events that depend on each other for state calculations. + + Notifies about the events where appropriate. """ contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults( [ @@ -1576,10 +1488,10 @@ class FederationHandler(BaseHandler): ], consumeErrors=True, )) - yield self.store.persist_events( + yield self.persist_events_and_notify( [ (ev_info["event"], context) - for ev_info, context in itertools.izip(event_infos, contexts) + for ev_info, context in zip(event_infos, contexts) ], backfilled=backfilled, ) @@ -1588,7 +1500,8 @@ class FederationHandler(BaseHandler): def _persist_auth_tree(self, origin, auth_events, state, event): """Checks the auth chain is valid (and passes auth checks) for the state and event. Then persists the auth chain and state atomically. - Persists the event seperately. + Persists the event separately. Notifies about the persisted events + where appropriate. Will attempt to fetch missing auth events. @@ -1599,8 +1512,7 @@ class FederationHandler(BaseHandler): event (Event) Returns: - 2-tuple of (event_stream_id, max_stream_id) from the persist_event - call for `event` + Deferred """ events_to_context = {} for e in itertools.chain(auth_events, state): @@ -1626,7 +1538,7 @@ class FederationHandler(BaseHandler): missing_auth_events.add(e_id) for e_id in missing_auth_events: - m_ev = yield self.replication_layer.get_pdu( + m_ev = yield self.federation_client.get_pdu( [origin], e_id, outlier=True, @@ -1664,7 +1576,7 @@ class FederationHandler(BaseHandler): raise events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR - yield self.store.persist_events( + yield self.persist_events_and_notify( [ (e, events_to_context[e.event_id]) for e in itertools.chain(auth_events, state) @@ -1675,12 +1587,10 @@ class FederationHandler(BaseHandler): event, old_state=state ) - event_stream_id, max_stream_id = yield self.store.persist_event( - event, new_event_context, + yield self.persist_events_and_notify( + [(event, new_event_context)], ) - defer.returnValue((event_stream_id, max_stream_id)) - @defer.inlineCallbacks def _prep_event(self, origin, event, state=None, auth_events=None): """ @@ -1699,8 +1609,9 @@ class FederationHandler(BaseHandler): ) if not auth_events: + prev_state_ids = yield context.get_prev_state_ids(self.store) auth_events_ids = yield self.auth.compute_auth_events( - event, context.prev_state_ids, for_verification=True, + event, prev_state_ids, for_verification=True, ) auth_events = yield self.store.get_events(auth_events_ids) auth_events = { @@ -1736,8 +1647,19 @@ class FederationHandler(BaseHandler): defer.returnValue(context) @defer.inlineCallbacks - def on_query_auth(self, origin, event_id, remote_auth_chain, rejects, + def on_query_auth(self, origin, event_id, room_id, remote_auth_chain, rejects, missing): + in_room = yield self.auth.check_host_in_room( + room_id, + origin + ) + if not in_room: + raise AuthError(403, "Host not in room.") + + event = yield self.store.get_event( + event_id, allow_none=False, check_room_id=room_id + ) + # Just go through and process each event in `remote_auth_chain`. We # don't want to fall into the trap of `missing` being wrong. for e in remote_auth_chain: @@ -1747,7 +1669,6 @@ class FederationHandler(BaseHandler): pass # Now get the current auth_chain for the event. - event = yield self.store.get_event(event_id) local_auth_chain = yield self.store.get_auth_chain( [auth_id for auth_id, _ in event.auth_events], include_given=True @@ -1760,15 +1681,6 @@ class FederationHandler(BaseHandler): local_auth_chain, remote_auth_chain ) - for event in ret["auth_chain"]: - event.signatures.update( - compute_event_signature( - event, - self.hs.hostname, - self.hs.config.signing_key[0] - ) - ) - logger.debug("on_query_auth returning: %s", ret) defer.returnValue(ret) @@ -1794,8 +1706,8 @@ class FederationHandler(BaseHandler): min_depth=min_depth, ) - missing_events = yield self._filter_events_for_server( - origin, room_id, missing_events, + missing_events = yield filter_events_for_server( + self.store, origin, missing_events, ) defer.returnValue(missing_events) @@ -1844,7 +1756,7 @@ class FederationHandler(BaseHandler): logger.info("Missing auth: %s", missing_auth) # If we don't have all the auth events, we need to get them. try: - remote_auth_chain = yield self.replication_layer.get_event_auth( + remote_auth_chain = yield self.federation_client.get_event_auth( origin, event.room_id, event.event_id ) @@ -1917,7 +1829,10 @@ class FederationHandler(BaseHandler): (d.type, d.state_key): d for d in different_events if d }) + room_version = yield self.store.get_room_version(event.room_id) + new_state = self.state_handler.resolve_events( + room_version, [list(local_view.values()), list(remote_view.values())], event ) @@ -1949,9 +1864,10 @@ class FederationHandler(BaseHandler): break if do_resolution: + prev_state_ids = yield context.get_prev_state_ids(self.store) # 1. Get what we think is the auth chain. auth_ids = yield self.auth.compute_auth_events( - event, context.prev_state_ids + event, prev_state_ids ) local_auth_chain = yield self.store.get_auth_chain( auth_ids, include_given=True @@ -1959,7 +1875,7 @@ class FederationHandler(BaseHandler): try: # 2. Get remote difference. - result = yield self.replication_layer.query_auth( + result = yield self.federation_client.query_auth( origin, event.room_id, event.event_id, @@ -2041,21 +1957,34 @@ class FederationHandler(BaseHandler): k: a.event_id for k, a in iteritems(auth_events) if k != event_key } - context.current_state_ids = dict(context.current_state_ids) - context.current_state_ids.update(state_updates) - if context.delta_ids is not None: - context.delta_ids = dict(context.delta_ids) - context.delta_ids.update(state_updates) - context.prev_state_ids = dict(context.prev_state_ids) - context.prev_state_ids.update({ + current_state_ids = yield context.get_current_state_ids(self.store) + current_state_ids = dict(current_state_ids) + + current_state_ids.update(state_updates) + + prev_state_ids = yield context.get_prev_state_ids(self.store) + prev_state_ids = dict(prev_state_ids) + + prev_state_ids.update({ k: a.event_id for k, a in iteritems(auth_events) }) - context.state_group = yield self.store.store_state_group( + + # create a new state group as a delta from the existing one. + prev_group = context.state_group + state_group = yield self.store.store_state_group( event.event_id, event.room_id, - prev_group=context.prev_group, - delta_ids=context.delta_ids, - current_state_ids=context.current_state_ids, + prev_group=prev_group, + delta_ids=state_updates, + current_state_ids=current_state_ids, + ) + + yield context.update_state( + state_group=state_group, + current_state_ids=current_state_ids, + prev_state_ids=prev_state_ids, + prev_group=prev_group, + delta_ids=state_updates, ) @defer.inlineCallbacks @@ -2245,7 +2174,7 @@ class FederationHandler(BaseHandler): yield member_handler.send_membership_event(None, event, context) else: destinations = set(x.split(":", 1)[-1] for x in (sender_user_id, room_id)) - yield self.replication_layer.forward_third_party_invite( + yield self.federation_client.forward_third_party_invite( destinations, room_id, event_dict, @@ -2295,7 +2224,8 @@ class FederationHandler(BaseHandler): event.content["third_party_invite"]["signed"]["token"] ) original_invite = None - original_invite_id = context.prev_state_ids.get(key) + prev_state_ids = yield context.get_prev_state_ids(self.store) + original_invite_id = prev_state_ids.get(key) if original_invite_id: original_invite = yield self.store.get_event( original_invite_id, allow_none=True @@ -2337,7 +2267,8 @@ class FederationHandler(BaseHandler): signed = event.content["third_party_invite"]["signed"] token = signed["token"] - invite_event_id = context.prev_state_ids.get( + prev_state_ids = yield context.get_prev_state_ids(self.store) + invite_event_id = prev_state_ids.get( (EventTypes.ThirdPartyInvite, token,) ) @@ -2387,7 +2318,7 @@ class FederationHandler(BaseHandler): for revocation. """ try: - response = yield self.hs.get_simple_http_client().get_json( + response = yield self.http_client.get_json( url, {"public_key": public_key} ) @@ -2398,3 +2329,91 @@ class FederationHandler(BaseHandler): ) if "valid" not in response or not response["valid"]: raise AuthError(403, "Third party certificate was invalid") + + @defer.inlineCallbacks + def persist_events_and_notify(self, event_and_contexts, backfilled=False): + """Persists events and tells the notifier/pushers about them, if + necessary. + + Args: + event_and_contexts(list[tuple[FrozenEvent, EventContext]]) + backfilled (bool): Whether these events are a result of + backfilling or not + + Returns: + Deferred + """ + if self.config.worker_app: + yield self._send_events_to_master( + store=self.store, + event_and_contexts=event_and_contexts, + backfilled=backfilled + ) + else: + max_stream_id = yield self.store.persist_events( + event_and_contexts, + backfilled=backfilled, + ) + + if not backfilled: # Never notify for backfilled events + for event, _ in event_and_contexts: + self._notify_persisted_event(event, max_stream_id) + + def _notify_persisted_event(self, event, max_stream_id): + """Checks to see if notifier/pushers should be notified about the + event or not. + + Args: + event (FrozenEvent) + max_stream_id (int): The max_stream_id returned by persist_events + """ + + extra_users = [] + if event.type == EventTypes.Member: + target_user_id = event.state_key + + # We notify for memberships if its an invite for one of our + # users + if event.internal_metadata.is_outlier(): + if event.membership != Membership.INVITE: + if not self.is_mine_id(target_user_id): + return + + target_user = UserID.from_string(target_user_id) + extra_users.append(target_user) + elif event.internal_metadata.is_outlier(): + return + + event_stream_id = event.internal_metadata.stream_ordering + self.notifier.on_new_room_event( + event, event_stream_id, max_stream_id, + extra_users=extra_users + ) + + self.pusher_pool.on_new_notifications( + event_stream_id, max_stream_id, + ) + + def _clean_room_for_join(self, room_id): + """Called to clean up any data in DB for a given room, ready for the + server to join the room. + + Args: + room_id (str) + """ + if self.config.worker_app: + return self._clean_room_for_join_client(room_id) + else: + return self.store.clean_room_for_join(room_id) + + def user_joined_room(self, user, room_id): + """Called when a new user has joined the room + """ + if self.config.worker_app: + return self._notify_user_membership_change( + room_id=room_id, + user_id=user.to_string(), + change="joined", + ) + else: + return user_joined_room(self.distributor, user, room_id) |