summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorDavid Robertson <davidr@element.io>2021-12-02 20:58:32 +0000
committerGitHub <noreply@github.com>2021-12-02 20:58:32 +0000
commitd26808dd854006bd26a2366c675428ce0737238c (patch)
treefe22756f8eef75a3393200bceca52632187a7a6d /synapse
parentClean up tests.storage.test_appservice (#11492) (diff)
downloadsynapse-d26808dd854006bd26a2366c675428ce0737238c.tar.xz
Comments on the /sync tentacles (#11494)
This mainly consists of docstrings and inline comments. There are one or two type annotations and variable renames thrown in while I was here.

Co-authored-by: Patrick Cloke <clokep@users.noreply.github.com>
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/sync.py156
-rw-r--r--synapse/storage/databases/main/stream.py15
2 files changed, 128 insertions, 43 deletions
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 891435c14d..53d4627147 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -334,6 +334,19 @@ class SyncHandler:
         full_state: bool,
         cache_context: ResponseCacheContext[SyncRequestKey],
     ) -> SyncResult:
+        """The start of the machinery that produces a /sync response.
+
+        See https://spec.matrix.org/v1.1/client-server-api/#syncing for full details.
+
+        This method does high-level bookkeeping:
+        - tracking the kind of sync in the logging context
+        - deleting any to_device messages whose delivery has been acknowledged.
+        - deciding if we should dispatch an instant or delayed response
+        - marking the sync as being lazily loaded, if appropriate
+
+        Computing the body of the response begins in the next method,
+        `current_sync_for_user`.
+        """
         if since_token is None:
             sync_type = "initial_sync"
         elif full_state:
@@ -363,7 +376,7 @@ class SyncHandler:
                 sync_config, since_token, full_state=full_state
             )
         else:
-
+            # Otherwise, we wait for something to happen and report it to the user.
             async def current_sync_callback(
                 before_token: StreamToken, after_token: StreamToken
             ) -> SyncResult:
@@ -402,7 +415,12 @@ class SyncHandler:
         since_token: Optional[StreamToken] = None,
         full_state: bool = False,
     ) -> SyncResult:
-        """Get the sync for client needed to match what the server has now."""
+        """Generates the response body of a sync result, represented as a SyncResult.
+
+        This is a wrapper around `generate_sync_result` which starts an open tracing
+        span to track the sync. See `generate_sync_result` for the next part of your
+        indoctrination.
+        """
         with start_active_span("current_sync_for_user"):
             log_kv({"since_token": since_token})
             sync_result = await self.generate_sync_result(
@@ -560,7 +578,7 @@ class SyncHandler:
                 # that have happened since `since_key` up to `end_key`, so we
                 # can just use `get_room_events_stream_for_room`.
                 # Otherwise, we want to return the last N events in the room
-                # in toplogical ordering.
+                # in topological ordering.
                 if since_key:
                     events, end_key = await self.store.get_room_events_stream_for_room(
                         room_id,
@@ -1042,7 +1060,18 @@ class SyncHandler:
         since_token: Optional[StreamToken] = None,
         full_state: bool = False,
     ) -> SyncResult:
-        """Generates a sync result."""
+        """Generates the response body of a sync result.
+
+        This is represented by a `SyncResult` struct, which is built from small pieces
+        using a `SyncResultBuilder`. See also
+            https://spec.matrix.org/v1.1/client-server-api/#get_matrixclientv3sync
+        the `sync_result_builder` is passed as a mutable ("inout") parameter to various
+        helper functions. These retrieve and process the data which forms the sync body,
+        often writing to the `sync_result_builder` to store their output.
+
+        At the end, we transfer data from the `sync_result_builder` to a new `SyncResult`
+        instance to signify that the sync calculation is complete.
+        """
         # NB: The now_token gets changed by some of the generate_sync_* methods,
         # this is due to some of the underlying streams not supporting the ability
         # to query up to a given point.
@@ -1344,14 +1373,22 @@ class SyncHandler:
     async def _generate_sync_entry_for_account_data(
         self, sync_result_builder: "SyncResultBuilder"
     ) -> Dict[str, Dict[str, JsonDict]]:
-        """Generates the account data portion of the sync response. Populates
-        `sync_result_builder` with the result.
+        """Generates the account data portion of the sync response.
+
+        Account data (called "Client Config" in the spec) can be set either globally
+        or for a specific room. Account data consists of a list of events which
+        accumulate state, much like a room.
+
+        This function retrieves global and per-room account data. The former is written
+        to the given `sync_result_builder`. The latter is returned directly, to be
+        later written to the `sync_result_builder` on a room-by-room basis.
 
         Args:
             sync_result_builder
 
         Returns:
-            A dictionary containing the per room account data.
+            A dictionary whose keys (room ids) map to the per room account data for that
+            room.
         """
         sync_config = sync_result_builder.sync_config
         user_id = sync_result_builder.sync_config.user.to_string()
@@ -1359,7 +1396,7 @@ class SyncHandler:
 
         if since_token and not sync_result_builder.full_state:
             (
-                account_data,
+                global_account_data,
                 account_data_by_room,
             ) = await self.store.get_updated_account_data_for_user(
                 user_id, since_token.account_data_key
@@ -1370,23 +1407,23 @@ class SyncHandler:
             )
 
             if push_rules_changed:
-                account_data["m.push_rules"] = await self.push_rules_for_user(
+                global_account_data["m.push_rules"] = await self.push_rules_for_user(
                     sync_config.user
                 )
         else:
             (
-                account_data,
+                global_account_data,
                 account_data_by_room,
             ) = await self.store.get_account_data_for_user(sync_config.user.to_string())
 
-            account_data["m.push_rules"] = await self.push_rules_for_user(
+            global_account_data["m.push_rules"] = await self.push_rules_for_user(
                 sync_config.user
             )
 
         account_data_for_user = await sync_config.filter_collection.filter_account_data(
             [
                 {"type": account_data_type, "content": content}
-                for account_data_type, content in account_data.items()
+                for account_data_type, content in global_account_data.items()
             ]
         )
 
@@ -1460,15 +1497,22 @@ class SyncHandler:
         """Generates the rooms portion of the sync response. Populates the
         `sync_result_builder` with the result.
 
+        In the response that reaches the client, rooms are divided into four categories:
+        `invite`, `join`, `knock`, `leave`. These aren't the same as the four sets of
+        room ids returned by this function.
+
         Args:
             sync_result_builder
             account_data_by_room: Dictionary of per room account data
 
         Returns:
-            Returns a 4-tuple of
-            `(newly_joined_rooms, newly_joined_or_invited_users,
-            newly_left_rooms, newly_left_users)`
+            Returns a 4-tuple whose entries are:
+            - newly_joined_rooms
+            - newly_joined_or_invited_or_knocked_users
+            - newly_left_rooms
+            - newly_left_users
         """
+        # Start by fetching all ephemeral events in rooms we've joined (if required).
         user_id = sync_result_builder.sync_config.user.to_string()
         block_all_room_ephemeral = (
             sync_result_builder.since_token is None
@@ -1590,6 +1634,8 @@ class SyncHandler:
     ) -> bool:
         """Returns whether there may be any new events that should be sent down
         the sync. Returns True if there are.
+
+        Does not modify the `sync_result_builder`.
         """
         user_id = sync_result_builder.sync_config.user.to_string()
         since_token = sync_result_builder.since_token
@@ -1597,12 +1643,13 @@ class SyncHandler:
 
         assert since_token
 
-        # Get a list of membership change events that have happened.
-        rooms_changed = await self.store.get_membership_changes_for_user(
+        # Get a list of membership change events that have happened to the user
+        # requesting the sync.
+        membership_changes = await self.store.get_membership_changes_for_user(
             user_id, since_token.room_key, now_token.room_key
         )
 
-        if rooms_changed:
+        if membership_changes:
             return True
 
         stream_id = since_token.room_key.stream
@@ -1614,7 +1661,25 @@ class SyncHandler:
     async def _get_rooms_changed(
         self, sync_result_builder: "SyncResultBuilder", ignored_users: FrozenSet[str]
     ) -> _RoomChanges:
-        """Gets the the changes that have happened since the last sync."""
+        """Determine the changes in rooms to report to the user.
+
+        Ideally, we want to report all events whose stream ordering `s` lies in the
+        range `since_token < s <= now_token`, where the two tokens are read from the
+        sync_result_builder.
+
+        If there are too many events in that range to report, things get complicated.
+        In this situation we return a truncated list of the most recent events, and
+        indicate in the response that there is a "gap" of omitted events. Additionally:
+
+        - we include a "state_delta", to describe the changes in state over the gap,
+        - we include all membership events applying to the user making the request,
+          even those in the gap.
+
+        See the spec for the rationale:
+            https://spec.matrix.org/v1.1/client-server-api/#syncing
+
+        The sync_result_builder is not modified by this function.
+        """
         user_id = sync_result_builder.sync_config.user.to_string()
         since_token = sync_result_builder.since_token
         now_token = sync_result_builder.now_token
@@ -1622,21 +1687,36 @@ class SyncHandler:
 
         assert since_token
 
-        # Get a list of membership change events that have happened.
-        rooms_changed = await self.store.get_membership_changes_for_user(
+        # The spec
+        #     https://spec.matrix.org/v1.1/client-server-api/#get_matrixclientv3sync
+        # notes that membership events need special consideration:
+        #
+        # > When a sync is limited, the server MUST return membership events for events
+        # > in the gap (between since and the start of the returned timeline), regardless
+        # > as to whether or not they are redundant.
+        #
+        # We fetch such events here, but we only seem to use them for categorising rooms
+        # as newly joined, newly left, invited or knocked.
+        # TODO: we've already called this function and ran this query in
+        #       _have_rooms_changed. We could keep the results in memory to avoid a
+        #       second query, at the cost of more complicated source code.
+        membership_change_events = await self.store.get_membership_changes_for_user(
             user_id, since_token.room_key, now_token.room_key
         )
 
         mem_change_events_by_room_id: Dict[str, List[EventBase]] = {}
-        for event in rooms_changed:
+        for event in membership_change_events:
             mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)
 
-        newly_joined_rooms = []
-        newly_left_rooms = []
-        room_entries = []
-        invited = []
-        knocked = []
+        newly_joined_rooms: List[str] = []
+        newly_left_rooms: List[str] = []
+        room_entries: List[RoomSyncResultBuilder] = []
+        invited: List[InvitedSyncResult] = []
+        knocked: List[KnockedSyncResult] = []
         for room_id, events in mem_change_events_by_room_id.items():
+            # The body of this loop will add this room to at least one of the five lists
+            # above. Things get messy if you've e.g. joined, left, joined then left the
+            # room all in the same sync period.
             logger.debug(
                 "Membership changes in %s: [%s]",
                 room_id,
@@ -1781,7 +1861,9 @@ class SyncHandler:
 
         timeline_limit = sync_config.filter_collection.timeline_limit()
 
-        # Get all events for rooms we're currently joined to.
+        # Get all events since the `from_key` in rooms we're currently joined to.
+        # If there are too many, we get the most recent events only. This leaves
+        # a "gap" in the timeline, as described by the spec for /sync.
         room_to_events = await self.store.get_room_events_stream_for_rooms(
             room_ids=sync_result_builder.joined_room_ids,
             from_key=since_token.room_key,
@@ -1842,6 +1924,10 @@ class SyncHandler:
     ) -> _RoomChanges:
         """Returns entries for all rooms for the user.
 
+        Like `_get_rooms_changed`, but assumes the `since_token` is `None`.
+
+        This function does not modify the sync_result_builder.
+
         Args:
             sync_result_builder
             ignored_users: Set of users ignored by user.
@@ -1853,16 +1939,9 @@ class SyncHandler:
         now_token = sync_result_builder.now_token
         sync_config = sync_result_builder.sync_config
 
-        membership_list = (
-            Membership.INVITE,
-            Membership.KNOCK,
-            Membership.JOIN,
-            Membership.LEAVE,
-            Membership.BAN,
-        )
-
         room_list = await self.store.get_rooms_for_local_user_where_membership_is(
-            user_id=user_id, membership_list=membership_list
+            user_id=user_id,
+            membership_list=Membership.LIST,
         )
 
         room_entries = []
@@ -2212,8 +2291,7 @@ def _calculate_state(
     # to only include membership events for the senders in the timeline.
     # In practice, we can do this by removing them from the p_ids list,
     # which is the list of relevant state we know we have already sent to the client.
-    # see https://github.com/matrix-org/synapse/pull/2970
-    #            /files/efcdacad7d1b7f52f879179701c7e0d9b763511f#r204732809
+    # see https://github.com/matrix-org/synapse/pull/2970/files/efcdacad7d1b7f52f879179701c7e0d9b763511f#r204732809
 
     if lazy_load_members:
         p_ids.difference_update(
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 42dc807d17..57aab55259 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -497,7 +497,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
                 oldest `limit` events.
 
         Returns:
-            The list of events (in ascending order) and the token from the start
+            The list of events (in ascending stream order) and the token from the start
             of the chunk of events returned.
         """
         if from_key == to_key:
@@ -510,7 +510,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
         if not has_changed:
             return [], from_key
 
-        def f(txn):
+        def f(txn: LoggingTransaction) -> List[_EventDictReturn]:
             # To handle tokens with a non-empty instance_map we fetch more
             # results than necessary and then filter down
             min_from_id = from_key.stream
@@ -565,6 +565,13 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
     async def get_membership_changes_for_user(
         self, user_id: str, from_key: RoomStreamToken, to_key: RoomStreamToken
     ) -> List[EventBase]:
+        """Fetch membership events for a given user.
+
+        All such events whose stream ordering `s` lies in the range
+        `from_key < s <= to_key` are returned. Events are ordered by ascending stream
+        order.
+        """
+        # Start by ruling out cases where a DB query is not necessary.
         if from_key == to_key:
             return []
 
@@ -575,7 +582,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
             if not has_changed:
                 return []
 
-        def f(txn):
+        def f(txn: LoggingTransaction) -> List[_EventDictReturn]:
             # To handle tokens with a non-empty instance_map we fetch more
             # results than necessary and then filter down
             min_from_id = from_key.stream
@@ -634,7 +641,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
 
         Returns:
             A list of events and a token pointing to the start of the returned
-            events. The events returned are in ascending order.
+            events. The events returned are in ascending topological order.
         """
 
         rows, token = await self.get_recent_event_ids_for_room(