summary refs log tree commit diff
path: root/synapse/handlers/typing.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-06-16 17:10:28 +0100
committerGitHub <noreply@github.com>2020-06-16 17:10:28 +0100
commitf6f7511a4c0548b17bd1cdabebd0ffad9ea73bc7 (patch)
tree5c8f96ecc110b9bfa8cf2d735359a03e397bff83 /synapse/handlers/typing.py
parentFix "argument of type 'ObservableDeferred' is not iterable" error (#7708) (diff)
downloadsynapse-f6f7511a4c0548b17bd1cdabebd0ffad9ea73bc7.tar.xz
Refactor getting replication updates from database. (#7636)
The aim here is to make it easier to reason about when streams are limited and when they're not, by moving the logic into the database functions themselves. This should mean we can kill of `db_query_to_update_function` function.
Diffstat (limited to 'synapse/handlers/typing.py')
-rw-r--r--synapse/handlers/typing.py40
1 files changed, 32 insertions, 8 deletions
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index c7bc14c623..4330abb9f7 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -15,7 +15,7 @@
 
 import logging
 from collections import namedtuple
-from typing import List
+from typing import List, Tuple
 
 from twisted.internet import defer
 
@@ -259,14 +259,31 @@ class TypingHandler(object):
         )
 
     async def get_all_typing_updates(
-        self, last_id: int, current_id: int, limit: int
-    ) -> List[dict]:
-        """Get up to `limit` typing updates between the given tokens, earliest
-        updates first.
+        self, instance_name: str, last_id: int, current_id: int, limit: int
+    ) -> Tuple[List[Tuple[int, list]], int, bool]:
+        """Get updates for typing replication stream.
+
+        Args:
+            instance_name: The writer we want to fetch updates from. Unused
+                here since there is only ever one writer.
+            last_id: The token to fetch updates from. Exclusive.
+            current_id: The token to fetch updates up to. Inclusive.
+            limit: The requested limit for the number of rows to return. The
+                function may return more or fewer rows.
+
+        Returns:
+            A tuple consisting of: the updates, a token to use to fetch
+            subsequent updates, and whether we returned fewer rows than exists
+            between the requested tokens due to the limit.
+
+            The token returned can be used in a subsequent call to this
+            function to get further updatees.
+
+            The updates are a list of 2-tuples of stream ID and the row data
         """
 
         if last_id == current_id:
-            return []
+            return [], current_id, False
 
         changed_rooms = self._typing_stream_change_cache.get_all_entities_changed(
             last_id
@@ -280,9 +297,16 @@ class TypingHandler(object):
             serial = self._room_serials[room_id]
             if last_id < serial <= current_id:
                 typing = self._room_typing[room_id]
-                rows.append((serial, room_id, list(typing)))
+                rows.append((serial, [room_id, list(typing)]))
         rows.sort()
-        return rows[:limit]
+
+        limited = False
+        if len(rows) > limit:
+            rows = rows[:limit]
+            current_id = rows[-1][0]
+            limited = True
+
+        return rows, current_id, limited
 
     def get_current_token(self):
         return self._latest_room_serial