summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/federation.py19
-rw-r--r--synapse/handlers/initial_sync.py33
-rw-r--r--synapse/handlers/message.py13
3 files changed, 36 insertions, 29 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 1ea837d082..26b8e3f43c 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -360,31 +360,34 @@ class FederationHandler:
 
         logger.debug("calling resolve_state_groups in _maybe_backfill")
         resolve = preserve_fn(self.state_handler.resolve_state_groups_for_events)
-        states = await make_deferred_yieldable(
+        states_list = await make_deferred_yieldable(
             defer.gatherResults(
                 [resolve(room_id, [e]) for e in event_ids], consumeErrors=True
             )
         )
 
-        # dict[str, dict[tuple, str]], a map from event_id to state map of
-        # event_ids.
-        states = dict(zip(event_ids, [s.state for s in states]))
+        # A map from event_id to state map of event_ids.
+        state_ids: Dict[str, StateMap[str]] = dict(
+            zip(event_ids, [s.state for s in states_list])
+        )
 
         state_map = await self.store.get_events(
-            [e_id for ids in states.values() for e_id in ids.values()],
+            [e_id for ids in state_ids.values() for e_id in ids.values()],
             get_prev_content=False,
         )
-        states = {
+
+        # A map from event_id to state map of events.
+        state_events: Dict[str, StateMap[EventBase]] = {
             key: {
                 k: state_map[e_id]
                 for k, e_id in state_dict.items()
                 if e_id in state_map
             }
-            for key, state_dict in states.items()
+            for key, state_dict in state_ids.items()
         }
 
         for e_id in event_ids:
-            likely_extremeties_domains = get_domains_from_state(states[e_id])
+            likely_extremeties_domains = get_domains_from_state(state_events[e_id])
 
             success = await try_backfill(
                 [
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index 9cd21e7f2b..9ab723ff97 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -13,21 +13,27 @@
 # limitations under the License.
 
 import logging
-from typing import TYPE_CHECKING, List, Optional, Tuple
-
-from twisted.internet import defer
+from typing import TYPE_CHECKING, List, Optional, Tuple, cast
 
 from synapse.api.constants import EduTypes, EventTypes, Membership
 from synapse.api.errors import SynapseError
+from synapse.events import EventBase
 from synapse.events.validator import EventValidator
 from synapse.handlers.presence import format_user_presence_state
 from synapse.handlers.receipts import ReceiptEventSource
 from synapse.logging.context import make_deferred_yieldable, run_in_background
 from synapse.storage.roommember import RoomsForUser
 from synapse.streams.config import PaginationConfig
-from synapse.types import JsonDict, Requester, RoomStreamToken, StreamToken, UserID
+from synapse.types import (
+    JsonDict,
+    Requester,
+    RoomStreamToken,
+    StateMap,
+    StreamToken,
+    UserID,
+)
 from synapse.util import unwrapFirstError
-from synapse.util.async_helpers import concurrently_execute
+from synapse.util.async_helpers import concurrently_execute, gather_results
 from synapse.util.caches.response_cache import ResponseCache
 from synapse.visibility import filter_events_for_client
 
@@ -190,14 +196,13 @@ class InitialSyncHandler:
                     )
                     deferred_room_state = run_in_background(
                         self.state_store.get_state_for_events, [event.event_id]
-                    )
-                    deferred_room_state.addCallback(
-                        lambda states: states[event.event_id]
+                    ).addCallback(
+                        lambda states: cast(StateMap[EventBase], states[event.event_id])
                     )
 
                 (messages, token), current_state = await make_deferred_yieldable(
-                    defer.gatherResults(
-                        [
+                    gather_results(
+                        (
                             run_in_background(
                                 self.store.get_recent_events_for_room,
                                 event.room_id,
@@ -205,7 +210,7 @@ class InitialSyncHandler:
                                 end_token=room_end_token,
                             ),
                             deferred_room_state,
-                        ]
+                        )
                     )
                 ).addErrback(unwrapFirstError)
 
@@ -454,8 +459,8 @@ class InitialSyncHandler:
             return receipts
 
         presence, receipts, (messages, token) = await make_deferred_yieldable(
-            defer.gatherResults(
-                [
+            gather_results(
+                (
                     run_in_background(get_presence),
                     run_in_background(get_receipts),
                     run_in_background(
@@ -464,7 +469,7 @@ class InitialSyncHandler:
                         limit=limit,
                         end_token=now_token.room_key,
                     ),
-                ],
+                ),
                 consumeErrors=True,
             ).addErrback(unwrapFirstError)
         )
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 38409fef38..5e3d3886eb 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -21,7 +21,6 @@ from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Tuple
 
 from canonicaljson import encode_canonical_json
 
-from twisted.internet import defer
 from twisted.internet.interfaces import IDelayedCall
 
 from synapse import event_auth
@@ -57,7 +56,7 @@ from synapse.storage.databases.main.events_worker import EventRedactBehaviour
 from synapse.storage.state import StateFilter
 from synapse.types import Requester, RoomAlias, StreamToken, UserID, create_requester
 from synapse.util import json_decoder, json_encoder, log_failure
-from synapse.util.async_helpers import Linearizer, unwrapFirstError
+from synapse.util.async_helpers import Linearizer, gather_results, unwrapFirstError
 from synapse.util.caches.expiringcache import ExpiringCache
 from synapse.util.metrics import measure_func
 from synapse.visibility import filter_events_for_client
@@ -1168,9 +1167,9 @@ class EventCreationHandler:
 
         # We now persist the event (and update the cache in parallel, since we
         # don't want to block on it).
-        result = await make_deferred_yieldable(
-            defer.gatherResults(
-                [
+        result, _ = await make_deferred_yieldable(
+            gather_results(
+                (
                     run_in_background(
                         self._persist_event,
                         requester=requester,
@@ -1182,12 +1181,12 @@ class EventCreationHandler:
                     run_in_background(
                         self.cache_joined_hosts_for_event, event, context
                     ).addErrback(log_failure, "cache_joined_hosts_for_event failed"),
-                ],
+                ),
                 consumeErrors=True,
             )
         ).addErrback(unwrapFirstError)
 
-        return result[0]
+        return result
 
     async def _persist_event(
         self,