diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 5afb7fc261..9f365eb5ad 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -19,6 +19,7 @@ from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Tuple
from canonicaljson import encode_canonical_json
+from twisted.internet import defer
from twisted.internet.interfaces import IDelayedCall
from synapse import event_auth
@@ -43,14 +44,14 @@ from synapse.events import EventBase
from synapse.events.builder import EventBuilder
from synapse.events.snapshot import EventContext
from synapse.events.validator import EventValidator
-from synapse.logging.context import run_in_background
+from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.storage.state import StateFilter
from synapse.types import Requester, RoomAlias, StreamToken, UserID, create_requester
-from synapse.util import json_decoder, json_encoder
-from synapse.util.async_helpers import Linearizer
+from synapse.util import json_decoder, json_encoder, log_failure
+from synapse.util.async_helpers import Linearizer, unwrapFirstError
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.metrics import measure_func
from synapse.visibility import filter_events_for_client
@@ -979,9 +980,43 @@ class EventCreationHandler:
logger.exception("Failed to encode content: %r", event.content)
raise
- await self.action_generator.handle_push_actions_for_event(event, context)
+ # We now persist the event (and update the cache in parallel, since we
+ # don't want to block on it).
+ result = await make_deferred_yieldable(
+ defer.gatherResults(
+ [
+ run_in_background(
+ self._persist_event,
+ requester=requester,
+ event=event,
+ context=context,
+ ratelimit=ratelimit,
+ extra_users=extra_users,
+ ),
+ run_in_background(
+ self.cache_joined_hosts_for_event, event, context
+ ).addErrback(log_failure, "cache_joined_hosts_for_event failed"),
+ ],
+ consumeErrors=True,
+ )
+ ).addErrback(unwrapFirstError)
+
+ return result[0]
+
+ async def _persist_event(
+ self,
+ requester: Requester,
+ event: EventBase,
+ context: EventContext,
+ ratelimit: bool = True,
+ extra_users: Optional[List[UserID]] = None,
+ ) -> EventBase:
+ """Actually persists the event. Should only be called by
+ `handle_new_client_event`, and see its docstring for documentation of
+ the arguments.
+ """
- await self.cache_joined_hosts_for_event(event, context)
+ await self.action_generator.handle_push_actions_for_event(event, context)
try:
# If we're a worker we need to hit out to the master.
|