diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 93472d0117..1af6d77545 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -396,15 +396,17 @@ class DeviceWorkerHandler:
up_to_stream_id = task.params["up_to_stream_id"]
# Delete the messages in batches to avoid too much DB load.
+ from_stream_id = None
while True:
- res = await self.store.delete_messages_for_device(
+ from_stream_id, _ = await self.store.delete_messages_for_device_between(
user_id=user_id,
device_id=device_id,
- up_to_stream_id=up_to_stream_id,
+ from_stream_id=from_stream_id,
+ to_stream_id=up_to_stream_id,
limit=DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT,
)
- if res < DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT:
+ if from_stream_id is None:
return TaskStatus.COMPLETE, None, None
await self.clock.sleep(DeviceHandler.DEVICE_MSGS_DELETE_SLEEP_MS / 1000.0)
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index d06524495f..70fa931d17 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -1450,19 +1450,25 @@ class E2eKeysHandler:
return desired_key_data
- async def is_cross_signing_set_up_for_user(self, user_id: str) -> bool:
+ async def check_cross_signing_setup(self, user_id: str) -> Tuple[bool, bool]:
"""Checks if the user has cross-signing set up
Args:
user_id: The user to check
- Returns:
- True if the user has cross-signing set up, False otherwise
+ Returns: a 2-tuple of booleans
+ - whether the user has cross-signing set up, and
+ - whether the user's master cross-signing key may be replaced without UIA.
"""
- existing_master_key = await self.store.get_e2e_cross_signing_key(
- user_id, "master"
- )
- return existing_master_key is not None
+ (
+ exists,
+ ts_replacable_without_uia_before,
+ ) = await self.store.get_master_cross_signing_key_updatable_before(user_id)
+
+ if ts_replacable_without_uia_before is None:
+ return exists, False
+ else:
+ return exists, self.clock.time_msec() < ts_replacable_without_uia_before
def _check_cross_signing_key(
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 0cc8e990d9..f4c17894aa 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -88,7 +88,7 @@ from synapse.types import (
)
from synapse.types.state import StateFilter
from synapse.util.async_helpers import Linearizer, concurrently_execute
-from synapse.util.iterutils import batch_iter, partition
+from synapse.util.iterutils import batch_iter, partition, sorted_topologically_batched
from synapse.util.retryutils import NotRetryingDestination
from synapse.util.stringutils import shortstr
@@ -748,7 +748,7 @@ class FederationEventHandler:
# fetching fresh state for the room if the missing event
# can't be found, which slightly reduces our security.
# it may also increase our DAG extremity count for the room,
- # causing additional state resolution? See #1760.
+ # causing additional state resolution? See https://github.com/matrix-org/synapse/issues/1760.
# However, fetching state doesn't hold the linearizer lock
# apparently.
#
@@ -1669,14 +1669,13 @@ class FederationEventHandler:
# XXX: it might be possible to kick this process off in parallel with fetching
# the events.
- while event_map:
- # build a list of events whose auth events are not in the queue.
- roots = tuple(
- ev
- for ev in event_map.values()
- if not any(aid in event_map for aid in ev.auth_event_ids())
- )
+ # We need to persist an event's auth events before the event.
+ auth_graph = {
+ ev: [event_map[e_id] for e_id in ev.auth_event_ids() if e_id in event_map]
+ for ev in event_map.values()
+ }
+ for roots in sorted_topologically_batched(event_map.values(), auth_graph):
if not roots:
# if *none* of the remaining events are ready, that means
# we have a loop. This either means a bug in our logic, or that
@@ -1698,9 +1697,6 @@ class FederationEventHandler:
await self._auth_and_persist_outliers_inner(room_id, roots)
- for ev in roots:
- del event_map[ev.event_id]
-
async def _auth_and_persist_outliers_inner(
self, room_id: str, fetched_events: Collection[EventBase]
) -> None:
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 202beee738..4137fd50b1 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -1816,7 +1816,7 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
# the same token repeatedly.
#
# Hence this guard where we just return nothing so that the sync
- # doesn't return. C.f. #5503.
+ # doesn't return. C.f. https://github.com/matrix-org/synapse/issues/5503.
return [], max_token
# Figure out which other users this user should explicitly receive
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 2f1bc5a015..bf0106c6e7 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -399,7 +399,7 @@ class SyncHandler:
#
# If that happens, we mustn't cache it, so that when the client comes back
# with the same cache token, we don't immediately return the same empty
- # result, causing a tightloop. (#8518)
+ # result, causing a tightloop. (https://github.com/matrix-org/synapse/issues/8518)
if result.next_batch == since_token:
cache_context.should_cache = False
@@ -1003,7 +1003,7 @@ class SyncHandler:
# always make sure we LL ourselves so we know we're in the room
# (if we are) to fix https://github.com/vector-im/riot-web/issues/7209
# We only need apply this on full state syncs given we disabled
- # LL for incr syncs in #3840.
+ # LL for incr syncs in https://github.com/matrix-org/synapse/pull/3840.
# We don't insert ourselves into `members_to_fetch`, because in some
# rare cases (an empty event batch with a now_token after the user's
# leave in a partial state room which another local user has
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index 75717ba4f9..3c19ea56f8 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -184,8 +184,8 @@ class UserDirectoryHandler(StateDeltasHandler):
"""Called to update index of our local user profiles when they change
irrespective of any rooms the user may be in.
"""
- # FIXME(#3714): We should probably do this in the same worker as all
- # the other changes.
+ # FIXME(https://github.com/matrix-org/synapse/issues/3714): We should
+ # probably do this in the same worker as all the other changes.
if await self.store.should_include_local_user_in_dir(user_id):
await self.store.update_profile_in_user_dir(
@@ -194,8 +194,8 @@ class UserDirectoryHandler(StateDeltasHandler):
async def handle_local_user_deactivated(self, user_id: str) -> None:
"""Called when a user ID is deactivated"""
- # FIXME(#3714): We should probably do this in the same worker as all
- # the other changes.
+ # FIXME(https://github.com/matrix-org/synapse/issues/3714): We should
+ # probably do this in the same worker as all the other changes.
await self.store.remove_from_user_dir(user_id)
async def _unsafe_process(self) -> None:
|