diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/federation.py | 75 | ||||
-rw-r--r-- | synapse/handlers/federation_event.py | 39 |
2 files changed, 114 insertions, 0 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 78d149905f..1434e99056 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -466,6 +466,8 @@ class FederationHandler: ) if ret.partial_state: + # TODO(faster_joins): roll this back if we don't manage to start the + # background resync (eg process_remote_join fails) await self.store.store_partial_state_room(room_id, ret.servers_in_room) max_stream_id = await self._federation_event_handler.process_remote_join( @@ -478,6 +480,18 @@ class FederationHandler: partial_state=ret.partial_state, ) + if ret.partial_state: + # Kick off the process of asynchronously fetching the state for this + # room. + # + # TODO(faster_joins): pick this up again on restart + run_as_background_process( + desc="sync_partial_state_room", + func=self._sync_partial_state_room, + destination=origin, + room_id=room_id, + ) + # We wait here until this instance has seen the events come down # replication (if we're using replication) as the below uses caches. await self._replication.wait_for_stream_position( @@ -1370,3 +1384,64 @@ class FederationHandler: # We fell off the bottom, couldn't get the complexity from anyone. Oh # well. return None + + async def _sync_partial_state_room( + self, + destination: str, + room_id: str, + ) -> None: + """Background process to resync the state of a partial-state room + + Args: + destination: homeserver to pull the state from + room_id: room to be resynced + """ + + # TODO(faster_joins): do we need to lock to avoid races? What happens if other + # worker processes kick off a resync in parallel? Perhaps we should just elect + # a single worker to do the resync. + # + # TODO(faster_joins): what happens if we leave the room during a resync? if we + # really leave, that might mean we have difficulty getting the room state over + # federation. + # + # TODO(faster_joins): try other destinations if the one we have fails + + logger.info("Syncing state for room %s via %s", room_id, destination) + + # we work through the queue in order of increasing stream ordering. + while True: + batch = await self.store.get_partial_state_events_batch(room_id) + if not batch: + # all the events are updated, so we can update current state and + # clear the lazy-loading flag. + logger.info("Updating current state for %s", room_id) + assert ( + self.storage.persistence is not None + ), "TODO(faster_joins): support for workers" + await self.storage.persistence.update_current_state(room_id) + + logger.info("Clearing partial-state flag for %s", room_id) + success = await self.store.clear_partial_state_room(room_id) + if success: + logger.info("State resync complete for %s", room_id) + + # TODO(faster_joins) update room stats and user directory? + return + + # we raced against more events arriving with partial state. Go round + # the loop again. We've already logged a warning, so no need for more. + # TODO(faster_joins): there is still a race here, whereby incoming events which raced + # with us will fail to be persisted after the call to `clear_partial_state_room` due to + # having partial state. + continue + + events = await self.store.get_events_as_list( + batch, + redact_behaviour=EventRedactBehaviour.AS_IS, + allow_rejected=True, + ) + for event in events: + await self._federation_event_handler.update_state_for_partial_state_event( + destination, event + ) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 03c1197c99..32bf02818c 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -477,6 +477,45 @@ class FederationEventHandler: return await self.persist_events_and_notify(room_id, [(event, context)]) + async def update_state_for_partial_state_event( + self, destination: str, event: EventBase + ) -> None: + """Recalculate the state at an event as part of a de-partial-stating process + + Args: + destination: server to request full state from + event: partial-state event to be de-partial-stated + """ + logger.info("Updating state for %s", event.event_id) + with nested_logging_context(suffix=event.event_id): + # if we have all the event's prev_events, then we can work out the + # state based on their states. Otherwise, we request it from the destination + # server. + # + # This is the same operation as we do when we receive a regular event + # over federation. + state = await self._resolve_state_at_missing_prevs(destination, event) + + # build a new state group for it if need be + context = await self._state_handler.compute_event_context( + event, + old_state=state, + ) + if context.partial_state: + # this can happen if some or all of the event's prev_events still have + # partial state - ie, an event has an earlier stream_ordering than one + # or more of its prev_events, so we de-partial-state it before its + # prev_events. + # + # TODO(faster_joins): we probably need to be more intelligent, and + # exclude partial-state prev_events from consideration + logger.warning( + "%s still has partial state: can't de-partial-state it yet", + event.event_id, + ) + return + await self._store.update_state_for_partial_state_event(event, context) + async def backfill( self, dest: str, room_id: str, limit: int, extremities: Collection[str] ) -> None: |