summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorMatthew Hodgson <matthew@matrix.org>2014-11-13 11:58:54 +0200
committerMatthew Hodgson <matthew@matrix.org>2014-11-13 11:58:54 +0200
commit28408a9f64dc889078711b0e2bae5cff4c90c91e (patch)
tree35ea17e51de82925f171bb15d4b9876d67b3bf9c /synapse
parentvarious fixes based on truphone feedback (diff)
parentDetect OpenWebRTC and add workarounds, but comment out the turn server remova... (diff)
downloadsynapse-28408a9f64dc889078711b0e2bae5cff4c90c91e.tar.xz
Merge branch 'develop' of git+ssh://github.com/matrix-org/synapse into develop
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/federation.py32
-rw-r--r--synapse/storage/__init__.py15
-rw-r--r--synapse/storage/_base.py7
-rw-r--r--synapse/storage/event_federation.py31
-rw-r--r--synapse/storage/schema/event_signatures.sql6
-rw-r--r--synapse/storage/schema/state.sql13
-rw-r--r--synapse/storage/signatures.py24
-rw-r--r--synapse/storage/state.py84
8 files changed, 152 insertions, 60 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 99655c8bb0..5e096f4652 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -40,6 +40,8 @@ class FederationHandler(BaseHandler):
         of the home server (including auth and state conflict resoultion)
         b) converting events that were produced by local clients that may need
         to be sent to remote home servers.
+        c) doing the necessary dances to invite remote users and join remote
+        rooms.
     """
 
     def __init__(self, hs):
@@ -102,6 +104,8 @@ class FederationHandler(BaseHandler):
 
         logger.debug("Got event: %s", event.event_id)
 
+        # If we are currently in the process of joining this room, then we
+        # queue up events for later processing.
         if event.room_id in self.room_queues:
             self.room_queues[event.room_id].append(pdu)
             return
@@ -187,6 +191,8 @@ class FederationHandler(BaseHandler):
     @log_function
     @defer.inlineCallbacks
     def backfill(self, dest, room_id, limit):
+        """ Trigger a backfill request to `dest` for the given `room_id`
+        """
         extremities = yield self.store.get_oldest_events_in_room(room_id)
 
         pdus = yield self.replication_layer.backfill(
@@ -212,6 +218,10 @@ class FederationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def send_invite(self, target_host, event):
+        """ Sends the invite to the remote server for signing.
+
+        Invites must be signed by the invitee's server before distribution.
+        """
         pdu = yield self.replication_layer.send_invite(
             destination=target_host,
             context=event.room_id,
@@ -229,6 +239,17 @@ class FederationHandler(BaseHandler):
     @log_function
     @defer.inlineCallbacks
     def do_invite_join(self, target_host, room_id, joinee, content, snapshot):
+        """ Attempts to join the `joinee` to the room `room_id` via the
+        server `target_host`.
+
+        This first triggers a /make_join/ request that returns a partial
+        event that we can fill out and sign. This is then sent to the
+        remote server via /send_join/ which responds with the state at that
+        event and the auth_chains.
+
+        We suspend processing of any received events from this room until we
+        have finished processing the join.
+        """
         pdu = yield self.replication_layer.make_join(
             target_host,
             room_id,
@@ -313,6 +334,10 @@ class FederationHandler(BaseHandler):
     @defer.inlineCallbacks
     @log_function
     def on_make_join_request(self, context, user_id):
+        """ We've received a /make_join/ request, so we create a partial
+        join event for the room and return that. We don *not* persist or
+        process it until the other server has signed it and sent it back.
+        """
         event = self.event_factory.create_event(
             etype=RoomMemberEvent.TYPE,
             content={"membership": Membership.JOIN},
@@ -335,6 +360,9 @@ class FederationHandler(BaseHandler):
     @defer.inlineCallbacks
     @log_function
     def on_send_join_request(self, origin, pdu):
+        """ We have received a join event for a room. Fully process it and
+        respond with the current state and auth chains.
+        """
         event = self.pdu_codec.event_from_pdu(pdu)
 
         event.outlier = False
@@ -403,6 +431,10 @@ class FederationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def on_invite_request(self, origin, pdu):
+        """ We've got an invite event. Process and persist it. Sign it.
+
+        Respond with the now signed event.
+        """
         event = self.pdu_codec.event_from_pdu(pdu)
 
         event.outlier = True
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 72290eb5a0..d8f351a675 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -279,13 +279,14 @@ class DataStore(RoomMemberStore, RoomStore,
             )
 
         if hasattr(event, "signatures"):
-            signatures = event.signatures.get(event.origin, {})
-
-            for key_id, signature_base64 in signatures.items():
-                signature_bytes = decode_base64(signature_base64)
-                self._store_event_origin_signature_txn(
-                    txn, event.event_id, event.origin, key_id, signature_bytes,
-                )
+            logger.debug("sigs: %s", event.signatures)
+            for name, sigs in event.signatures.items():
+                for key_id, signature_base64 in sigs.items():
+                    signature_bytes = decode_base64(signature_base64)
+                    self._store_event_signature_txn(
+                        txn, event.event_id, name, key_id,
+                        signature_bytes,
+                    )
 
         for prev_event_id, prev_hashes in event.prev_events:
             for alg, hash_base64 in prev_hashes.items():
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index a1ee0318f6..670387b04a 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -470,12 +470,15 @@ class SQLBaseStore(object):
         select_event_sql = "SELECT * FROM events WHERE event_id = ?"
 
         for i, ev in enumerate(events):
-            signatures = self._get_event_origin_signatures_txn(
+            signatures = self._get_event_signatures_txn(
                 txn, ev.event_id,
             )
 
             ev.signatures = {
-                k: encode_base64(v) for k, v in signatures.items()
+                n: {
+                    k: encode_base64(v) for k, v in s.items()
+                }
+                for n, s in signatures.items()
             }
 
             prevs = self._get_prev_events_and_state(txn, ev.event_id)
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index a027db3868..6c559f8f63 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -23,6 +23,14 @@ logger = logging.getLogger(__name__)
 
 
 class EventFederationStore(SQLBaseStore):
+    """ Responsible for storing and serving up the various graphs associated
+    with an event. Including the main event graph and the auth chains for an
+    event.
+
+    Also has methods for getting the front (latest) and back (oldest) edges
+    of the event graphs. These are used to generate the parents for new events
+    and backfilling from another server respectively.
+    """
 
     def get_auth_chain(self, event_id):
         return self.runInteraction(
@@ -205,6 +213,8 @@ class EventFederationStore(SQLBaseStore):
         return results
 
     def get_min_depth(self, room_id):
+        """ For hte given room, get the minimum depth we have seen for it.
+        """
         return self.runInteraction(
             "get_min_depth",
             self._get_min_depth_interaction,
@@ -240,6 +250,10 @@ class EventFederationStore(SQLBaseStore):
 
     def _handle_prev_events(self, txn, outlier, event_id, prev_events,
                             room_id):
+        """
+        For the given event, update the event edges table and forward and
+        backward extremities tables.
+        """
         for e_id, _ in prev_events:
             # TODO (erikj): This could be done as a bulk insert
             self._simple_insert_txn(
@@ -267,8 +281,8 @@ class EventFederationStore(SQLBaseStore):
                     }
                 )
 
-            # We only insert as a forward extremity the new pdu if there are
-            # no other pdus that reference it as a prev pdu
+            # We only insert as a forward extremity the new event if there are
+            # no other events that reference it as a prev event
             query = (
                 "INSERT OR IGNORE INTO %(table)s (event_id, room_id) "
                 "SELECT ?, ? WHERE NOT EXISTS ("
@@ -284,7 +298,7 @@ class EventFederationStore(SQLBaseStore):
 
             txn.execute(query, (event_id, room_id, event_id))
 
-            # Insert all the prev_pdus as a backwards thing, they'll get
+            # Insert all the prev_events as a backwards thing, they'll get
             # deleted in a second if they're incorrect anyway.
             for e_id, _ in prev_events:
                 # TODO (erikj): This could be done as a bulk insert
@@ -299,7 +313,7 @@ class EventFederationStore(SQLBaseStore):
                 )
 
             # Also delete from the backwards extremities table all ones that
-            # reference pdus that we have already seen
+            # reference events that we have already seen
             query = (
                 "DELETE FROM event_backward_extremities WHERE EXISTS ("
                 "SELECT 1 FROM events "
@@ -311,17 +325,14 @@ class EventFederationStore(SQLBaseStore):
             txn.execute(query)
 
     def get_backfill_events(self, room_id, event_list, limit):
-        """Get a list of Events for a given topic that occured before (and
-        including) the pdus in pdu_list. Return a list of max size `limit`.
+        """Get a list of Events for a given topic that occurred before (and
+        including) the events in event_list. Return a list of max size `limit`
 
         Args:
             txn
             room_id (str)
             event_list (list)
             limit (int)
-
-        Return:
-            list: A list of PduTuples
         """
         return self.runInteraction(
             "get_backfill_events",
@@ -334,7 +345,6 @@ class EventFederationStore(SQLBaseStore):
             room_id, repr(event_list), limit
         )
 
-        # We seed the pdu_results with the things from the pdu_list.
         event_results = event_list
 
         front = event_list
@@ -373,5 +383,4 @@ class EventFederationStore(SQLBaseStore):
             front = new_front
             event_results += new_front
 
-        # We also want to update the `prev_pdus` attributes before returning.
         return self._get_events_txn(txn, event_results)
diff --git a/synapse/storage/schema/event_signatures.sql b/synapse/storage/schema/event_signatures.sql
index 5491c7ecec..4efa8a3e63 100644
--- a/synapse/storage/schema/event_signatures.sql
+++ b/synapse/storage/schema/event_signatures.sql
@@ -37,15 +37,15 @@ CREATE INDEX IF NOT EXISTS event_reference_hashes_id ON event_reference_hashes (
 );
 
 
-CREATE TABLE IF NOT EXISTS event_origin_signatures (
+CREATE TABLE IF NOT EXISTS event_signatures (
     event_id TEXT,
-    origin TEXT,
+    signature_name TEXT,
     key_id TEXT,
     signature BLOB,
     CONSTRAINT uniqueness UNIQUE (event_id, key_id)
 );
 
-CREATE INDEX IF NOT EXISTS event_origin_signatures_id ON event_origin_signatures (
+CREATE INDEX IF NOT EXISTS event_signatures_id ON event_signatures (
     event_id
 );
 
diff --git a/synapse/storage/schema/state.sql b/synapse/storage/schema/state.sql
index b44c56b519..44f7aafb27 100644
--- a/synapse/storage/schema/state.sql
+++ b/synapse/storage/schema/state.sql
@@ -30,4 +30,17 @@ CREATE TABLE IF NOT EXISTS state_groups_state(
 CREATE TABLE IF NOT EXISTS event_to_state_groups(
     event_id TEXT NOT NULL,
     state_group INTEGER NOT NULL
+);
+
+CREATE INDEX IF NOT EXISTS state_groups_id ON state_groups(id);
+
+CREATE INDEX IF NOT EXISTS state_groups_state_id ON state_groups_state(
+    state_group
+);
+CREATE INDEX IF NOT EXISTS state_groups_state_tuple ON state_groups_state(
+    room_id, type, state_key
+);
+
+CREATE INDEX IF NOT EXISTS event_to_state_groups_id ON event_to_state_groups(
+    event_id
 );
\ No newline at end of file
diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py
index 84a49088a2..d90e08fff1 100644
--- a/synapse/storage/signatures.py
+++ b/synapse/storage/signatures.py
@@ -103,24 +103,30 @@ class SignatureStore(SQLBaseStore):
             or_ignore=True,
         )
 
-
-    def _get_event_origin_signatures_txn(self, txn, event_id):
+    def _get_event_signatures_txn(self, txn, event_id):
         """Get all the signatures for a given PDU.
         Args:
             txn (cursor):
             event_id (str): Id for the Event.
         Returns:
-            A dict of key_id -> signature_bytes.
+            A dict of sig name -> dict(key_id -> signature_bytes)
         """
         query = (
-            "SELECT key_id, signature"
-            " FROM event_origin_signatures"
+            "SELECT signature_name, key_id, signature"
+            " FROM event_signatures"
             " WHERE event_id = ? "
         )
         txn.execute(query, (event_id, ))
-        return dict(txn.fetchall())
+        rows = txn.fetchall()
+
+        res = {}
+
+        for name, key, sig in rows:
+            res.setdefault(name, {})[key] = sig
+
+        return res
 
-    def _store_event_origin_signature_txn(self, txn, event_id, origin, key_id,
+    def _store_event_signature_txn(self, txn, event_id, signature_name, key_id,
                                           signature_bytes):
         """Store a signature from the origin server for a PDU.
         Args:
@@ -132,10 +138,10 @@ class SignatureStore(SQLBaseStore):
         """
         self._simple_insert_txn(
             txn,
-            "event_origin_signatures",
+            "event_signatures",
             {
                 "event_id": event_id,
-                "origin": origin,
+                "signature_name": signature_name,
                 "key_id": key_id,
                 "signature": buffer(signature_bytes),
             },
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 2f3a70b4e5..55ea567793 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -14,43 +14,71 @@
 # limitations under the License.
 
 from ._base import SQLBaseStore
-from twisted.internet import defer
 
 
 class StateStore(SQLBaseStore):
+    """ Keeps track of the state at a given event.
+
+    This is done by the concept of `state groups`. Every event is a assigned
+    a state group (identified by an arbitrary string), which references a
+    collection of state events. The current state of an event is then the
+    collection of state events referenced by the event's state group.
+
+    Hence, every change in the current state causes a new state group to be
+    generated. However, if no change happens (e.g., if we get a message event
+    with only one parent it inherits the state group from its parent.)
+
+    There are three tables:
+      * `state_groups`: Stores group name, first event with in the group and
+        room id.
+      * `event_to_state_groups`: Maps events to state groups.
+      * `state_groups_state`: Maps state group to state events.
+    """
 
-    @defer.inlineCallbacks
     def get_state_groups(self, event_ids):
-        groups = set()
-        for event_id in event_ids:
-            group = yield self._simple_select_one_onecol(
-                table="event_to_state_groups",
-                keyvalues={"event_id": event_id},
-                retcol="state_group",
-                allow_none=True,
-            )
-            if group:
-                groups.add(group)
-
-        res = {}
-        for group in groups:
-            state_ids = yield self._simple_select_onecol(
-                table="state_groups_state",
-                keyvalues={"state_group": group},
-                retcol="event_id",
-            )
-            state = []
-            for state_id in state_ids:
-                s = yield self.get_event(
-                    state_id,
+        """ Get the state groups for the given list of event_ids
+
+        The return value is a dict mapping group names to lists of events.
+        """
+
+        def f(txn):
+            groups = set()
+            for event_id in event_ids:
+                group = self._simple_select_one_onecol_txn(
+                    txn,
+                    table="event_to_state_groups",
+                    keyvalues={"event_id": event_id},
+                    retcol="state_group",
                     allow_none=True,
                 )
-                if s:
-                    state.append(s)
+                if group:
+                    groups.add(group)
 
-            res[group] = state
+            res = {}
+            for group in groups:
+                state_ids = self._simple_select_onecol_txn(
+                    txn,
+                    table="state_groups_state",
+                    keyvalues={"state_group": group},
+                    retcol="event_id",
+                )
+                state = []
+                for state_id in state_ids:
+                    s = self._get_events_txn(
+                        txn,
+                        [state_id],
+                    )
+                    if s:
+                        state.extend(s)
+
+                res[group] = state
 
-        defer.returnValue(res)
+            return res
+
+        return self.runInteraction(
+            "get_state_groups",
+            f,
+        )
 
     def store_state_groups(self, event):
         return self.runInteraction(