diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 38e375f86a..1809a44a99 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -89,7 +89,9 @@ class MessageHandler(BaseHandler):
if not pagin_config.from_token:
pagin_config.from_token = (
- yield self.hs.get_event_sources().get_current_token()
+ yield self.hs.get_event_sources().get_current_token(
+ direction='b'
+ )
)
room_token = RoomStreamToken.parse(pagin_config.from_token.room_key)
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index cfa2e38ed2..29b6d52757 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -577,8 +577,8 @@ class RoomEventSource(object):
defer.returnValue((events, end_key))
- def get_current_key(self):
- return self.store.get_room_events_max_id()
+ def get_current_key(self, direction='f'):
+ return self.store.get_room_events_max_id(direction)
@defer.inlineCallbacks
def get_pagination_rows(self, user, config, key):
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index b03fc67f71..8045e17fd7 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -364,9 +364,25 @@ class StreamStore(SQLBaseStore):
)
@defer.inlineCallbacks
- def get_room_events_max_id(self):
+ def get_room_events_max_id(self, direction='f'):
token = yield self._stream_id_gen.get_max_token(self)
- defer.returnValue("s%d" % (token,))
+ if direction != 'b':
+ defer.returnValue("s%d" % (token,))
+ else:
+ topo = yield self.runInteraction(
+ "_get_max_topological_txn", self._get_max_topological_txn
+ )
+ defer.returnValue("t%d-%d" % (topo, token))
+
+ def _get_max_topological_txn(self, txn):
+ txn.execute(
+ "SELECT MAX(topological_ordering) FROM events"
+ " WHERE outlier = ?",
+ (False,)
+ )
+
+ rows = txn.fetchall()
+ return rows[0][0] if rows else 0
@defer.inlineCallbacks
def _get_min_token(self):
diff --git a/synapse/streams/events.py b/synapse/streams/events.py
index 5c8e54b78b..dff7970bea 100644
--- a/synapse/streams/events.py
+++ b/synapse/streams/events.py
@@ -31,7 +31,7 @@ class NullSource(object):
def get_new_events_for_user(self, user, from_key, limit):
return defer.succeed(([], from_key))
- def get_current_key(self):
+ def get_current_key(self, direction='f'):
return defer.succeed(0)
def get_pagination_rows(self, user, pagination_config, key):
@@ -52,10 +52,10 @@ class EventSources(object):
}
@defer.inlineCallbacks
- def get_current_token(self):
+ def get_current_token(self, direction='f'):
token = StreamToken(
room_key=(
- yield self.sources["room"].get_current_key()
+ yield self.sources["room"].get_current_key(direction)
),
presence_key=(
yield self.sources["presence"].get_current_key()
|