summary refs log tree commit diff
path: root/synapse/handlers/federation.py
diff options
context:
space:
mode:
authorMark Haines <mjark@negativecurvature.net>2014-12-16 13:53:43 +0000
committerMark Haines <mjark@negativecurvature.net>2014-12-16 13:53:43 +0000
commit2af40cfa14ba579c156349ae3457a55bc93f8548 (patch)
tree2b94b53c72d51fc170549213ed8cb88556c2b501 /synapse/handlers/federation.py
parentAdd a script for talking matrix federation adding X-Matrix Authorization (diff)
parentFix pyflakes (diff)
downloadsynapse-2af40cfa14ba579c156349ae3457a55bc93f8548.tar.xz
Merge pull request #25 from matrix-org/events_refactor
Event refactor
Diffstat (limited to 'synapse/handlers/federation.py')
-rw-r--r--synapse/handlers/federation.py265
1 files changed, 153 insertions, 112 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index cfb5029774..16a104c0e2 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -17,12 +17,12 @@
 
 from ._base import BaseHandler
 
-from synapse.api.events.utils import prune_event
+from synapse.events.snapshot import EventContext
+from synapse.events.utils import prune_event
 from synapse.api.errors import (
     AuthError, FederationError, SynapseError, StoreError,
 )
-from synapse.api.events.room import RoomMemberEvent, RoomCreateEvent
-from synapse.api.constants import Membership
+from synapse.api.constants import EventTypes, Membership
 from synapse.util.logutils import log_function
 from synapse.util.async import run_on_reactor
 from synapse.crypto.event_signing import (
@@ -76,7 +76,7 @@ class FederationHandler(BaseHandler):
 
     @log_function
     @defer.inlineCallbacks
-    def handle_new_event(self, event, snapshot):
+    def handle_new_event(self, event, snapshot, destinations):
         """ Takes in an event from the client to server side, that has already
         been authed and handled by the state module, and sends it to any
         remote home servers that may be interested.
@@ -92,12 +92,7 @@ class FederationHandler(BaseHandler):
 
         yield run_on_reactor()
 
-        pdu = event
-
-        if not hasattr(pdu, "destinations") or not pdu.destinations:
-            pdu.destinations = []
-
-        yield self.replication_layer.send_pdu(pdu)
+        yield self.replication_layer.send_pdu(event, destinations)
 
     @log_function
     @defer.inlineCallbacks
@@ -140,7 +135,7 @@ class FederationHandler(BaseHandler):
         if not check_event_content_hash(event):
             logger.warn(
                 "Event content has been tampered, redacting %s, %s",
-                event.event_id, encode_canonical_json(event.get_full_dict())
+                event.event_id, encode_canonical_json(event.get_dict())
             )
             event = redacted_event
 
@@ -153,7 +148,7 @@ class FederationHandler(BaseHandler):
             event.room_id,
             self.server_name
         )
-        if not is_in_room and not event.outlier:
+        if not is_in_room and not event.internal_metadata.outlier:
             logger.debug("Got event for room we're not in.")
 
             replication_layer = self.replication_layer
@@ -164,7 +159,7 @@ class FederationHandler(BaseHandler):
             )
 
             for e in auth_chain:
-                e.outlier = True
+                e.internal_metadata.outlier = True
                 try:
                     yield self._handle_new_event(e, fetch_missing=False)
                 except:
@@ -184,7 +179,7 @@ class FederationHandler(BaseHandler):
 
         if state:
             for e in state:
-                e.outlier = True
+                e.internal_metadata.outlier = True
                 try:
                     yield self._handle_new_event(e)
                 except:
@@ -229,7 +224,7 @@ class FederationHandler(BaseHandler):
 
         if not backfilled:
             extra_users = []
-            if event.type == RoomMemberEvent.TYPE:
+            if event.type == EventTypes.Member:
                 target_user_id = event.state_key
                 target_user = self.hs.parse_userid(target_user_id)
                 extra_users.append(target_user)
@@ -238,7 +233,7 @@ class FederationHandler(BaseHandler):
                 event, extra_users=extra_users
             )
 
-        if event.type == RoomMemberEvent.TYPE:
+        if event.type == EventTypes.Member:
             if event.membership == Membership.JOIN:
                 user = self.hs.parse_userid(event.state_key)
                 yield self.distributor.fire(
@@ -265,11 +260,18 @@ class FederationHandler(BaseHandler):
             event = pdu
 
             # FIXME (erikj): Not sure this actually works :/
-            yield self.state_handler.annotate_event_with_state(event)
+            context = EventContext()
+            yield self.state_handler.annotate_context_with_state(event, context)
 
-            events.append(event)
+            events.append(
+                (event, context)
+            )
 
-            yield self.store.persist_event(event, backfilled=True)
+            yield self.store.persist_event(
+                event,
+                context=context,
+                backfilled=True
+            )
 
         defer.returnValue(events)
 
@@ -286,8 +288,6 @@ class FederationHandler(BaseHandler):
             pdu=event
         )
 
-
-
         defer.returnValue(pdu)
 
     @defer.inlineCallbacks
@@ -332,42 +332,55 @@ class FederationHandler(BaseHandler):
         event = pdu
 
         # We should assert some things.
-        assert(event.type == RoomMemberEvent.TYPE)
+        # FIXME: Do this in a nicer way
+        assert(event.type == EventTypes.Member)
         assert(event.user_id == joinee)
         assert(event.state_key == joinee)
         assert(event.room_id == room_id)
 
-        event.outlier = False
+        event.internal_metadata.outlier = False
 
         self.room_queues[room_id] = []
 
+        builder = self.event_builder_factory.new(
+            event.get_pdu_json()
+        )
+
+        handled_events = set()
+
         try:
-            event.event_id = self.event_factory.create_event_id()
-            event.origin = self.hs.hostname
-            event.content = content
+            builder.event_id = self.event_builder_factory.create_event_id()
+            builder.origin = self.hs.hostname
+            builder.content = content
 
             if not hasattr(event, "signatures"):
-                event.signatures = {}
+                builder.signatures = {}
 
             add_hashes_and_signatures(
-                event,
+                builder,
                 self.hs.hostname,
                 self.hs.config.signing_key[0],
             )
 
+            new_event = builder.build()
+
             ret = yield self.replication_layer.send_join(
                 target_host,
-                event
+                new_event
             )
 
             state = ret["state"]
             auth_chain = ret["auth_chain"]
             auth_chain.sort(key=lambda e: e.depth)
 
+            handled_events.update([s.event_id for s in state])
+            handled_events.update([a.event_id for a in auth_chain])
+            handled_events.add(new_event.event_id)
+
             logger.debug("do_invite_join auth_chain: %s", auth_chain)
             logger.debug("do_invite_join state: %s", state)
 
-            logger.debug("do_invite_join event: %s", event)
+            logger.debug("do_invite_join event: %s", new_event)
 
             try:
                 yield self.store.store_room(
@@ -380,7 +393,7 @@ class FederationHandler(BaseHandler):
                 pass
 
             for e in auth_chain:
-                e.outlier = True
+                e.internal_metadata.outlier = True
                 try:
                     yield self._handle_new_event(e, fetch_missing=False)
                 except:
@@ -391,7 +404,7 @@ class FederationHandler(BaseHandler):
 
             for e in state:
                 # FIXME: Auth these.
-                e.outlier = True
+                e.internal_metadata.outlier = True
                 try:
                     yield self._handle_new_event(
                         e,
@@ -404,13 +417,13 @@ class FederationHandler(BaseHandler):
                     )
 
             yield self._handle_new_event(
-                event,
+                new_event,
                 state=state,
                 current_state=state,
             )
 
             yield self.notifier.on_new_room_event(
-                event, extra_users=[joinee]
+                new_event, extra_users=[joinee]
             )
 
             logger.debug("Finished joining %s to %s", joinee, room_id)
@@ -419,6 +432,9 @@ class FederationHandler(BaseHandler):
             del self.room_queues[room_id]
 
             for p, origin in room_queue:
+                if p.event_id in handled_events:
+                    continue
+
                 try:
                     self.on_receive_pdu(origin, p, backfilled=False)
                 except:
@@ -428,25 +444,24 @@ class FederationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     @log_function
-    def on_make_join_request(self, context, user_id):
+    def on_make_join_request(self, room_id, user_id):
         """ We've received a /make_join/ request, so we create a partial
         join event for the room and return that. We don *not* persist or
         process it until the other server has signed it and sent it back.
         """
-        event = self.event_factory.create_event(
-            etype=RoomMemberEvent.TYPE,
-            content={"membership": Membership.JOIN},
-            room_id=context,
-            user_id=user_id,
-            state_key=user_id,
-        )
+        builder = self.event_builder_factory.new({
+            "type": EventTypes.Member,
+            "content": {"membership": Membership.JOIN},
+            "room_id": room_id,
+            "sender": user_id,
+            "state_key": user_id,
+        })
 
-        snapshot = yield self.store.snapshot_room(event)
-        snapshot.fill_out_prev_events(event)
+        event, context = yield self._create_new_client_event(
+            builder=builder,
+        )
 
-        yield self.state_handler.annotate_event_with_state(event)
-        yield self.auth.add_auth_events(event)
-        self.auth.check(event, auth_events=event.old_state_events)
+        self.auth.check(event, auth_events=context.auth_events)
 
         pdu = event
 
@@ -460,12 +475,24 @@ class FederationHandler(BaseHandler):
         """
         event = pdu
 
-        event.outlier = False
+        logger.debug(
+            "on_send_join_request: Got event: %s, signatures: %s",
+            event.event_id,
+            event.signatures,
+        )
+
+        event.internal_metadata.outlier = False
 
-        yield self._handle_new_event(event)
+        context = yield self._handle_new_event(event)
+
+        logger.debug(
+            "on_send_join_request: After _handle_new_event: %s, sigs: %s",
+            event.event_id,
+            event.signatures,
+        )
 
         extra_users = []
-        if event.type == RoomMemberEvent.TYPE:
+        if event.type == EventTypes.Member:
             target_user_id = event.state_key
             target_user = self.hs.parse_userid(target_user_id)
             extra_users.append(target_user)
@@ -474,7 +501,7 @@ class FederationHandler(BaseHandler):
             event, extra_users=extra_users
         )
 
-        if event.type == RoomMemberEvent.TYPE:
+        if event.type == EventTypes.Member:
             if event.content["membership"] == Membership.JOIN:
                 user = self.hs.parse_userid(event.state_key)
                 yield self.distributor.fire(
@@ -485,9 +512,9 @@ class FederationHandler(BaseHandler):
 
         destinations = set()
 
-        for k, s in event.state_events.items():
+        for k, s in context.current_state.items():
             try:
-                if k[0] == RoomMemberEvent.TYPE:
+                if k[0] == EventTypes.Member:
                     if s.content["membership"] == Membership.JOIN:
                         destinations.add(
                             self.hs.parse_userid(s.state_key).domain
@@ -497,14 +524,18 @@ class FederationHandler(BaseHandler):
                     "Failed to get destination from event %s", s.event_id
                 )
 
-        new_pdu.destinations = list(destinations)
+        logger.debug(
+            "on_send_join_request: Sending event: %s, signatures: %s",
+            event.event_id,
+            event.signatures,
+        )
 
-        yield self.replication_layer.send_pdu(new_pdu)
+        yield self.replication_layer.send_pdu(new_pdu, destinations)
 
         auth_chain = yield self.store.get_auth_chain(event.event_id)
 
         defer.returnValue({
-            "state": event.state_events.values(),
+            "state": context.current_state.values(),
             "auth_chain": auth_chain,
         })
 
@@ -516,7 +547,9 @@ class FederationHandler(BaseHandler):
         """
         event = pdu
 
-        event.outlier = True
+        context = EventContext()
+
+        event.internal_metadata.outlier = True
 
         event.signatures.update(
             compute_event_signature(
@@ -526,10 +559,11 @@ class FederationHandler(BaseHandler):
             )
         )
 
-        yield self.state_handler.annotate_event_with_state(event)
+        yield self.state_handler.annotate_context_with_state(event, context)
 
         yield self.store.persist_event(
             event,
+            context=context,
             backfilled=False,
         )
 
@@ -559,13 +593,13 @@ class FederationHandler(BaseHandler):
             }
 
             event = yield self.store.get_event(event_id)
-            if hasattr(event, "state_key"):
+            if event and event.is_state():
                 # Get previous state
-                if hasattr(event, "replaces_state") and event.replaces_state:
-                    prev_event = yield self.store.get_event(
-                        event.replaces_state
-                    )
-                    results[(event.type, event.state_key)] = prev_event
+                if "replaces_state" in event.unsigned:
+                    prev_id = event.unsigned["replaces_state"]
+                    if prev_id != event.event_id:
+                        prev_event = yield self.store.get_event(prev_id)
+                        results[(event.type, event.state_key)] = prev_event
                 else:
                     del results[(event.type, event.state_key)]
 
@@ -651,74 +685,81 @@ class FederationHandler(BaseHandler):
     @defer.inlineCallbacks
     def _handle_new_event(self, event, state=None, backfilled=False,
                           current_state=None, fetch_missing=True):
-        is_new_state = yield self.state_handler.annotate_event_with_state(
+        context = EventContext()
+
+        logger.debug(
+            "_handle_new_event: Before annotate: %s, sigs: %s",
+            event.event_id, event.signatures,
+        )
+
+        yield self.state_handler.annotate_context_with_state(
             event,
+            context,
             old_state=state
         )
 
-        if event.old_state_events:
-            known_ids = set(
-                [s.event_id for s in event.old_state_events.values()]
-            )
-            for e_id, _ in event.auth_events:
-                if e_id not in known_ids:
-                    e = yield self.store.get_event(
-                        e_id,
-                        allow_none=True,
-                    )
-
-                    if not e:
-                        # TODO: Do some conflict res to make sure that we're
-                        # not the ones who are wrong.
-                        logger.info(
-                            "Rejecting %s as %s not in %s",
-                            event.event_id, e_id, known_ids,
-                        )
-                        raise AuthError(403, "Auth events are stale")
-
-            auth_events = event.old_state_events
-        else:
-            # We need to get the auth events from somewhere.
+        logger.debug(
+            "_handle_new_event: Before auth fetch: %s, sigs: %s",
+            event.event_id, event.signatures,
+        )
 
-            # TODO: Don't just hit the DBs?
+        is_new_state = not event.internal_metadata.is_outlier()
 
-            auth_events = {}
-            for e_id, _ in event.auth_events:
+        known_ids = set(
+            [s.event_id for s in context.auth_events.values()]
+        )
+        for e_id, _ in event.auth_events:
+            if e_id not in known_ids:
                 e = yield self.store.get_event(
-                    e_id,
-                    allow_none=True,
+                    e_id, allow_none=True,
                 )
 
                 if not e:
-                    e = yield self.replication_layer.get_pdu(
-                        event.origin, e_id, outlier=True
+                    # TODO: Do some conflict res to make sure that we're
+                    # not the ones who are wrong.
+                    logger.info(
+                        "Rejecting %s as %s not in db or %s",
+                        event.event_id, e_id, known_ids,
                     )
+                    # FIXME: How does raising AuthError work with federation?
+                    raise AuthError(403, "Auth events are stale")
 
-                    if e and fetch_missing:
-                        try:
-                            yield self.on_receive_pdu(event.origin, e, False)
-                        except:
-                            logger.exception(
-                                "Failed to parse auth event %s",
-                                e_id,
-                            )
+                context.auth_events[(e.type, e.state_key)] = e
 
-                if not e:
-                    logger.warn("Can't find auth event %s.", e_id)
+        logger.debug(
+            "_handle_new_event: Before hack: %s, sigs: %s",
+            event.event_id, event.signatures,
+        )
 
-                auth_events[(e.type, e.state_key)] = e
+        if event.type == EventTypes.Member and not event.auth_events:
+            if len(event.prev_events) == 1:
+                c = yield self.store.get_event(event.prev_events[0][0])
+                if c.type == EventTypes.Create:
+                    context.auth_events[(c.type, c.state_key)] = c
 
-            if event.type == RoomMemberEvent.TYPE and not event.auth_events:
-                if len(event.prev_events) == 1:
-                    c = yield self.store.get_event(event.prev_events[0][0])
-                    if c.type == RoomCreateEvent.TYPE:
-                        auth_events[(c.type, c.state_key)] = c
+        logger.debug(
+            "_handle_new_event: Before auth check: %s, sigs: %s",
+            event.event_id, event.signatures,
+        )
 
-        self.auth.check(event, auth_events=auth_events)
+        self.auth.check(event, auth_events=context.auth_events)
+
+        logger.debug(
+            "_handle_new_event: Before persist_event: %s, sigs: %s",
+            event.event_id, event.signatures,
+        )
 
         yield self.store.persist_event(
             event,
+            context=context,
             backfilled=backfilled,
             is_new_state=(is_new_state and not backfilled),
             current_state=current_state,
         )
+
+        logger.debug(
+            "_handle_new_event: After persist_event: %s, sigs: %s",
+            event.event_id, event.signatures,
+        )
+
+        defer.returnValue(context)