summary refs log tree commit diff
path: root/synapse/handlers/message.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/message.py')
-rw-r--r--synapse/handlers/message.py60
1 files changed, 48 insertions, 12 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index ec8eb21674..8729332d4b 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -15,7 +15,7 @@
 # limitations under the License.
 import logging
 import random
-from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
+from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Tuple
 
 from canonicaljson import encode_canonical_json
 
@@ -51,6 +51,7 @@ from synapse.storage.state import StateFilter
 from synapse.types import Requester, RoomAlias, StreamToken, UserID, create_requester
 from synapse.util import json_decoder, json_encoder
 from synapse.util.async_helpers import Linearizer
+from synapse.util.caches.expiringcache import ExpiringCache
 from synapse.util.metrics import measure_func
 from synapse.visibility import filter_events_for_client
 
@@ -66,7 +67,7 @@ logger = logging.getLogger(__name__)
 class MessageHandler:
     """Contains some read only APIs to get state about a room"""
 
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         self.auth = hs.get_auth()
         self.clock = hs.get_clock()
         self.state = hs.get_state_handler()
@@ -91,7 +92,7 @@ class MessageHandler:
         room_id: str,
         event_type: str,
         state_key: str,
-    ) -> dict:
+    ) -> Optional[EventBase]:
         """Get data from a room.
 
         Args:
@@ -115,6 +116,10 @@ class MessageHandler:
             data = await self.state.get_current_state(room_id, event_type, state_key)
         elif membership == Membership.LEAVE:
             key = (event_type, state_key)
+            # If the membership is not JOIN, then the event ID should exist.
+            assert (
+                membership_event_id is not None
+            ), "check_user_in_room_or_world_readable returned invalid data"
             room_state = await self.state_store.get_state_for_events(
                 [membership_event_id], StateFilter.from_types([key])
             )
@@ -186,10 +191,12 @@ class MessageHandler:
 
             event = last_events[0]
             if visible_events:
-                room_state = await self.state_store.get_state_for_events(
+                room_state_events = await self.state_store.get_state_for_events(
                     [event.event_id], state_filter=state_filter
                 )
-                room_state = room_state[event.event_id]
+                room_state = room_state_events[
+                    event.event_id
+                ]  # type: Mapping[Any, EventBase]
             else:
                 raise AuthError(
                     403,
@@ -210,10 +217,14 @@ class MessageHandler:
                 )
                 room_state = await self.store.get_events(state_ids.values())
             elif membership == Membership.LEAVE:
-                room_state = await self.state_store.get_state_for_events(
+                # If the membership is not JOIN, then the event ID should exist.
+                assert (
+                    membership_event_id is not None
+                ), "check_user_in_room_or_world_readable returned invalid data"
+                room_state_events = await self.state_store.get_state_for_events(
                     [membership_event_id], state_filter=state_filter
                 )
-                room_state = room_state[membership_event_id]
+                room_state = room_state_events[membership_event_id]
 
         now = self.clock.time_msec()
         events = await self._event_serializer.serialize_events(
@@ -248,7 +259,7 @@ class MessageHandler:
                     "Getting joined members after leaving is not implemented"
                 )
 
-        users_with_profile = await self.state.get_current_users_in_room(room_id)
+        users_with_profile = await self.store.get_users_in_room_with_profiles(room_id)
 
         # If this is an AS, double check that they are allowed to see the members.
         # This can either be because the AS user is in the room or because there
@@ -447,6 +458,19 @@ class EventCreationHandler:
 
         self._external_cache = hs.get_external_cache()
 
+        # Stores the state groups we've recently added to the joined hosts
+        # external cache. Note that the timeout must be significantly less than
+        # the TTL on the external cache.
+        self._external_cache_joined_hosts_updates = (
+            None
+        )  # type: Optional[ExpiringCache]
+        if self._external_cache.is_enabled():
+            self._external_cache_joined_hosts_updates = ExpiringCache(
+                "_external_cache_joined_hosts_updates",
+                self.clock,
+                expiry_ms=30 * 60 * 1000,
+            )
+
     async def create_event(
         self,
         requester: Requester,
@@ -957,7 +981,7 @@ class EventCreationHandler:
 
         await self.action_generator.handle_push_actions_for_event(event, context)
 
-        await self.cache_joined_hosts_for_event(event)
+        await self.cache_joined_hosts_for_event(event, context)
 
         try:
             # If we're a worker we need to hit out to the master.
@@ -998,7 +1022,9 @@ class EventCreationHandler:
             await self.store.remove_push_actions_from_staging(event.event_id)
             raise
 
-    async def cache_joined_hosts_for_event(self, event: EventBase) -> None:
+    async def cache_joined_hosts_for_event(
+        self, event: EventBase, context: EventContext
+    ) -> None:
         """Precalculate the joined hosts at the event, when using Redis, so that
         external federation senders don't have to recalculate it themselves.
         """
@@ -1006,6 +1032,9 @@ class EventCreationHandler:
         if not self._external_cache.is_enabled():
             return
 
+        # If external cache is enabled we should always have this.
+        assert self._external_cache_joined_hosts_updates is not None
+
         # We actually store two mappings, event ID -> prev state group,
         # state group -> joined hosts, which is much more space efficient
         # than event ID -> joined hosts.
@@ -1013,16 +1042,21 @@ class EventCreationHandler:
         # Note: We have to cache event ID -> prev state group, as we don't
         # store that in the DB.
         #
-        # Note: We always set the state group -> joined hosts cache, even if
-        # we already set it, so that the expiry time is reset.
+        # Note: We set the state group -> joined hosts cache if it hasn't been
+        # set for a while, so that the expiry time is reset.
 
         state_entry = await self.state.resolve_state_groups_for_events(
             event.room_id, event_ids=event.prev_event_ids()
         )
 
         if state_entry.state_group:
+            if state_entry.state_group in self._external_cache_joined_hosts_updates:
+                return
+
             joined_hosts = await self.store.get_joined_hosts(event.room_id, state_entry)
 
+            # Note that the expiry times must be larger than the expiry time in
+            # _external_cache_joined_hosts_updates.
             await self._external_cache.set(
                 "event_to_prev_state_group",
                 event.event_id,
@@ -1036,6 +1070,8 @@ class EventCreationHandler:
                 expiry_ms=60 * 60 * 1000,
             )
 
+            self._external_cache_joined_hosts_updates[state_entry.state_group] = None
+
     async def _validate_canonical_alias(
         self, directory_handler, room_alias_str: str, expected_room_id: str
     ) -> None: