summary refs log tree commit diff
path: root/synapse/handlers/message.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/message.py')
-rw-r--r--synapse/handlers/message.py116
1 files changed, 59 insertions, 57 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 00e7645ba5..4e55ebba0b 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -1390,7 +1390,7 @@ class EventCreationHandler:
                             extra_users=extra_users,
                         ),
                         run_in_background(
-                            self.cache_joined_hosts_for_event, event, context
+                            self.cache_joined_hosts_for_events, events_and_context
                         ).addErrback(
                             log_failure, "cache_joined_hosts_for_event failed"
                         ),
@@ -1491,62 +1491,65 @@ class EventCreationHandler:
                 await self.store.remove_push_actions_from_staging(event.event_id)
             raise
 
-    async def cache_joined_hosts_for_event(
-        self, event: EventBase, context: EventContext
+    async def cache_joined_hosts_for_events(
+        self, events_and_context: List[Tuple[EventBase, EventContext]]
     ) -> None:
-        """Precalculate the joined hosts at the event, when using Redis, so that
+        """Precalculate the joined hosts at each of the given events, when using Redis, so that
         external federation senders don't have to recalculate it themselves.
         """
 
-        if not self._external_cache.is_enabled():
-            return
+        for event, _ in events_and_context:
+            if not self._external_cache.is_enabled():
+                return
 
-        # If external cache is enabled we should always have this.
-        assert self._external_cache_joined_hosts_updates is not None
+            # If external cache is enabled we should always have this.
+            assert self._external_cache_joined_hosts_updates is not None
 
-        # We actually store two mappings, event ID -> prev state group,
-        # state group -> joined hosts, which is much more space efficient
-        # than event ID -> joined hosts.
-        #
-        # Note: We have to cache event ID -> prev state group, as we don't
-        # store that in the DB.
-        #
-        # Note: We set the state group -> joined hosts cache if it hasn't been
-        # set for a while, so that the expiry time is reset.
-
-        state_entry = await self.state.resolve_state_groups_for_events(
-            event.room_id, event_ids=event.prev_event_ids()
-        )
+            # We actually store two mappings, event ID -> prev state group,
+            # state group -> joined hosts, which is much more space efficient
+            # than event ID -> joined hosts.
+            #
+            # Note: We have to cache event ID -> prev state group, as we don't
+            # store that in the DB.
+            #
+            # Note: We set the state group -> joined hosts cache if it hasn't been
+            # set for a while, so that the expiry time is reset.
 
-        if state_entry.state_group:
-            await self._external_cache.set(
-                "event_to_prev_state_group",
-                event.event_id,
-                state_entry.state_group,
-                expiry_ms=60 * 60 * 1000,
+            state_entry = await self.state.resolve_state_groups_for_events(
+                event.room_id, event_ids=event.prev_event_ids()
             )
 
-            if state_entry.state_group in self._external_cache_joined_hosts_updates:
-                return
+            if state_entry.state_group:
+                await self._external_cache.set(
+                    "event_to_prev_state_group",
+                    event.event_id,
+                    state_entry.state_group,
+                    expiry_ms=60 * 60 * 1000,
+                )
 
-            state = await state_entry.get_state(
-                self._storage_controllers.state, StateFilter.all()
-            )
-            with opentracing.start_active_span("get_joined_hosts"):
-                joined_hosts = await self.store.get_joined_hosts(
-                    event.room_id, state, state_entry
+                if state_entry.state_group in self._external_cache_joined_hosts_updates:
+                    return
+
+                state = await state_entry.get_state(
+                    self._storage_controllers.state, StateFilter.all()
                 )
+                with opentracing.start_active_span("get_joined_hosts"):
+                    joined_hosts = await self.store.get_joined_hosts(
+                        event.room_id, state, state_entry
+                    )
 
-            # Note that the expiry times must be larger than the expiry time in
-            # _external_cache_joined_hosts_updates.
-            await self._external_cache.set(
-                "get_joined_hosts",
-                str(state_entry.state_group),
-                list(joined_hosts),
-                expiry_ms=60 * 60 * 1000,
-            )
+                # Note that the expiry times must be larger than the expiry time in
+                # _external_cache_joined_hosts_updates.
+                await self._external_cache.set(
+                    "get_joined_hosts",
+                    str(state_entry.state_group),
+                    list(joined_hosts),
+                    expiry_ms=60 * 60 * 1000,
+                )
 
-            self._external_cache_joined_hosts_updates[state_entry.state_group] = None
+                self._external_cache_joined_hosts_updates[
+                    state_entry.state_group
+                ] = None
 
     async def _validate_canonical_alias(
         self,
@@ -1872,6 +1875,7 @@ class EventCreationHandler:
             events_and_context, backfilled=backfilled
         )
 
+        events_and_pos = []
         for event in persisted_events:
             if self._ephemeral_events_enabled:
                 # If there's an expiry timestamp on the event, schedule its expiry.
@@ -1880,25 +1884,23 @@ class EventCreationHandler:
             stream_ordering = event.internal_metadata.stream_ordering
             assert stream_ordering is not None
             pos = PersistedEventPosition(self._instance_name, stream_ordering)
-
-            async def _notify() -> None:
-                try:
-                    await self.notifier.on_new_room_event(
-                        event, pos, max_stream_token, extra_users=extra_users
-                    )
-                except Exception:
-                    logger.exception(
-                        "Error notifying about new room event %s",
-                        event.event_id,
-                    )
-
-            run_in_background(_notify)
+            events_and_pos.append((event, pos))
 
             if event.type == EventTypes.Message:
                 # We don't want to block sending messages on any presence code. This
                 # matters as sometimes presence code can take a while.
                 run_in_background(self._bump_active_time, requester.user)
 
+        async def _notify() -> None:
+            try:
+                await self.notifier.on_new_room_events(
+                    events_and_pos, max_stream_token, extra_users=extra_users
+                )
+            except Exception:
+                logger.exception("Error notifying about new room events")
+
+        run_in_background(_notify)
+
         return persisted_events[-1]
 
     async def _maybe_kick_guest_users(