summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/13222.misc1
-rw-r--r--synapse/app/admin_cmd.py13
-rw-r--r--synapse/visibility.py524
3 files changed, 400 insertions, 138 deletions
diff --git a/changelog.d/13222.misc b/changelog.d/13222.misc
new file mode 100644
index 0000000000..0bab1aed70
--- /dev/null
+++ b/changelog.d/13222.misc
@@ -0,0 +1 @@
+Improve memory usage of calculating push actions for events in large rooms.
diff --git a/synapse/app/admin_cmd.py b/synapse/app/admin_cmd.py
index 561621a285..87f82bd9a5 100644
--- a/synapse/app/admin_cmd.py
+++ b/synapse/app/admin_cmd.py
@@ -39,6 +39,7 @@ from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
 from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
 from synapse.server import HomeServer
+from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
 from synapse.storage.databases.main.room import RoomWorkerStore
 from synapse.types import StateMap
 from synapse.util import SYNAPSE_VERSION
@@ -60,7 +61,17 @@ class AdminCmdSlavedStore(
     BaseSlavedStore,
     RoomWorkerStore,
 ):
-    pass
+    def __init__(
+        self,
+        database: DatabasePool,
+        db_conn: LoggingDatabaseConnection,
+        hs: "HomeServer",
+    ):
+        super().__init__(database, db_conn, hs)
+
+        # Annoyingly `filter_events_for_client` assumes that this exists. We
+        # should refactor it to take a `Clock` directly.
+        self.clock = hs.get_clock()
 
 
 class AdminCmdServer(HomeServer):
diff --git a/synapse/visibility.py b/synapse/visibility.py
index 8aaa8c709f..9abbaa5a64 100644
--- a/synapse/visibility.py
+++ b/synapse/visibility.py
@@ -13,16 +13,21 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
+from enum import Enum, auto
 from typing import Collection, Dict, FrozenSet, List, Optional, Tuple
 
+import attr
 from typing_extensions import Final
 
 from synapse.api.constants import EventTypes, HistoryVisibility, Membership
 from synapse.events import EventBase
+from synapse.events.snapshot import EventContext
 from synapse.events.utils import prune_event
 from synapse.storage.controllers import StorageControllers
+from synapse.storage.databases.main import DataStore
 from synapse.storage.state import StateFilter
 from synapse.types import RetentionPolicy, StateMap, get_domain_from_id
+from synapse.util import Clock
 
 logger = logging.getLogger(__name__)
 
@@ -102,165 +107,410 @@ async def filter_events_for_client(
             ] = await storage.main.get_retention_policy_for_room(room_id)
 
     def allowed(event: EventBase) -> Optional[EventBase]:
-        """
-        Args:
-            event: event to check
-
-        Returns:
-           None if the user cannot see this event at all
-
-           a redacted copy of the event if they can only see a redacted
-           version
-
-           the original event if they can see it as normal.
-        """
-        # Only run some checks if these events aren't about to be sent to clients. This is
-        # because, if this is not the case, we're probably only checking if the users can
-        # see events in the room at that point in the DAG, and that shouldn't be decided
-        # on those checks.
-        if filter_send_to_client:
-            if event.type == EventTypes.Dummy:
-                return None
-
-            if not event.is_state() and event.sender in ignore_list:
-                return None
-
-            # Until MSC2261 has landed we can't redact malicious alias events, so for
-            # now we temporarily filter out m.room.aliases entirely to mitigate
-            # abuse, while we spec a better solution to advertising aliases
-            # on rooms.
-            if event.type == EventTypes.Aliases:
-                return None
-
-            # Don't try to apply the room's retention policy if the event is a state
-            # event, as MSC1763 states that retention is only considered for non-state
-            # events.
-            if not event.is_state():
-                retention_policy = retention_policies[event.room_id]
-                max_lifetime = retention_policy.max_lifetime
-
-                if max_lifetime is not None:
-                    oldest_allowed_ts = storage.main.clock.time_msec() - max_lifetime
-
-                    if event.origin_server_ts < oldest_allowed_ts:
-                        return None
-
-        if event.event_id in always_include_ids:
-            return event
+        return _check_client_allowed_to_see_event(
+            user_id=user_id,
+            event=event,
+            clock=storage.main.clock,
+            filter_send_to_client=filter_send_to_client,
+            sender_ignored=event.sender in ignore_list,
+            always_include_ids=always_include_ids,
+            retention_policy=retention_policies[room_id],
+            state=event_id_to_state.get(event.event_id),
+            is_peeking=is_peeking,
+            sender_erased=erased_senders.get(event.sender, False),
+        )
 
-        # we need to handle outliers separately, since we don't have the room state.
-        if event.internal_metadata.outlier:
-            # Normally these can't be seen by clients, but we make an exception for
-            # for out-of-band membership events (eg, incoming invites, or rejections of
-            # said invite) for the user themselves.
-            if event.type == EventTypes.Member and event.state_key == user_id:
-                logger.debug("Returning out-of-band-membership event %s", event)
-                return event
+    # Check each event: gives an iterable of None or (a potentially modified)
+    # EventBase.
+    filtered_events = map(allowed, events)
+
+    # Turn it into a list and remove None entries before returning.
+    return [ev for ev in filtered_events if ev]
 
-            return None
 
-        state = event_id_to_state[event.event_id]
+async def filter_event_for_clients_with_state(
+    store: DataStore,
+    user_ids: Collection[str],
+    event: EventBase,
+    context: EventContext,
+    is_peeking: bool = False,
+    filter_send_to_client: bool = True,
+) -> Collection[str]:
+    """
+    Checks to see if an event is visible to the users in the list at the time of
+    the event.
 
-        # get the room_visibility at the time of the event.
-        visibility = get_effective_room_visibility_from_state(state)
+    Note: This does *not* check if the sender of the event was erased.
 
-        # Always allow history visibility events on boundaries. This is done
-        # by setting the effective visibility to the least restrictive
-        # of the old vs new.
-        if event.type == EventTypes.RoomHistoryVisibility:
-            prev_content = event.unsigned.get("prev_content", {})
-            prev_visibility = prev_content.get("history_visibility", None)
+    Args:
+        store: databases
+        user_ids: user_ids to be checked
+        event: the event to be checked
+        context: EventContext for the event to be checked
+        is_peeking: Whether the users are peeking into the room, ie not
+            currently joined
+        filter_send_to_client: Whether we're checking an event that's going to be
+            sent to a client. This might not always be the case since this function can
+            also be called to check whether a user can see the state at a given point.
 
-            if prev_visibility not in VISIBILITY_PRIORITY:
-                prev_visibility = HistoryVisibility.SHARED
+    Returns:
+        Collection of user IDs for whom the event is visible
+    """
+    # None of the users should see the event if it is soft_failed
+    if event.internal_metadata.is_soft_failed():
+        return []
 
-            new_priority = VISIBILITY_PRIORITY.index(visibility)
-            old_priority = VISIBILITY_PRIORITY.index(prev_visibility)
-            if old_priority < new_priority:
-                visibility = prev_visibility
+    # Make a set for all user IDs that haven't been filtered out by a check.
+    allowed_user_ids = set(user_ids)
 
-        # likewise, if the event is the user's own membership event, use
-        # the 'most joined' membership
-        membership = None
-        if event.type == EventTypes.Member and event.state_key == user_id:
-            membership = event.content.get("membership", None)
-            if membership not in MEMBERSHIP_PRIORITY:
-                membership = "leave"
-
-            prev_content = event.unsigned.get("prev_content", {})
-            prev_membership = prev_content.get("membership", None)
-            if prev_membership not in MEMBERSHIP_PRIORITY:
-                prev_membership = "leave"
-
-            # Always allow the user to see their own leave events, otherwise
-            # they won't see the room disappear if they reject the invite
-            #
-            # (Note this doesn't work for out-of-band invite rejections, which don't
-            # have prev_state populated. They are handled above in the outlier code.)
-            if membership == "leave" and (
-                prev_membership == "join" or prev_membership == "invite"
+    # Only run some checks if these events aren't about to be sent to clients. This is
+    # because, if this is not the case, we're probably only checking if the users can
+    # see events in the room at that point in the DAG, and that shouldn't be decided
+    # on those checks.
+    if filter_send_to_client:
+        ignored_by = await store.ignored_by(event.sender)
+        retention_policy = await store.get_retention_policy_for_room(event.room_id)
+
+        for user_id in user_ids:
+            if (
+                _check_filter_send_to_client(
+                    event,
+                    store.clock,
+                    retention_policy,
+                    sender_ignored=user_id in ignored_by,
+                )
+                == _CheckFilter.DENIED
             ):
-                return event
-
-            new_priority = MEMBERSHIP_PRIORITY.index(membership)
-            old_priority = MEMBERSHIP_PRIORITY.index(prev_membership)
-            if old_priority < new_priority:
-                membership = prev_membership
-
-        # otherwise, get the user's membership at the time of the event.
-        if membership is None:
-            membership_event = state.get((EventTypes.Member, user_id), None)
-            if membership_event:
-                membership = membership_event.membership
-
-        # if the user was a member of the room at the time of the event,
-        # they can see it.
-        if membership == Membership.JOIN:
-            return event
+                allowed_user_ids.discard(user_id)
+
+    if event.internal_metadata.outlier:
+        # Normally these can't be seen by clients, but we make an exception for
+        # for out-of-band membership events (eg, incoming invites, or rejections of
+        # said invite) for the user themselves.
+        if event.type == EventTypes.Member and event.state_key in allowed_user_ids:
+            logger.debug("Returning out-of-band-membership event %s", event)
+            return {event.state_key}
+
+        return set()
+
+    # First we get just the history visibility in case its shared/world-readable
+    # room.
+    visibility_state_map = await _get_state_map(
+        store, event, context, StateFilter.from_types([_HISTORY_VIS_KEY])
+    )
 
-        # otherwise, it depends on the room visibility.
+    visibility = get_effective_room_visibility_from_state(visibility_state_map)
+    if (
+        _check_history_visibility(event, visibility, is_peeking=is_peeking)
+        == _CheckVisibility.ALLOWED
+    ):
+        return allowed_user_ids
 
-        if visibility == HistoryVisibility.JOINED:
-            # we weren't a member at the time of the event, so we can't
-            # see this event.
-            return None
+    # The history visibility isn't lax, so we now need to fetch the membership
+    # events of all the users.
+
+    filter_list = []
+    for user_id in allowed_user_ids:
+        filter_list.append((EventTypes.Member, user_id))
+    filter_list.append((EventTypes.RoomHistoryVisibility, ""))
+
+    state_filter = StateFilter.from_types(filter_list)
+    state_map = await _get_state_map(store, event, context, state_filter)
+
+    # Now we check whether the membership allows each user to see the event.
+    return {
+        user_id
+        for user_id in allowed_user_ids
+        if _check_membership(user_id, event, visibility, state_map, is_peeking).allowed
+    }
+
+
+async def _get_state_map(
+    store: DataStore, event: EventBase, context: EventContext, state_filter: StateFilter
+) -> StateMap[EventBase]:
+    """Helper function for getting a `StateMap[EventBase]` from an `EventContext`"""
+    state_map = await context.get_prev_state_ids(state_filter)
+
+    # Use events rather than event ids as content from the events are needed in
+    # _check_visibility
+    event_map = await store.get_events(state_map.values(), get_prev_content=False)
+
+    updated_state_map = {}
+    for state_key, event_id in state_map.items():
+        state_event = event_map.get(event_id)
+        if state_event:
+            updated_state_map[state_key] = state_event
+
+    if event.is_state():
+        current_state_key = (event.type, event.state_key)
+        # Add current event to updated_state_map, we need to do this here as it
+        # may not have been persisted to the db yet
+        updated_state_map[current_state_key] = event
 
-        elif visibility == HistoryVisibility.INVITED:
-            # user can also see the event if they were *invited* at the time
-            # of the event.
-            return event if membership == Membership.INVITE else None
-
-        elif visibility == HistoryVisibility.SHARED and is_peeking:
-            # if the visibility is shared, users cannot see the event unless
-            # they have *subsequently* joined the room (or were members at the
-            # time, of course)
-            #
-            # XXX: if the user has subsequently joined and then left again,
-            # ideally we would share history up to the point they left. But
-            # we don't know when they left. We just treat it as though they
-            # never joined, and restrict access.
+    return updated_state_map
+
+
+def _check_client_allowed_to_see_event(
+    user_id: str,
+    event: EventBase,
+    clock: Clock,
+    filter_send_to_client: bool,
+    is_peeking: bool,
+    always_include_ids: FrozenSet[str],
+    sender_ignored: bool,
+    retention_policy: RetentionPolicy,
+    state: Optional[StateMap[EventBase]],
+    sender_erased: bool,
+) -> Optional[EventBase]:
+    """Check with the given user is allowed to see the given event
+
+    See `filter_events_for_client` for details about args
+
+    Args:
+        user_id
+        event
+        clock
+        filter_send_to_client
+        is_peeking
+        always_include_ids
+        sender_ignored: Whether the user is ignoring the event sender
+        retention_policy: The retention policy of the room
+        state: The state at the event, unless its an outlier
+        sender_erased: Whether the event sender has been marked as "erased"
+
+    Returns:
+        None if the user cannot see this event at all
+
+        a redacted copy of the event if they can only see a redacted
+        version
+
+        the original event if they can see it as normal.
+    """
+    # Only run some checks if these events aren't about to be sent to clients. This is
+    # because, if this is not the case, we're probably only checking if the users can
+    # see events in the room at that point in the DAG, and that shouldn't be decided
+    # on those checks.
+    if filter_send_to_client:
+        if (
+            _check_filter_send_to_client(event, clock, retention_policy, sender_ignored)
+            == _CheckFilter.DENIED
+        ):
             return None
 
-        # the visibility is either shared or world_readable, and the user was
-        # not a member at the time. We allow it, provided the original sender
-        # has not requested their data to be erased, in which case, we return
-        # a redacted version.
-        if erased_senders[event.sender]:
-            return prune_event(event)
+    if event.event_id in always_include_ids:
+        return event
+
+    # we need to handle outliers separately, since we don't have the room state.
+    if event.internal_metadata.outlier:
+        # Normally these can't be seen by clients, but we make an exception for
+        # for out-of-band membership events (eg, incoming invites, or rejections of
+        # said invite) for the user themselves.
+        if event.type == EventTypes.Member and event.state_key == user_id:
+            logger.debug("Returning out-of-band-membership event %s", event)
+            return event
+
+        return None
+
+    if state is None:
+        raise Exception("Missing state for non-outlier event")
 
+    # get the room_visibility at the time of the event.
+    visibility = get_effective_room_visibility_from_state(state)
+
+    # Check if the room has lax history visibility, allowing us to skip
+    # membership checks.
+    #
+    # We can only do this check if the sender has *not* been erased, as if they
+    # have we need to check the user's membership.
+    if (
+        not sender_erased
+        and _check_history_visibility(event, visibility, is_peeking)
+        == _CheckVisibility.ALLOWED
+    ):
         return event
 
-    # Check each event: gives an iterable of None or (a potentially modified)
-    # EventBase.
-    filtered_events = map(allowed, events)
+    membership_result = _check_membership(user_id, event, visibility, state, is_peeking)
+    if not membership_result.allowed:
+        return None
 
-    # Turn it into a list and remove None entries before returning.
-    return [ev for ev in filtered_events if ev]
+    # If the sender has been erased and the user was not joined at the time, we
+    # must only return the redacted form.
+    if sender_erased and not membership_result.joined:
+        event = prune_event(event)
+
+    return event
+
+
+@attr.s(frozen=True, slots=True, auto_attribs=True)
+class _CheckMembershipReturn:
+    "Return value of _check_membership"
+    allowed: bool
+    joined: bool
+
+
+def _check_membership(
+    user_id: str,
+    event: EventBase,
+    visibility: str,
+    state: StateMap[EventBase],
+    is_peeking: bool,
+) -> _CheckMembershipReturn:
+    """Check whether the user can see the event due to their membership
+
+    Returns:
+        True if they can, False if they can't, plus the membership of the user
+        at the event.
+    """
+    # If the event is the user's own membership event, use the 'most joined'
+    # membership
+    membership = None
+    if event.type == EventTypes.Member and event.state_key == user_id:
+        membership = event.content.get("membership", None)
+        if membership not in MEMBERSHIP_PRIORITY:
+            membership = "leave"
+
+        prev_content = event.unsigned.get("prev_content", {})
+        prev_membership = prev_content.get("membership", None)
+        if prev_membership not in MEMBERSHIP_PRIORITY:
+            prev_membership = "leave"
+
+        # Always allow the user to see their own leave events, otherwise
+        # they won't see the room disappear if they reject the invite
+        #
+        # (Note this doesn't work for out-of-band invite rejections, which don't
+        # have prev_state populated. They are handled above in the outlier code.)
+        if membership == "leave" and (
+            prev_membership == "join" or prev_membership == "invite"
+        ):
+            return _CheckMembershipReturn(True, membership == Membership.JOIN)
+
+        new_priority = MEMBERSHIP_PRIORITY.index(membership)
+        old_priority = MEMBERSHIP_PRIORITY.index(prev_membership)
+        if old_priority < new_priority:
+            membership = prev_membership
+
+    # otherwise, get the user's membership at the time of the event.
+    if membership is None:
+        membership_event = state.get((EventTypes.Member, user_id), None)
+        if membership_event:
+            membership = membership_event.membership
+
+    # if the user was a member of the room at the time of the event,
+    # they can see it.
+    if membership == Membership.JOIN:
+        return _CheckMembershipReturn(True, True)
+
+    # otherwise, it depends on the room visibility.
+
+    if visibility == HistoryVisibility.JOINED:
+        # we weren't a member at the time of the event, so we can't
+        # see this event.
+        return _CheckMembershipReturn(False, False)
+
+    elif visibility == HistoryVisibility.INVITED:
+        # user can also see the event if they were *invited* at the time
+        # of the event.
+        return _CheckMembershipReturn(membership == Membership.INVITE, False)
+
+    elif visibility == HistoryVisibility.SHARED and is_peeking:
+        # if the visibility is shared, users cannot see the event unless
+        # they have *subsequently* joined the room (or were members at the
+        # time, of course)
+        #
+        # XXX: if the user has subsequently joined and then left again,
+        # ideally we would share history up to the point they left. But
+        # we don't know when they left. We just treat it as though they
+        # never joined, and restrict access.
+        return _CheckMembershipReturn(False, False)
+
+    # The visibility is either shared or world_readable, and the user was
+    # not a member at the time. We allow it.
+    return _CheckMembershipReturn(True, False)
+
+
+class _CheckFilter(Enum):
+    MAYBE_ALLOWED = auto()
+    DENIED = auto()
+
+
+def _check_filter_send_to_client(
+    event: EventBase,
+    clock: Clock,
+    retention_policy: RetentionPolicy,
+    sender_ignored: bool,
+) -> _CheckFilter:
+    """Apply checks for sending events to client
+
+    Returns:
+        True if might be allowed to be sent to clients, False if definitely not.
+    """
+
+    if event.type == EventTypes.Dummy:
+        return _CheckFilter.DENIED
+
+    if not event.is_state() and sender_ignored:
+        return _CheckFilter.DENIED
+
+    # Until MSC2261 has landed we can't redact malicious alias events, so for
+    # now we temporarily filter out m.room.aliases entirely to mitigate
+    # abuse, while we spec a better solution to advertising aliases
+    # on rooms.
+    if event.type == EventTypes.Aliases:
+        return _CheckFilter.DENIED
+
+    # Don't try to apply the room's retention policy if the event is a state
+    # event, as MSC1763 states that retention is only considered for non-state
+    # events.
+    if not event.is_state():
+        max_lifetime = retention_policy.max_lifetime
+
+        if max_lifetime is not None:
+            oldest_allowed_ts = clock.time_msec() - max_lifetime
+
+            if event.origin_server_ts < oldest_allowed_ts:
+                return _CheckFilter.DENIED
+
+    return _CheckFilter.MAYBE_ALLOWED
+
+
+class _CheckVisibility(Enum):
+    ALLOWED = auto()
+    MAYBE_DENIED = auto()
+
+
+def _check_history_visibility(
+    event: EventBase, visibility: str, is_peeking: bool
+) -> _CheckVisibility:
+    """Check if event is allowed to be seen due to lax history visibility.
+
+    Returns:
+        True if user can definitely see the event, False if maybe not.
+    """
+    # Always allow history visibility events on boundaries. This is done
+    # by setting the effective visibility to the least restrictive
+    # of the old vs new.
+    if event.type == EventTypes.RoomHistoryVisibility:
+        prev_content = event.unsigned.get("prev_content", {})
+        prev_visibility = prev_content.get("history_visibility", None)
+
+        if prev_visibility not in VISIBILITY_PRIORITY:
+            prev_visibility = HistoryVisibility.SHARED
+
+        new_priority = VISIBILITY_PRIORITY.index(visibility)
+        old_priority = VISIBILITY_PRIORITY.index(prev_visibility)
+        if old_priority < new_priority:
+            visibility = prev_visibility
+
+    if visibility == HistoryVisibility.SHARED and not is_peeking:
+        return _CheckVisibility.ALLOWED
+    elif visibility == HistoryVisibility.WORLD_READABLE:
+        return _CheckVisibility.ALLOWED
+
+    return _CheckVisibility.MAYBE_DENIED
 
 
 def get_effective_room_visibility_from_state(state: StateMap[EventBase]) -> str:
     """Get the actual history vis, from a state map including the history_visibility event
-
     Handles missing and invalid history visibility events.
     """
     visibility_event = state.get(_HISTORY_VIS_KEY, None)