diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index 9cd21e7f2b..601bab67f9 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -13,21 +13,27 @@
# limitations under the License.
import logging
-from typing import TYPE_CHECKING, List, Optional, Tuple
-
-from twisted.internet import defer
+from typing import TYPE_CHECKING, List, Optional, Tuple, cast
from synapse.api.constants import EduTypes, EventTypes, Membership
from synapse.api.errors import SynapseError
+from synapse.events import EventBase
from synapse.events.validator import EventValidator
from synapse.handlers.presence import format_user_presence_state
from synapse.handlers.receipts import ReceiptEventSource
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.storage.roommember import RoomsForUser
from synapse.streams.config import PaginationConfig
-from synapse.types import JsonDict, Requester, RoomStreamToken, StreamToken, UserID
+from synapse.types import (
+ JsonDict,
+ Requester,
+ RoomStreamToken,
+ StateMap,
+ StreamToken,
+ UserID,
+)
from synapse.util import unwrapFirstError
-from synapse.util.async_helpers import concurrently_execute
+from synapse.util.async_helpers import concurrently_execute, gather_results
from synapse.util.caches.response_cache import ResponseCache
from synapse.visibility import filter_events_for_client
@@ -167,8 +173,6 @@ class InitialSyncHandler:
d["invite"] = await self._event_serializer.serialize_event(
invite_event,
time_now,
- # Don't bundle aggregations as this is a deprecated API.
- bundle_aggregations=False,
as_client_event=as_client_event,
)
@@ -190,14 +194,13 @@ class InitialSyncHandler:
)
deferred_room_state = run_in_background(
self.state_store.get_state_for_events, [event.event_id]
- )
- deferred_room_state.addCallback(
- lambda states: states[event.event_id]
+ ).addCallback(
+ lambda states: cast(StateMap[EventBase], states[event.event_id])
)
(messages, token), current_state = await make_deferred_yieldable(
- defer.gatherResults(
- [
+ gather_results(
+ (
run_in_background(
self.store.get_recent_events_for_room,
event.room_id,
@@ -205,7 +208,7 @@ class InitialSyncHandler:
end_token=room_end_token,
),
deferred_room_state,
- ]
+ )
)
).addErrback(unwrapFirstError)
@@ -222,8 +225,6 @@ class InitialSyncHandler:
await self._event_serializer.serialize_events(
messages,
time_now=time_now,
- # Don't bundle aggregations as this is a deprecated API.
- bundle_aggregations=False,
as_client_event=as_client_event,
)
),
@@ -234,8 +235,6 @@ class InitialSyncHandler:
d["state"] = await self._event_serializer.serialize_events(
current_state.values(),
time_now=time_now,
- # Don't bundle aggregations as this is a deprecated API.
- bundle_aggregations=False,
as_client_event=as_client_event,
)
@@ -377,9 +376,7 @@ class InitialSyncHandler:
"messages": {
"chunk": (
# Don't bundle aggregations as this is a deprecated API.
- await self._event_serializer.serialize_events(
- messages, time_now, bundle_aggregations=False
- )
+ await self._event_serializer.serialize_events(messages, time_now)
),
"start": await start_token.to_string(self.store),
"end": await end_token.to_string(self.store),
@@ -387,7 +384,7 @@ class InitialSyncHandler:
"state": (
# Don't bundle aggregations as this is a deprecated API.
await self._event_serializer.serialize_events(
- room_state.values(), time_now, bundle_aggregations=False
+ room_state.values(), time_now
)
),
"presence": [],
@@ -408,7 +405,7 @@ class InitialSyncHandler:
time_now = self.clock.time_msec()
# Don't bundle aggregations as this is a deprecated API.
state = await self._event_serializer.serialize_events(
- current_state.values(), time_now, bundle_aggregations=False
+ current_state.values(), time_now
)
now_token = self.hs.get_event_sources().get_current_token()
@@ -454,8 +451,8 @@ class InitialSyncHandler:
return receipts
presence, receipts, (messages, token) = await make_deferred_yieldable(
- defer.gatherResults(
- [
+ gather_results(
+ (
run_in_background(get_presence),
run_in_background(get_receipts),
run_in_background(
@@ -464,7 +461,7 @@ class InitialSyncHandler:
limit=limit,
end_token=now_token.room_key,
),
- ],
+ ),
consumeErrors=True,
).addErrback(unwrapFirstError)
)
@@ -483,9 +480,7 @@ class InitialSyncHandler:
"messages": {
"chunk": (
# Don't bundle aggregations as this is a deprecated API.
- await self._event_serializer.serialize_events(
- messages, time_now, bundle_aggregations=False
- )
+ await self._event_serializer.serialize_events(messages, time_now)
),
"start": await start_token.to_string(self.store),
"end": await end_token.to_string(self.store),
|