summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2018-05-08 15:45:38 +0100
committerErik Johnston <erik@matrix.org>2018-05-08 16:14:26 +0100
commit06c0d0ed081b56716135c4881e600861b2e8cad5 (patch)
treef24ef8cd9ba01a06602eb0676ba8540b37389a76
parentMerge pull request #3007 from matrix-org/rav/warn_on_logcontext_fail (diff)
downloadsynapse-06c0d0ed081b56716135c4881e600861b2e8cad5.tar.xz
Split paginate_room_events storage function
-rw-r--r--synapse/storage/stream.py100
1 files changed, 72 insertions, 28 deletions
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index f0784ba137..b57a8a7ef6 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -738,17 +738,28 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
     def has_room_changed_since(self, room_id, stream_id):
         return self._events_stream_cache.has_entity_changed(room_id, stream_id)
 
+    def paginate_room_events_txn(self, txn, room_id, from_key, to_key=None,
+                                 direction='b', limit=-1, event_filter=None):
+        """Returns list of events before or after a given token.
 
-class StreamStore(StreamWorkerStore):
-    def get_room_max_stream_ordering(self):
-        return self._stream_id_gen.get_current_token()
-
-    def get_room_min_stream_ordering(self):
-        return self._backfill_id_gen.get_current_token()
+        Args:
+            txn
+            room_id (str)
+            from_key (str): The token used to stream from
+            to_key (str|None): A token which if given limits the results to
+                only those before
+            direction(char): Either 'b' or 'f' to indicate whether we are
+                paginating forwards or backwards from `from_key`.
+            limit (int): The maximum number of events to return. Zero or less
+                means no limit.
+            event_filter (Filter|None): If provided filters the events to
+                those that match the filter.
 
-    @defer.inlineCallbacks
-    def paginate_room_events(self, room_id, from_key, to_key=None,
-                             direction='b', limit=-1, event_filter=None):
+        Returns:
+            tuple[list[dict], str]: Returns the results as a list of dicts and
+            a token that points to the end of the result set. The dicts have
+            the keys "event_id", "toplogical_ordering" and "stream_orderign".
+        """
         # Tokens really represent positions between elements, but we use
         # the convention of pointing to the event before the gap. Hence
         # we have a bit of asymmetry when it comes to equalities.
@@ -795,29 +806,54 @@ class StreamStore(StreamWorkerStore):
             "limit": limit_str
         }
 
-        def f(txn):
-            txn.execute(sql, args)
+        txn.execute(sql, args)
 
-            rows = self.cursor_to_dict(txn)
+        rows = self.cursor_to_dict(txn)
 
-            if rows:
-                topo = rows[-1]["topological_ordering"]
-                toke = rows[-1]["stream_ordering"]
-                if direction == 'b':
-                    # Tokens are positions between events.
-                    # This token points *after* the last event in the chunk.
-                    # We need it to point to the event before it in the chunk
-                    # when we are going backwards so we subtract one from the
-                    # stream part.
-                    toke -= 1
-                next_token = str(RoomStreamToken(topo, toke))
-            else:
-                # TODO (erikj): We should work out what to do here instead.
-                next_token = to_key if to_key else from_key
+        if rows:
+            topo = rows[-1]["topological_ordering"]
+            toke = rows[-1]["stream_ordering"]
+            if direction == 'b':
+                # Tokens are positions between events.
+                # This token points *after* the last event in the chunk.
+                # We need it to point to the event before it in the chunk
+                # when we are going backwards so we subtract one from the
+                # stream part.
+                toke -= 1
+            next_token = str(RoomStreamToken(topo, toke))
+        else:
+            # TODO (erikj): We should work out what to do here instead.
+            next_token = to_key if to_key else from_key
+
+        return rows, next_token,
 
-            return rows, next_token,
+    @defer.inlineCallbacks
+    def paginate_room_events(self, room_id, from_key, to_key=None,
+                             direction='b', limit=-1, event_filter=None):
+        """Returns list of events before or after a given token.
 
-        rows, token = yield self.runInteraction("paginate_room_events", f)
+        Args:
+            room_id (str)
+            from_key (str): The token used to stream from
+            to_key (str|None): A token which if given limits the results to
+                only those before
+            direction(char): Either 'b' or 'f' to indicate whether we are
+                paginating forwards or backwards from `from_key`.
+            limit (int): The maximum number of events to return. Zero or less
+                means no limit.
+            event_filter (Filter|None): If provided filters the events to
+                those that match the filter.
+
+        Returns:
+            tuple[list[dict], str]: Returns the results as a list of dicts and
+            a token that points to the end of the result set. The dicts have
+            the keys "event_id", "toplogical_ordering" and "stream_orderign".
+        """
+
+        rows, token = yield self.runInteraction(
+            "paginate_room_events", self.paginate_room_events_txn,
+            room_id, from_key, to_key, direction, limit, event_filter,
+        )
 
         events = yield self._get_events(
             [r["event_id"] for r in rows],
@@ -827,3 +863,11 @@ class StreamStore(StreamWorkerStore):
         self._set_before_and_after(events, rows)
 
         defer.returnValue((events, token))
+
+
+class StreamStore(StreamWorkerStore):
+    def get_room_max_stream_ordering(self):
+        return self._stream_id_gen.get_current_token()
+
+    def get_room_min_stream_ordering(self):
+        return self._backfill_id_gen.get_current_token()