summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorMark Haines <mjark@negativecurvature.net>2015-01-30 15:54:25 +0000
committerMark Haines <mjark@negativecurvature.net>2015-01-30 15:54:25 +0000
commit6dc92d3427c80c1b4ea3182ac5c36575f50e8366 (patch)
treed3121fece4d3abfe8b17a6b5cda9c595b0cce480 /synapse/storage
parentMerge pull request #42 from matrix-org/replication_split (diff)
parentAdd doc string for __nonzero__ overrides for sync results, raise not implemen... (diff)
downloadsynapse-6dc92d3427c80c1b4ea3182ac5c36575f50e8366.tar.xz
Merge pull request #41 from matrix-org/client_v2_sync
Client v2 sync
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/stream.py60
1 files changed, 49 insertions, 11 deletions
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 062ca06fb3..3ccb6f8a61 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -181,8 +181,11 @@ class StreamStore(SQLBaseStore):
                 get_prev_content=True
             )
 
+            self._set_before_and_after(ret, rows)
+
             if rows:
                 key = "s%d" % max([r["stream_ordering"] for r in rows])
+
             else:
                 # Assume we didn't get anything because there was nothing to
                 # get.
@@ -260,22 +263,44 @@ class StreamStore(SQLBaseStore):
                 get_prev_content=True
             )
 
+            self._set_before_and_after(events, rows)
+
             return events, next_token,
 
         return self.runInteraction("paginate_room_events", f)
 
     def get_recent_events_for_room(self, room_id, limit, end_token,
-                                   with_feedback=False):
+                                   with_feedback=False, from_token=None):
         # TODO (erikj): Handle compressed feedback
 
-        sql = (
-            "SELECT stream_ordering, topological_ordering, event_id FROM events "
-            "WHERE room_id = ? AND stream_ordering <= ? AND outlier = 0 "
-            "ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ? "
-        )
+        end_token = _StreamToken.parse_stream_token(end_token)
 
-        def f(txn):
-            txn.execute(sql, (room_id, end_token, limit,))
+        if from_token is None:
+            sql = (
+                "SELECT stream_ordering, topological_ordering, event_id"
+                " FROM events"
+                " WHERE room_id = ? AND stream_ordering <= ? AND outlier = 0"
+                " ORDER BY topological_ordering DESC, stream_ordering DESC"
+                " LIMIT ?"
+            )
+        else:
+            from_token = _StreamToken.parse_stream_token(from_token)
+            sql = (
+                "SELECT stream_ordering, topological_ordering, event_id"
+                " FROM events"
+                " WHERE room_id = ? AND stream_ordering > ?"
+                " AND stream_ordering <= ? AND outlier = 0"
+                " ORDER BY topological_ordering DESC, stream_ordering DESC"
+                " LIMIT ?"
+            )
+
+        def get_recent_events_for_room_txn(txn):
+            if from_token is None:
+                txn.execute(sql, (room_id, end_token.stream, limit,))
+            else:
+                txn.execute(sql, (
+                    room_id, from_token.stream, end_token.stream, limit
+                ))
 
             rows = self.cursor_to_dict(txn)
 
@@ -291,9 +316,9 @@ class StreamStore(SQLBaseStore):
                 toke = rows[0]["stream_ordering"] - 1
                 start_token = str(_StreamToken(topo, toke))
 
-                token = (start_token, end_token)
+                token = (start_token, str(end_token))
             else:
-                token = (end_token, end_token)
+                token = (str(end_token), str(end_token))
 
             events = self._get_events_txn(
                 txn,
@@ -301,9 +326,13 @@ class StreamStore(SQLBaseStore):
                 get_prev_content=True
             )
 
+            self._set_before_and_after(events, rows)
+
             return events, token
 
-        return self.runInteraction("get_recent_events_for_room", f)
+        return self.runInteraction(
+            "get_recent_events_for_room", get_recent_events_for_room_txn
+        )
 
     def get_room_events_max_id(self):
         return self.runInteraction(
@@ -325,3 +354,12 @@ class StreamStore(SQLBaseStore):
 
         key = res[0]["m"]
         return "s%d" % (key,)
+
+    @staticmethod
+    def _set_before_and_after(events, rows):
+        for event, row in zip(events, rows):
+            stream = row["stream_ordering"]
+            topo = event.depth
+            internal = event.internal_metadata
+            internal.before = str(_StreamToken(topo, stream - 1))
+            internal.after = str(_StreamToken(topo, stream))