summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2014-10-29 16:59:24 +0000
committerErik Johnston <erik@matrix.org>2014-10-29 16:59:24 +0000
commite7858b6d7ef37849a3d2d5004743cdd21ec330a8 (patch)
treebe917c0b61075a287f169ec5210a0b2dab563603
parentDon't reference PDU when persisting event (diff)
downloadsynapse-e7858b6d7ef37849a3d2d5004743cdd21ec330a8.tar.xz
Start filling out and using new events tables
-rw-r--r--synapse/federation/pdu_codec.py12
-rw-r--r--synapse/handlers/_base.py4
-rw-r--r--synapse/handlers/federation.py90
-rw-r--r--synapse/state.py11
-rw-r--r--synapse/storage/__init__.py45
-rw-r--r--synapse/storage/_base.py33
-rw-r--r--synapse/storage/event_federation.py49
-rw-r--r--synapse/storage/schema/event_edges.sql8
8 files changed, 159 insertions, 93 deletions
diff --git a/synapse/federation/pdu_codec.py b/synapse/federation/pdu_codec.py
index 2cd591410b..dccbccb85b 100644
--- a/synapse/federation/pdu_codec.py
+++ b/synapse/federation/pdu_codec.py
@@ -48,8 +48,8 @@ class PduCodec(object):
         kwargs["room_id"] = pdu.context
         kwargs["etype"] = pdu.pdu_type
         kwargs["prev_events"] = [
-            encode_event_id(i, o)
-            for i, o in pdu.prev_pdus
+            (encode_event_id(i, o), s)
+            for i, o, s in pdu.prev_pdus
         ]
 
         if hasattr(pdu, "prev_state_id") and hasattr(pdu, "prev_state_origin"):
@@ -82,7 +82,13 @@ class PduCodec(object):
         d["pdu_type"] = event.type
 
         if hasattr(event, "prev_events"):
-            d["prev_pdus"] = [decode_event_id(e) for e in event.prev_events]
+            def f(e, s):
+                i, o = decode_event_id(e, self.server_name)
+                return i, o, s
+            d["prev_pdus"] = [
+                f(e, s)
+                for e, s in event.prev_events
+            ]
 
         if hasattr(event, "prev_state"):
             d["prev_state_id"], d["prev_state_origin"] = (
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index cd6c35f194..787a01efc5 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -16,6 +16,8 @@
 from twisted.internet import defer
 from synapse.api.errors import LimitExceededError
 
+from synapse.util.async import run_on_reactor
+
 class BaseHandler(object):
 
     def __init__(self, hs):
@@ -45,6 +47,8 @@ class BaseHandler(object):
     @defer.inlineCallbacks
     def _on_new_room_event(self, event, snapshot, extra_destinations=[],
                            extra_users=[], suppress_auth=False):
+        yield run_on_reactor()
+
         snapshot.fill_out_prev_events(event)
 
         yield self.state_handler.annotate_state_groups(event)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index b575986fc3..5f86ed03fa 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -22,6 +22,7 @@ from synapse.api.constants import Membership
 from synapse.util.logutils import log_function
 from synapse.federation.pdu_codec import PduCodec, encode_event_id
 from synapse.api.errors import SynapseError
+from synapse.util.async import run_on_reactor
 
 from twisted.internet import defer, reactor
 
@@ -81,6 +82,8 @@ class FederationHandler(BaseHandler):
             processing.
         """
 
+        yield run_on_reactor()
+
         pdu = self.pdu_codec.pdu_from_event(event)
 
         if not hasattr(pdu, "destinations") or not pdu.destinations:
@@ -102,6 +105,8 @@ class FederationHandler(BaseHandler):
             self.room_queues[event.room_id].append(pdu)
             return
 
+        logger.debug("Processing event: %s", event.event_id)
+
         if state:
             state = [self.pdu_codec.event_from_pdu(p) for p in state]
 
@@ -216,58 +221,65 @@ class FederationHandler(BaseHandler):
         assert(event.state_key == joinee)
         assert(event.room_id == room_id)
 
-        self.room_queues[room_id] = []
-
-        event.event_id = self.event_factory.create_event_id()
-        event.content = content
+        event.outlier = False
 
-        state = yield self.replication_layer.send_join(
-            target_host,
-            self.pdu_codec.pdu_from_event(event)
-        )
+        self.room_queues[room_id] = []
 
-        state = [self.pdu_codec.event_from_pdu(p) for p in state]
+        try:
+            event.event_id = self.event_factory.create_event_id()
+            event.content = content
 
-        logger.debug("do_invite_join state: %s", state)
+            state = yield self.replication_layer.send_join(
+                target_host,
+                self.pdu_codec.pdu_from_event(event)
+            )
 
-        is_new_state = yield self.state_handler.annotate_state_groups(
-            event,
-            state=state
-        )
+            state = [self.pdu_codec.event_from_pdu(p) for p in state]
 
-        try:
-            yield self.store.store_room(
-                room_id=room_id,
-                room_creator_user_id="",
-                is_public=False
-            )
-        except:
-            # FIXME
-            pass
+            logger.debug("do_invite_join state: %s", state)
 
-        for e in state:
-            # FIXME: Auth these.
             is_new_state = yield self.state_handler.annotate_state_groups(
-                e,
+                event,
+                state=state
             )
 
+            logger.debug("do_invite_join event: %s", event)
+
+            try:
+                yield self.store.store_room(
+                    room_id=room_id,
+                    room_creator_user_id="",
+                    is_public=False
+                )
+            except:
+                # FIXME
+                pass
+
+            for e in state:
+                # FIXME: Auth these.
+                e.outlier = True
+
+                yield self.state_handler.annotate_state_groups(
+                    e,
+                )
+
+                yield self.store.persist_event(
+                    e,
+                    backfilled=False,
+                    is_new_state=False
+                )
+
             yield self.store.persist_event(
-                e,
+                event,
                 backfilled=False,
-                is_new_state=False
+                is_new_state=is_new_state
             )
+        finally:
+            room_queue = self.room_queues[room_id]
+            del self.room_queues[room_id]
 
-        yield self.store.persist_event(
-            event,
-            backfilled=False,
-            is_new_state=is_new_state
-        )
-
-        room_queue = self.room_queues[room_id]
-        del self.room_queues[room_id]
-
-        for p in room_queue:
-            yield self.on_receive_pdu(p, backfilled=False)
+            for p in room_queue:
+                yield self.on_receive_pdu(p, backfilled=False)
 
         defer.returnValue(True)
 
diff --git a/synapse/state.py b/synapse/state.py
index cc6a7db96b..993c4f18d3 100644
--- a/synapse/state.py
+++ b/synapse/state.py
@@ -143,7 +143,9 @@ class StateHandler(object):
             defer.returnValue(False)
             return
 
-        new_state = yield self.resolve_state_groups(event.prev_events)
+        new_state = yield self.resolve_state_groups(
+            [e for e, _ in event.prev_events]
+        )
 
         event.old_state_events = copy.deepcopy(new_state)
 
@@ -157,12 +159,11 @@ class StateHandler(object):
 
     @defer.inlineCallbacks
     def get_current_state(self, room_id, event_type=None, state_key=""):
-        # FIXME: HACK!
-        pdus = yield self.store.get_latest_pdus_in_context(room_id)
+        events = yield self.store.get_latest_events_in_room(room_id)
 
         event_ids = [
-            encode_event_id(pdu_id, origin)
-            for pdu_id, origin, _ in pdus
+            e_id
+            for e_id, _ in events
         ]
 
         res = yield self.resolve_state_groups(event_ids)
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index f89e518690..d75c366834 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -71,6 +71,7 @@ SCHEMAS = [
     "state",
     "signatures",
     "event_edges",
+    "event_signatures",
 ]
 
 
@@ -134,7 +135,8 @@ class DataStore(RoomMemberStore, RoomStore,
                 "type",
                 "room_id",
                 "content",
-                "unrecognized_keys"
+                "unrecognized_keys",
+                "depth",
             ],
             allow_none=allow_none,
         )
@@ -263,7 +265,12 @@ class DataStore(RoomMemberStore, RoomStore,
         vals["unrecognized_keys"] = json.dumps(unrec)
 
         try:
-            self._simple_insert_txn(txn, "events", vals)
+            self._simple_insert_txn(
+                txn,
+                "events",
+                vals,
+                or_replace=(not outlier),
+            )
         except:
             logger.warn(
                 "Failed to persist, probably duplicate: %s",
@@ -307,13 +314,14 @@ class DataStore(RoomMemberStore, RoomStore,
                 }
             )
 
-        signatures = event.signatures.get(event.origin, {})
+        if hasattr(event, "signatures"):
+            signatures = event.signatures.get(event.origin, {})
 
-        for key_id, signature_base64 in signatures.items():
-            signature_bytes = decode_base64(signature_base64)
-            self._store_event_origin_signature_txn(
-                txn, event.event_id, key_id, signature_bytes,
-            )
+            for key_id, signature_base64 in signatures.items():
+                signature_bytes = decode_base64(signature_base64)
+                self._store_event_origin_signature_txn(
+                    txn, event.event_id, event.origin, key_id, signature_bytes,
+                )
 
         for prev_event_id, prev_hashes in event.prev_events:
             for alg, hash_base64 in prev_hashes.items():
@@ -323,10 +331,10 @@ class DataStore(RoomMemberStore, RoomStore,
                 )
 
         # TODO
-        (ref_alg, ref_hash_bytes) = compute_pdu_event_reference_hash(pdu)
-        self._store_event_reference_hash_txn(
-            txn, event.event_id, ref_alg, ref_hash_bytes
-        )
+        # (ref_alg, ref_hash_bytes) = compute_pdu_event_reference_hash(pdu)
+        # self._store_event_reference_hash_txn(
+        #    txn, event.event_id, ref_alg, ref_hash_bytes
+        # )
 
         self._update_min_depth_for_room_txn(txn, event.room_id, event.depth)
 
@@ -412,9 +420,7 @@ class DataStore(RoomMemberStore, RoomStore,
         """
         def _snapshot(txn):
             membership_state = self._get_room_member(txn, user_id, room_id)
-            prev_events = self._get_latest_events_in_room(
-                txn, room_id
-            )
+            prev_events = self._get_latest_events_in_room(txn, room_id)
 
             if state_type is not None and state_key is not None:
                 prev_state_pdu = self._get_current_state_pdu(
@@ -469,12 +475,12 @@ class Snapshot(object):
             return
 
         event.prev_events = [
-            (p_id, origin, hashes)
-            for p_id, origin, hashes, _ in self.prev_events
+            (event_id, hashes)
+            for event_id, hashes, _ in self.prev_events
         ]
 
         if self.prev_events:
-            event.depth = max([int(v) for _, _, _, v in self.prev_events]) + 1
+            event.depth = max([int(v) for _, _, v in self.prev_events]) + 1
         else:
             event.depth = 0
 
@@ -533,9 +539,10 @@ def prepare_database(db_conn):
             db_conn.commit()
 
     else:
-        sql_script = "BEGIN TRANSACTION;"
+        sql_script = "BEGIN TRANSACTION;\n"
         for sql_loc in SCHEMAS:
             sql_script += read_schema(sql_loc)
+            sql_script += "\n"
         sql_script += "COMMIT TRANSACTION;"
         c.executescript(sql_script)
         db_conn.commit()
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 30732caa83..464b12f032 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -19,10 +19,12 @@ from twisted.internet import defer
 from synapse.api.errors import StoreError
 from synapse.api.events.utils import prune_event
 from synapse.util.logutils import log_function
+from syutil.base64util import encode_base64
 
 import collections
 import copy
 import json
+import sys
 import time
 
 
@@ -67,6 +69,9 @@ class LoggingTransaction(object):
             return self.txn.execute(
                 sql, *args, **kwargs
             )
+        except:
+                logger.exception("[SQL FAIL] {%s}", self.name)
+                raise
         finally:
             end = time.clock() * 1000
             sql_logger.debug("[SQL time] {%s} %f", self.name, end - start)
@@ -85,14 +90,20 @@ class SQLBaseStore(object):
         """Wraps the .runInteraction() method on the underlying db_pool."""
         def inner_func(txn, *args, **kwargs):
             start = time.clock() * 1000
-            txn_id = str(SQLBaseStore._TXN_ID)
-            SQLBaseStore._TXN_ID += 1
+            txn_id = SQLBaseStore._TXN_ID
+
+            # We don't really need these to be unique, so lets stop it from
+            # growing really large.
+            self._TXN_ID = (self._TXN_ID + 1) % (sys.maxint - 1)
 
-            name = "%s-%s" % (desc, txn_id, )
+            name = "%s-%x" % (desc, txn_id, )
 
             transaction_logger.debug("[TXN START] {%s}", name)
             try:
                 return func(LoggingTransaction(txn, name), *args, **kwargs)
+            except:
+                logger.exception("[TXN FAIL] {%s}", name)
+                raise
             finally:
                 end = time.clock() * 1000
                 transaction_logger.debug(
@@ -189,7 +200,6 @@ class SQLBaseStore(object):
               statement returns no rows
         """
         return self._simple_selectupdate_one(
-            "_simple_select_one",
             table, keyvalues, retcols=retcols, allow_none=allow_none
         )
 
@@ -215,11 +225,11 @@ class SQLBaseStore(object):
             txn,
             table=table,
             keyvalues=keyvalues,
-            retcols=retcol,
+            retcol=retcol,
         )
 
         if ret:
-            return ret[retcol]
+            return ret[0]
         else:
             if allow_none:
                 return None
@@ -434,6 +444,17 @@ class SQLBaseStore(object):
         sql = "SELECT * FROM events WHERE event_id = ?"
 
         for ev in events:
+            signatures = self._get_event_origin_signatures_txn(
+                txn, ev.event_id,
+            )
+
+            ev.signatures = {
+                k: encode_base64(v) for k, v in signatures.items()
+            }
+
+            prev_events = self._get_latest_events_in_room(txn, ev.room_id)
+            ev.prev_events = [(e_id, s,) for e_id, s, _ in prev_events]
+
             if hasattr(ev, "prev_state"):
                 # Load previous state_content.
                 # TODO: Should we be pulling this out above?
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 7688fc550f..5f94c31818 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -24,6 +24,13 @@ logger = logging.getLogger(__name__)
 
 class EventFederationStore(SQLBaseStore):
 
+    def get_latest_events_in_room(self, room_id):
+        return self.runInteraction(
+            "get_latest_events_in_room",
+            self._get_latest_events_in_room,
+            room_id,
+        )
+
     def _get_latest_events_in_room(self, txn, room_id):
         self._simple_select_onecol_txn(
             txn,
@@ -34,12 +41,25 @@ class EventFederationStore(SQLBaseStore):
             retcol="event_id",
         )
 
+        sql = (
+            "SELECT e.event_id, e.depth FROM events as e "
+            "INNER JOIN event_forward_extremities as f "
+            "ON e.event_id = f.event_id "
+            "WHERE f.room_id = ?"
+        )
+
+        txn.execute(sql, (room_id, ))
+
         results = []
-        for pdu_id, origin, depth in txn.fetchall():
-            hashes = self._get_prev_event_hashes_txn(txn, pdu_id, origin)
-            sha256_bytes = hashes["sha256"]
-            prev_hashes = {"sha256": encode_base64(sha256_bytes)}
-            results.append((pdu_id, origin, prev_hashes, depth))
+        for event_id, depth in txn.fetchall():
+            hashes = self._get_prev_event_hashes_txn(txn, event_id)
+            prev_hashes = {
+                k: encode_base64(v) for k, v in hashes.items()
+                if k == "sha256"
+            }
+            results.append((event_id, prev_hashes, depth))
+
+        return results
 
     def _get_min_depth_interaction(self, txn, room_id):
         min_depth = self._simple_select_one_onecol_txn(
@@ -70,21 +90,21 @@ class EventFederationStore(SQLBaseStore):
 
     def _handle_prev_events(self, txn, outlier, event_id, prev_events,
                             room_id):
-        for e_id in prev_events:
+        for e_id, _ in prev_events:
             # TODO (erikj): This could be done as a bulk insert
             self._simple_insert_txn(
                 txn,
                 table="event_edges",
                 values={
                     "event_id": event_id,
-                    "prev_event": e_id,
+                    "prev_event_id": e_id,
                     "room_id": room_id,
                 }
             )
 
         # Update the extremities table if this is not an outlier.
         if not outlier:
-            for e_id in prev_events:
+            for e_id, _ in prev_events:
                 # TODO (erikj): This could be done as a bulk insert
                 self._simple_delete_txn(
                     txn,
@@ -116,7 +136,7 @@ class EventFederationStore(SQLBaseStore):
 
             # Insert all the prev_pdus as a backwards thing, they'll get
             # deleted in a second if they're incorrect anyway.
-            for e_id in prev_events:
+            for e_id, _ in prev_events:
                 # TODO (erikj): This could be done as a bulk insert
                 self._simple_insert_txn(
                     txn,
@@ -130,14 +150,11 @@ class EventFederationStore(SQLBaseStore):
             # Also delete from the backwards extremities table all ones that
             # reference pdus that we have already seen
             query = (
-                "DELETE FROM %(event_back)s as b WHERE EXISTS ("
-                "SELECT 1 FROM %(events)s AS events "
+                "DELETE FROM event_backward_extremities WHERE EXISTS ("
+                "SELECT 1 FROM events "
                 "WHERE "
-                "b.event_id = events.event_id "
+                "event_backward_extremities.event_id = events.event_id "
                 "AND not events.outlier "
                 ")"
-            ) % {
-                "event_back": "event_backward_extremities",
-                "events": "events",
-            }
+            )
             txn.execute(query)
\ No newline at end of file
diff --git a/synapse/storage/schema/event_edges.sql b/synapse/storage/schema/event_edges.sql
index 6a28314ece..e5f768c705 100644
--- a/synapse/storage/schema/event_edges.sql
+++ b/synapse/storage/schema/event_edges.sql
@@ -7,7 +7,7 @@ CREATE TABLE IF NOT EXISTS event_forward_extremities(
 
 CREATE INDEX IF NOT EXISTS ev_extrem_room ON event_forward_extremities(room_id);
 CREATE INDEX IF NOT EXISTS ev_extrem_id ON event_forward_extremities(event_id);
---
+
 
 CREATE TABLE IF NOT EXISTS event_backward_extremities(
     event_id TEXT,
@@ -17,7 +17,7 @@ CREATE TABLE IF NOT EXISTS event_backward_extremities(
 
 CREATE INDEX IF NOT EXISTS ev_b_extrem_room ON event_backward_extremities(room_id);
 CREATE INDEX IF NOT EXISTS ev_b_extrem_id ON event_backward_extremities(event_id);
---
+
 
 CREATE TABLE IF NOT EXISTS event_edges(
     event_id TEXT,
@@ -28,7 +28,6 @@ CREATE TABLE IF NOT EXISTS event_edges(
 
 CREATE INDEX IF NOT EXISTS ev_edges_id ON event_edges(event_id);
 CREATE INDEX IF NOT EXISTS ev_edges_prev_id ON event_edges(prev_event_id);
---
 
 
 CREATE TABLE IF NOT EXISTS room_depth(
@@ -38,7 +37,7 @@ CREATE TABLE IF NOT EXISTS room_depth(
 );
 
 CREATE INDEX IF NOT EXISTS room_depth_room ON room_depth(room_id);
---
+
 
 create TABLE IF NOT EXISTS event_destinations(
     event_id TEXT,
@@ -48,4 +47,3 @@ create TABLE IF NOT EXISTS event_destinations(
 );
 
 CREATE INDEX IF NOT EXISTS event_destinations_id ON event_destinations(event_id);
---
\ No newline at end of file