summary refs log tree commit diff
path: root/synapse/storage/stream.py
diff options
context:
space:
mode:
authorMark Haines <mark.haines@matrix.org>2015-01-27 16:24:22 +0000
committerMark Haines <mark.haines@matrix.org>2015-01-27 16:24:22 +0000
commita56008842b43089433768f569f35b2d14523ac39 (patch)
tree275d2f405d9da059b8deba68180308ac6f07bd17 /synapse/storage/stream.py
parentStart implementing the non-incremental sync portion of the v2 /sync API (diff)
downloadsynapse-a56008842b43089433768f569f35b2d14523ac39.tar.xz
Start implementing incremental initial sync
Diffstat (limited to 'synapse/storage/stream.py')
-rw-r--r--synapse/storage/stream.py41
1 files changed, 32 insertions, 9 deletions
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 8ac2adab05..06aca1a4e5 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -265,17 +265,38 @@ class StreamStore(SQLBaseStore):
         return self.runInteraction("paginate_room_events", f)
 
     def get_recent_events_for_room(self, room_id, limit, end_token,
-                                   with_feedback=False):
+                                   with_feedback=False, from_token=None):
         # TODO (erikj): Handle compressed feedback
 
-        sql = (
-            "SELECT stream_ordering, topological_ordering, event_id FROM events "
-            "WHERE room_id = ? AND stream_ordering <= ? AND outlier = 0 "
-            "ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ? "
-        )
+        end_token = _StreamToken.parse_stream_token(end_token)
 
-        def f(txn):
-            txn.execute(sql, (room_id, end_token, limit,))
+        if from_token is None:
+            sql = (
+                "SELECT stream_ordering, topological_ordering, event_id"
+                " FROM events"
+                " WHERE room_id = ? AND stream_ordering <= ? AND outlier = 0"
+                " ORDER BY topological_ordering DESC, stream_ordering DESC"
+                " LIMIT ?"
+            )
+        else:
+            from_token = _StreamToken.parse_stream_token(from_token)
+            sql = (
+                "SELECT stream_ordering, topological_ordering, event_id"
+                " FROM events"
+                " WHERE room_id = ? AND stream_ordering > ?"
+                " AND stream_ordering <= ? AND outlier = 0"
+                " ORDER BY topological_ordering DESC, stream_ordering DESC"
+                " LIMIT ?"
+            )
+
+
+        def get_recent_events_for_room_txn(txn):
+            if from_token is None:
+                txn.execute(sql, (room_id, end_token.stream, limit,))
+            else:
+                txn.execute(sql, (
+                    room_id, from_token.stream, end_token.stream, limit
+                ))
 
             rows = self.cursor_to_dict(txn)
 
@@ -303,7 +324,9 @@ class StreamStore(SQLBaseStore):
 
             return events, token
 
-        return self.runInteraction("get_recent_events_for_room", f)
+        return self.runInteraction(
+            "get_recent_events_for_room", get_recent_events_for_room_txn
+        )
 
     def get_room_events_max_id(self):
         return self.runInteraction(