summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/9634.misc1
-rw-r--r--changelog.d/9638.misc1
-rw-r--r--synapse/handlers/device.py22
-rw-r--r--synapse/replication/tcp/streams/_base.py17
-rw-r--r--synapse/rest/client/v1/room.py15
-rw-r--r--synapse/server.py11
6 files changed, 49 insertions, 18 deletions
diff --git a/changelog.d/9634.misc b/changelog.d/9634.misc
new file mode 100644

index 0000000000..59ac42cb83 --- /dev/null +++ b/changelog.d/9634.misc
@@ -0,0 +1 @@ +Only save remote cross-signing and device keys if they're different from the current ones. diff --git a/changelog.d/9638.misc b/changelog.d/9638.misc new file mode 100644
index 0000000000..35338cd332 --- /dev/null +++ b/changelog.d/9638.misc
@@ -0,0 +1 @@ +Add additional type hints to the Homeserver object. diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 6aa3f73eee..2fc4951df4 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py
@@ -907,6 +907,7 @@ class DeviceListUpdater: master_key = result.get("master_key") self_signing_key = result.get("self_signing_key") + ignore_devices = False # If the remote server has more than ~1000 devices for this user # we assume that something is going horribly wrong (e.g. a bot # that logs in and creates a new device every time it tries to @@ -925,6 +926,12 @@ class DeviceListUpdater: len(devices), ) devices = [] + ignore_devices = True + else: + cached_devices = await self.store.get_cached_devices_for_user(user_id) + if cached_devices == {d["device_id"]: d for d in devices}: + devices = [] + ignore_devices = True for device in devices: logger.debug( @@ -934,7 +941,10 @@ class DeviceListUpdater: stream_id, ) - await self.store.update_remote_device_list_cache(user_id, devices, stream_id) + if not ignore_devices: + await self.store.update_remote_device_list_cache( + user_id, devices, stream_id + ) device_ids = [device["device_id"] for device in devices] # Handle cross-signing keys. @@ -945,7 +955,8 @@ class DeviceListUpdater: ) device_ids = device_ids + cross_signing_device_ids - await self.device_handler.notify_device_update(user_id, device_ids) + if device_ids: + await self.device_handler.notify_device_update(user_id, device_ids) # We clobber the seen updates since we've re-synced from a given # point. @@ -973,14 +984,17 @@ class DeviceListUpdater: """ device_ids = [] - if master_key: + current_keys_map = await self.store.get_e2e_cross_signing_keys_bulk([user_id]) + current_keys = current_keys_map.get(user_id) or {} + + if master_key and master_key != current_keys.get("master"): await self.store.set_e2e_cross_signing_key(user_id, "master", master_key) _, verify_key = get_verify_key_from_cross_signing_key(master_key) # verify_key is a VerifyKey from signedjson, which uses # .version to denote the portion of the key ID after the # algorithm and colon, which is the device ID device_ids.append(verify_key.version) - if self_signing_key: + if self_signing_key and self_signing_key != current_keys.get("self_signing"): await self.store.set_e2e_cross_signing_key( user_id, "self_signing", self_signing_key ) diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index f45e7a8c89..7e8e64d61c 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py
@@ -33,7 +33,7 @@ import attr from synapse.replication.http.streams import ReplicationGetStreamUpdates if TYPE_CHECKING: - import synapse.server + from synapse.app.homeserver import HomeServer logger = logging.getLogger(__name__) @@ -299,20 +299,23 @@ class TypingStream(Stream): NAME = "typing" ROW_TYPE = TypingStreamRow - def __init__(self, hs): - typing_handler = hs.get_typing_handler() - + def __init__(self, hs: "HomeServer"): writer_instance = hs.config.worker.writers.typing if writer_instance == hs.get_instance_name(): # On the writer, query the typing handler - update_function = typing_handler.get_all_typing_updates + typing_writer_handler = hs.get_typing_writer_handler() + update_function = ( + typing_writer_handler.get_all_typing_updates + ) # type: Callable[[str, int, int, int], Awaitable[Tuple[List[Tuple[int, Any]], int, bool]]] + current_token_function = typing_writer_handler.get_current_token else: # Query the typing writer process update_function = make_http_update_function(hs, self.NAME) + current_token_function = hs.get_typing_handler().get_current_token super().__init__( hs.get_instance_name(), - current_token_without_instance(typing_handler.get_current_token), + current_token_without_instance(current_token_function), update_function, ) @@ -509,7 +512,7 @@ class AccountDataStream(Stream): NAME = "account_data" ROW_TYPE = AccountDataStreamRow - def __init__(self, hs: "synapse.server.HomeServer"): + def __init__(self, hs: "HomeServer"): self.store = hs.get_datastore() super().__init__( hs.get_instance_name(), diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 95aaf51d23..4ed82a5036 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py
@@ -49,7 +49,7 @@ from synapse.util import json_decoder from synapse.util.stringutils import parse_and_validate_server_name, random_string if TYPE_CHECKING: - import synapse.server + from synapse.app.homeserver import HomeServer logger = logging.getLogger(__name__) @@ -845,10 +845,10 @@ class RoomTypingRestServlet(RestServlet): "/rooms/(?P<room_id>[^/]*)/typing/(?P<user_id>[^/]*)$", v1=True ) - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): super().__init__() + self.hs = hs self.presence_handler = hs.get_presence_handler() - self.typing_handler = hs.get_typing_handler() self.auth = hs.get_auth() # If we're not on the typing writer instance we should scream if we get @@ -873,16 +873,19 @@ class RoomTypingRestServlet(RestServlet): # Limit timeout to stop people from setting silly typing timeouts. timeout = min(content.get("timeout", 30000), 120000) + # Defer getting the typing handler since it will raise on workers. + typing_handler = self.hs.get_typing_writer_handler() + try: if content["typing"]: - await self.typing_handler.started_typing( + await typing_handler.started_typing( target_user=target_user, requester=requester, room_id=room_id, timeout=timeout, ) else: - await self.typing_handler.stopped_typing( + await typing_handler.stopped_typing( target_user=target_user, requester=requester, room_id=room_id ) except ShadowBanError: @@ -900,7 +903,7 @@ class RoomAliasListServlet(RestServlet): ), ] - def __init__(self, hs: "synapse.server.HomeServer"): + def __init__(self, hs: "HomeServer"): super().__init__() self.auth = hs.get_auth() self.directory_handler = hs.get_directory_handler() diff --git a/synapse/server.py b/synapse/server.py
index dd4ee7dd3c..d11d08c573 100644 --- a/synapse/server.py +++ b/synapse/server.py
@@ -417,10 +417,19 @@ class HomeServer(metaclass=abc.ABCMeta): return PresenceHandler(self) @cache_in_self - def get_typing_handler(self): + def get_typing_writer_handler(self) -> TypingWriterHandler: if self.config.worker.writers.typing == self.get_instance_name(): return TypingWriterHandler(self) else: + raise Exception("Workers cannot write typing") + + @cache_in_self + def get_typing_handler(self) -> FollowerTypingHandler: + if self.config.worker.writers.typing == self.get_instance_name(): + # Use get_typing_writer_handler to ensure that we use the same + # cached version. + return self.get_typing_writer_handler() + else: return FollowerTypingHandler(self) @cache_in_self