From 998f7fe7d4ddb2dddf4d46a8a420a6fd7e37577c Mon Sep 17 00:00:00 2001 From: Hubert Chathi Date: Wed, 30 Oct 2019 17:22:52 -0400 Subject: make user signatures a separate stream --- synapse/storage/data_stores/main/devices.py | 13 +----------- .../storage/data_stores/main/end_to_end_keys.py | 24 ++++++++++++++++++++++ 2 files changed, 25 insertions(+), 12 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/data_stores/main/devices.py b/synapse/storage/data_stores/main/devices.py index a96f09ea7b..f7a3542348 100644 --- a/synapse/storage/data_stores/main/devices.py +++ b/synapse/storage/data_stores/main/devices.py @@ -543,20 +543,9 @@ class DeviceWorkerStore(SQLBaseStore): LEFT JOIN device_lists_outbound_pokes USING (stream_id, user_id, device_id) WHERE ? < stream_id AND stream_id <= ? GROUP BY user_id, destination - UNION - SELECT MAX(stream_id) AS stream_id, from_user_id AS user_id, NULL AS destination - FROM user_signature_stream - WHERE ? < stream_id AND stream_id <= ? - GROUP BY user_id """ return self._execute( - "get_all_device_list_changes_for_remotes", - None, - sql, - from_key, - to_key, - from_key, - to_key, + "get_all_device_list_changes_for_remotes", None, sql, from_key, to_key ) @cached(max_entries=10000) diff --git a/synapse/storage/data_stores/main/end_to_end_keys.py b/synapse/storage/data_stores/main/end_to_end_keys.py index a0bc6f2d18..073412a78d 100644 --- a/synapse/storage/data_stores/main/end_to_end_keys.py +++ b/synapse/storage/data_stores/main/end_to_end_keys.py @@ -315,6 +315,30 @@ class EndToEndKeyWorkerStore(SQLBaseStore): from_user_id, ) + def get_all_user_signature_changes_for_remotes(self, from_key, to_key): + """Return a list of changes from the user signature stream to notify remotes. + Note that the user signature stream represents when a user signs their + device with their user-signing key, which is not published to other + users or servers, so no `destination` is needed in the returned + list. However, this is needed to poke workers. + + Args: + from_key (int): the stream ID to start at (exclusive) + to_key (int): the stream ID to end at (inclusive) + + Returns: + Deferred[list[(int,str)]] a list of `(stream_id, user_id)` + """ + sql = """ + SELECT MAX(stream_id) AS stream_id, from_user_id AS user_id + FROM user_signature_stream + WHERE ? < stream_id AND stream_id <= ? + GROUP BY user_id + """ + return self._execute( + "get_all_user_signature_changes_for_remotes", None, sql, from_key, to_key + ) + class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore): def set_e2e_device_keys(self, user_id, device_id, time_now, device_keys): -- cgit 1.4.1