diff options
Diffstat (limited to 'synapse/storage/stream.py')
-rw-r--r-- | synapse/storage/stream.py | 47 |
1 files changed, 38 insertions, 9 deletions
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 062ca06fb3..2ea5e1a021 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -181,6 +181,13 @@ class StreamStore(SQLBaseStore): get_prev_content=True ) + for event, row in zip(ret, rows): + stream = row["stream_ordering"] + topo = event.depth + internal = event.internal_metadata + internal.before = str(_StreamToken(topo, stream - 1)) + internal.after = str(_StreamToken(topo, stream)) + if rows: key = "s%d" % max([r["stream_ordering"] for r in rows]) else: @@ -265,17 +272,37 @@ class StreamStore(SQLBaseStore): return self.runInteraction("paginate_room_events", f) def get_recent_events_for_room(self, room_id, limit, end_token, - with_feedback=False): + with_feedback=False, from_token=None): # TODO (erikj): Handle compressed feedback - sql = ( - "SELECT stream_ordering, topological_ordering, event_id FROM events " - "WHERE room_id = ? AND stream_ordering <= ? AND outlier = 0 " - "ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ? " - ) + end_token = _StreamToken.parse_stream_token(end_token) - def f(txn): - txn.execute(sql, (room_id, end_token, limit,)) + if from_token is None: + sql = ( + "SELECT stream_ordering, topological_ordering, event_id" + " FROM events" + " WHERE room_id = ? AND stream_ordering <= ? AND outlier = 0" + " ORDER BY topological_ordering DESC, stream_ordering DESC" + " LIMIT ?" + ) + else: + from_token = _StreamToken.parse_stream_token(from_token) + sql = ( + "SELECT stream_ordering, topological_ordering, event_id" + " FROM events" + " WHERE room_id = ? AND stream_ordering > ?" + " AND stream_ordering <= ? AND outlier = 0" + " ORDER BY topological_ordering DESC, stream_ordering DESC" + " LIMIT ?" + ) + + def get_recent_events_for_room_txn(txn): + if from_token is None: + txn.execute(sql, (room_id, end_token.stream, limit,)) + else: + txn.execute(sql, ( + room_id, from_token.stream, end_token.stream, limit + )) rows = self.cursor_to_dict(txn) @@ -303,7 +330,9 @@ class StreamStore(SQLBaseStore): return events, token - return self.runInteraction("get_recent_events_for_room", f) + return self.runInteraction( + "get_recent_events_for_room", get_recent_events_for_room_txn + ) def get_room_events_max_id(self): return self.runInteraction( |