diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 949b69cb41..68c07f0265 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -23,7 +23,7 @@ from synapse.events.utils import SerializeEventConfig
from synapse.handlers.presence import format_user_presence_state
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.streams.config import PaginationConfig
-from synapse.types import JsonDict, UserID
+from synapse.types import JsonDict, Requester, UserID
from synapse.visibility import filter_events_for_client
if TYPE_CHECKING:
@@ -46,13 +46,12 @@ class EventStreamHandler:
async def get_stream(
self,
- auth_user_id: str,
+ requester: Requester,
pagin_config: PaginationConfig,
timeout: int = 0,
as_client_event: bool = True,
affect_presence: bool = True,
room_id: Optional[str] = None,
- is_guest: bool = False,
) -> JsonDict:
"""Fetches the events stream for a given user."""
@@ -62,13 +61,12 @@ class EventStreamHandler:
raise SynapseError(403, "This room has been blocked on this server")
# send any outstanding server notices to the user.
- await self._server_notices_sender.on_user_syncing(auth_user_id)
+ await self._server_notices_sender.on_user_syncing(requester.user.to_string())
- auth_user = UserID.from_string(auth_user_id)
presence_handler = self.hs.get_presence_handler()
context = await presence_handler.user_syncing(
- auth_user_id,
+ requester.user.to_string(),
affect_presence=affect_presence,
presence_state=PresenceState.ONLINE,
)
@@ -82,10 +80,10 @@ class EventStreamHandler:
timeout = random.randint(int(timeout * 0.9), int(timeout * 1.1))
stream_result = await self.notifier.get_events_for(
- auth_user,
+ requester.user,
pagin_config,
timeout,
- is_guest=is_guest,
+ is_guest=requester.is_guest,
explicit_room_id=room_id,
)
events = stream_result.events
@@ -102,7 +100,7 @@ class EventStreamHandler:
if event.membership != Membership.JOIN:
continue
# Send down presence.
- if event.state_key == auth_user_id:
+ if event.state_key == requester.user.to_string():
# Send down presence for everyone in the room.
users: Iterable[str] = await self.store.get_users_in_room(
event.room_id
@@ -124,7 +122,9 @@ class EventStreamHandler:
chunks = self._event_serializer.serialize_events(
events,
time_now,
- config=SerializeEventConfig(as_client_event=as_client_event),
+ config=SerializeEventConfig(
+ as_client_event=as_client_event, requester=requester
+ ),
)
chunk = {
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index aead0b44b9..b3be7a86f0 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -318,11 +318,9 @@ class InitialSyncHandler:
)
is_peeking = member_event_id is None
- user_id = requester.user.to_string()
-
if membership == Membership.JOIN:
result = await self._room_initial_sync_joined(
- user_id, room_id, pagin_config, membership, is_peeking
+ requester, room_id, pagin_config, membership, is_peeking
)
elif membership == Membership.LEAVE:
# The member_event_id will always be available if membership is set
@@ -330,10 +328,16 @@ class InitialSyncHandler:
assert member_event_id
result = await self._room_initial_sync_parted(
- user_id, room_id, pagin_config, membership, member_event_id, is_peeking
+ requester,
+ room_id,
+ pagin_config,
+ membership,
+ member_event_id,
+ is_peeking,
)
account_data_events = []
+ user_id = requester.user.to_string()
tags = await self.store.get_tags_for_room(user_id, room_id)
if tags:
account_data_events.append(
@@ -350,7 +354,7 @@ class InitialSyncHandler:
async def _room_initial_sync_parted(
self,
- user_id: str,
+ requester: Requester,
room_id: str,
pagin_config: PaginationConfig,
membership: str,
@@ -369,13 +373,17 @@ class InitialSyncHandler:
)
messages = await filter_events_for_client(
- self._storage_controllers, user_id, messages, is_peeking=is_peeking
+ self._storage_controllers,
+ requester.user.to_string(),
+ messages,
+ is_peeking=is_peeking,
)
start_token = StreamToken.START.copy_and_replace(StreamKeyType.ROOM, token)
end_token = StreamToken.START.copy_and_replace(StreamKeyType.ROOM, stream_token)
time_now = self.clock.time_msec()
+ serialize_options = SerializeEventConfig(requester=requester)
return {
"membership": membership,
@@ -383,14 +391,18 @@ class InitialSyncHandler:
"messages": {
"chunk": (
# Don't bundle aggregations as this is a deprecated API.
- self._event_serializer.serialize_events(messages, time_now)
+ self._event_serializer.serialize_events(
+ messages, time_now, config=serialize_options
+ )
),
"start": await start_token.to_string(self.store),
"end": await end_token.to_string(self.store),
},
"state": (
# Don't bundle aggregations as this is a deprecated API.
- self._event_serializer.serialize_events(room_state.values(), time_now)
+ self._event_serializer.serialize_events(
+ room_state.values(), time_now, config=serialize_options
+ )
),
"presence": [],
"receipts": [],
@@ -398,7 +410,7 @@ class InitialSyncHandler:
async def _room_initial_sync_joined(
self,
- user_id: str,
+ requester: Requester,
room_id: str,
pagin_config: PaginationConfig,
membership: str,
@@ -410,9 +422,12 @@ class InitialSyncHandler:
# TODO: These concurrently
time_now = self.clock.time_msec()
+ serialize_options = SerializeEventConfig(requester=requester)
# Don't bundle aggregations as this is a deprecated API.
state = self._event_serializer.serialize_events(
- current_state.values(), time_now
+ current_state.values(),
+ time_now,
+ config=serialize_options,
)
now_token = self.hs.get_event_sources().get_current_token()
@@ -450,7 +465,10 @@ class InitialSyncHandler:
if not receipts:
return []
- return ReceiptEventSource.filter_out_private_receipts(receipts, user_id)
+ return ReceiptEventSource.filter_out_private_receipts(
+ receipts,
+ requester.user.to_string(),
+ )
presence, receipts, (messages, token) = await make_deferred_yieldable(
gather_results(
@@ -469,20 +487,23 @@ class InitialSyncHandler:
)
messages = await filter_events_for_client(
- self._storage_controllers, user_id, messages, is_peeking=is_peeking
+ self._storage_controllers,
+ requester.user.to_string(),
+ messages,
+ is_peeking=is_peeking,
)
start_token = now_token.copy_and_replace(StreamKeyType.ROOM, token)
end_token = now_token
- time_now = self.clock.time_msec()
-
ret = {
"room_id": room_id,
"messages": {
"chunk": (
# Don't bundle aggregations as this is a deprecated API.
- self._event_serializer.serialize_events(messages, time_now)
+ self._event_serializer.serialize_events(
+ messages, time_now, config=serialize_options
+ )
),
"start": await start_token.to_string(self.store),
"end": await end_token.to_string(self.store),
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index e433d6b01f..da129ec16a 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -50,7 +50,7 @@ from synapse.event_auth import validate_event_for_room_version
from synapse.events import EventBase, relation_from_event
from synapse.events.builder import EventBuilder
from synapse.events.snapshot import EventContext, UnpersistedEventContextBase
-from synapse.events.utils import maybe_upsert_event_field
+from synapse.events.utils import SerializeEventConfig, maybe_upsert_event_field
from synapse.events.validator import EventValidator
from synapse.handlers.directory import DirectoryHandler
from synapse.logging import opentracing
@@ -245,8 +245,11 @@ class MessageHandler:
)
room_state = room_state_events[membership_event_id]
- now = self.clock.time_msec()
- events = self._event_serializer.serialize_events(room_state.values(), now)
+ events = self._event_serializer.serialize_events(
+ room_state.values(),
+ self.clock.time_msec(),
+ config=SerializeEventConfig(requester=requester),
+ )
return events
async def _user_can_see_state_at_event(
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index ceefa16b49..8c79c055ba 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -579,7 +579,9 @@ class PaginationHandler:
time_now = self.clock.time_msec()
- serialize_options = SerializeEventConfig(as_client_event=as_client_event)
+ serialize_options = SerializeEventConfig(
+ as_client_event=as_client_event, requester=requester
+ )
chunk = {
"chunk": (
diff --git a/synapse/handlers/relations.py b/synapse/handlers/relations.py
index 553053b694..1d09fdf135 100644
--- a/synapse/handlers/relations.py
+++ b/synapse/handlers/relations.py
@@ -20,6 +20,7 @@ import attr
from synapse.api.constants import Direction, EventTypes, RelationTypes
from synapse.api.errors import SynapseError
from synapse.events import EventBase, relation_from_event
+from synapse.events.utils import SerializeEventConfig
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.logging.opentracing import trace
from synapse.storage.databases.main.relations import ThreadsNextBatch, _RelatedEvent
@@ -151,16 +152,23 @@ class RelationsHandler:
)
now = self._clock.time_msec()
+ serialize_options = SerializeEventConfig(requester=requester)
return_value: JsonDict = {
"chunk": self._event_serializer.serialize_events(
- events, now, bundle_aggregations=aggregations
+ events,
+ now,
+ bundle_aggregations=aggregations,
+ config=serialize_options,
),
}
if include_original_event:
# Do not bundle aggregations when retrieving the original event because
# we want the content before relations are applied to it.
return_value["original_event"] = self._event_serializer.serialize_event(
- event, now, bundle_aggregations=None
+ event,
+ now,
+ bundle_aggregations=None,
+ config=serialize_options,
)
if next_token:
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index 9bbf83047d..aad4706f14 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -23,7 +23,8 @@ from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import NotFoundError, SynapseError
from synapse.api.filtering import Filter
from synapse.events import EventBase
-from synapse.types import JsonDict, StrCollection, StreamKeyType, UserID
+from synapse.events.utils import SerializeEventConfig
+from synapse.types import JsonDict, Requester, StrCollection, StreamKeyType, UserID
from synapse.types.state import StateFilter
from synapse.visibility import filter_events_for_client
@@ -109,12 +110,12 @@ class SearchHandler:
return historical_room_ids
async def search(
- self, user: UserID, content: JsonDict, batch: Optional[str] = None
+ self, requester: Requester, content: JsonDict, batch: Optional[str] = None
) -> JsonDict:
"""Performs a full text search for a user.
Args:
- user: The user performing the search.
+ requester: The user performing the search.
content: Search parameters
batch: The next_batch parameter. Used for pagination.
@@ -199,7 +200,7 @@ class SearchHandler:
)
return await self._search(
- user,
+ requester,
batch_group,
batch_group_key,
batch_token,
@@ -217,7 +218,7 @@ class SearchHandler:
async def _search(
self,
- user: UserID,
+ requester: Requester,
batch_group: Optional[str],
batch_group_key: Optional[str],
batch_token: Optional[str],
@@ -235,7 +236,7 @@ class SearchHandler:
"""Performs a full text search for a user.
Args:
- user: The user performing the search.
+ requester: The user performing the search.
batch_group: Pagination information.
batch_group_key: Pagination information.
batch_token: Pagination information.
@@ -269,7 +270,7 @@ class SearchHandler:
# TODO: Search through left rooms too
rooms = await self.store.get_rooms_for_local_user_where_membership_is(
- user.to_string(),
+ requester.user.to_string(),
membership_list=[Membership.JOIN],
# membership_list=[Membership.JOIN, Membership.LEAVE, Membership.Ban],
)
@@ -303,13 +304,13 @@ class SearchHandler:
if order_by == "rank":
search_result, sender_group = await self._search_by_rank(
- user, room_ids, search_term, keys, search_filter
+ requester.user, room_ids, search_term, keys, search_filter
)
# Unused return values for rank search.
global_next_batch = None
elif order_by == "recent":
search_result, global_next_batch = await self._search_by_recent(
- user,
+ requester.user,
room_ids,
search_term,
keys,
@@ -334,7 +335,7 @@ class SearchHandler:
assert after_limit is not None
contexts = await self._calculate_event_contexts(
- user,
+ requester.user,
search_result.allowed_events,
before_limit,
after_limit,
@@ -363,27 +364,37 @@ class SearchHandler:
# The returned events.
search_result.allowed_events,
),
- user.to_string(),
+ requester.user.to_string(),
)
# We're now about to serialize the events. We should not make any
# blocking calls after this. Otherwise, the 'age' will be wrong.
time_now = self.clock.time_msec()
+ serialize_options = SerializeEventConfig(requester=requester)
for context in contexts.values():
context["events_before"] = self._event_serializer.serialize_events(
- context["events_before"], time_now, bundle_aggregations=aggregations
+ context["events_before"],
+ time_now,
+ bundle_aggregations=aggregations,
+ config=serialize_options,
)
context["events_after"] = self._event_serializer.serialize_events(
- context["events_after"], time_now, bundle_aggregations=aggregations
+ context["events_after"],
+ time_now,
+ bundle_aggregations=aggregations,
+ config=serialize_options,
)
results = [
{
"rank": search_result.rank_map[e.event_id],
"result": self._event_serializer.serialize_event(
- e, time_now, bundle_aggregations=aggregations
+ e,
+ time_now,
+ bundle_aggregations=aggregations,
+ config=serialize_options,
),
"context": contexts.get(e.event_id, {}),
}
@@ -398,7 +409,9 @@ class SearchHandler:
if state_results:
rooms_cat_res["state"] = {
- room_id: self._event_serializer.serialize_events(state_events, time_now)
+ room_id: self._event_serializer.serialize_events(
+ state_events, time_now, config=serialize_options
+ )
for room_id, state_events in state_results.items()
}
|