diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 07ea969d4d..888b1cb35d 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -521,13 +521,20 @@ class StreamStore(SQLBaseStore):
)
@defer.inlineCallbacks
- def get_room_events_max_id(self, direction='f'):
+ def get_room_events_max_id(self, room_id=None):
+ """Returns the current token for rooms stream.
+
+ By default, it returns the current global stream token. Specifying a
+ `room_id` causes it to return the current room specific topological
+ token.
+ """
token = yield self._stream_id_gen.get_current_token()
- if direction != 'b':
+ if room_id is None:
defer.returnValue("s%d" % (token,))
else:
topo = yield self.runInteraction(
- "_get_max_topological_txn", self._get_max_topological_txn
+ "_get_max_topological_txn", self._get_max_topological_txn,
+ room_id,
)
defer.returnValue("t%d-%d" % (topo, token))
@@ -579,11 +586,11 @@ class StreamStore(SQLBaseStore):
lambda r: r[0][0] if r else 0
)
- def _get_max_topological_txn(self, txn):
+ def _get_max_topological_txn(self, txn, room_id):
txn.execute(
"SELECT MAX(topological_ordering) FROM events"
- " WHERE outlier = ?",
- (False,)
+ " WHERE room_id = ?",
+ (room_id,)
)
rows = txn.fetchall()
|