summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/__init__.py45
-rw-r--r--synapse/storage/_base.py33
-rw-r--r--synapse/storage/event_federation.py49
-rw-r--r--synapse/storage/schema/event_edges.sql8
4 files changed, 89 insertions, 46 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index f89e518690..d75c366834 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -71,6 +71,7 @@ SCHEMAS = [
     "state",
     "signatures",
     "event_edges",
+    "event_signatures",
 ]
 
 
@@ -134,7 +135,8 @@ class DataStore(RoomMemberStore, RoomStore,
                 "type",
                 "room_id",
                 "content",
-                "unrecognized_keys"
+                "unrecognized_keys",
+                "depth",
             ],
             allow_none=allow_none,
         )
@@ -263,7 +265,12 @@ class DataStore(RoomMemberStore, RoomStore,
         vals["unrecognized_keys"] = json.dumps(unrec)
 
         try:
-            self._simple_insert_txn(txn, "events", vals)
+            self._simple_insert_txn(
+                txn,
+                "events",
+                vals,
+                or_replace=(not outlier),
+            )
         except:
             logger.warn(
                 "Failed to persist, probably duplicate: %s",
@@ -307,13 +314,14 @@ class DataStore(RoomMemberStore, RoomStore,
                 }
             )
 
-        signatures = event.signatures.get(event.origin, {})
+        if hasattr(event, "signatures"):
+            signatures = event.signatures.get(event.origin, {})
 
-        for key_id, signature_base64 in signatures.items():
-            signature_bytes = decode_base64(signature_base64)
-            self._store_event_origin_signature_txn(
-                txn, event.event_id, key_id, signature_bytes,
-            )
+            for key_id, signature_base64 in signatures.items():
+                signature_bytes = decode_base64(signature_base64)
+                self._store_event_origin_signature_txn(
+                    txn, event.event_id, event.origin, key_id, signature_bytes,
+                )
 
         for prev_event_id, prev_hashes in event.prev_events:
             for alg, hash_base64 in prev_hashes.items():
@@ -323,10 +331,10 @@ class DataStore(RoomMemberStore, RoomStore,
                 )
 
         # TODO
-        (ref_alg, ref_hash_bytes) = compute_pdu_event_reference_hash(pdu)
-        self._store_event_reference_hash_txn(
-            txn, event.event_id, ref_alg, ref_hash_bytes
-        )
+        # (ref_alg, ref_hash_bytes) = compute_pdu_event_reference_hash(pdu)
+        # self._store_event_reference_hash_txn(
+        #    txn, event.event_id, ref_alg, ref_hash_bytes
+        # )
 
         self._update_min_depth_for_room_txn(txn, event.room_id, event.depth)
 
@@ -412,9 +420,7 @@ class DataStore(RoomMemberStore, RoomStore,
         """
         def _snapshot(txn):
             membership_state = self._get_room_member(txn, user_id, room_id)
-            prev_events = self._get_latest_events_in_room(
-                txn, room_id
-            )
+            prev_events = self._get_latest_events_in_room(txn, room_id)
 
             if state_type is not None and state_key is not None:
                 prev_state_pdu = self._get_current_state_pdu(
@@ -469,12 +475,12 @@ class Snapshot(object):
             return
 
         event.prev_events = [
-            (p_id, origin, hashes)
-            for p_id, origin, hashes, _ in self.prev_events
+            (event_id, hashes)
+            for event_id, hashes, _ in self.prev_events
         ]
 
         if self.prev_events:
-            event.depth = max([int(v) for _, _, _, v in self.prev_events]) + 1
+            event.depth = max([int(v) for _, _, v in self.prev_events]) + 1
         else:
             event.depth = 0
 
@@ -533,9 +539,10 @@ def prepare_database(db_conn):
             db_conn.commit()
 
     else:
-        sql_script = "BEGIN TRANSACTION;"
+        sql_script = "BEGIN TRANSACTION;\n"
         for sql_loc in SCHEMAS:
             sql_script += read_schema(sql_loc)
+            sql_script += "\n"
         sql_script += "COMMIT TRANSACTION;"
         c.executescript(sql_script)
         db_conn.commit()
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 30732caa83..464b12f032 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -19,10 +19,12 @@ from twisted.internet import defer
 from synapse.api.errors import StoreError
 from synapse.api.events.utils import prune_event
 from synapse.util.logutils import log_function
+from syutil.base64util import encode_base64
 
 import collections
 import copy
 import json
+import sys
 import time
 
 
@@ -67,6 +69,9 @@ class LoggingTransaction(object):
             return self.txn.execute(
                 sql, *args, **kwargs
             )
+        except:
+                logger.exception("[SQL FAIL] {%s}", self.name)
+                raise
         finally:
             end = time.clock() * 1000
             sql_logger.debug("[SQL time] {%s} %f", self.name, end - start)
@@ -85,14 +90,20 @@ class SQLBaseStore(object):
         """Wraps the .runInteraction() method on the underlying db_pool."""
         def inner_func(txn, *args, **kwargs):
             start = time.clock() * 1000
-            txn_id = str(SQLBaseStore._TXN_ID)
-            SQLBaseStore._TXN_ID += 1
+            txn_id = SQLBaseStore._TXN_ID
+
+            # We don't really need these to be unique, so lets stop it from
+            # growing really large.
+            self._TXN_ID = (self._TXN_ID + 1) % (sys.maxint - 1)
 
-            name = "%s-%s" % (desc, txn_id, )
+            name = "%s-%x" % (desc, txn_id, )
 
             transaction_logger.debug("[TXN START] {%s}", name)
             try:
                 return func(LoggingTransaction(txn, name), *args, **kwargs)
+            except:
+                logger.exception("[TXN FAIL] {%s}", name)
+                raise
             finally:
                 end = time.clock() * 1000
                 transaction_logger.debug(
@@ -189,7 +200,6 @@ class SQLBaseStore(object):
               statement returns no rows
         """
         return self._simple_selectupdate_one(
-            "_simple_select_one",
             table, keyvalues, retcols=retcols, allow_none=allow_none
         )
 
@@ -215,11 +225,11 @@ class SQLBaseStore(object):
             txn,
             table=table,
             keyvalues=keyvalues,
-            retcols=retcol,
+            retcol=retcol,
         )
 
         if ret:
-            return ret[retcol]
+            return ret[0]
         else:
             if allow_none:
                 return None
@@ -434,6 +444,17 @@ class SQLBaseStore(object):
         sql = "SELECT * FROM events WHERE event_id = ?"
 
         for ev in events:
+            signatures = self._get_event_origin_signatures_txn(
+                txn, ev.event_id,
+            )
+
+            ev.signatures = {
+                k: encode_base64(v) for k, v in signatures.items()
+            }
+
+            prev_events = self._get_latest_events_in_room(txn, ev.room_id)
+            ev.prev_events = [(e_id, s,) for e_id, s, _ in prev_events]
+
             if hasattr(ev, "prev_state"):
                 # Load previous state_content.
                 # TODO: Should we be pulling this out above?
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 7688fc550f..5f94c31818 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -24,6 +24,13 @@ logger = logging.getLogger(__name__)
 
 class EventFederationStore(SQLBaseStore):
 
+    def get_latest_events_in_room(self, room_id):
+        return self.runInteraction(
+            "get_latest_events_in_room",
+            self._get_latest_events_in_room,
+            room_id,
+        )
+
     def _get_latest_events_in_room(self, txn, room_id):
         self._simple_select_onecol_txn(
             txn,
@@ -34,12 +41,25 @@ class EventFederationStore(SQLBaseStore):
             retcol="event_id",
         )
 
+        sql = (
+            "SELECT e.event_id, e.depth FROM events as e "
+            "INNER JOIN event_forward_extremities as f "
+            "ON e.event_id = f.event_id "
+            "WHERE f.room_id = ?"
+        )
+
+        txn.execute(sql, (room_id, ))
+
         results = []
-        for pdu_id, origin, depth in txn.fetchall():
-            hashes = self._get_prev_event_hashes_txn(txn, pdu_id, origin)
-            sha256_bytes = hashes["sha256"]
-            prev_hashes = {"sha256": encode_base64(sha256_bytes)}
-            results.append((pdu_id, origin, prev_hashes, depth))
+        for event_id, depth in txn.fetchall():
+            hashes = self._get_prev_event_hashes_txn(txn, event_id)
+            prev_hashes = {
+                k: encode_base64(v) for k, v in hashes.items()
+                if k == "sha256"
+            }
+            results.append((event_id, prev_hashes, depth))
+
+        return results
 
     def _get_min_depth_interaction(self, txn, room_id):
         min_depth = self._simple_select_one_onecol_txn(
@@ -70,21 +90,21 @@ class EventFederationStore(SQLBaseStore):
 
     def _handle_prev_events(self, txn, outlier, event_id, prev_events,
                             room_id):
-        for e_id in prev_events:
+        for e_id, _ in prev_events:
             # TODO (erikj): This could be done as a bulk insert
             self._simple_insert_txn(
                 txn,
                 table="event_edges",
                 values={
                     "event_id": event_id,
-                    "prev_event": e_id,
+                    "prev_event_id": e_id,
                     "room_id": room_id,
                 }
             )
 
         # Update the extremities table if this is not an outlier.
         if not outlier:
-            for e_id in prev_events:
+            for e_id, _ in prev_events:
                 # TODO (erikj): This could be done as a bulk insert
                 self._simple_delete_txn(
                     txn,
@@ -116,7 +136,7 @@ class EventFederationStore(SQLBaseStore):
 
             # Insert all the prev_pdus as a backwards thing, they'll get
             # deleted in a second if they're incorrect anyway.
-            for e_id in prev_events:
+            for e_id, _ in prev_events:
                 # TODO (erikj): This could be done as a bulk insert
                 self._simple_insert_txn(
                     txn,
@@ -130,14 +150,11 @@ class EventFederationStore(SQLBaseStore):
             # Also delete from the backwards extremities table all ones that
             # reference pdus that we have already seen
             query = (
-                "DELETE FROM %(event_back)s as b WHERE EXISTS ("
-                "SELECT 1 FROM %(events)s AS events "
+                "DELETE FROM event_backward_extremities WHERE EXISTS ("
+                "SELECT 1 FROM events "
                 "WHERE "
-                "b.event_id = events.event_id "
+                "event_backward_extremities.event_id = events.event_id "
                 "AND not events.outlier "
                 ")"
-            ) % {
-                "event_back": "event_backward_extremities",
-                "events": "events",
-            }
+            )
             txn.execute(query)
\ No newline at end of file
diff --git a/synapse/storage/schema/event_edges.sql b/synapse/storage/schema/event_edges.sql
index 6a28314ece..e5f768c705 100644
--- a/synapse/storage/schema/event_edges.sql
+++ b/synapse/storage/schema/event_edges.sql
@@ -7,7 +7,7 @@ CREATE TABLE IF NOT EXISTS event_forward_extremities(
 
 CREATE INDEX IF NOT EXISTS ev_extrem_room ON event_forward_extremities(room_id);
 CREATE INDEX IF NOT EXISTS ev_extrem_id ON event_forward_extremities(event_id);
---
+
 
 CREATE TABLE IF NOT EXISTS event_backward_extremities(
     event_id TEXT,
@@ -17,7 +17,7 @@ CREATE TABLE IF NOT EXISTS event_backward_extremities(
 
 CREATE INDEX IF NOT EXISTS ev_b_extrem_room ON event_backward_extremities(room_id);
 CREATE INDEX IF NOT EXISTS ev_b_extrem_id ON event_backward_extremities(event_id);
---
+
 
 CREATE TABLE IF NOT EXISTS event_edges(
     event_id TEXT,
@@ -28,7 +28,6 @@ CREATE TABLE IF NOT EXISTS event_edges(
 
 CREATE INDEX IF NOT EXISTS ev_edges_id ON event_edges(event_id);
 CREATE INDEX IF NOT EXISTS ev_edges_prev_id ON event_edges(prev_event_id);
---
 
 
 CREATE TABLE IF NOT EXISTS room_depth(
@@ -38,7 +37,7 @@ CREATE TABLE IF NOT EXISTS room_depth(
 );
 
 CREATE INDEX IF NOT EXISTS room_depth_room ON room_depth(room_id);
---
+
 
 create TABLE IF NOT EXISTS event_destinations(
     event_id TEXT,
@@ -48,4 +47,3 @@ create TABLE IF NOT EXISTS event_destinations(
 );
 
 CREATE INDEX IF NOT EXISTS event_destinations_id ON event_destinations(event_id);
---
\ No newline at end of file