summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/message.py4
-rw-r--r--synapse/handlers/room.py4
-rw-r--r--synapse/storage/stream.py20
-rw-r--r--synapse/streams/events.py6
4 files changed, 26 insertions, 8 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 38e375f86a..1809a44a99 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -89,7 +89,9 @@ class MessageHandler(BaseHandler):
 
         if not pagin_config.from_token:
             pagin_config.from_token = (
-                yield self.hs.get_event_sources().get_current_token()
+                yield self.hs.get_event_sources().get_current_token(
+                    direction='b'
+                )
             )
 
         room_token = RoomStreamToken.parse(pagin_config.from_token.room_key)
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index cfa2e38ed2..29b6d52757 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -577,8 +577,8 @@ class RoomEventSource(object):
 
         defer.returnValue((events, end_key))
 
-    def get_current_key(self):
-        return self.store.get_room_events_max_id()
+    def get_current_key(self, direction='f'):
+        return self.store.get_room_events_max_id(direction)
 
     @defer.inlineCallbacks
     def get_pagination_rows(self, user, config, key):
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index b03fc67f71..8045e17fd7 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -364,9 +364,25 @@ class StreamStore(SQLBaseStore):
         )
 
     @defer.inlineCallbacks
-    def get_room_events_max_id(self):
+    def get_room_events_max_id(self, direction='f'):
         token = yield self._stream_id_gen.get_max_token(self)
-        defer.returnValue("s%d" % (token,))
+        if direction != 'b':
+            defer.returnValue("s%d" % (token,))
+        else:
+            topo = yield self.runInteraction(
+                "_get_max_topological_txn", self._get_max_topological_txn
+            )
+            defer.returnValue("t%d-%d" % (topo, token))
+
+    def _get_max_topological_txn(self, txn):
+        txn.execute(
+            "SELECT MAX(topological_ordering) FROM events"
+            " WHERE outlier = ?",
+            (False,)
+        )
+
+        rows = txn.fetchall()
+        return rows[0][0] if rows else 0
 
     @defer.inlineCallbacks
     def _get_min_token(self):
diff --git a/synapse/streams/events.py b/synapse/streams/events.py
index 5c8e54b78b..dff7970bea 100644
--- a/synapse/streams/events.py
+++ b/synapse/streams/events.py
@@ -31,7 +31,7 @@ class NullSource(object):
     def get_new_events_for_user(self, user, from_key, limit):
         return defer.succeed(([], from_key))
 
-    def get_current_key(self):
+    def get_current_key(self, direction='f'):
         return defer.succeed(0)
 
     def get_pagination_rows(self, user, pagination_config, key):
@@ -52,10 +52,10 @@ class EventSources(object):
         }
 
     @defer.inlineCallbacks
-    def get_current_token(self):
+    def get_current_token(self, direction='f'):
         token = StreamToken(
             room_key=(
-                yield self.sources["room"].get_current_key()
+                yield self.sources["room"].get_current_key(direction)
             ),
             presence_key=(
                 yield self.sources["presence"].get_current_key()