summary refs log tree commit diff
path: root/synapse/handlers/sync.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/sync.py')
-rw-r--r--synapse/handlers/sync.py132
1 files changed, 26 insertions, 106 deletions
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 6389c51b1c..e2563428d2 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -844,7 +844,6 @@ class SyncHandler:
                     sync_config.user.to_string(),
                     recents,
                     always_include_ids=current_state_ids,
-                    msc4115_membership_on_events=self.hs_config.experimental.msc4115_membership_on_events,
                 )
                 log_kv({"recents_after_visibility_filtering": len(recents)})
             else:
@@ -930,7 +929,6 @@ class SyncHandler:
                     sync_config.user.to_string(),
                     loaded_recents,
                     always_include_ids=current_state_ids,
-                    msc4115_membership_on_events=self.hs_config.experimental.msc4115_membership_on_events,
                 )
 
                 loaded_recents = []
@@ -981,89 +979,6 @@ class SyncHandler:
             bundled_aggregations=bundled_aggregations,
         )
 
-    async def get_state_after_event(
-        self,
-        event_id: str,
-        state_filter: Optional[StateFilter] = None,
-        await_full_state: bool = True,
-    ) -> StateMap[str]:
-        """
-        Get the room state after the given event
-
-        Args:
-            event_id: event of interest
-            state_filter: The state filter used to fetch state from the database.
-            await_full_state: if `True`, will block if we do not yet have complete state
-                at the event and `state_filter` is not satisfied by partial state.
-                Defaults to `True`.
-        """
-        state_ids = await self._state_storage_controller.get_state_ids_for_event(
-            event_id,
-            state_filter=state_filter or StateFilter.all(),
-            await_full_state=await_full_state,
-        )
-
-        # using get_metadata_for_events here (instead of get_event) sidesteps an issue
-        # with redactions: if `event_id` is a redaction event, and we don't have the
-        # original (possibly because it got purged), get_event will refuse to return
-        # the redaction event, which isn't terribly helpful here.
-        #
-        # (To be fair, in that case we could assume it's *not* a state event, and
-        # therefore we don't need to worry about it. But still, it seems cleaner just
-        # to pull the metadata.)
-        m = (await self.store.get_metadata_for_events([event_id]))[event_id]
-        if m.state_key is not None and m.rejection_reason is None:
-            state_ids = dict(state_ids)
-            state_ids[(m.event_type, m.state_key)] = event_id
-
-        return state_ids
-
-    async def get_state_at(
-        self,
-        room_id: str,
-        stream_position: StreamToken,
-        state_filter: Optional[StateFilter] = None,
-        await_full_state: bool = True,
-    ) -> StateMap[str]:
-        """Get the room state at a particular stream position
-
-        Args:
-            room_id: room for which to get state
-            stream_position: point at which to get state
-            state_filter: The state filter used to fetch state from the database.
-            await_full_state: if `True`, will block if we do not yet have complete state
-                at the last event in the room before `stream_position` and
-                `state_filter` is not satisfied by partial state. Defaults to `True`.
-        """
-        # FIXME: This gets the state at the latest event before the stream ordering,
-        # which might not be the same as the "current state" of the room at the time
-        # of the stream token if there were multiple forward extremities at the time.
-        last_event_id = await self.store.get_last_event_in_room_before_stream_ordering(
-            room_id,
-            end_token=stream_position.room_key,
-        )
-
-        if last_event_id:
-            state = await self.get_state_after_event(
-                last_event_id,
-                state_filter=state_filter or StateFilter.all(),
-                await_full_state=await_full_state,
-            )
-
-        else:
-            # no events in this room - so presumably no state
-            state = {}
-
-            # (erikj) This should be rarely hit, but we've had some reports that
-            # we get more state down gappy syncs than we should, so let's add
-            # some logging.
-            logger.info(
-                "Failed to find any events in room %s at %s",
-                room_id,
-                stream_position.room_key,
-            )
-        return state
-
     async def compute_summary(
         self,
         room_id: str,
@@ -1437,7 +1352,7 @@ class SyncHandler:
             await_full_state = True
             lazy_load_members = False
 
-        state_at_timeline_end = await self.get_state_at(
+        state_at_timeline_end = await self._state_storage_controller.get_state_at(
             room_id,
             stream_position=end_token,
             state_filter=state_filter,
@@ -1521,7 +1436,7 @@ class SyncHandler:
             # We need to make sure the first event in our batch points to the
             # last event in the previous batch.
             last_event_id_prev_batch = (
-                await self.store.get_last_event_in_room_before_stream_ordering(
+                await self.store.get_last_event_id_in_room_before_stream_ordering(
                     room_id,
                     end_token=since_token.room_key,
                 )
@@ -1565,7 +1480,7 @@ class SyncHandler:
         else:
             # We can get here if the user has ignored the senders of all
             # the recent events.
-            state_at_timeline_start = await self.get_state_at(
+            state_at_timeline_start = await self._state_storage_controller.get_state_at(
                 room_id,
                 stream_position=end_token,
                 state_filter=state_filter,
@@ -1587,14 +1502,14 @@ class SyncHandler:
             # about them).
             state_filter = StateFilter.all()
 
-        state_at_previous_sync = await self.get_state_at(
+        state_at_previous_sync = await self._state_storage_controller.get_state_at(
             room_id,
             stream_position=since_token,
             state_filter=state_filter,
             await_full_state=await_full_state,
         )
 
-        state_at_timeline_end = await self.get_state_at(
+        state_at_timeline_end = await self._state_storage_controller.get_state_at(
             room_id,
             stream_position=end_token,
             state_filter=state_filter,
@@ -2002,7 +1917,7 @@ class SyncHandler:
         """
         user_id = sync_config.user.to_string()
 
-        # Note: we get the users room list *before* we get the current token, this
+        # Note: we get the users room list *before* we get the `now_token`, this
         # avoids checking back in history if rooms are joined after the token is fetched.
         token_before_rooms = self.event_sources.get_current_token()
         mutable_joined_room_ids = set(await self.store.get_rooms_for_user(user_id))
@@ -2014,10 +1929,10 @@ class SyncHandler:
         now_token = self.event_sources.get_current_token()
         log_kv({"now_token": now_token})
 
-        # Since we fetched the users room list before the token, there's a small window
-        # during which membership events may have been persisted, so we fetch these now
-        # and modify the joined room list for any changes between the get_rooms_for_user
-        # call and the get_current_token call.
+        # Since we fetched the users room list before calculating the `now_token` (see
+        # above), there's a small window during which membership events may have been
+        # persisted, so we fetch these now and modify the joined room list for any
+        # changes between the get_rooms_for_user call and the get_current_token call.
         membership_change_events = []
         if since_token:
             membership_change_events = await self.store.get_membership_changes_for_user(
@@ -2027,16 +1942,19 @@ class SyncHandler:
                 self.rooms_to_exclude_globally,
             )
 
-            mem_last_change_by_room_id: Dict[str, EventBase] = {}
+            last_membership_change_by_room_id: Dict[str, EventBase] = {}
             for event in membership_change_events:
-                mem_last_change_by_room_id[event.room_id] = event
+                last_membership_change_by_room_id[event.room_id] = event
 
             # For the latest membership event in each room found, add/remove the room ID
             # from the joined room list accordingly. In this case we only care if the
             # latest change is JOIN.
 
-            for room_id, event in mem_last_change_by_room_id.items():
+            for room_id, event in last_membership_change_by_room_id.items():
                 assert event.internal_metadata.stream_ordering
+                # As a shortcut, skip any events that happened before we got our
+                # `get_rooms_for_user()` snapshot (any changes are already represented
+                # in that list).
                 if (
                     event.internal_metadata.stream_ordering
                     < token_before_rooms.room_key.stream
@@ -2590,7 +2508,7 @@ class SyncHandler:
                 continue
 
             if room_id in sync_result_builder.joined_room_ids or has_join:
-                old_state_ids = await self.get_state_at(
+                old_state_ids = await self._state_storage_controller.get_state_at(
                     room_id,
                     since_token,
                     state_filter=StateFilter.from_types([(EventTypes.Member, user_id)]),
@@ -2620,12 +2538,14 @@ class SyncHandler:
                     newly_left_rooms.append(room_id)
                 else:
                     if not old_state_ids:
-                        old_state_ids = await self.get_state_at(
-                            room_id,
-                            since_token,
-                            state_filter=StateFilter.from_types(
-                                [(EventTypes.Member, user_id)]
-                            ),
+                        old_state_ids = (
+                            await self._state_storage_controller.get_state_at(
+                                room_id,
+                                since_token,
+                                state_filter=StateFilter.from_types(
+                                    [(EventTypes.Member, user_id)]
+                                ),
+                            )
                         )
                         old_mem_ev_id = old_state_ids.get(
                             (EventTypes.Member, user_id), None
@@ -2830,7 +2750,7 @@ class SyncHandler:
                             continue
 
                 leave_token = now_token.copy_and_replace(
-                    StreamKeyType.ROOM, RoomStreamToken(stream=event.stream_ordering)
+                    StreamKeyType.ROOM, RoomStreamToken(stream=event.event_pos.stream)
                 )
                 room_entries.append(
                     RoomSyncResultBuilder(