diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 3cab06fdef..15d4c2bf68 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -23,7 +23,7 @@ paginate bacwards.
This is implemented by keeping two ordering columns: stream_ordering and
topological_ordering. Stream ordering is basically insertion/received order
-(except for events from backfill requests). The topolgical_ordering is a
+(except for events from backfill requests). The topological_ordering is a
weak ordering of events based on the pdu graph.
This means that we have to have two different types of tokens, depending on
@@ -436,3 +436,138 @@ class StreamStore(SQLBaseStore):
internal = event.internal_metadata
internal.before = str(RoomStreamToken(topo, stream - 1))
internal.after = str(RoomStreamToken(topo, stream))
+
+ @defer.inlineCallbacks
+ def get_events_around(self, room_id, event_id, before_limit, after_limit):
+ """Retrieve events and pagination tokens around a given event in a
+ room.
+
+ Args:
+ room_id (str)
+ event_id (str)
+ before_limit (int)
+ after_limit (int)
+
+ Returns:
+ dict
+ """
+
+ results = yield self.runInteraction(
+ "get_events_around", self._get_events_around_txn,
+ room_id, event_id, before_limit, after_limit
+ )
+
+ events_before = yield self._get_events(
+ [e for e in results["before"]["event_ids"]],
+ get_prev_content=True
+ )
+
+ events_after = yield self._get_events(
+ [e for e in results["after"]["event_ids"]],
+ get_prev_content=True
+ )
+
+ defer.returnValue({
+ "events_before": events_before,
+ "events_after": events_after,
+ "start": results["before"]["token"],
+ "end": results["after"]["token"],
+ })
+
+ def _get_events_around_txn(self, txn, room_id, event_id, before_limit, after_limit):
+ """Retrieves event_ids and pagination tokens around a given event in a
+ room.
+
+ Args:
+ room_id (str)
+ event_id (str)
+ before_limit (int)
+ after_limit (int)
+
+ Returns:
+ dict
+ """
+
+ results = self._simple_select_one_txn(
+ txn,
+ "events",
+ keyvalues={
+ "event_id": event_id,
+ "room_id": room_id,
+ },
+ retcols=["stream_ordering", "topological_ordering"],
+ )
+
+ stream_ordering = results["stream_ordering"]
+ topological_ordering = results["topological_ordering"]
+
+ query_before = (
+ "SELECT topological_ordering, stream_ordering, event_id FROM events"
+ " WHERE room_id = ? AND (topological_ordering < ?"
+ " OR (topological_ordering = ? AND stream_ordering < ?))"
+ " ORDER BY topological_ordering DESC, stream_ordering DESC"
+ " LIMIT ?"
+ )
+
+ query_after = (
+ "SELECT topological_ordering, stream_ordering, event_id FROM events"
+ " WHERE room_id = ? AND (topological_ordering > ?"
+ " OR (topological_ordering = ? AND stream_ordering > ?))"
+ " ORDER BY topological_ordering ASC, stream_ordering ASC"
+ " LIMIT ?"
+ )
+
+ txn.execute(
+ query_before,
+ (
+ room_id, topological_ordering, topological_ordering,
+ stream_ordering, before_limit,
+ )
+ )
+
+ rows = self.cursor_to_dict(txn)
+ events_before = [r["event_id"] for r in rows]
+
+ if rows:
+ start_token = str(RoomStreamToken(
+ rows[0]["topological_ordering"],
+ rows[0]["stream_ordering"] - 1,
+ ))
+ else:
+ start_token = str(RoomStreamToken(
+ topological_ordering,
+ stream_ordering - 1,
+ ))
+
+ txn.execute(
+ query_after,
+ (
+ room_id, topological_ordering, topological_ordering,
+ stream_ordering, after_limit,
+ )
+ )
+
+ rows = self.cursor_to_dict(txn)
+ events_after = [r["event_id"] for r in rows]
+
+ if rows:
+ end_token = str(RoomStreamToken(
+ rows[-1]["topological_ordering"],
+ rows[-1]["stream_ordering"],
+ ))
+ else:
+ end_token = str(RoomStreamToken(
+ topological_ordering,
+ stream_ordering,
+ ))
+
+ return {
+ "before": {
+ "event_ids": events_before,
+ "token": start_token,
+ },
+ "after": {
+ "event_ids": events_after,
+ "token": end_token,
+ },
+ }
|