summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorJason Little <realtyem@gmail.com>2023-03-08 06:22:27 -0600
committerGitHub <noreply@github.com>2023-03-08 06:22:27 -0600
commit0cf5e80870de37cc5115447f5a06d785f5cf8718 (patch)
tree8c705ee2208b5842d99e3828b41c1d8ff1cdbb32 /synapse/handlers
parentRemove condition based on number of ports (diff)
parentMore speedups/fixes to creating batched events (#15195) (diff)
downloadsynapse-0cf5e80870de37cc5115447f5a06d785f5cf8718.tar.xz
Merge branch 'develop' into comp-worker-shorthand
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/event_auth.py13
-rw-r--r--synapse/handlers/events.py20
-rw-r--r--synapse/handlers/initial_sync.py51
-rw-r--r--synapse/handlers/message.py9
-rw-r--r--synapse/handlers/pagination.py4
-rw-r--r--synapse/handlers/relations.py12
-rw-r--r--synapse/handlers/room.py4
-rw-r--r--synapse/handlers/search.py43
8 files changed, 107 insertions, 49 deletions
diff --git a/synapse/handlers/event_auth.py b/synapse/handlers/event_auth.py
index c508861b6a..0db0bd7304 100644
--- a/synapse/handlers/event_auth.py
+++ b/synapse/handlers/event_auth.py
@@ -63,9 +63,18 @@ class EventAuthHandler:
             self._store, event, batched_auth_events
         )
         auth_event_ids = event.auth_event_ids()
-        auth_events_by_id = await self._store.get_events(auth_event_ids)
+
         if batched_auth_events:
-            auth_events_by_id.update(batched_auth_events)
+            # Copy the batched auth events to avoid mutating them.
+            auth_events_by_id = dict(batched_auth_events)
+            needed_auth_event_ids = set(auth_event_ids) - set(batched_auth_events)
+            if needed_auth_event_ids:
+                auth_events_by_id.update(
+                    await self._store.get_events(needed_auth_event_ids)
+                )
+        else:
+            auth_events_by_id = await self._store.get_events(auth_event_ids)
+
         check_state_dependent_auth_rules(event, auth_events_by_id.values())
 
     def compute_auth_events(
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 949b69cb41..68c07f0265 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -23,7 +23,7 @@ from synapse.events.utils import SerializeEventConfig
 from synapse.handlers.presence import format_user_presence_state
 from synapse.storage.databases.main.events_worker import EventRedactBehaviour
 from synapse.streams.config import PaginationConfig
-from synapse.types import JsonDict, UserID
+from synapse.types import JsonDict, Requester, UserID
 from synapse.visibility import filter_events_for_client
 
 if TYPE_CHECKING:
@@ -46,13 +46,12 @@ class EventStreamHandler:
 
     async def get_stream(
         self,
-        auth_user_id: str,
+        requester: Requester,
         pagin_config: PaginationConfig,
         timeout: int = 0,
         as_client_event: bool = True,
         affect_presence: bool = True,
         room_id: Optional[str] = None,
-        is_guest: bool = False,
     ) -> JsonDict:
         """Fetches the events stream for a given user."""
 
@@ -62,13 +61,12 @@ class EventStreamHandler:
                 raise SynapseError(403, "This room has been blocked on this server")
 
         # send any outstanding server notices to the user.
-        await self._server_notices_sender.on_user_syncing(auth_user_id)
+        await self._server_notices_sender.on_user_syncing(requester.user.to_string())
 
-        auth_user = UserID.from_string(auth_user_id)
         presence_handler = self.hs.get_presence_handler()
 
         context = await presence_handler.user_syncing(
-            auth_user_id,
+            requester.user.to_string(),
             affect_presence=affect_presence,
             presence_state=PresenceState.ONLINE,
         )
@@ -82,10 +80,10 @@ class EventStreamHandler:
                 timeout = random.randint(int(timeout * 0.9), int(timeout * 1.1))
 
             stream_result = await self.notifier.get_events_for(
-                auth_user,
+                requester.user,
                 pagin_config,
                 timeout,
-                is_guest=is_guest,
+                is_guest=requester.is_guest,
                 explicit_room_id=room_id,
             )
             events = stream_result.events
@@ -102,7 +100,7 @@ class EventStreamHandler:
                     if event.membership != Membership.JOIN:
                         continue
                     # Send down presence.
-                    if event.state_key == auth_user_id:
+                    if event.state_key == requester.user.to_string():
                         # Send down presence for everyone in the room.
                         users: Iterable[str] = await self.store.get_users_in_room(
                             event.room_id
@@ -124,7 +122,9 @@ class EventStreamHandler:
             chunks = self._event_serializer.serialize_events(
                 events,
                 time_now,
-                config=SerializeEventConfig(as_client_event=as_client_event),
+                config=SerializeEventConfig(
+                    as_client_event=as_client_event, requester=requester
+                ),
             )
 
             chunk = {
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index aead0b44b9..b3be7a86f0 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -318,11 +318,9 @@ class InitialSyncHandler:
         )
         is_peeking = member_event_id is None
 
-        user_id = requester.user.to_string()
-
         if membership == Membership.JOIN:
             result = await self._room_initial_sync_joined(
-                user_id, room_id, pagin_config, membership, is_peeking
+                requester, room_id, pagin_config, membership, is_peeking
             )
         elif membership == Membership.LEAVE:
             # The member_event_id will always be available if membership is set
@@ -330,10 +328,16 @@ class InitialSyncHandler:
             assert member_event_id
 
             result = await self._room_initial_sync_parted(
-                user_id, room_id, pagin_config, membership, member_event_id, is_peeking
+                requester,
+                room_id,
+                pagin_config,
+                membership,
+                member_event_id,
+                is_peeking,
             )
 
         account_data_events = []
+        user_id = requester.user.to_string()
         tags = await self.store.get_tags_for_room(user_id, room_id)
         if tags:
             account_data_events.append(
@@ -350,7 +354,7 @@ class InitialSyncHandler:
 
     async def _room_initial_sync_parted(
         self,
-        user_id: str,
+        requester: Requester,
         room_id: str,
         pagin_config: PaginationConfig,
         membership: str,
@@ -369,13 +373,17 @@ class InitialSyncHandler:
         )
 
         messages = await filter_events_for_client(
-            self._storage_controllers, user_id, messages, is_peeking=is_peeking
+            self._storage_controllers,
+            requester.user.to_string(),
+            messages,
+            is_peeking=is_peeking,
         )
 
         start_token = StreamToken.START.copy_and_replace(StreamKeyType.ROOM, token)
         end_token = StreamToken.START.copy_and_replace(StreamKeyType.ROOM, stream_token)
 
         time_now = self.clock.time_msec()
+        serialize_options = SerializeEventConfig(requester=requester)
 
         return {
             "membership": membership,
@@ -383,14 +391,18 @@ class InitialSyncHandler:
             "messages": {
                 "chunk": (
                     # Don't bundle aggregations as this is a deprecated API.
-                    self._event_serializer.serialize_events(messages, time_now)
+                    self._event_serializer.serialize_events(
+                        messages, time_now, config=serialize_options
+                    )
                 ),
                 "start": await start_token.to_string(self.store),
                 "end": await end_token.to_string(self.store),
             },
             "state": (
                 # Don't bundle aggregations as this is a deprecated API.
-                self._event_serializer.serialize_events(room_state.values(), time_now)
+                self._event_serializer.serialize_events(
+                    room_state.values(), time_now, config=serialize_options
+                )
             ),
             "presence": [],
             "receipts": [],
@@ -398,7 +410,7 @@ class InitialSyncHandler:
 
     async def _room_initial_sync_joined(
         self,
-        user_id: str,
+        requester: Requester,
         room_id: str,
         pagin_config: PaginationConfig,
         membership: str,
@@ -410,9 +422,12 @@ class InitialSyncHandler:
 
         # TODO: These concurrently
         time_now = self.clock.time_msec()
+        serialize_options = SerializeEventConfig(requester=requester)
         # Don't bundle aggregations as this is a deprecated API.
         state = self._event_serializer.serialize_events(
-            current_state.values(), time_now
+            current_state.values(),
+            time_now,
+            config=serialize_options,
         )
 
         now_token = self.hs.get_event_sources().get_current_token()
@@ -450,7 +465,10 @@ class InitialSyncHandler:
             if not receipts:
                 return []
 
-            return ReceiptEventSource.filter_out_private_receipts(receipts, user_id)
+            return ReceiptEventSource.filter_out_private_receipts(
+                receipts,
+                requester.user.to_string(),
+            )
 
         presence, receipts, (messages, token) = await make_deferred_yieldable(
             gather_results(
@@ -469,20 +487,23 @@ class InitialSyncHandler:
         )
 
         messages = await filter_events_for_client(
-            self._storage_controllers, user_id, messages, is_peeking=is_peeking
+            self._storage_controllers,
+            requester.user.to_string(),
+            messages,
+            is_peeking=is_peeking,
         )
 
         start_token = now_token.copy_and_replace(StreamKeyType.ROOM, token)
         end_token = now_token
 
-        time_now = self.clock.time_msec()
-
         ret = {
             "room_id": room_id,
             "messages": {
                 "chunk": (
                     # Don't bundle aggregations as this is a deprecated API.
-                    self._event_serializer.serialize_events(messages, time_now)
+                    self._event_serializer.serialize_events(
+                        messages, time_now, config=serialize_options
+                    )
                 ),
                 "start": await start_token.to_string(self.store),
                 "end": await end_token.to_string(self.store),
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index e433d6b01f..da129ec16a 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -50,7 +50,7 @@ from synapse.event_auth import validate_event_for_room_version
 from synapse.events import EventBase, relation_from_event
 from synapse.events.builder import EventBuilder
 from synapse.events.snapshot import EventContext, UnpersistedEventContextBase
-from synapse.events.utils import maybe_upsert_event_field
+from synapse.events.utils import SerializeEventConfig, maybe_upsert_event_field
 from synapse.events.validator import EventValidator
 from synapse.handlers.directory import DirectoryHandler
 from synapse.logging import opentracing
@@ -245,8 +245,11 @@ class MessageHandler:
                 )
                 room_state = room_state_events[membership_event_id]
 
-        now = self.clock.time_msec()
-        events = self._event_serializer.serialize_events(room_state.values(), now)
+        events = self._event_serializer.serialize_events(
+            room_state.values(),
+            self.clock.time_msec(),
+            config=SerializeEventConfig(requester=requester),
+        )
         return events
 
     async def _user_can_see_state_at_event(
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index ceefa16b49..8c79c055ba 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -579,7 +579,9 @@ class PaginationHandler:
 
         time_now = self.clock.time_msec()
 
-        serialize_options = SerializeEventConfig(as_client_event=as_client_event)
+        serialize_options = SerializeEventConfig(
+            as_client_event=as_client_event, requester=requester
+        )
 
         chunk = {
             "chunk": (
diff --git a/synapse/handlers/relations.py b/synapse/handlers/relations.py
index 553053b694..1d09fdf135 100644
--- a/synapse/handlers/relations.py
+++ b/synapse/handlers/relations.py
@@ -20,6 +20,7 @@ import attr
 from synapse.api.constants import Direction, EventTypes, RelationTypes
 from synapse.api.errors import SynapseError
 from synapse.events import EventBase, relation_from_event
+from synapse.events.utils import SerializeEventConfig
 from synapse.logging.context import make_deferred_yieldable, run_in_background
 from synapse.logging.opentracing import trace
 from synapse.storage.databases.main.relations import ThreadsNextBatch, _RelatedEvent
@@ -151,16 +152,23 @@ class RelationsHandler:
         )
 
         now = self._clock.time_msec()
+        serialize_options = SerializeEventConfig(requester=requester)
         return_value: JsonDict = {
             "chunk": self._event_serializer.serialize_events(
-                events, now, bundle_aggregations=aggregations
+                events,
+                now,
+                bundle_aggregations=aggregations,
+                config=serialize_options,
             ),
         }
         if include_original_event:
             # Do not bundle aggregations when retrieving the original event because
             # we want the content before relations are applied to it.
             return_value["original_event"] = self._event_serializer.serialize_event(
-                event, now, bundle_aggregations=None
+                event,
+                now,
+                bundle_aggregations=None,
+                config=serialize_options,
             )
 
         if next_token:
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index b1784638f4..32451670f3 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -1123,7 +1123,9 @@ class RoomCreationHandler:
                 event_dict,
                 prev_event_ids=prev_event,
                 depth=depth,
-                state_map=state_map,
+                # Take a copy to ensure each event gets a unique copy of
+                # state_map since it is modified below.
+                state_map=dict(state_map),
                 for_batch=for_batch,
             )
 
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index 9bbf83047d..aad4706f14 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -23,7 +23,8 @@ from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import NotFoundError, SynapseError
 from synapse.api.filtering import Filter
 from synapse.events import EventBase
-from synapse.types import JsonDict, StrCollection, StreamKeyType, UserID
+from synapse.events.utils import SerializeEventConfig
+from synapse.types import JsonDict, Requester, StrCollection, StreamKeyType, UserID
 from synapse.types.state import StateFilter
 from synapse.visibility import filter_events_for_client
 
@@ -109,12 +110,12 @@ class SearchHandler:
         return historical_room_ids
 
     async def search(
-        self, user: UserID, content: JsonDict, batch: Optional[str] = None
+        self, requester: Requester, content: JsonDict, batch: Optional[str] = None
     ) -> JsonDict:
         """Performs a full text search for a user.
 
         Args:
-            user: The user performing the search.
+            requester: The user performing the search.
             content: Search parameters
             batch: The next_batch parameter. Used for pagination.
 
@@ -199,7 +200,7 @@ class SearchHandler:
             )
 
         return await self._search(
-            user,
+            requester,
             batch_group,
             batch_group_key,
             batch_token,
@@ -217,7 +218,7 @@ class SearchHandler:
 
     async def _search(
         self,
-        user: UserID,
+        requester: Requester,
         batch_group: Optional[str],
         batch_group_key: Optional[str],
         batch_token: Optional[str],
@@ -235,7 +236,7 @@ class SearchHandler:
         """Performs a full text search for a user.
 
         Args:
-            user: The user performing the search.
+            requester: The user performing the search.
             batch_group: Pagination information.
             batch_group_key: Pagination information.
             batch_token: Pagination information.
@@ -269,7 +270,7 @@ class SearchHandler:
 
         # TODO: Search through left rooms too
         rooms = await self.store.get_rooms_for_local_user_where_membership_is(
-            user.to_string(),
+            requester.user.to_string(),
             membership_list=[Membership.JOIN],
             # membership_list=[Membership.JOIN, Membership.LEAVE, Membership.Ban],
         )
@@ -303,13 +304,13 @@ class SearchHandler:
 
         if order_by == "rank":
             search_result, sender_group = await self._search_by_rank(
-                user, room_ids, search_term, keys, search_filter
+                requester.user, room_ids, search_term, keys, search_filter
             )
             # Unused return values for rank search.
             global_next_batch = None
         elif order_by == "recent":
             search_result, global_next_batch = await self._search_by_recent(
-                user,
+                requester.user,
                 room_ids,
                 search_term,
                 keys,
@@ -334,7 +335,7 @@ class SearchHandler:
             assert after_limit is not None
 
             contexts = await self._calculate_event_contexts(
-                user,
+                requester.user,
                 search_result.allowed_events,
                 before_limit,
                 after_limit,
@@ -363,27 +364,37 @@ class SearchHandler:
                 # The returned events.
                 search_result.allowed_events,
             ),
-            user.to_string(),
+            requester.user.to_string(),
         )
 
         # We're now about to serialize the events. We should not make any
         # blocking calls after this. Otherwise, the 'age' will be wrong.
 
         time_now = self.clock.time_msec()
+        serialize_options = SerializeEventConfig(requester=requester)
 
         for context in contexts.values():
             context["events_before"] = self._event_serializer.serialize_events(
-                context["events_before"], time_now, bundle_aggregations=aggregations
+                context["events_before"],
+                time_now,
+                bundle_aggregations=aggregations,
+                config=serialize_options,
             )
             context["events_after"] = self._event_serializer.serialize_events(
-                context["events_after"], time_now, bundle_aggregations=aggregations
+                context["events_after"],
+                time_now,
+                bundle_aggregations=aggregations,
+                config=serialize_options,
             )
 
         results = [
             {
                 "rank": search_result.rank_map[e.event_id],
                 "result": self._event_serializer.serialize_event(
-                    e, time_now, bundle_aggregations=aggregations
+                    e,
+                    time_now,
+                    bundle_aggregations=aggregations,
+                    config=serialize_options,
                 ),
                 "context": contexts.get(e.event_id, {}),
             }
@@ -398,7 +409,9 @@ class SearchHandler:
 
         if state_results:
             rooms_cat_res["state"] = {
-                room_id: self._event_serializer.serialize_events(state_events, time_now)
+                room_id: self._event_serializer.serialize_events(
+                    state_events, time_now, config=serialize_options
+                )
                 for room_id, state_events in state_results.items()
             }