summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorErik Johnston <erikj@jki.re>2018-05-09 14:12:24 +0100
committerGitHub <noreply@github.com>2018-05-09 14:12:24 +0100
commit0461ef01b70f1fe7e30a930870c021202a83343d (patch)
tree6fec34c136dd33973b674d297e573d146ac4796b /synapse
parentMerge pull request #3194 from rubo77/fix-nuke (diff)
parentRemove unused from_token param (diff)
downloadsynapse-0461ef01b70f1fe7e30a930870c021202a83343d.tar.xz
Merge pull request #3195 from matrix-org/erikj/pagination_refactor
 Refactor recent events func to use pagination func
Diffstat (limited to 'synapse')
-rw-r--r--synapse/storage/stream.py78
1 files changed, 27 insertions, 51 deletions
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 54be025401..ecd39074b8 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -38,7 +38,6 @@ from twisted.internet import defer
 from synapse.storage._base import SQLBaseStore
 from synapse.storage.events import EventsWorkerStore
 
-from synapse.util.caches.descriptors import cached
 from synapse.types import RoomStreamToken
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.logcontext import make_deferred_yieldable, run_in_background
@@ -347,9 +346,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
         defer.returnValue(ret)
 
     @defer.inlineCallbacks
-    def get_recent_events_for_room(self, room_id, limit, end_token, from_token=None):
+    def get_recent_events_for_room(self, room_id, limit, end_token):
         rows, token = yield self.get_recent_event_ids_for_room(
-            room_id, limit, end_token, from_token
+            room_id, limit, end_token,
         )
 
         logger.debug("stream before")
@@ -363,60 +362,37 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         defer.returnValue((events, token))
 
-    @cached(num_args=4)
-    def get_recent_event_ids_for_room(self, room_id, limit, end_token, from_token=None):
-        end_token = RoomStreamToken.parse_stream_token(end_token)
-
-        if from_token is None:
-            sql = (
-                "SELECT stream_ordering, topological_ordering, event_id"
-                " FROM events"
-                " WHERE room_id = ? AND stream_ordering <= ? AND outlier = ?"
-                " ORDER BY topological_ordering DESC, stream_ordering DESC"
-                " LIMIT ?"
-            )
-        else:
-            from_token = RoomStreamToken.parse_stream_token(from_token)
-            sql = (
-                "SELECT stream_ordering, topological_ordering, event_id"
-                " FROM events"
-                " WHERE room_id = ? AND stream_ordering > ?"
-                " AND stream_ordering <= ? AND outlier = ?"
-                " ORDER BY topological_ordering DESC, stream_ordering DESC"
-                " LIMIT ?"
-            )
-
-        def get_recent_events_for_room_txn(txn):
-            if from_token is None:
-                txn.execute(sql, (room_id, end_token.stream, False, limit,))
-            else:
-                txn.execute(sql, (
-                    room_id, from_token.stream, end_token.stream, False, limit
-                ))
+    @defer.inlineCallbacks
+    def get_recent_event_ids_for_room(self, room_id, limit, end_token):
+        """Get the most recent events in the room in topological ordering.
 
-            rows = self.cursor_to_dict(txn)
+        Args:
+            room_id (str)
+            limit (int)
+            end_token (str): The stream token representing now.
 
-            rows.reverse()  # As we selected with reverse ordering
+        Returns:
+            Deferred[tuple[list[dict], tuple[str, str]]]: Returns a list of
+            dicts (which include event_ids, etc), and a tuple for
+            `(start_token, end_token)` representing the range of rows
+            returned.
+            The returned events are in ascending order.
+        """
+        # Allow a zero limit here, and no-op.
+        if limit == 0:
+            defer.returnValue(([], (end_token, end_token)))
 
-            if rows:
-                # Tokens are positions between events.
-                # This token points *after* the last event in the chunk.
-                # We need it to point to the event before it in the chunk
-                # since we are going backwards so we subtract one from the
-                # stream part.
-                topo = rows[0]["topological_ordering"]
-                toke = rows[0]["stream_ordering"] - 1
-                start_token = str(RoomStreamToken(topo, toke))
+        end_token = RoomStreamToken.parse_stream_token(end_token)
 
-                token = (start_token, str(end_token))
-            else:
-                token = (str(end_token), str(end_token))
+        rows, token = yield self.runInteraction(
+            "get_recent_event_ids_for_room", self._paginate_room_events_txn,
+            room_id, from_token=end_token, limit=limit,
+        )
 
-            return rows, token
+        # We want to return the results in ascending order.
+        rows.reverse()
 
-        return self.runInteraction(
-            "get_recent_events_for_room", get_recent_events_for_room_txn
-        )
+        defer.returnValue((rows, (token, str(end_token))))
 
     def get_room_event_after_stream_ordering(self, room_id, stream_ordering):
         """Gets details of the first event in a room at or after a stream ordering