summary refs log tree commit diff
path: root/synapse/storage/stream.py
diff options
context:
space:
mode:
authorErik Johnston <erikj@jki.re>2018-05-09 15:27:17 +0100
committerGitHub <noreply@github.com>2018-05-09 15:27:17 +0100
commit1e5280b7d0836afe13120eae06fa72d8b6b966ec (patch)
tree98cfe35b0c9f4de512480b0b503f6fc27fe4119f /synapse/storage/stream.py
parentMerge pull request #3195 from matrix-org/erikj/pagination_refactor (diff)
parentUpdate comments (diff)
downloadsynapse-1e5280b7d0836afe13120eae06fa72d8b6b966ec.tar.xz
Merge pull request #3196 from matrix-org/erikj/pagination_return
 Refactor pagination DB API to return concrete type
Diffstat (limited to 'synapse/storage/stream.py')
-rw-r--r--synapse/storage/stream.py77
1 files changed, 49 insertions, 28 deletions
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index ecd39074b8..60d2dca154 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -47,6 +47,7 @@ import abc
 import logging
 
 from six.moves import range
+from collections import namedtuple
 
 
 logger = logging.getLogger(__name__)
@@ -59,6 +60,12 @@ _STREAM_TOKEN = "stream"
 _TOPOLOGICAL_TOKEN = "topological"
 
 
+# Used as return values for pagination APIs
+_EventDictReturn = namedtuple("_EventDictReturn", (
+    "event_id", "topological_ordering", "stream_ordering",
+))
+
+
 def lower_bound(token, engine, inclusive=False):
     inclusive = "=" if inclusive else ""
     if token.topological is None:
@@ -256,9 +263,13 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
                     " ORDER BY stream_ordering %s LIMIT ?"
                 ) % (order,)
                 txn.execute(sql, (room_id, from_id, to_id, limit))
+
+                rows = [_EventDictReturn(row[0], None, row[1]) for row in txn]
             else:
                 sql = (
-                    "SELECT event_id, stream_ordering FROM events WHERE"
+                    "SELECT event_id, topological_ordering, stream_ordering"
+                    " FROM events"
+                    " WHERE"
                     " room_id = ?"
                     " AND not outlier"
                     " AND stream_ordering <= ?"
@@ -266,14 +277,14 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
                 ) % (order, order,)
                 txn.execute(sql, (room_id, to_id, limit))
 
-            rows = self.cursor_to_dict(txn)
+                rows = [_EventDictReturn(row[0], row[1], row[2]) for row in txn]
 
             return rows
 
         rows = yield self.runInteraction("get_room_events_stream_for_room", f)
 
         ret = yield self._get_events(
-            [r["event_id"] for r in rows],
+            [r.event_id for r in rows],
             get_prev_content=True
         )
 
@@ -283,7 +294,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             ret.reverse()
 
         if rows:
-            key = "s%d" % min(r["stream_ordering"] for r in rows)
+            key = "s%d" % min(r.stream_ordering for r in rows)
         else:
             # Assume we didn't get anything because there was nothing to
             # get.
@@ -330,14 +341,15 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
                     " ORDER BY stream_ordering ASC"
                 )
                 txn.execute(sql, (user_id, to_id,))
-            rows = self.cursor_to_dict(txn)
+
+            rows = [_EventDictReturn(row[0], None, row[1]) for row in txn]
 
             return rows
 
         rows = yield self.runInteraction("get_membership_changes_for_user", f)
 
         ret = yield self._get_events(
-            [r["event_id"] for r in rows],
+            [r.event_id for r in rows],
             get_prev_content=True
         )
 
@@ -353,14 +365,14 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         logger.debug("stream before")
         events = yield self._get_events(
-            [r["event_id"] for r in rows],
+            [r.event_id for r in rows],
             get_prev_content=True
         )
         logger.debug("stream after")
 
         self._set_before_and_after(events, rows)
 
-        defer.returnValue((events, token))
+        defer.returnValue((events, (token, end_token)))
 
     @defer.inlineCallbacks
     def get_recent_event_ids_for_room(self, room_id, limit, end_token):
@@ -372,15 +384,14 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             end_token (str): The stream token representing now.
 
         Returns:
-            Deferred[tuple[list[dict], tuple[str, str]]]: Returns a list of
-            dicts (which include event_ids, etc), and a tuple for
-            `(start_token, end_token)` representing the range of rows
-            returned.
-            The returned events are in ascending order.
+            Deferred[tuple[list[_EventDictReturn],  str]]: Returns a list of
+            _EventDictReturn and a token pointing to the start of the returned
+            events.
+            The events returned are in ascending order.
         """
         # Allow a zero limit here, and no-op.
         if limit == 0:
-            defer.returnValue(([], (end_token, end_token)))
+            defer.returnValue(([], end_token))
 
         end_token = RoomStreamToken.parse_stream_token(end_token)
 
@@ -392,7 +403,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
         # We want to return the results in ascending order.
         rows.reverse()
 
-        defer.returnValue((rows, (token, str(end_token))))
+        defer.returnValue((rows, token))
 
     def get_room_event_after_stream_ordering(self, room_id, stream_ordering):
         """Gets details of the first event in a room at or after a stream ordering
@@ -496,10 +507,20 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
     @staticmethod
     def _set_before_and_after(events, rows, topo_order=True):
+        """Inserts ordering information to events' internal metadata from
+        the DB rows.
+
+        Args:
+            events (list[FrozenEvent])
+            rows (list[_EventDictReturn])
+            topo_order (bool): Whether the events were ordered topologically
+                or by stream ordering. If true then all rows should have a non
+                null topological_ordering.
+        """
         for event, row in zip(events, rows):
-            stream = row["stream_ordering"]
-            if topo_order:
-                topo = event.depth
+            stream = row.stream_ordering
+            if topo_order and row.topological_ordering:
+                topo = row.topological_ordering
             else:
                 topo = None
             internal = event.internal_metadata
@@ -586,12 +607,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
         rows, start_token = self._paginate_room_events_txn(
             txn, room_id, before_token, direction='b', limit=before_limit,
         )
-        events_before = [r["event_id"] for r in rows]
+        events_before = [r.event_id for r in rows]
 
         rows, end_token = self._paginate_room_events_txn(
             txn, room_id, after_token, direction='f', limit=after_limit,
         )
-        events_after = [r["event_id"] for r in rows]
+        events_after = [r.event_id for r in rows]
 
         return {
             "before": {
@@ -672,9 +693,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
                 those that match the filter.
 
         Returns:
-            tuple[list[dict], str]: Returns the results as a list of dicts and
-            a token that points to the end of the result set. The dicts have
-            the keys "event_id", "toplogical_ordering" and "stream_ordering".
+            Deferred[tuple[list[_EventDictReturn], str]]: Returns the results
+            as a list of _EventDictReturn and a token that points to the end
+            of the result set.
         """
         # Tokens really represent positions between elements, but we use
         # the convention of pointing to the event before the gap. Hence
@@ -725,11 +746,11 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         txn.execute(sql, args)
 
-        rows = self.cursor_to_dict(txn)
+        rows = [_EventDictReturn(row[0], row[1], row[2]) for row in txn]
 
         if rows:
-            topo = rows[-1]["topological_ordering"]
-            toke = rows[-1]["stream_ordering"]
+            topo = rows[-1].topological_ordering
+            toke = rows[-1].stream_ordering
             if direction == 'b':
                 # Tokens are positions between events.
                 # This token points *after* the last event in the chunk.
@@ -764,7 +785,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
         Returns:
             tuple[list[dict], str]: Returns the results as a list of dicts and
             a token that points to the end of the result set. The dicts have
-            the keys "event_id", "toplogical_ordering" and "stream_orderign".
+            the keys "event_id", "topological_ordering" and "stream_orderign".
         """
 
         from_key = RoomStreamToken.parse(from_key)
@@ -777,7 +798,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
         )
 
         events = yield self._get_events(
-            [r["event_id"] for r in rows],
+            [r.event_id for r in rows],
             get_prev_content=True
         )