summary refs log tree commit diff
path: root/synapse/storage/stream.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/stream.py')
-rw-r--r--synapse/storage/stream.py137
1 files changed, 136 insertions, 1 deletions
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 3cab06fdef..15d4c2bf68 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -23,7 +23,7 @@ paginate bacwards.
 
 This is implemented by keeping two ordering columns: stream_ordering and
 topological_ordering. Stream ordering is basically insertion/received order
-(except for events from backfill requests). The topolgical_ordering is a
+(except for events from backfill requests). The topological_ordering is a
 weak ordering of events based on the pdu graph.
 
 This means that we have to have two different types of tokens, depending on
@@ -436,3 +436,138 @@ class StreamStore(SQLBaseStore):
             internal = event.internal_metadata
             internal.before = str(RoomStreamToken(topo, stream - 1))
             internal.after = str(RoomStreamToken(topo, stream))
+
+    @defer.inlineCallbacks
+    def get_events_around(self, room_id, event_id, before_limit, after_limit):
+        """Retrieve events and pagination tokens around a given event in a
+        room.
+
+        Args:
+            room_id (str)
+            event_id (str)
+            before_limit (int)
+            after_limit (int)
+
+        Returns:
+            dict
+        """
+
+        results = yield self.runInteraction(
+            "get_events_around", self._get_events_around_txn,
+            room_id, event_id, before_limit, after_limit
+        )
+
+        events_before = yield self._get_events(
+            [e for e in results["before"]["event_ids"]],
+            get_prev_content=True
+        )
+
+        events_after = yield self._get_events(
+            [e for e in results["after"]["event_ids"]],
+            get_prev_content=True
+        )
+
+        defer.returnValue({
+            "events_before": events_before,
+            "events_after": events_after,
+            "start": results["before"]["token"],
+            "end": results["after"]["token"],
+        })
+
+    def _get_events_around_txn(self, txn, room_id, event_id, before_limit, after_limit):
+        """Retrieves event_ids and pagination tokens around a given event in a
+        room.
+
+        Args:
+            room_id (str)
+            event_id (str)
+            before_limit (int)
+            after_limit (int)
+
+        Returns:
+            dict
+        """
+
+        results = self._simple_select_one_txn(
+            txn,
+            "events",
+            keyvalues={
+                "event_id": event_id,
+                "room_id": room_id,
+            },
+            retcols=["stream_ordering", "topological_ordering"],
+        )
+
+        stream_ordering = results["stream_ordering"]
+        topological_ordering = results["topological_ordering"]
+
+        query_before = (
+            "SELECT topological_ordering, stream_ordering, event_id FROM events"
+            " WHERE room_id = ? AND (topological_ordering < ?"
+            " OR (topological_ordering = ? AND stream_ordering < ?))"
+            " ORDER BY topological_ordering DESC, stream_ordering DESC"
+            " LIMIT ?"
+        )
+
+        query_after = (
+            "SELECT topological_ordering, stream_ordering, event_id FROM events"
+            " WHERE room_id = ? AND (topological_ordering > ?"
+            " OR (topological_ordering = ? AND stream_ordering > ?))"
+            " ORDER BY topological_ordering ASC, stream_ordering ASC"
+            " LIMIT ?"
+        )
+
+        txn.execute(
+            query_before,
+            (
+                room_id, topological_ordering, topological_ordering,
+                stream_ordering, before_limit,
+            )
+        )
+
+        rows = self.cursor_to_dict(txn)
+        events_before = [r["event_id"] for r in rows]
+
+        if rows:
+            start_token = str(RoomStreamToken(
+                rows[0]["topological_ordering"],
+                rows[0]["stream_ordering"] - 1,
+            ))
+        else:
+            start_token = str(RoomStreamToken(
+                topological_ordering,
+                stream_ordering - 1,
+            ))
+
+        txn.execute(
+            query_after,
+            (
+                room_id, topological_ordering, topological_ordering,
+                stream_ordering, after_limit,
+            )
+        )
+
+        rows = self.cursor_to_dict(txn)
+        events_after = [r["event_id"] for r in rows]
+
+        if rows:
+            end_token = str(RoomStreamToken(
+                rows[-1]["topological_ordering"],
+                rows[-1]["stream_ordering"],
+            ))
+        else:
+            end_token = str(RoomStreamToken(
+                topological_ordering,
+                stream_ordering,
+            ))
+
+        return {
+            "before": {
+                "event_ids": events_before,
+                "token": start_token,
+            },
+            "after": {
+                "event_ids": events_after,
+                "token": end_token,
+            },
+        }