summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/9951.feature1
-rw-r--r--synapse/handlers/message.py45
2 files changed, 41 insertions, 5 deletions
diff --git a/changelog.d/9951.feature b/changelog.d/9951.feature
new file mode 100644
index 0000000000..96a0e7f09f
--- /dev/null
+++ b/changelog.d/9951.feature
@@ -0,0 +1 @@
+Improve performance of sending events for worker-based deployments using Redis.
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 5afb7fc261..9f365eb5ad 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -19,6 +19,7 @@ from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Tuple
 
 from canonicaljson import encode_canonical_json
 
+from twisted.internet import defer
 from twisted.internet.interfaces import IDelayedCall
 
 from synapse import event_auth
@@ -43,14 +44,14 @@ from synapse.events import EventBase
 from synapse.events.builder import EventBuilder
 from synapse.events.snapshot import EventContext
 from synapse.events.validator import EventValidator
-from synapse.logging.context import run_in_background
+from synapse.logging.context import make_deferred_yieldable, run_in_background
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.replication.http.send_event import ReplicationSendEventRestServlet
 from synapse.storage.databases.main.events_worker import EventRedactBehaviour
 from synapse.storage.state import StateFilter
 from synapse.types import Requester, RoomAlias, StreamToken, UserID, create_requester
-from synapse.util import json_decoder, json_encoder
-from synapse.util.async_helpers import Linearizer
+from synapse.util import json_decoder, json_encoder, log_failure
+from synapse.util.async_helpers import Linearizer, unwrapFirstError
 from synapse.util.caches.expiringcache import ExpiringCache
 from synapse.util.metrics import measure_func
 from synapse.visibility import filter_events_for_client
@@ -979,9 +980,43 @@ class EventCreationHandler:
             logger.exception("Failed to encode content: %r", event.content)
             raise
 
-        await self.action_generator.handle_push_actions_for_event(event, context)
+        # We now persist the event (and update the cache in parallel, since we
+        # don't want to block on it).
+        result = await make_deferred_yieldable(
+            defer.gatherResults(
+                [
+                    run_in_background(
+                        self._persist_event,
+                        requester=requester,
+                        event=event,
+                        context=context,
+                        ratelimit=ratelimit,
+                        extra_users=extra_users,
+                    ),
+                    run_in_background(
+                        self.cache_joined_hosts_for_event, event, context
+                    ).addErrback(log_failure, "cache_joined_hosts_for_event failed"),
+                ],
+                consumeErrors=True,
+            )
+        ).addErrback(unwrapFirstError)
+
+        return result[0]
+
+    async def _persist_event(
+        self,
+        requester: Requester,
+        event: EventBase,
+        context: EventContext,
+        ratelimit: bool = True,
+        extra_users: Optional[List[UserID]] = None,
+    ) -> EventBase:
+        """Actually persists the event. Should only be called by
+        `handle_new_client_event`, and see its docstring for documentation of
+        the arguments.
+        """
 
-        await self.cache_joined_hosts_for_event(event, context)
+        await self.action_generator.handle_push_actions_for_event(event, context)
 
         try:
             # If we're a worker we need to hit out to the master.