summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-06-16 17:10:28 +0100
committerGitHub <noreply@github.com>2020-06-16 17:10:28 +0100
commitf6f7511a4c0548b17bd1cdabebd0ffad9ea73bc7 (patch)
tree5c8f96ecc110b9bfa8cf2d735359a03e397bff83 /synapse/handlers
parentFix "argument of type 'ObservableDeferred' is not iterable" error (#7708) (diff)
downloadsynapse-f6f7511a4c0548b17bd1cdabebd0ffad9ea73bc7.tar.xz
Refactor getting replication updates from database. (#7636)
The aim here is to make it easier to reason about when streams are limited and when they're not, by moving the logic into the database functions themselves. This should mean we can kill of `db_query_to_update_function` function.
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/presence.py29
-rw-r--r--synapse/handlers/typing.py40
2 files changed, 58 insertions, 11 deletions
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 2e8914be14..d2f25ae12a 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -25,7 +25,7 @@ The methods that define policy are:
 import abc
 import logging
 from contextlib import contextmanager
-from typing import Dict, Iterable, List, Set
+from typing import Dict, Iterable, List, Set, Tuple
 
 from prometheus_client import Counter
 from typing_extensions import ContextManager
@@ -773,7 +773,9 @@ class PresenceHandler(BasePresenceHandler):
 
         return False
 
-    async def get_all_presence_updates(self, last_id, current_id, limit):
+    async def get_all_presence_updates(
+        self, instance_name: str, last_id: int, current_id: int, limit: int
+    ) -> Tuple[List[Tuple[int, list]], int, bool]:
         """
         Gets a list of presence update rows from between the given stream ids.
         Each row has:
@@ -785,10 +787,31 @@ class PresenceHandler(BasePresenceHandler):
         - last_user_sync_ts(int)
         - status_msg(int)
         - currently_active(int)
+
+        Args:
+            instance_name: The writer we want to fetch updates from. Unused
+                here since there is only ever one writer.
+            last_id: The token to fetch updates from. Exclusive.
+            current_id: The token to fetch updates up to. Inclusive.
+            limit: The requested limit for the number of rows to return. The
+                function may return more or fewer rows.
+
+        Returns:
+            A tuple consisting of: the updates, a token to use to fetch
+            subsequent updates, and whether we returned fewer rows than exists
+            between the requested tokens due to the limit.
+
+            The token returned can be used in a subsequent call to this
+            function to get further updatees.
+
+            The updates are a list of 2-tuples of stream ID and the row data
         """
+
         # TODO(markjh): replicate the unpersisted changes.
         # This could use the in-memory stores for recent changes.
-        rows = await self.store.get_all_presence_updates(last_id, current_id, limit)
+        rows = await self.store.get_all_presence_updates(
+            instance_name, last_id, current_id, limit
+        )
         return rows
 
     def notify_new_event(self):
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index c7bc14c623..4330abb9f7 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -15,7 +15,7 @@
 
 import logging
 from collections import namedtuple
-from typing import List
+from typing import List, Tuple
 
 from twisted.internet import defer
 
@@ -259,14 +259,31 @@ class TypingHandler(object):
         )
 
     async def get_all_typing_updates(
-        self, last_id: int, current_id: int, limit: int
-    ) -> List[dict]:
-        """Get up to `limit` typing updates between the given tokens, earliest
-        updates first.
+        self, instance_name: str, last_id: int, current_id: int, limit: int
+    ) -> Tuple[List[Tuple[int, list]], int, bool]:
+        """Get updates for typing replication stream.
+
+        Args:
+            instance_name: The writer we want to fetch updates from. Unused
+                here since there is only ever one writer.
+            last_id: The token to fetch updates from. Exclusive.
+            current_id: The token to fetch updates up to. Inclusive.
+            limit: The requested limit for the number of rows to return. The
+                function may return more or fewer rows.
+
+        Returns:
+            A tuple consisting of: the updates, a token to use to fetch
+            subsequent updates, and whether we returned fewer rows than exists
+            between the requested tokens due to the limit.
+
+            The token returned can be used in a subsequent call to this
+            function to get further updatees.
+
+            The updates are a list of 2-tuples of stream ID and the row data
         """
 
         if last_id == current_id:
-            return []
+            return [], current_id, False
 
         changed_rooms = self._typing_stream_change_cache.get_all_entities_changed(
             last_id
@@ -280,9 +297,16 @@ class TypingHandler(object):
             serial = self._room_serials[room_id]
             if last_id < serial <= current_id:
                 typing = self._room_typing[room_id]
-                rows.append((serial, room_id, list(typing)))
+                rows.append((serial, [room_id, list(typing)]))
         rows.sort()
-        return rows[:limit]
+
+        limited = False
+        if len(rows) > limit:
+            rows = rows[:limit]
+            current_id = rows[-1][0]
+            limited = True
+
+        return rows, current_id, limited
 
     def get_current_token(self):
         return self._latest_room_serial