diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 882be905db..12837429b9 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -94,7 +94,7 @@ from synapse.types import (
)
from synapse.types.state import StateFilter
from synapse.util.async_helpers import Linearizer, concurrently_execute
-from synapse.util.iterutils import batch_iter, partition, sorted_topologically_batched
+from synapse.util.iterutils import batch_iter, partition, sorted_topologically
from synapse.util.retryutils import NotRetryingDestination
from synapse.util.stringutils import shortstr
@@ -1141,16 +1141,8 @@ class FederationEventHandler:
partial_state_flags = await self._store.get_partial_state_events(seen)
partial_state = any(partial_state_flags.values())
- # Get the state of the events we know about
- ours = await self._state_storage_controller.get_state_groups_ids(
- room_id, seen, await_full_state=False
- )
-
# state_maps is a list of mappings from (type, state_key) to event_id
- state_maps: List[StateMap[str]] = list(ours.values())
-
- # we don't need this any more, let's delete it.
- del ours
+ state_maps: List[StateMap[str]] = []
# Ask the remote server for the states we don't
# know about
@@ -1169,6 +1161,17 @@ class FederationEventHandler:
state_maps.append(remote_state_map)
+ # Get the state of the events we know about. We do this *after*
+ # trying to fetch missing state over federation as that might fail
+ # and then we can skip loading the local state.
+ ours = await self._state_storage_controller.get_state_groups_ids(
+ room_id, seen, await_full_state=False
+ )
+ state_maps.extend(ours.values())
+
+ # we don't need this any more, let's delete it.
+ del ours
+
room_version = await self._store.get_room_version_id(room_id)
state_map = await self._state_resolution_handler.resolve_events_with_store(
room_id,
@@ -1678,57 +1681,36 @@ class FederationEventHandler:
# We need to persist an event's auth events before the event.
auth_graph = {
- ev: [event_map[e_id] for e_id in ev.auth_event_ids() if e_id in event_map]
+ ev.event_id: [e_id for e_id in ev.auth_event_ids() if e_id in event_map]
for ev in event_map.values()
}
- for roots in sorted_topologically_batched(event_map.values(), auth_graph):
- if not roots:
- # if *none* of the remaining events are ready, that means
- # we have a loop. This either means a bug in our logic, or that
- # somebody has managed to create a loop (which requires finding a
- # hash collision in room v2 and later).
- logger.warning(
- "Loop found in auth events while fetching missing state/auth "
- "events: %s",
- shortstr(event_map.keys()),
- )
- return
-
- logger.info(
- "Persisting %i of %i remaining outliers: %s",
- len(roots),
- len(event_map),
- shortstr(e.event_id for e in roots),
- )
-
- await self._auth_and_persist_outliers_inner(room_id, roots)
-
- async def _auth_and_persist_outliers_inner(
- self, room_id: str, fetched_events: Collection[EventBase]
- ) -> None:
- """Helper for _auth_and_persist_outliers
-
- Persists a batch of events where we have (theoretically) already persisted all
- of their auth events.
-
- Marks the events as outliers, auths them, persists them to the database, and,
- where appropriate (eg, an invite), awakes the notifier.
+ sorted_auth_event_ids = sorted_topologically(event_map.keys(), auth_graph)
+ sorted_auth_events = [event_map[e_id] for e_id in sorted_auth_event_ids]
+ logger.info(
+ "Persisting %i remaining outliers: %s",
+ len(sorted_auth_events),
+ shortstr(e.event_id for e in sorted_auth_events),
+ )
- Params:
- origin: where the events came from
- room_id: the room that the events are meant to be in (though this has
- not yet been checked)
- fetched_events: the events to persist
- """
# get all the auth events for all the events in this batch. By now, they should
# have been persisted.
- auth_events = {
- aid for event in fetched_events for aid in event.auth_event_ids()
+ auth_event_ids = {
+ aid for event in sorted_auth_events for aid in event.auth_event_ids()
}
- persisted_events = await self._store.get_events(
- auth_events,
- allow_rejected=True,
- )
+ auth_map = {
+ ev.event_id: ev
+ for ev in sorted_auth_events
+ if ev.event_id in auth_event_ids
+ }
+
+ missing_events = auth_event_ids.difference(auth_map)
+ if missing_events:
+ persisted_events = await self._store.get_events(
+ missing_events,
+ allow_rejected=True,
+ redact_behaviour=EventRedactBehaviour.as_is,
+ )
+ auth_map.update(persisted_events)
events_and_contexts_to_persist: List[Tuple[EventBase, EventContext]] = []
@@ -1736,7 +1718,7 @@ class FederationEventHandler:
with nested_logging_context(suffix=event.event_id):
auth = []
for auth_event_id in event.auth_event_ids():
- ae = persisted_events.get(auth_event_id)
+ ae = auth_map.get(auth_event_id)
if not ae:
# the fact we can't find the auth event doesn't mean it doesn't
# exist, which means it is premature to reject `event`. Instead we
@@ -1755,7 +1737,9 @@ class FederationEventHandler:
context = EventContext.for_outlier(self._storage_controllers)
try:
validate_event_for_room_version(event)
- await check_state_independent_auth_rules(self._store, event)
+ await check_state_independent_auth_rules(
+ self._store, event, batched_auth_events=auth_map
+ )
check_state_dependent_auth_rules(event, auth)
except AuthError as e:
logger.warning("Rejecting %r because %s", event, e)
@@ -1772,7 +1756,7 @@ class FederationEventHandler:
events_and_contexts_to_persist.append((event, context))
- for event in fetched_events:
+ for event in sorted_auth_events:
await prep(event)
await self.persist_events_and_notify(
diff --git a/synapse/handlers/room_summary.py b/synapse/handlers/room_summary.py
index a534f5f280..78bcac1429 100644
--- a/synapse/handlers/room_summary.py
+++ b/synapse/handlers/room_summary.py
@@ -44,6 +44,7 @@ from synapse.api.ratelimiting import Ratelimiter
from synapse.config.ratelimiting import RatelimitSettings
from synapse.events import EventBase
from synapse.types import JsonDict, Requester, StrCollection
+from synapse.types.state import StateFilter
from synapse.util.caches.response_cache import ResponseCache
if TYPE_CHECKING:
@@ -546,7 +547,16 @@ class RoomSummaryHandler:
Returns:
True if the room is accessible to the requesting user or server.
"""
- state_ids = await self._storage_controllers.state.get_current_state_ids(room_id)
+ event_types = [
+ (EventTypes.JoinRules, ""),
+ (EventTypes.RoomHistoryVisibility, ""),
+ ]
+ if requester:
+ event_types.append((EventTypes.Member, requester))
+
+ state_ids = await self._storage_controllers.state.get_current_state_ids(
+ room_id, state_filter=StateFilter.from_types(event_types)
+ )
# If there's no state for the room, it isn't known.
if not state_ids:
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 0385c04bc2..2e10035772 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -583,10 +583,11 @@ class SyncHandler:
# `recents`, so partial state is only a problem when a membership
# event turns up in `recents` but has not made it into the current
# state.
- current_state_ids_map = (
- await self.store.get_partial_current_state_ids(room_id)
+ current_state_ids = (
+ await self.store.check_if_events_in_current_state(
+ {e.event_id for e in recents if e.is_state()}
+ )
)
- current_state_ids = frozenset(current_state_ids_map.values())
recents = await filter_events_for_client(
self._storage_controllers,
@@ -667,10 +668,11 @@ class SyncHandler:
# `loaded_recents`, so partial state is only a problem when a
# membership event turns up in `loaded_recents` but has not made it
# into the current state.
- current_state_ids_map = (
- await self.store.get_partial_current_state_ids(room_id)
+ current_state_ids = (
+ await self.store.check_if_events_in_current_state(
+ {e.event_id for e in loaded_recents if e.is_state()}
+ )
)
- current_state_ids = frozenset(current_state_ids_map.values())
loaded_recents = await filter_events_for_client(
self._storage_controllers,
|