summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-03-20 14:40:47 +0000
committerGitHub <noreply@github.com>2020-03-20 14:40:47 +0000
commitfdb13447167da0670dd6ad95fdf4a99cde450eb9 (patch)
tree4e65824624e802bdaf04f3e289ec348fd4f3ab55 /synapse/storage
parentConvert some of the media REST code to async/await (#7110) (diff)
downloadsynapse-fdb13447167da0670dd6ad95fdf4a99cde450eb9.tar.xz
Remove concept of a non-limited stream. (#7011)
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/data_stores/main/devices.py10
-rw-r--r--synapse/storage/data_stores/main/end_to_end_keys.py14
-rw-r--r--synapse/storage/data_stores/main/presence.py23
3 files changed, 31 insertions, 16 deletions
diff --git a/synapse/storage/data_stores/main/devices.py b/synapse/storage/data_stores/main/devices.py
index 4c19c02bbc..2d47cfd131 100644
--- a/synapse/storage/data_stores/main/devices.py
+++ b/synapse/storage/data_stores/main/devices.py
@@ -576,7 +576,7 @@ class DeviceWorkerStore(SQLBaseStore):
             return set()
 
     async def get_all_device_list_changes_for_remotes(
-        self, from_key: int, to_key: int
+        self, from_key: int, to_key: int, limit: int,
     ) -> List[Tuple[int, str]]:
         """Return a list of `(stream_id, entity)` which is the combined list of
         changes to devices and which destinations need to be poked. Entity is
@@ -592,10 +592,16 @@ class DeviceWorkerStore(SQLBaseStore):
                 SELECT stream_id, destination AS entity FROM device_lists_outbound_pokes
             ) AS e
             WHERE ? < stream_id AND stream_id <= ?
+            LIMIT ?
         """
 
         return await self.db.execute(
-            "get_all_device_list_changes_for_remotes", None, sql, from_key, to_key
+            "get_all_device_list_changes_for_remotes",
+            None,
+            sql,
+            from_key,
+            to_key,
+            limit,
         )
 
     @cached(max_entries=10000)
diff --git a/synapse/storage/data_stores/main/end_to_end_keys.py b/synapse/storage/data_stores/main/end_to_end_keys.py
index 001a53f9b4..bcf746b7ef 100644
--- a/synapse/storage/data_stores/main/end_to_end_keys.py
+++ b/synapse/storage/data_stores/main/end_to_end_keys.py
@@ -537,7 +537,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
 
         return result
 
-    def get_all_user_signature_changes_for_remotes(self, from_key, to_key):
+    def get_all_user_signature_changes_for_remotes(self, from_key, to_key, limit):
         """Return a list of changes from the user signature stream to notify remotes.
         Note that the user signature stream represents when a user signs their
         device with their user-signing key, which is not published to other
@@ -552,13 +552,19 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
             Deferred[list[(int,str)]] a list of `(stream_id, user_id)`
         """
         sql = """
-            SELECT MAX(stream_id) AS stream_id, from_user_id AS user_id
+            SELECT stream_id, from_user_id AS user_id
             FROM user_signature_stream
             WHERE ? < stream_id AND stream_id <= ?
-            GROUP BY user_id
+            ORDER BY stream_id ASC
+            LIMIT ?
         """
         return self.db.execute(
-            "get_all_user_signature_changes_for_remotes", None, sql, from_key, to_key
+            "get_all_user_signature_changes_for_remotes",
+            None,
+            sql,
+            from_key,
+            to_key,
+            limit,
         )
 
 
diff --git a/synapse/storage/data_stores/main/presence.py b/synapse/storage/data_stores/main/presence.py
index 604c8b7ddd..dab31e0c2d 100644
--- a/synapse/storage/data_stores/main/presence.py
+++ b/synapse/storage/data_stores/main/presence.py
@@ -60,7 +60,7 @@ class PresenceStore(SQLBaseStore):
                     "status_msg": state.status_msg,
                     "currently_active": state.currently_active,
                 }
-                for state in presence_states
+                for stream_id, state in zip(stream_orderings, presence_states)
             ],
         )
 
@@ -73,19 +73,22 @@ class PresenceStore(SQLBaseStore):
             )
             txn.execute(sql + clause, [stream_id] + list(args))
 
-    def get_all_presence_updates(self, last_id, current_id):
+    def get_all_presence_updates(self, last_id, current_id, limit):
         if last_id == current_id:
             return defer.succeed([])
 
         def get_all_presence_updates_txn(txn):
-            sql = (
-                "SELECT stream_id, user_id, state, last_active_ts,"
-                " last_federation_update_ts, last_user_sync_ts, status_msg,"
-                " currently_active"
-                " FROM presence_stream"
-                " WHERE ? < stream_id AND stream_id <= ?"
-            )
-            txn.execute(sql, (last_id, current_id))
+            sql = """
+                SELECT stream_id, user_id, state, last_active_ts,
+                    last_federation_update_ts, last_user_sync_ts,
+                    status_msg,
+                currently_active
+                FROM presence_stream
+                WHERE ? < stream_id AND stream_id <= ?
+                ORDER BY stream_id ASC
+                LIMIT ?
+            """
+            txn.execute(sql, (last_id, current_id, limit))
             return txn.fetchall()
 
         return self.db.runInteraction(