diff --git a/changelog.d/9905.feature b/changelog.d/9905.feature
new file mode 100644
index 0000000000..96a0e7f09f
--- /dev/null
+++ b/changelog.d/9905.feature
@@ -0,0 +1 @@
+Improve performance of sending events for worker-based deployments using Redis.
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 9d867aaf4d..e8330a2b50 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -2446,7 +2446,9 @@ class FederationHandler(BaseHandler):
# If we are going to send this event over federation we precaclculate
# the joined hosts.
if event.internal_metadata.get_send_on_behalf_of():
- await self.event_creation_handler.cache_joined_hosts_for_event(event)
+ await self.event_creation_handler.cache_joined_hosts_for_event(
+ event, context
+ )
return context
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 393f17c3a3..8729332d4b 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -51,6 +51,7 @@ from synapse.storage.state import StateFilter
from synapse.types import Requester, RoomAlias, StreamToken, UserID, create_requester
from synapse.util import json_decoder, json_encoder
from synapse.util.async_helpers import Linearizer
+from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.metrics import measure_func
from synapse.visibility import filter_events_for_client
@@ -457,6 +458,19 @@ class EventCreationHandler:
self._external_cache = hs.get_external_cache()
+ # Stores the state groups we've recently added to the joined hosts
+ # external cache. Note that the timeout must be significantly less than
+ # the TTL on the external cache.
+ self._external_cache_joined_hosts_updates = (
+ None
+ ) # type: Optional[ExpiringCache]
+ if self._external_cache.is_enabled():
+ self._external_cache_joined_hosts_updates = ExpiringCache(
+ "_external_cache_joined_hosts_updates",
+ self.clock,
+ expiry_ms=30 * 60 * 1000,
+ )
+
async def create_event(
self,
requester: Requester,
@@ -967,7 +981,7 @@ class EventCreationHandler:
await self.action_generator.handle_push_actions_for_event(event, context)
- await self.cache_joined_hosts_for_event(event)
+ await self.cache_joined_hosts_for_event(event, context)
try:
# If we're a worker we need to hit out to the master.
@@ -1008,7 +1022,9 @@ class EventCreationHandler:
await self.store.remove_push_actions_from_staging(event.event_id)
raise
- async def cache_joined_hosts_for_event(self, event: EventBase) -> None:
+ async def cache_joined_hosts_for_event(
+ self, event: EventBase, context: EventContext
+ ) -> None:
"""Precalculate the joined hosts at the event, when using Redis, so that
external federation senders don't have to recalculate it themselves.
"""
@@ -1016,6 +1032,9 @@ class EventCreationHandler:
if not self._external_cache.is_enabled():
return
+ # If external cache is enabled we should always have this.
+ assert self._external_cache_joined_hosts_updates is not None
+
# We actually store two mappings, event ID -> prev state group,
# state group -> joined hosts, which is much more space efficient
# than event ID -> joined hosts.
@@ -1023,16 +1042,21 @@ class EventCreationHandler:
# Note: We have to cache event ID -> prev state group, as we don't
# store that in the DB.
#
- # Note: We always set the state group -> joined hosts cache, even if
- # we already set it, so that the expiry time is reset.
+ # Note: We set the state group -> joined hosts cache if it hasn't been
+ # set for a while, so that the expiry time is reset.
state_entry = await self.state.resolve_state_groups_for_events(
event.room_id, event_ids=event.prev_event_ids()
)
if state_entry.state_group:
+ if state_entry.state_group in self._external_cache_joined_hosts_updates:
+ return
+
joined_hosts = await self.store.get_joined_hosts(event.room_id, state_entry)
+ # Note that the expiry times must be larger than the expiry time in
+ # _external_cache_joined_hosts_updates.
await self._external_cache.set(
"event_to_prev_state_group",
event.event_id,
@@ -1046,6 +1070,8 @@ class EventCreationHandler:
expiry_ms=60 * 60 * 1000,
)
+ self._external_cache_joined_hosts_updates[state_entry.state_group] = None
+
async def _validate_canonical_alias(
self, directory_handler, room_alias_str: str, expected_room_id: str
) -> None:
|