diff --git a/changelog.d/9634.misc b/changelog.d/9634.misc
new file mode 100644
index 0000000000..59ac42cb83
--- /dev/null
+++ b/changelog.d/9634.misc
@@ -0,0 +1 @@
+Only save remote cross-signing and device keys if they're different from the current ones.
diff --git a/changelog.d/9638.misc b/changelog.d/9638.misc
new file mode 100644
index 0000000000..35338cd332
--- /dev/null
+++ b/changelog.d/9638.misc
@@ -0,0 +1 @@
+Add additional type hints to the Homeserver object.
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 6aa3f73eee..2fc4951df4 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -907,6 +907,7 @@ class DeviceListUpdater:
master_key = result.get("master_key")
self_signing_key = result.get("self_signing_key")
+ ignore_devices = False
# If the remote server has more than ~1000 devices for this user
# we assume that something is going horribly wrong (e.g. a bot
# that logs in and creates a new device every time it tries to
@@ -925,6 +926,12 @@ class DeviceListUpdater:
len(devices),
)
devices = []
+ ignore_devices = True
+ else:
+ cached_devices = await self.store.get_cached_devices_for_user(user_id)
+ if cached_devices == {d["device_id"]: d for d in devices}:
+ devices = []
+ ignore_devices = True
for device in devices:
logger.debug(
@@ -934,7 +941,10 @@ class DeviceListUpdater:
stream_id,
)
- await self.store.update_remote_device_list_cache(user_id, devices, stream_id)
+ if not ignore_devices:
+ await self.store.update_remote_device_list_cache(
+ user_id, devices, stream_id
+ )
device_ids = [device["device_id"] for device in devices]
# Handle cross-signing keys.
@@ -945,7 +955,8 @@ class DeviceListUpdater:
)
device_ids = device_ids + cross_signing_device_ids
- await self.device_handler.notify_device_update(user_id, device_ids)
+ if device_ids:
+ await self.device_handler.notify_device_update(user_id, device_ids)
# We clobber the seen updates since we've re-synced from a given
# point.
@@ -973,14 +984,17 @@ class DeviceListUpdater:
"""
device_ids = []
- if master_key:
+ current_keys_map = await self.store.get_e2e_cross_signing_keys_bulk([user_id])
+ current_keys = current_keys_map.get(user_id) or {}
+
+ if master_key and master_key != current_keys.get("master"):
await self.store.set_e2e_cross_signing_key(user_id, "master", master_key)
_, verify_key = get_verify_key_from_cross_signing_key(master_key)
# verify_key is a VerifyKey from signedjson, which uses
# .version to denote the portion of the key ID after the
# algorithm and colon, which is the device ID
device_ids.append(verify_key.version)
- if self_signing_key:
+ if self_signing_key and self_signing_key != current_keys.get("self_signing"):
await self.store.set_e2e_cross_signing_key(
user_id, "self_signing", self_signing_key
)
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index f45e7a8c89..7e8e64d61c 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -33,7 +33,7 @@ import attr
from synapse.replication.http.streams import ReplicationGetStreamUpdates
if TYPE_CHECKING:
- import synapse.server
+ from synapse.app.homeserver import HomeServer
logger = logging.getLogger(__name__)
@@ -299,20 +299,23 @@ class TypingStream(Stream):
NAME = "typing"
ROW_TYPE = TypingStreamRow
- def __init__(self, hs):
- typing_handler = hs.get_typing_handler()
-
+ def __init__(self, hs: "HomeServer"):
writer_instance = hs.config.worker.writers.typing
if writer_instance == hs.get_instance_name():
# On the writer, query the typing handler
- update_function = typing_handler.get_all_typing_updates
+ typing_writer_handler = hs.get_typing_writer_handler()
+ update_function = (
+ typing_writer_handler.get_all_typing_updates
+ ) # type: Callable[[str, int, int, int], Awaitable[Tuple[List[Tuple[int, Any]], int, bool]]]
+ current_token_function = typing_writer_handler.get_current_token
else:
# Query the typing writer process
update_function = make_http_update_function(hs, self.NAME)
+ current_token_function = hs.get_typing_handler().get_current_token
super().__init__(
hs.get_instance_name(),
- current_token_without_instance(typing_handler.get_current_token),
+ current_token_without_instance(current_token_function),
update_function,
)
@@ -509,7 +512,7 @@ class AccountDataStream(Stream):
NAME = "account_data"
ROW_TYPE = AccountDataStreamRow
- def __init__(self, hs: "synapse.server.HomeServer"):
+ def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastore()
super().__init__(
hs.get_instance_name(),
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 95aaf51d23..4ed82a5036 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -49,7 +49,7 @@ from synapse.util import json_decoder
from synapse.util.stringutils import parse_and_validate_server_name, random_string
if TYPE_CHECKING:
- import synapse.server
+ from synapse.app.homeserver import HomeServer
logger = logging.getLogger(__name__)
@@ -845,10 +845,10 @@ class RoomTypingRestServlet(RestServlet):
"/rooms/(?P<room_id>[^/]*)/typing/(?P<user_id>[^/]*)$", v1=True
)
- def __init__(self, hs):
+ def __init__(self, hs: "HomeServer"):
super().__init__()
+ self.hs = hs
self.presence_handler = hs.get_presence_handler()
- self.typing_handler = hs.get_typing_handler()
self.auth = hs.get_auth()
# If we're not on the typing writer instance we should scream if we get
@@ -873,16 +873,19 @@ class RoomTypingRestServlet(RestServlet):
# Limit timeout to stop people from setting silly typing timeouts.
timeout = min(content.get("timeout", 30000), 120000)
+ # Defer getting the typing handler since it will raise on workers.
+ typing_handler = self.hs.get_typing_writer_handler()
+
try:
if content["typing"]:
- await self.typing_handler.started_typing(
+ await typing_handler.started_typing(
target_user=target_user,
requester=requester,
room_id=room_id,
timeout=timeout,
)
else:
- await self.typing_handler.stopped_typing(
+ await typing_handler.stopped_typing(
target_user=target_user, requester=requester, room_id=room_id
)
except ShadowBanError:
@@ -900,7 +903,7 @@ class RoomAliasListServlet(RestServlet):
),
]
- def __init__(self, hs: "synapse.server.HomeServer"):
+ def __init__(self, hs: "HomeServer"):
super().__init__()
self.auth = hs.get_auth()
self.directory_handler = hs.get_directory_handler()
diff --git a/synapse/server.py b/synapse/server.py
index dd4ee7dd3c..d11d08c573 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -417,10 +417,19 @@ class HomeServer(metaclass=abc.ABCMeta):
return PresenceHandler(self)
@cache_in_self
- def get_typing_handler(self):
+ def get_typing_writer_handler(self) -> TypingWriterHandler:
if self.config.worker.writers.typing == self.get_instance_name():
return TypingWriterHandler(self)
else:
+ raise Exception("Workers cannot write typing")
+
+ @cache_in_self
+ def get_typing_handler(self) -> FollowerTypingHandler:
+ if self.config.worker.writers.typing == self.get_instance_name():
+ # Use get_typing_writer_handler to ensure that we use the same
+ # cached version.
+ return self.get_typing_writer_handler()
+ else:
return FollowerTypingHandler(self)
@cache_in_self
|