summary refs log tree commit diff
path: root/synapse/handlers/federation.py
diff options
context:
space:
mode:
authorMatthew Hodgson <matthew@matrix.org>2018-07-19 11:26:04 +0100
committerMatthew Hodgson <matthew@matrix.org>2018-07-19 11:26:04 +0100
commitbe3adfc331ef7f19b2e44b17cd06e463bff09f3a (patch)
tree2e565f68ef2154e3882704916eae3b1e8f0c9f3c /synapse/handlers/federation.py
parentMerge branch 'develop' into matthew/filter_members (diff)
parentrevert 00bc979 (diff)
downloadsynapse-be3adfc331ef7f19b2e44b17cd06e463bff09f3a.tar.xz
merge develop pydoc for _get_state_for_groups
Diffstat (limited to 'synapse/handlers/federation.py')
-rw-r--r--synapse/handlers/federation.py309
1 files changed, 118 insertions, 191 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 495ac4c648..65f6041b10 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -20,37 +20,41 @@ import itertools
 import logging
 import sys
 
-from signedjson.key import decode_verify_key_bytes
-from signedjson.sign import verify_signed_json
 import six
-from six.moves import http_client
 from six import iteritems
-from twisted.internet import defer
+from six.moves import http_client
+
+from signedjson.key import decode_verify_key_bytes
+from signedjson.sign import verify_signed_json
 from unpaddedbase64 import decode_base64
 
-from ._base import BaseHandler
+from twisted.internet import defer
 
+from synapse.api.constants import EventTypes, Membership, RejectedReason
 from synapse.api.errors import (
-    AuthError, FederationError, StoreError, CodeMessageException, SynapseError,
+    AuthError,
+    CodeMessageException,
     FederationDeniedError,
+    FederationError,
+    StoreError,
+    SynapseError,
 )
-from synapse.api.constants import EventTypes, Membership, RejectedReason
-from synapse.events.validator import EventValidator
-from synapse.util import unwrapFirstError, logcontext
-from synapse.util.metrics import measure_func
-from synapse.util.logutils import log_function
-from synapse.util.async import run_on_reactor, Linearizer
-from synapse.util.frozenutils import unfreeze
 from synapse.crypto.event_signing import (
-    compute_event_signature, add_hashes_and_signatures,
+    add_hashes_and_signatures,
+    compute_event_signature,
 )
+from synapse.events.validator import EventValidator
+from synapse.state import resolve_events_with_factory
 from synapse.types import UserID, get_domain_from_id
-
-from synapse.events.utils import prune_event
-
+from synapse.util import logcontext, unwrapFirstError
+from synapse.util.async import Linearizer
+from synapse.util.distributor import user_joined_room
+from synapse.util.frozenutils import unfreeze
+from synapse.util.logutils import log_function
 from synapse.util.retryutils import NotRetryingDestination
+from synapse.visibility import filter_events_for_server
 
-from synapse.util.distributor import user_joined_room
+from ._base import BaseHandler
 
 logger = logging.getLogger(__name__)
 
@@ -89,7 +93,9 @@ class FederationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     @log_function
-    def on_receive_pdu(self, origin, pdu, get_missing=True):
+    def on_receive_pdu(
+            self, origin, pdu, get_missing=True, sent_to_us_directly=False,
+    ):
         """ Process a PDU received via a federation /send/ transaction, or
         via backfill of missing prev_events
 
@@ -103,8 +109,10 @@ class FederationHandler(BaseHandler):
         """
 
         # We reprocess pdus when we have seen them only as outliers
-        existing = yield self.get_persisted_pdu(
-            origin, pdu.event_id, do_auth=False
+        existing = yield self.store.get_event(
+            pdu.event_id,
+            allow_none=True,
+            allow_rejected=True,
         )
 
         # FIXME: Currently we fetch an event again when we already have it
@@ -161,14 +169,11 @@ class FederationHandler(BaseHandler):
                     "Ignoring PDU %s for room %s from %s as we've left the room!",
                     pdu.event_id, pdu.room_id, origin,
                 )
-                return
+                defer.returnValue(None)
 
         state = None
-
         auth_chain = []
 
-        fetch_state = False
-
         # Get missing pdus if necessary.
         if not pdu.internal_metadata.is_outlier():
             # We only backfill backwards to the min depth.
@@ -223,26 +228,60 @@ class FederationHandler(BaseHandler):
                         list(prevs - seen)[:5],
                     )
 
-            if prevs - seen:
-                logger.info(
-                    "Still missing %d events for room %r: %r...",
-                    len(prevs - seen), pdu.room_id, list(prevs - seen)[:5]
+            if sent_to_us_directly and prevs - seen:
+                # If they have sent it to us directly, and the server
+                # isn't telling us about the auth events that it's
+                # made a message referencing, we explode
+                raise FederationError(
+                    "ERROR",
+                    403,
+                    (
+                        "Your server isn't divulging details about prev_events "
+                        "referenced in this event."
+                    ),
+                    affected=pdu.event_id,
                 )
-                fetch_state = True
+            elif prevs - seen:
+                # Calculate the state of the previous events, and
+                # de-conflict them to find the current state.
+                state_groups = []
+                auth_chains = set()
+                try:
+                    # Get the state of the events we know about
+                    ours = yield self.store.get_state_groups(pdu.room_id, list(seen))
+                    state_groups.append(ours)
+
+                    # Ask the remote server for the states we don't
+                    # know about
+                    for p in prevs - seen:
+                        state, got_auth_chain = (
+                            yield self.replication_layer.get_state_for_room(
+                                origin, pdu.room_id, p
+                            )
+                        )
+                        auth_chains.update(got_auth_chain)
+                        state_group = {(x.type, x.state_key): x.event_id for x in state}
+                        state_groups.append(state_group)
+
+                    # Resolve any conflicting state
+                    def fetch(ev_ids):
+                        return self.store.get_events(
+                            ev_ids, get_prev_content=False, check_redacted=False
+                        )
 
-        if fetch_state:
-            # We need to get the state at this event, since we haven't
-            # processed all the prev events.
-            logger.debug(
-                "_handle_new_pdu getting state for %s",
-                pdu.room_id
-            )
-            try:
-                state, auth_chain = yield self.replication_layer.get_state_for_room(
-                    origin, pdu.room_id, pdu.event_id,
-                )
-            except Exception:
-                logger.exception("Failed to get state for event: %s", pdu.event_id)
+                    state_map = yield resolve_events_with_factory(
+                        state_groups, {pdu.event_id: pdu}, fetch
+                    )
+
+                    state = (yield self.store.get_events(state_map.values())).values()
+                    auth_chain = list(auth_chains)
+                except Exception:
+                    raise FederationError(
+                        "ERROR",
+                        403,
+                        "We can't get valid state history.",
+                        affected=pdu.event_id,
+                    )
 
         yield self._process_received_pdu(
             origin,
@@ -320,11 +359,17 @@ class FederationHandler(BaseHandler):
 
         for e in missing_events:
             logger.info("Handling found event %s", e.event_id)
-            yield self.on_receive_pdu(
-                origin,
-                e,
-                get_missing=False
-            )
+            try:
+                yield self.on_receive_pdu(
+                    origin,
+                    e,
+                    get_missing=False
+                )
+            except FederationError as e:
+                if e.code == 403:
+                    logger.warn("Event %s failed history check.")
+                else:
+                    raise
 
     @log_function
     @defer.inlineCallbacks
@@ -455,83 +500,6 @@ class FederationHandler(BaseHandler):
                     user = UserID.from_string(event.state_key)
                     yield user_joined_room(self.distributor, user, event.room_id)
 
-    @measure_func("_filter_events_for_server")
-    @defer.inlineCallbacks
-    def _filter_events_for_server(self, server_name, room_id, events):
-        event_to_state_ids = yield self.store.get_state_ids_for_events(
-            frozenset(e.event_id for e in events),
-            types=(
-                (EventTypes.RoomHistoryVisibility, ""),
-                (EventTypes.Member, None),
-            )
-        )
-
-        # We only want to pull out member events that correspond to the
-        # server's domain.
-
-        def check_match(id):
-            try:
-                return server_name == get_domain_from_id(id)
-            except Exception:
-                return False
-
-        # Parses mapping `event_id -> (type, state_key) -> state event_id`
-        # to get all state ids that we're interested in.
-        event_map = yield self.store.get_events([
-            e_id
-            for key_to_eid in list(event_to_state_ids.values())
-            for key, e_id in key_to_eid.items()
-            if key[0] != EventTypes.Member or check_match(key[1])
-        ])
-
-        event_to_state = {
-            e_id: {
-                key: event_map[inner_e_id]
-                for key, inner_e_id in key_to_eid.iteritems()
-                if inner_e_id in event_map
-            }
-            for e_id, key_to_eid in event_to_state_ids.iteritems()
-        }
-
-        def redact_disallowed(event, state):
-            if not state:
-                return event
-
-            history = state.get((EventTypes.RoomHistoryVisibility, ''), None)
-            if history:
-                visibility = history.content.get("history_visibility", "shared")
-                if visibility in ["invited", "joined"]:
-                    # We now loop through all state events looking for
-                    # membership states for the requesting server to determine
-                    # if the server is either in the room or has been invited
-                    # into the room.
-                    for ev in state.itervalues():
-                        if ev.type != EventTypes.Member:
-                            continue
-                        try:
-                            domain = get_domain_from_id(ev.state_key)
-                        except Exception:
-                            continue
-
-                        if domain != server_name:
-                            continue
-
-                        memtype = ev.membership
-                        if memtype == Membership.JOIN:
-                            return event
-                        elif memtype == Membership.INVITE:
-                            if visibility == "invited":
-                                return event
-                    else:
-                        return prune_event(event)
-
-            return event
-
-        defer.returnValue([
-            redact_disallowed(e, event_to_state[e.event_id])
-            for e in events
-        ])
-
     @log_function
     @defer.inlineCallbacks
     def backfill(self, dest, room_id, limit, extremities):
@@ -938,16 +906,6 @@ class FederationHandler(BaseHandler):
             [auth_id for auth_id, _ in event.auth_events],
             include_given=True
         )
-
-        for event in auth:
-            event.signatures.update(
-                compute_event_signature(
-                    event,
-                    self.hs.hostname,
-                    self.hs.config.signing_key[0]
-                )
-            )
-
         defer.returnValue([e for e in auth])
 
     @log_function
@@ -1381,8 +1339,6 @@ class FederationHandler(BaseHandler):
     def get_state_for_pdu(self, room_id, event_id):
         """Returns the state at the event. i.e. not including said event.
         """
-        yield run_on_reactor()
-
         state_groups = yield self.store.get_state_groups(
             room_id, [event_id]
         )
@@ -1405,18 +1361,6 @@ class FederationHandler(BaseHandler):
                     del results[(event.type, event.state_key)]
 
             res = list(results.values())
-            for event in res:
-                # We sign these again because there was a bug where we
-                # incorrectly signed things the first time round
-                if self.is_mine_id(event.event_id):
-                    event.signatures.update(
-                        compute_event_signature(
-                            event,
-                            self.hs.hostname,
-                            self.hs.config.signing_key[0]
-                        )
-                    )
-
             defer.returnValue(res)
         else:
             defer.returnValue([])
@@ -1425,8 +1369,6 @@ class FederationHandler(BaseHandler):
     def get_state_ids_for_pdu(self, room_id, event_id):
         """Returns the state at the event. i.e. not including said event.
         """
-        yield run_on_reactor()
-
         state_groups = yield self.store.get_state_groups_ids(
             room_id, [event_id]
         )
@@ -1462,17 +1404,26 @@ class FederationHandler(BaseHandler):
             limit
         )
 
-        events = yield self._filter_events_for_server(origin, room_id, events)
+        events = yield filter_events_for_server(self.store, origin, events)
 
         defer.returnValue(events)
 
     @defer.inlineCallbacks
     @log_function
-    def get_persisted_pdu(self, origin, event_id, do_auth=True):
-        """ Get a PDU from the database with given origin and id.
+    def get_persisted_pdu(self, origin, event_id):
+        """Get an event from the database for the given server.
+
+        Args:
+            origin [str]: hostname of server which is requesting the event; we
+               will check that the server is allowed to see it.
+            event_id [str]: id of the event being requested
 
         Returns:
-            Deferred: Results in a `Pdu`.
+            Deferred[EventBase|None]: None if we know nothing about the event;
+                otherwise the (possibly-redacted) event.
+
+        Raises:
+            AuthError if the server is not currently in the room
         """
         event = yield self.store.get_event(
             event_id,
@@ -1481,32 +1432,17 @@ class FederationHandler(BaseHandler):
         )
 
         if event:
-            if self.is_mine_id(event.event_id):
-                # FIXME: This is a temporary work around where we occasionally
-                # return events slightly differently than when they were
-                # originally signed
-                event.signatures.update(
-                    compute_event_signature(
-                        event,
-                        self.hs.hostname,
-                        self.hs.config.signing_key[0]
-                    )
-                )
-
-            if do_auth:
-                in_room = yield self.auth.check_host_in_room(
-                    event.room_id,
-                    origin
-                )
-                if not in_room:
-                    raise AuthError(403, "Host not in room.")
-
-                events = yield self._filter_events_for_server(
-                    origin, event.room_id, [event]
-                )
-
-                event = events[0]
+            in_room = yield self.auth.check_host_in_room(
+                event.room_id,
+                origin
+            )
+            if not in_room:
+                raise AuthError(403, "Host not in room.")
 
+            events = yield filter_events_for_server(
+                self.store, origin, [event],
+            )
+            event = events[0]
             defer.returnValue(event)
         else:
             defer.returnValue(None)
@@ -1760,15 +1696,6 @@ class FederationHandler(BaseHandler):
             local_auth_chain, remote_auth_chain
         )
 
-        for event in ret["auth_chain"]:
-            event.signatures.update(
-                compute_event_signature(
-                    event,
-                    self.hs.hostname,
-                    self.hs.config.signing_key[0]
-                )
-            )
-
         logger.debug("on_query_auth returning: %s", ret)
 
         defer.returnValue(ret)
@@ -1794,8 +1721,8 @@ class FederationHandler(BaseHandler):
             min_depth=min_depth,
         )
 
-        missing_events = yield self._filter_events_for_server(
-            origin, room_id, missing_events,
+        missing_events = yield filter_events_for_server(
+            self.store, origin, missing_events,
         )
 
         defer.returnValue(missing_events)