summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2014-08-18 15:50:41 +0100
committerErik Johnston <erik@matrix.org>2014-08-18 15:50:41 +0100
commitfc26275bb34206f48d70c7effcbc6f5d0bf86ecb (patch)
treebb092c3bf3ed25a0e0a3f11f5966e94487e9c008
parentMerge branch 'master' of github.com:matrix-org/synapse into sql_refactor (diff)
downloadsynapse-fc26275bb34206f48d70c7effcbc6f5d0bf86ecb.tar.xz
Add two different columns for ordering the events table, one which can be used for pagination and one which can be as tokens for notifying clients. Also add a 'processed' field which is currently always set to True
Diffstat (limited to '')
-rw-r--r--synapse/federation/handler.py4
-rw-r--r--synapse/storage/__init__.py18
-rw-r--r--synapse/storage/schema/im.sql13
-rw-r--r--synapse/storage/stream.py17
4 files changed, 33 insertions, 19 deletions
diff --git a/synapse/federation/handler.py b/synapse/federation/handler.py
index 580e591aca..68243d31d0 100644
--- a/synapse/federation/handler.py
+++ b/synapse/federation/handler.py
@@ -63,7 +63,7 @@ class FederationEventHandler(object):
             Deferred: Resolved when it has successfully been queued for
             processing.
         """
-        yield self._fill_out_prev_events(event)
+        yield self.fill_out_prev_events(event)
 
         pdu = self.pdu_codec.pdu_from_event(event)
 
@@ -129,7 +129,7 @@ class FederationEventHandler(object):
         yield self.event_handler.on_receive(new_state_event)
 
     @defer.inlineCallbacks
-    def _fill_out_prev_events(self, event):
+    def fill_out_prev_events(self, event):
         if hasattr(event, "prev_events"):
             return
 
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 841ad8f132..9f78f3f9bd 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -43,9 +43,10 @@ class DataStore(RoomMemberStore, RoomStore,
     def __init__(self, hs):
         super(DataStore, self).__init__(hs)
         self.event_factory = hs.get_event_factory()
+        self.hs = hs
 
     @defer.inlineCallbacks
-    def persist_event(self, event):
+    def persist_event(self, event, backfilled=False):
         if event.type == RoomMemberEvent.TYPE:
             yield self._store_room_member(event)
         elif event.type == FeedbackEvent.TYPE:
@@ -57,7 +58,7 @@ class DataStore(RoomMemberStore, RoomStore,
         elif event.type == RoomTopicEvent.TYPE:
             yield self._store_room_topic(event)
 
-        ret = yield self._store_event(event)
+        ret = yield self._store_event(event, backfilled)
         defer.returnValue(ret)
 
     @defer.inlineCallbacks
@@ -79,14 +80,23 @@ class DataStore(RoomMemberStore, RoomStore,
         defer.returnValue(event)
 
     @defer.inlineCallbacks
-    def _store_event(self, event):
+    def _store_event(self, event, backfilled):
+        # FIXME (erikj): This should be removed when we start amalgamating
+        # event and pdu storage.
+        yield self.hs.get_federation().fill_out_prev_events(event)
+
         vals = {
+            "topological_ordering": event.depth,
             "event_id": event.event_id,
             "type": event.type,
             "room_id": event.room_id,
             "content": json.dumps(event.content),
+            "processed": True,
         }
 
+        if backfilled:
+            vals["token_ordering"] = "-1"
+
         unrec = {
             k: v
             for k, v in event.get_full_dict().items()
@@ -96,7 +106,7 @@ class DataStore(RoomMemberStore, RoomStore,
 
         yield self._simple_insert("events", vals)
 
-        if hasattr(event, "state_key"):
+        if not backfilled and hasattr(event, "state_key"):
             vals = {
                 "event_id": event.event_id,
                 "room_id": event.room_id,
diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/im.sql
index 2452890ea4..b0240e39af 100644
--- a/synapse/storage/schema/im.sql
+++ b/synapse/storage/schema/im.sql
@@ -14,12 +14,15 @@
  */
 
 CREATE TABLE IF NOT EXISTS events(
-    ordering INTEGER PRIMARY KEY AUTOINCREMENT,
+    token_ordering INTEGER AUTOINCREMENT,
+    topological_ordering INTEGER NOT NULL,
     event_id TEXT NOT NULL,
     type TEXT NOT NULL,
-    room_id TEXT,
-    content TEXT,
-    unrecognized_keys TEXT
+    room_id TEXT NOT NULL,
+    content TEXT NOT NULL,
+    unrecognized_keys TEXT,
+    processed BOOL NOT NULL,
+    CONSTRAINT ev_uniq UNIQUE (event_id)
 );
 
 CREATE TABLE IF NOT EXISTS state_events(
@@ -35,7 +38,7 @@ CREATE TABLE IF NOT EXISTS current_state_events(
     room_id TEXT NOT NULL,
     type TEXT NOT NULL,
     state_key TEXT NOT NULL,
-    CONSTRAINT uniq UNIQUE (room_id, type, state_key) ON CONFLICT REPLACE
+    CONSTRAINT curr_uniq UNIQUE (room_id, type, state_key) ON CONFLICT REPLACE
 );
 
 CREATE TABLE IF NOT EXISTS room_memberships(
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index cf4b1682b6..f7968f576f 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -73,13 +73,14 @@ class StreamStore(SQLBaseStore):
         # Constraints and ordering depend on direction.
         if from_key < to_key:
             sql += (
-                "AND e.ordering > ? AND e.ordering < ? "
-                "ORDER BY ordering ASC LIMIT %(limit)d "
+                "AND e.token_ordering > ? AND e.token_ordering < ? "
+                "ORDER BY token_ordering, rowid ASC LIMIT %(limit)d "
             ) % {"limit": limit}
         else:
             sql += (
-                "AND e.ordering < ? AND e.ordering > ? "
-                "ORDER BY ordering DESC LIMIT %(limit)d "
+                "AND e.token_ordering < ? "
+                "AND e.token_ordering > ? "
+                "ORDER BY e.token_ordering, rowid DESC LIMIT %(limit)d "
             ) % {"limit": int(limit)}
 
         rows = yield self._execute_and_decode(
@@ -91,9 +92,9 @@ class StreamStore(SQLBaseStore):
 
         if rows:
             if from_key < to_key:
-                key = max([r["ordering"] for r in rows])
+                key = max([r["token_ordering"] for r in rows])
             else:
-                key = min([r["ordering"] for r in rows])
+                key = min([r["token_ordering"] for r in rows])
         else:
             key = to_key
 
@@ -105,7 +106,7 @@ class StreamStore(SQLBaseStore):
 
         sql = (
             "SELECT * FROM events WHERE room_id = ? "
-            "ORDER BY ordering DESC LIMIT ? "
+            "ORDER BY token_ordering, rowid DESC LIMIT ? "
         )
 
         rows = yield self._execute_and_decode(
@@ -120,7 +121,7 @@ class StreamStore(SQLBaseStore):
     @defer.inlineCallbacks
     def get_room_events_max_id(self):
         res = yield self._execute_and_decode(
-            "SELECT MAX(ordering) as m FROM events"
+            "SELECT MAX(token_ordering) as m FROM events"
         )
 
         if not res: