From fdb13447167da0670dd6ad95fdf4a99cde450eb9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 20 Mar 2020 14:40:47 +0000 Subject: Remove concept of a non-limited stream. (#7011) --- synapse/storage/data_stores/main/devices.py | 10 ++++++++-- .../storage/data_stores/main/end_to_end_keys.py | 14 +++++++++---- synapse/storage/data_stores/main/presence.py | 23 ++++++++++++---------- 3 files changed, 31 insertions(+), 16 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/data_stores/main/devices.py b/synapse/storage/data_stores/main/devices.py index 4c19c02bbc..2d47cfd131 100644 --- a/synapse/storage/data_stores/main/devices.py +++ b/synapse/storage/data_stores/main/devices.py @@ -576,7 +576,7 @@ class DeviceWorkerStore(SQLBaseStore): return set() async def get_all_device_list_changes_for_remotes( - self, from_key: int, to_key: int + self, from_key: int, to_key: int, limit: int, ) -> List[Tuple[int, str]]: """Return a list of `(stream_id, entity)` which is the combined list of changes to devices and which destinations need to be poked. Entity is @@ -592,10 +592,16 @@ class DeviceWorkerStore(SQLBaseStore): SELECT stream_id, destination AS entity FROM device_lists_outbound_pokes ) AS e WHERE ? < stream_id AND stream_id <= ? + LIMIT ? """ return await self.db.execute( - "get_all_device_list_changes_for_remotes", None, sql, from_key, to_key + "get_all_device_list_changes_for_remotes", + None, + sql, + from_key, + to_key, + limit, ) @cached(max_entries=10000) diff --git a/synapse/storage/data_stores/main/end_to_end_keys.py b/synapse/storage/data_stores/main/end_to_end_keys.py index 001a53f9b4..bcf746b7ef 100644 --- a/synapse/storage/data_stores/main/end_to_end_keys.py +++ b/synapse/storage/data_stores/main/end_to_end_keys.py @@ -537,7 +537,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore): return result - def get_all_user_signature_changes_for_remotes(self, from_key, to_key): + def get_all_user_signature_changes_for_remotes(self, from_key, to_key, limit): """Return a list of changes from the user signature stream to notify remotes. Note that the user signature stream represents when a user signs their device with their user-signing key, which is not published to other @@ -552,13 +552,19 @@ class EndToEndKeyWorkerStore(SQLBaseStore): Deferred[list[(int,str)]] a list of `(stream_id, user_id)` """ sql = """ - SELECT MAX(stream_id) AS stream_id, from_user_id AS user_id + SELECT stream_id, from_user_id AS user_id FROM user_signature_stream WHERE ? < stream_id AND stream_id <= ? - GROUP BY user_id + ORDER BY stream_id ASC + LIMIT ? """ return self.db.execute( - "get_all_user_signature_changes_for_remotes", None, sql, from_key, to_key + "get_all_user_signature_changes_for_remotes", + None, + sql, + from_key, + to_key, + limit, ) diff --git a/synapse/storage/data_stores/main/presence.py b/synapse/storage/data_stores/main/presence.py index 604c8b7ddd..dab31e0c2d 100644 --- a/synapse/storage/data_stores/main/presence.py +++ b/synapse/storage/data_stores/main/presence.py @@ -60,7 +60,7 @@ class PresenceStore(SQLBaseStore): "status_msg": state.status_msg, "currently_active": state.currently_active, } - for state in presence_states + for stream_id, state in zip(stream_orderings, presence_states) ], ) @@ -73,19 +73,22 @@ class PresenceStore(SQLBaseStore): ) txn.execute(sql + clause, [stream_id] + list(args)) - def get_all_presence_updates(self, last_id, current_id): + def get_all_presence_updates(self, last_id, current_id, limit): if last_id == current_id: return defer.succeed([]) def get_all_presence_updates_txn(txn): - sql = ( - "SELECT stream_id, user_id, state, last_active_ts," - " last_federation_update_ts, last_user_sync_ts, status_msg," - " currently_active" - " FROM presence_stream" - " WHERE ? < stream_id AND stream_id <= ?" - ) - txn.execute(sql, (last_id, current_id)) + sql = """ + SELECT stream_id, user_id, state, last_active_ts, + last_federation_update_ts, last_user_sync_ts, + status_msg, + currently_active + FROM presence_stream + WHERE ? < stream_id AND stream_id <= ? + ORDER BY stream_id ASC + LIMIT ? + """ + txn.execute(sql, (last_id, current_id, limit)) return txn.fetchall() return self.db.runInteraction( -- cgit 1.4.1