diff options
author | Eric Eastwood <erice@element.io> | 2022-08-18 16:33:22 -0500 |
---|---|---|
committer | Eric Eastwood <erice@element.io> | 2022-08-18 16:33:22 -0500 |
commit | 8def7e4b4b6d0ecfb7776bf987f621e38946b50c (patch) | |
tree | 92fb7ce5f7008e7b053a36342409e1958fbdbb46 /synapse | |
parent | Merge branch 'develop' into madlittlemods/11850-migrate-to-opentelemetry (diff) | |
parent | Add metrics to track `/messages` response time by room size (#13545) (diff) | |
download | synapse-8def7e4b4b6d0ecfb7776bf987f621e38946b50c.tar.xz |
Merge branch 'develop' into madlittlemods/11850-migrate-to-opentelemetry
Conflicts: poetry.lock synapse/federation/federation_client.py synapse/federation/federation_server.py synapse/handlers/federation.py synapse/handlers/federation_event.py synapse/logging/opentracing.py synapse/rest/client/room.py synapse/storage/controllers/persist_events.py synapse/storage/controllers/state.py
Diffstat (limited to 'synapse')
68 files changed, 1787 insertions, 613 deletions
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 42d1f6d219..30e21d9707 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -441,6 +441,13 @@ def start(config_options: List[str]) -> None: "synapse.app.user_dir", ) + if config.experimental.faster_joins_enabled: + raise ConfigError( + "You have enabled the experimental `faster_joins` config option, but it is " + "not compatible with worker deployments yet. Please disable `faster_joins` " + "or run Synapse as a single process deployment instead." + ) + synapse.events.USE_FROZEN_DICTS = config.server.use_frozen_dicts synapse.util.caches.TRACK_MEMORY_USAGE = config.caches.track_memory_usage diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 745e704141..d98012adeb 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -220,7 +220,10 @@ class SynapseHomeServer(HomeServer): resources.update({"/_matrix/consent": consent_resource}) if name == "federation": - resources.update({FEDERATION_PREFIX: TransportLayerServer(self)}) + federation_resource: Resource = TransportLayerServer(self) + if compress: + federation_resource = gz_wrap(federation_resource) + resources.update({FEDERATION_PREFIX: federation_resource}) if name == "openid": resources.update( diff --git a/synapse/config/account_validity.py b/synapse/config/account_validity.py index d1335e77cd..b3972ede96 100644 --- a/synapse/config/account_validity.py +++ b/synapse/config/account_validity.py @@ -23,7 +23,7 @@ LEGACY_TEMPLATE_DIR_WARNING = """ This server's configuration file is using the deprecated 'template_dir' setting in the 'account_validity' section. Support for this setting has been deprecated and will be removed in a future version of Synapse. Server admins should instead use the new -'custom_templates_directory' setting documented here: +'custom_template_directory' setting documented here: https://matrix-org.github.io/synapse/latest/templates.html ---------------------------------------------------------------------------------------""" diff --git a/synapse/config/emailconfig.py b/synapse/config/emailconfig.py index 7765c5b454..66a6dbf1fe 100644 --- a/synapse/config/emailconfig.py +++ b/synapse/config/emailconfig.py @@ -53,7 +53,7 @@ LEGACY_TEMPLATE_DIR_WARNING = """ This server's configuration file is using the deprecated 'template_dir' setting in the 'email' section. Support for this setting has been deprecated and will be removed in a future version of Synapse. Server admins should instead use the new -'custom_templates_directory' setting documented here: +'custom_template_directory' setting documented here: https://matrix-org.github.io/synapse/latest/templates.html ---------------------------------------------------------------------------------------""" diff --git a/synapse/config/sso.py b/synapse/config/sso.py index 2178cbf983..a452cc3a49 100644 --- a/synapse/config/sso.py +++ b/synapse/config/sso.py @@ -26,7 +26,7 @@ LEGACY_TEMPLATE_DIR_WARNING = """ This server's configuration file is using the deprecated 'template_dir' setting in the 'sso' section. Support for this setting has been deprecated and will be removed in a future version of Synapse. Server admins should instead use the new -'custom_templates_directory' setting documented here: +'custom_template_directory' setting documented here: https://matrix-org.github.io/synapse/latest/templates.html ---------------------------------------------------------------------------------------""" diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index dfd0bc414e..dee5461270 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -61,7 +61,7 @@ from synapse.federation.federation_base import ( ) from synapse.federation.transport.client import SendJoinResponse from synapse.http.types import QueryParams -from synapse.logging.tracing import trace +from synapse.logging.tracing import SynapseTags, set_attribute, tag_args, trace from synapse.types import JsonDict, UserID, get_domain_from_id from synapse.util.async_helpers import concurrently_execute from synapse.util.caches.expiringcache import ExpiringCache @@ -235,6 +235,7 @@ class FederationClient(FederationBase): ) @trace + @tag_args async def backfill( self, dest: str, room_id: str, limit: int, extremities: Collection[str] ) -> Optional[List[EventBase]]: @@ -337,6 +338,8 @@ class FederationClient(FederationBase): return None + @trace + @tag_args async def get_pdu( self, destinations: Iterable[str], @@ -448,6 +451,8 @@ class FederationClient(FederationBase): return event_copy + @trace + @tag_args async def get_room_state_ids( self, destination: str, room_id: str, event_id: str ) -> Tuple[List[str], List[str]]: @@ -467,6 +472,23 @@ class FederationClient(FederationBase): state_event_ids = result["pdu_ids"] auth_event_ids = result.get("auth_chain_ids", []) + set_attribute( + SynapseTags.RESULT_PREFIX + "state_event_ids", + str(state_event_ids), + ) + set_attribute( + SynapseTags.RESULT_PREFIX + "state_event_ids.length", + str(len(state_event_ids)), + ) + set_attribute( + SynapseTags.RESULT_PREFIX + "auth_event_ids", + str(auth_event_ids), + ) + set_attribute( + SynapseTags.RESULT_PREFIX + "auth_event_ids.length", + str(len(auth_event_ids)), + ) + if not isinstance(state_event_ids, list) or not isinstance( auth_event_ids, list ): @@ -474,6 +496,8 @@ class FederationClient(FederationBase): return state_event_ids, auth_event_ids + @trace + @tag_args async def get_room_state( self, destination: str, @@ -533,6 +557,7 @@ class FederationClient(FederationBase): return valid_state_events, valid_auth_events + @trace async def _check_sigs_and_hash_and_fetch( self, origin: str, diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index c5b3b3cedf..dd9aeba9c8 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -61,7 +61,7 @@ from synapse.logging.context import ( nested_logging_context, run_in_background, ) -from synapse.logging.tracing import log_kv, start_active_span_from_edu, trace +from synapse.logging.tracing import log_kv, start_active_span_from_edu, tag_args, trace from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.replication.http.federation import ( ReplicationFederationSendEduRestServlet, @@ -547,6 +547,8 @@ class FederationServer(FederationBase): return 200, resp + @trace + @tag_args async def on_state_ids_request( self, origin: str, room_id: str, event_id: str ) -> Tuple[int, JsonDict]: @@ -569,6 +571,8 @@ class FederationServer(FederationBase): return 200, resp + @trace + @tag_args async def _on_state_ids_request_compute( self, room_id: str, event_id: str ) -> JsonDict: diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 5e2b1c0456..8dfc424807 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -32,6 +32,7 @@ from typing import ( ) import attr +from prometheus_client import Histogram from signedjson.key import decode_verify_key_bytes from signedjson.sign import verify_signed_json from unpaddedbase64 import decode_base64 @@ -59,7 +60,7 @@ from synapse.events.validator import EventValidator from synapse.federation.federation_client import InvalidResponseError from synapse.http.servlet import assert_params_in_dict from synapse.logging.context import nested_logging_context -from synapse.logging.tracing import trace +from synapse.logging.tracing import SynapseTags, set_attribute, tag_args, trace from synapse.metrics.background_process_metrics import run_as_background_process from synapse.module_api import NOT_SPAM from synapse.replication.http.federation import ( @@ -79,6 +80,24 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) +# Added to debug performance and track progress on optimizations +backfill_processing_before_timer = Histogram( + "synapse_federation_backfill_processing_before_time_seconds", + "sec", + [], + buckets=( + 1.0, + 5.0, + 10.0, + 20.0, + 30.0, + 40.0, + 60.0, + 80.0, + "+Inf", + ), +) + def get_domains_from_state(state: StateMap[EventBase]) -> List[Tuple[str, int]]: """Get joined domains from state @@ -138,6 +157,7 @@ class FederationHandler: def __init__(self, hs: "HomeServer"): self.hs = hs + self.clock = hs.get_clock() self.store = hs.get_datastores().main self._storage_controllers = hs.get_storage_controllers() self._state_storage_controller = self._storage_controllers.state @@ -197,12 +217,39 @@ class FederationHandler: return. This is used as part of the heuristic to decide if we should back paginate. """ + # Starting the processing time here so we can include the room backfill + # linearizer lock queue in the timing + processing_start_time = self.clock.time_msec() + async with self._room_backfill.queue(room_id): - return await self._maybe_backfill_inner(room_id, current_depth, limit) + return await self._maybe_backfill_inner( + room_id, + current_depth, + limit, + processing_start_time=processing_start_time, + ) async def _maybe_backfill_inner( - self, room_id: str, current_depth: int, limit: int + self, + room_id: str, + current_depth: int, + limit: int, + *, + processing_start_time: int, ) -> bool: + """ + Checks whether the `current_depth` is at or approaching any backfill + points in the room and if so, will backfill. We only care about + checking backfill points that happened before the `current_depth` + (meaning less than or equal to the `current_depth`). + + Args: + room_id: The room to backfill in. + current_depth: The depth to check at for any upcoming backfill points. + limit: The max number of events to request from the remote federated server. + processing_start_time: The time when `maybe_backfill` started + processing. Only used for timing. + """ backwards_extremities = [ _BackfillPoint(event_id, depth, _BackfillPointType.BACKWARDS_EXTREMITY) for event_id, depth in await self.store.get_oldest_event_ids_with_depth_in_room( @@ -370,6 +417,14 @@ class FederationHandler: logger.debug( "_maybe_backfill_inner: extremities_to_request %s", extremities_to_request ) + set_attribute( + SynapseTags.RESULT_PREFIX + "extremities_to_request", + str(extremities_to_request), + ) + set_attribute( + SynapseTags.RESULT_PREFIX + "extremities_to_request.length", + str(len(extremities_to_request)), + ) # Now we need to decide which hosts to hit first. @@ -425,6 +480,11 @@ class FederationHandler: return False + processing_end_time = self.clock.time_msec() + backfill_processing_before_timer.observe( + (processing_start_time - processing_end_time) / 1000 + ) + success = await try_backfill(likely_domains) if success: return True @@ -1081,6 +1141,8 @@ class FederationHandler: return event + @trace + @tag_args async def get_state_ids_for_pdu(self, room_id: str, event_id: str) -> List[str]: """Returns the state at the event. i.e. not including said event.""" event = await self.store.get_event(event_id, check_room_id=room_id) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 740a04aad6..a3bd04b712 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -29,7 +29,7 @@ from typing import ( Tuple, ) -from prometheus_client import Counter +from prometheus_client import Counter, Histogram from synapse import event_auth from synapse.api.constants import ( @@ -59,7 +59,13 @@ from synapse.events import EventBase from synapse.events.snapshot import EventContext from synapse.federation.federation_client import InvalidResponseError from synapse.logging.context import nested_logging_context -from synapse.logging.tracing import trace +from synapse.logging.tracing import ( + SynapseTags, + set_attribute, + start_active_span, + tag_args, + trace, +) from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet from synapse.replication.http.federation import ( @@ -92,6 +98,26 @@ soft_failed_event_counter = Counter( "Events received over federation that we marked as soft_failed", ) +# Added to debug performance and track progress on optimizations +backfill_processing_after_timer = Histogram( + "synapse_federation_backfill_processing_after_time_seconds", + "sec", + [], + buckets=( + 1.0, + 5.0, + 10.0, + 20.0, + 30.0, + 40.0, + 60.0, + 80.0, + 120.0, + 180.0, + "+Inf", + ), +) + class FederationEventHandler: """Handles events that originated from federation. @@ -410,6 +436,7 @@ class FederationEventHandler: prev_member_event, ) + @trace async def process_remote_join( self, origin: str, @@ -597,20 +624,21 @@ class FederationEventHandler: if not events: return - # if there are any events in the wrong room, the remote server is buggy and - # should not be trusted. - for ev in events: - if ev.room_id != room_id: - raise InvalidResponseError( - f"Remote server {dest} returned event {ev.event_id} which is in " - f"room {ev.room_id}, when we were backfilling in {room_id}" - ) + with backfill_processing_after_timer.time(): + # if there are any events in the wrong room, the remote server is buggy and + # should not be trusted. + for ev in events: + if ev.room_id != room_id: + raise InvalidResponseError( + f"Remote server {dest} returned event {ev.event_id} which is in " + f"room {ev.room_id}, when we were backfilling in {room_id}" + ) - await self._process_pulled_events( - dest, - events, - backfilled=True, - ) + await self._process_pulled_events( + dest, + events, + backfilled=True, + ) @trace async def _get_missing_events_for_pdu( @@ -715,7 +743,7 @@ class FederationEventHandler: @trace async def _process_pulled_events( - self, origin: str, events: Iterable[EventBase], backfilled: bool + self, origin: str, events: Collection[EventBase], backfilled: bool ) -> None: """Process a batch of events we have pulled from a remote server @@ -730,6 +758,15 @@ class FederationEventHandler: backfilled: True if this is part of a historical batch of events (inhibits notification to clients, and validation of device keys.) """ + set_attribute( + SynapseTags.FUNC_ARG_PREFIX + "event_ids", + str([event.event_id for event in events]), + ) + set_attribute( + SynapseTags.FUNC_ARG_PREFIX + "event_ids.length", + str(len(events)), + ) + set_attribute(SynapseTags.FUNC_ARG_PREFIX + "backfilled", str(backfilled)) logger.debug( "processing pulled backfilled=%s events=%s", backfilled, @@ -753,6 +790,7 @@ class FederationEventHandler: await self._process_pulled_event(origin, ev, backfilled=backfilled) @trace + @tag_args async def _process_pulled_event( self, origin: str, event: EventBase, backfilled: bool ) -> None: @@ -854,6 +892,7 @@ class FederationEventHandler: else: raise + @trace async def _compute_event_context_with_maybe_missing_prevs( self, dest: str, event: EventBase ) -> EventContext: @@ -970,6 +1009,8 @@ class FederationEventHandler: event, state_ids_before_event=state_map, partial_state=partial_state ) + @trace + @tag_args async def _get_state_ids_after_missing_prev_event( self, destination: str, @@ -1009,10 +1050,10 @@ class FederationEventHandler: logger.debug("Fetching %i events from cache/store", len(desired_events)) have_events = await self._store.have_seen_events(room_id, desired_events) - missing_desired_events = desired_events - have_events + missing_desired_event_ids = desired_events - have_events logger.debug( "We are missing %i events (got %i)", - len(missing_desired_events), + len(missing_desired_event_ids), len(have_events), ) @@ -1024,13 +1065,30 @@ class FederationEventHandler: # already have a bunch of the state events. It would be nice if the # federation api gave us a way of finding out which we actually need. - missing_auth_events = set(auth_event_ids) - have_events - missing_auth_events.difference_update( - await self._store.have_seen_events(room_id, missing_auth_events) + missing_auth_event_ids = set(auth_event_ids) - have_events + missing_auth_event_ids.difference_update( + await self._store.have_seen_events(room_id, missing_auth_event_ids) ) - logger.debug("We are also missing %i auth events", len(missing_auth_events)) + logger.debug("We are also missing %i auth events", len(missing_auth_event_ids)) - missing_events = missing_desired_events | missing_auth_events + missing_event_ids = missing_desired_event_ids | missing_auth_event_ids + + set_attribute( + SynapseTags.RESULT_PREFIX + "missing_auth_event_ids", + str(missing_auth_event_ids), + ) + set_attribute( + SynapseTags.RESULT_PREFIX + "missing_auth_event_ids.length", + str(len(missing_auth_event_ids)), + ) + set_attribute( + SynapseTags.RESULT_PREFIX + "missing_desired_event_ids", + str(missing_desired_event_ids), + ) + set_attribute( + SynapseTags.RESULT_PREFIX + "missing_desired_event_ids.length", + str(len(missing_desired_event_ids)), + ) # Making an individual request for each of 1000s of events has a lot of # overhead. On the other hand, we don't really want to fetch all of the events @@ -1041,13 +1099,13 @@ class FederationEventHandler: # # TODO: might it be better to have an API which lets us do an aggregate event # request - if (len(missing_events) * 10) >= len(auth_event_ids) + len(state_event_ids): + if (len(missing_event_ids) * 10) >= len(auth_event_ids) + len(state_event_ids): logger.debug("Requesting complete state from remote") await self._get_state_and_persist(destination, room_id, event_id) else: - logger.debug("Fetching %i events from remote", len(missing_events)) + logger.debug("Fetching %i events from remote", len(missing_event_ids)) await self._get_events_and_persist( - destination=destination, room_id=room_id, event_ids=missing_events + destination=destination, room_id=room_id, event_ids=missing_event_ids ) # We now need to fill out the state map, which involves fetching the @@ -1104,6 +1162,14 @@ class FederationEventHandler: event_id, failed_to_fetch, ) + set_attribute( + SynapseTags.RESULT_PREFIX + "failed_to_fetch", + str(failed_to_fetch), + ) + set_attribute( + SynapseTags.RESULT_PREFIX + "failed_to_fetch.length", + str(len(failed_to_fetch)), + ) if remote_event.is_state() and remote_event.rejected_reason is None: state_map[ @@ -1112,6 +1178,8 @@ class FederationEventHandler: return state_map + @trace + @tag_args async def _get_state_and_persist( self, destination: str, room_id: str, event_id: str ) -> None: @@ -1133,6 +1201,7 @@ class FederationEventHandler: destination=destination, room_id=room_id, event_ids=(event_id,) ) + @trace async def _process_received_pdu( self, origin: str, @@ -1283,6 +1352,7 @@ class FederationEventHandler: except Exception: logger.exception("Failed to resync device for %s", sender) + @trace async def _handle_marker_event(self, origin: str, marker_event: EventBase) -> None: """Handles backfilling the insertion event when we receive a marker event that points to one. @@ -1414,6 +1484,8 @@ class FederationEventHandler: return event_from_response + @trace + @tag_args async def _get_events_and_persist( self, destination: str, room_id: str, event_ids: Collection[str] ) -> None: @@ -1459,6 +1531,7 @@ class FederationEventHandler: logger.info("Fetched %i events of %i requested", len(events), len(event_ids)) await self._auth_and_persist_outliers(room_id, events) + @trace async def _auth_and_persist_outliers( self, room_id: str, events: Iterable[EventBase] ) -> None: @@ -1477,6 +1550,16 @@ class FederationEventHandler: """ event_map = {event.event_id: event for event in events} + event_ids = event_map.keys() + set_attribute( + SynapseTags.FUNC_ARG_PREFIX + "event_ids", + str(event_ids), + ) + set_attribute( + SynapseTags.FUNC_ARG_PREFIX + "event_ids.length", + str(len(event_ids)), + ) + # filter out any events we have already seen. This might happen because # the events were eagerly pushed to us (eg, during a room join), or because # another thread has raced against us since we decided to request the event. @@ -1593,6 +1676,7 @@ class FederationEventHandler: backfilled=True, ) + @trace async def _check_event_auth( self, origin: Optional[str], event: EventBase, context: EventContext ) -> None: @@ -1631,6 +1715,14 @@ class FederationEventHandler: claimed_auth_events = await self._load_or_fetch_auth_events_for_event( origin, event ) + set_attribute( + SynapseTags.RESULT_PREFIX + "claimed_auth_events", + str([ev.event_id for ev in claimed_auth_events]), + ) + set_attribute( + SynapseTags.RESULT_PREFIX + "claimed_auth_events.length", + str(len(claimed_auth_events)), + ) # ... and check that the event passes auth at those auth events. # https://spec.matrix.org/v1.3/server-server-api/#checks-performed-on-receipt-of-a-pdu: @@ -1728,6 +1820,7 @@ class FederationEventHandler: ) context.rejected = RejectedReason.AUTH_ERROR + @trace async def _maybe_kick_guest_users(self, event: EventBase) -> None: if event.type != EventTypes.GuestAccess: return @@ -1935,6 +2028,8 @@ class FederationEventHandler: # instead we raise an AuthError, which will make the caller ignore it. raise AuthError(code=HTTPStatus.FORBIDDEN, msg="Auth events could not be found") + @trace + @tag_args async def _get_remote_auth_chain_for_event( self, destination: str, room_id: str, event_id: str ) -> None: @@ -1963,6 +2058,7 @@ class FederationEventHandler: await self._auth_and_persist_outliers(room_id, remote_auth_events) + @trace async def _run_push_actions_and_persist_event( self, event: EventBase, context: EventContext, backfilled: bool = False ) -> None: @@ -2071,8 +2167,17 @@ class FederationEventHandler: self._message_handler.maybe_schedule_expiry(event) if not backfilled: # Never notify for backfilled events - for event in events: - await self._notify_persisted_event(event, max_stream_token) + with start_active_span("notify_persisted_events"): + set_attribute( + SynapseTags.RESULT_PREFIX + "event_ids", + str([ev.event_id for ev in events]), + ) + set_attribute( + SynapseTags.RESULT_PREFIX + "event_ids.length", + str(len(events)), + ) + for event in events: + await self._notify_persisted_event(event, max_stream_token) return max_stream_token.stream diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 8d9754f306..a7af04f6f3 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -331,7 +331,11 @@ class MessageHandler: msg="Getting joined members while not being a current member of the room is forbidden.", ) - users_with_profile = await self.store.get_users_in_room_with_profiles(room_id) + users_with_profile = ( + await self._state_storage_controller.get_users_in_room_with_profiles( + room_id + ) + ) # If this is an AS, double check that they are allowed to see the members. # This can either be because the AS user is in the room or because there diff --git a/synapse/handlers/room_summary.py b/synapse/handlers/room_summary.py index ebd445adca..732b0310bc 100644 --- a/synapse/handlers/room_summary.py +++ b/synapse/handlers/room_summary.py @@ -453,6 +453,7 @@ class RoomSummaryHandler: "type": e.type, "state_key": e.state_key, "content": e.content, + "room_id": e.room_id, "sender": e.sender, "origin_server_ts": e.origin_server_ts, } diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index cddfb4cec7..2991967226 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -13,7 +13,19 @@ # limitations under the License. import itertools import logging -from typing import TYPE_CHECKING, Any, Dict, FrozenSet, List, Optional, Set, Tuple +from typing import ( + TYPE_CHECKING, + Any, + Collection, + Dict, + FrozenSet, + List, + Mapping, + Optional, + Sequence, + Set, + Tuple, +) import attr from prometheus_client import Counter @@ -94,7 +106,7 @@ class SyncConfig: @attr.s(slots=True, frozen=True, auto_attribs=True) class TimelineBatch: prev_batch: StreamToken - events: List[EventBase] + events: Sequence[EventBase] limited: bool # A mapping of event ID to the bundled aggregations for the above events. # This is only calculated if limited is true. @@ -512,10 +524,17 @@ class SyncHandler: # ensure that we always include current state in the timeline current_state_ids: FrozenSet[str] = frozenset() if any(e.is_state() for e in recents): + # FIXME(faster_joins): We use the partial state here as + # we don't want to block `/sync` on finishing a lazy join. + # Which should be fine once + # https://github.com/matrix-org/synapse/issues/12989 is resolved, + # since we shouldn't reach here anymore? + # Note that we use the current state as a whitelist for filtering + # `recents`, so partial state is only a problem when a membership + # event turns up in `recents` but has not made it into the current + # state. current_state_ids_map = ( - await self._state_storage_controller.get_current_state_ids( - room_id - ) + await self.store.get_partial_current_state_ids(room_id) ) current_state_ids = frozenset(current_state_ids_map.values()) @@ -584,7 +603,13 @@ class SyncHandler: if any(e.is_state() for e in loaded_recents): # FIXME(faster_joins): We use the partial state here as # we don't want to block `/sync` on finishing a lazy join. - # Is this the correct way of doing it? + # Which should be fine once + # https://github.com/matrix-org/synapse/issues/12989 is resolved, + # since we shouldn't reach here anymore? + # Note that we use the current state as a whitelist for filtering + # `loaded_recents`, so partial state is only a problem when a + # membership event turns up in `loaded_recents` but has not made it + # into the current state. current_state_ids_map = ( await self.store.get_partial_current_state_ids(room_id) ) @@ -632,7 +657,10 @@ class SyncHandler: ) async def get_state_after_event( - self, event_id: str, state_filter: Optional[StateFilter] = None + self, + event_id: str, + state_filter: Optional[StateFilter] = None, + await_full_state: bool = True, ) -> StateMap[str]: """ Get the room state after the given event @@ -640,9 +668,14 @@ class SyncHandler: Args: event_id: event of interest state_filter: The state filter used to fetch state from the database. + await_full_state: if `True`, will block if we do not yet have complete state + at the event and `state_filter` is not satisfied by partial state. + Defaults to `True`. """ state_ids = await self._state_storage_controller.get_state_ids_for_event( - event_id, state_filter=state_filter or StateFilter.all() + event_id, + state_filter=state_filter or StateFilter.all(), + await_full_state=await_full_state, ) # using get_metadata_for_events here (instead of get_event) sidesteps an issue @@ -665,6 +698,7 @@ class SyncHandler: room_id: str, stream_position: StreamToken, state_filter: Optional[StateFilter] = None, + await_full_state: bool = True, ) -> StateMap[str]: """Get the room state at a particular stream position @@ -672,6 +706,9 @@ class SyncHandler: room_id: room for which to get state stream_position: point at which to get state state_filter: The state filter used to fetch state from the database. + await_full_state: if `True`, will block if we do not yet have complete state + at the last event in the room before `stream_position` and + `state_filter` is not satisfied by partial state. Defaults to `True`. """ # FIXME: This gets the state at the latest event before the stream ordering, # which might not be the same as the "current state" of the room at the time @@ -683,7 +720,9 @@ class SyncHandler: if last_event_id: state = await self.get_state_after_event( - last_event_id, state_filter=state_filter or StateFilter.all() + last_event_id, + state_filter=state_filter or StateFilter.all(), + await_full_state=await_full_state, ) else: @@ -857,16 +896,26 @@ class SyncHandler: now_token: StreamToken, full_state: bool, ) -> MutableStateMap[EventBase]: - """Works out the difference in state between the start of the timeline - and the previous sync. + """Works out the difference in state between the end of the previous sync and + the start of the timeline. Args: room_id: batch: The timeline batch for the room that will be sent to the user. sync_config: - since_token: Token of the end of the previous batch. May be None. + since_token: Token of the end of the previous batch. May be `None`. now_token: Token of the end of the current batch. full_state: Whether to force returning the full state. + `lazy_load_members` still applies when `full_state` is `True`. + + Returns: + The state to return in the sync response for the room. + + Clients will overlay this onto the state at the end of the previous sync to + arrive at the state at the start of the timeline. + + Clients will then overlay state events in the timeline to arrive at the + state at the end of the timeline, in preparation for the next sync. """ # TODO(mjark) Check if the state events were received by the server # after the previous sync, since we need to include those state @@ -874,8 +923,17 @@ class SyncHandler: # TODO(mjark) Check for new redactions in the state events. with Measure(self.clock, "compute_state_delta"): + # The memberships needed for events in the timeline. + # Only calculated when `lazy_load_members` is on. + members_to_fetch: Optional[Set[str]] = None + + # A dictionary mapping user IDs to the first event in the timeline sent by + # them. Only calculated when `lazy_load_members` is on. + first_event_by_sender_map: Optional[Dict[str, EventBase]] = None - members_to_fetch = None + # The contribution to the room state from state events in the timeline. + # Only contains the last event for any given state key. + timeline_state: StateMap[str] lazy_load_members = sync_config.filter_collection.lazy_load_members() include_redundant_members = ( @@ -886,10 +944,23 @@ class SyncHandler: # We only request state for the members needed to display the # timeline: - members_to_fetch = { - event.sender # FIXME: we also care about invite targets etc. - for event in batch.events - } + timeline_state = {} + + members_to_fetch = set() + first_event_by_sender_map = {} + for event in batch.events: + # Build the map from user IDs to the first timeline event they sent. + if event.sender not in first_event_by_sender_map: + first_event_by_sender_map[event.sender] = event + + # We need the event's sender, unless their membership was in a + # previous timeline event. + if (EventTypes.Member, event.sender) not in timeline_state: + members_to_fetch.add(event.sender) + # FIXME: we also care about invite targets etc. + + if event.is_state(): + timeline_state[(event.type, event.state_key)] = event.event_id if full_state: # always make sure we LL ourselves so we know we're in the room @@ -899,55 +970,80 @@ class SyncHandler: members_to_fetch.add(sync_config.user.to_string()) state_filter = StateFilter.from_lazy_load_member_list(members_to_fetch) + + # We are happy to use partial state to compute the `/sync` response. + # Since partial state may not include the lazy-loaded memberships we + # require, we fix up the state response afterwards with memberships from + # auth events. + await_full_state = False else: + timeline_state = { + (event.type, event.state_key): event.event_id + for event in batch.events + if event.is_state() + } + state_filter = StateFilter.all() + await_full_state = True - timeline_state = { - (event.type, event.state_key): event.event_id - for event in batch.events - if event.is_state() - } + # Now calculate the state to return in the sync response for the room. + # This is more or less the change in state between the end of the previous + # sync's timeline and the start of the current sync's timeline. + # See the docstring above for details. + state_ids: StateMap[str] if full_state: if batch: - current_state_ids = ( + state_at_timeline_end = ( await self._state_storage_controller.get_state_ids_for_event( - batch.events[-1].event_id, state_filter=state_filter + batch.events[-1].event_id, + state_filter=state_filter, + await_full_state=await_full_state, ) ) - state_ids = ( + state_at_timeline_start = ( await self._state_storage_controller.get_state_ids_for_event( - batch.events[0].event_id, state_filter=state_filter + batch.events[0].event_id, + state_filter=state_filter, + await_full_state=await_full_state, ) ) else: - current_state_ids = await self.get_state_at( - room_id, stream_position=now_token, state_filter=state_filter + state_at_timeline_end = await self.get_state_at( + room_id, + stream_position=now_token, + state_filter=state_filter, + await_full_state=await_full_state, ) - state_ids = current_state_ids + state_at_timeline_start = state_at_timeline_end state_ids = _calculate_state( timeline_contains=timeline_state, - timeline_start=state_ids, - previous={}, - current=current_state_ids, + timeline_start=state_at_timeline_start, + timeline_end=state_at_timeline_end, + previous_timeline_end={}, lazy_load_members=lazy_load_members, ) elif batch.limited: if batch: state_at_timeline_start = ( await self._state_storage_controller.get_state_ids_for_event( - batch.events[0].event_id, state_filter=state_filter + batch.events[0].event_id, + state_filter=state_filter, + await_full_state=await_full_state, ) ) else: # We can get here if the user has ignored the senders of all # the recent events. state_at_timeline_start = await self.get_state_at( - room_id, stream_position=now_token, state_filter=state_filter + room_id, + stream_position=now_token, + state_filter=state_filter, + await_full_state=await_full_state, ) # for now, we disable LL for gappy syncs - see @@ -969,28 +1065,35 @@ class SyncHandler: # is indeed the case. assert since_token is not None state_at_previous_sync = await self.get_state_at( - room_id, stream_position=since_token, state_filter=state_filter + room_id, + stream_position=since_token, + state_filter=state_filter, + await_full_state=await_full_state, ) if batch: - current_state_ids = ( + state_at_timeline_end = ( await self._state_storage_controller.get_state_ids_for_event( - batch.events[-1].event_id, state_filter=state_filter + batch.events[-1].event_id, + state_filter=state_filter, + await_full_state=await_full_state, ) ) else: - # Its not clear how we get here, but empirically we do - # (#5407). Logging has been added elsewhere to try and - # figure out where this state comes from. - current_state_ids = await self.get_state_at( - room_id, stream_position=now_token, state_filter=state_filter + # We can get here if the user has ignored the senders of all + # the recent events. + state_at_timeline_end = await self.get_state_at( + room_id, + stream_position=now_token, + state_filter=state_filter, + await_full_state=await_full_state, ) state_ids = _calculate_state( timeline_contains=timeline_state, timeline_start=state_at_timeline_start, - previous=state_at_previous_sync, - current=current_state_ids, + timeline_end=state_at_timeline_end, + previous_timeline_end=state_at_previous_sync, # we have to include LL members in case LL initial sync missed them lazy_load_members=lazy_load_members, ) @@ -1013,8 +1116,30 @@ class SyncHandler: (EventTypes.Member, member) for member in members_to_fetch ), + await_full_state=False, ) + # If we only have partial state for the room, `state_ids` may be missing the + # memberships we wanted. We attempt to find some by digging through the auth + # events of timeline events. + if lazy_load_members and await self.store.is_partial_state_room(room_id): + assert members_to_fetch is not None + assert first_event_by_sender_map is not None + + additional_state_ids = ( + await self._find_missing_partial_state_memberships( + room_id, members_to_fetch, first_event_by_sender_map, state_ids + ) + ) + state_ids = {**state_ids, **additional_state_ids} + + # At this point, if `lazy_load_members` is enabled, `state_ids` includes + # the memberships of all event senders in the timeline. This is because we + # may not have sent the memberships in a previous sync. + + # When `include_redundant_members` is on, we send all the lazy-loaded + # memberships of event senders. Otherwise we make an effort to limit the set + # of memberships we send to those that we have not already sent to this client. if lazy_load_members and not include_redundant_members: cache_key = (sync_config.user.to_string(), sync_config.device_id) cache = self.get_lazy_loaded_members_cache(cache_key) @@ -1056,6 +1181,99 @@ class SyncHandler: if e.type != EventTypes.Aliases # until MSC2261 or alternative solution } + async def _find_missing_partial_state_memberships( + self, + room_id: str, + members_to_fetch: Collection[str], + events_with_membership_auth: Mapping[str, EventBase], + found_state_ids: StateMap[str], + ) -> StateMap[str]: + """Finds missing memberships from a set of auth events and returns them as a + state map. + + Args: + room_id: The partial state room to find the remaining memberships for. + members_to_fetch: The memberships to find. + events_with_membership_auth: A mapping from user IDs to events whose auth + events are known to contain their membership. + found_state_ids: A dict from (type, state_key) -> state_event_id, containing + memberships that have been previously found. Entries in + `members_to_fetch` that have a membership in `found_state_ids` are + ignored. + + Returns: + A dict from ("m.room.member", state_key) -> state_event_id, containing the + memberships missing from `found_state_ids`. + + Raises: + KeyError: if `events_with_membership_auth` does not have an entry for a + missing membership. Memberships in `found_state_ids` do not need an + entry in `events_with_membership_auth`. + """ + additional_state_ids: MutableStateMap[str] = {} + + # Tracks the missing members for logging purposes. + missing_members = set() + + # Identify memberships missing from `found_state_ids` and pick out the auth + # events in which to look for them. + auth_event_ids: Set[str] = set() + for member in members_to_fetch: + if (EventTypes.Member, member) in found_state_ids: + continue + + missing_members.add(member) + event_with_membership_auth = events_with_membership_auth[member] + auth_event_ids.update(event_with_membership_auth.auth_event_ids()) + + auth_events = await self.store.get_events(auth_event_ids) + + # Run through the missing memberships once more, picking out the memberships + # from the pile of auth events we have just fetched. + for member in members_to_fetch: + if (EventTypes.Member, member) in found_state_ids: + continue + + event_with_membership_auth = events_with_membership_auth[member] + + # Dig through the auth events to find the desired membership. + for auth_event_id in event_with_membership_auth.auth_event_ids(): + # We only store events once we have all their auth events, + # so the auth event must be in the pile we have just + # fetched. + auth_event = auth_events[auth_event_id] + + if ( + auth_event.type == EventTypes.Member + and auth_event.state_key == member + ): + missing_members.remove(member) + additional_state_ids[ + (EventTypes.Member, member) + ] = auth_event.event_id + break + + if missing_members: + # There really shouldn't be any missing memberships now. Either: + # * we couldn't find an auth event, which shouldn't happen because we do + # not persist events with persisting their auth events first, or + # * the set of auth events did not contain a membership we wanted, which + # means our caller didn't compute the events in `members_to_fetch` + # correctly, or we somehow accepted an event whose auth events were + # dodgy. + logger.error( + "Failed to find memberships for %s in partial state room " + "%s in the auth events of %s.", + missing_members, + room_id, + [ + events_with_membership_auth[member].event_id + for member in missing_members + ], + ) + + return additional_state_ids + async def unread_notifs_for_room_id( self, room_id: str, sync_config: SyncConfig ) -> NotifCounts: @@ -1700,7 +1918,11 @@ class SyncHandler: continue if room_id in sync_result_builder.joined_room_ids or has_join: - old_state_ids = await self.get_state_at(room_id, since_token) + old_state_ids = await self.get_state_at( + room_id, + since_token, + state_filter=StateFilter.from_types([(EventTypes.Member, user_id)]), + ) old_mem_ev_id = old_state_ids.get((EventTypes.Member, user_id), None) old_mem_ev = None if old_mem_ev_id: @@ -1726,7 +1948,13 @@ class SyncHandler: newly_left_rooms.append(room_id) else: if not old_state_ids: - old_state_ids = await self.get_state_at(room_id, since_token) + old_state_ids = await self.get_state_at( + room_id, + since_token, + state_filter=StateFilter.from_types( + [(EventTypes.Member, user_id)] + ), + ) old_mem_ev_id = old_state_ids.get( (EventTypes.Member, user_id), None ) @@ -2221,8 +2449,8 @@ def _action_has_highlight(actions: List[JsonDict]) -> bool: def _calculate_state( timeline_contains: StateMap[str], timeline_start: StateMap[str], - previous: StateMap[str], - current: StateMap[str], + timeline_end: StateMap[str], + previous_timeline_end: StateMap[str], lazy_load_members: bool, ) -> StateMap[str]: """Works out what state to include in a sync response. @@ -2230,45 +2458,50 @@ def _calculate_state( Args: timeline_contains: state in the timeline timeline_start: state at the start of the timeline - previous: state at the end of the previous sync (or empty dict + timeline_end: state at the end of the timeline + previous_timeline_end: state at the end of the previous sync (or empty dict if this is an initial sync) - current: state at the end of the timeline lazy_load_members: whether to return members from timeline_start or not. assumes that timeline_start has already been filtered to include only the members the client needs to know about. """ - event_id_to_key = { - e: key - for key, e in itertools.chain( + event_id_to_state_key = { + event_id: state_key + for state_key, event_id in itertools.chain( timeline_contains.items(), - previous.items(), timeline_start.items(), - current.items(), + timeline_end.items(), + previous_timeline_end.items(), ) } - c_ids = set(current.values()) - ts_ids = set(timeline_start.values()) - p_ids = set(previous.values()) - tc_ids = set(timeline_contains.values()) + timeline_end_ids = set(timeline_end.values()) + timeline_start_ids = set(timeline_start.values()) + previous_timeline_end_ids = set(previous_timeline_end.values()) + timeline_contains_ids = set(timeline_contains.values()) # If we are lazyloading room members, we explicitly add the membership events # for the senders in the timeline into the state block returned by /sync, # as we may not have sent them to the client before. We find these membership # events by filtering them out of timeline_start, which has already been filtered # to only include membership events for the senders in the timeline. - # In practice, we can do this by removing them from the p_ids list, - # which is the list of relevant state we know we have already sent to the client. + # In practice, we can do this by removing them from the previous_timeline_end_ids + # list, which is the list of relevant state we know we have already sent to the + # client. # see https://github.com/matrix-org/synapse/pull/2970/files/efcdacad7d1b7f52f879179701c7e0d9b763511f#r204732809 if lazy_load_members: - p_ids.difference_update( + previous_timeline_end_ids.difference_update( e for t, e in timeline_start.items() if t[0] == EventTypes.Member ) - state_ids = ((c_ids | ts_ids) - p_ids) - tc_ids + state_ids = ( + (timeline_end_ids | timeline_start_ids) + - previous_timeline_end_ids + - timeline_contains_ids + ) - return {event_id_to_key[e]: e for e in state_ids} + return {event_id_to_state_key[e]: e for e in state_ids} @attr.s(slots=True, auto_attribs=True) diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py index 4ff840ca0e..26aaabfb34 100644 --- a/synapse/http/servlet.py +++ b/synapse/http/servlet.py @@ -23,9 +23,12 @@ from typing import ( Optional, Sequence, Tuple, + Type, + TypeVar, overload, ) +from pydantic import BaseModel, ValidationError from typing_extensions import Literal from twisted.web.server import Request @@ -694,6 +697,28 @@ def parse_json_object_from_request( return content +Model = TypeVar("Model", bound=BaseModel) + + +def parse_and_validate_json_object_from_request( + request: Request, model_type: Type[Model] +) -> Model: + """Parse a JSON object from the body of a twisted HTTP request, then deserialise and + validate using the given pydantic model. + + Raises: + SynapseError if the request body couldn't be decoded as JSON or + if it wasn't a JSON object. + """ + content = parse_json_object_from_request(request, allow_empty_body=False) + try: + instance = model_type.parse_obj(content) + except ValidationError as e: + raise SynapseError(HTTPStatus.BAD_REQUEST, str(e), errcode=Codes.BAD_JSON) + + return instance + + def assert_params_in_dict(body: JsonDict, required: Iterable[str]) -> None: absent = [] for k in required: diff --git a/synapse/logging/tracing.py b/synapse/logging/tracing.py index 1b509ffdcd..a250bbb204 100644 --- a/synapse/logging/tracing.py +++ b/synapse/logging/tracing.py @@ -280,6 +280,19 @@ class SynapseTags: # The name of the external cache CACHE_NAME = "cache.name" + # Used to tag function arguments + # + # Tag a named arg. The name of the argument should be appended to this prefix. + FUNC_ARG_PREFIX = "ARG." + # Tag extra variadic number of positional arguments (`def foo(first, second, *extras)`) + FUNC_ARGS = "args" + # Tag keyword args + FUNC_KWARGS = "kwargs" + + # Some intermediate result that's interesting to the function. The label for + # the result should be appended to this prefix. + RESULT_PREFIX = "RESULT." + class SynapseBaggage: FORCE_TRACING = "synapse-force-tracing" @@ -796,7 +809,6 @@ def _custom_sync_async_decorator( """ Decorates a function that is sync or async (coroutines), or that returns a Twisted `Deferred`. The custom business logic of the decorator goes in `wrapping_logic`. - Example usage: ```py # Decorator to time the function and log it out @@ -812,7 +824,6 @@ def _custom_sync_async_decorator( logger.info("%s took %s seconds", func.__name__, duration) return _custom_sync_async_decorator(func, _wrapping_logic) ``` - Args: func: The function to be decorated wrapping_logic: The business logic of your custom decorator. @@ -928,9 +939,9 @@ def tag_args(func: Callable[P, R]) -> Callable[P, R]: # first argument only if it's named `self` or `cls`. This isn't fool-proof # but handles the idiomatic cases. for i, arg in enumerate(args[1:], start=1): # type: ignore[index] - set_attribute("ARG_" + argspec.args[i], str(arg)) - set_attribute("args", str(args[len(argspec.args) :])) # type: ignore[index] - set_attribute("kwargs", str(kwargs)) + set_attribute(SynapseTags.FUNC_ARG_PREFIX + argspec.args[i], str(arg)) + set_attribute(SynapseTags.FUNC_ARGS, str(args[len(argspec.args) :])) # type: ignore[index] + set_attribute(SynapseTags.FUNC_KWARGS, str(kwargs)) yield return _custom_sync_async_decorator(func, _wrapping_logic) diff --git a/synapse/push/baserules.py b/synapse/push/baserules.py index 6c0cc5a6ce..440205e80c 100644 --- a/synapse/push/baserules.py +++ b/synapse/push/baserules.py @@ -14,128 +14,235 @@ # See the License for the specific language governing permissions and # limitations under the License. -import copy -from typing import Any, Dict, List - -from synapse.push.rulekinds import PRIORITY_CLASS_INVERSE_MAP, PRIORITY_CLASS_MAP +""" +Push rules is the system used to determine which events trigger a push (and a +bump in notification counts). + +This consists of a list of "push rules" for each user, where a push rule is a +pair of "conditions" and "actions". When a user receives an event Synapse +iterates over the list of push rules until it finds one where all the conditions +match the event, at which point "actions" describe the outcome (e.g. notify, +highlight, etc). + +Push rules are split up into 5 different "kinds" (aka "priority classes"), which +are run in order: + 1. Override — highest priority rules, e.g. always ignore notices + 2. Content — content specific rules, e.g. @ notifications + 3. Room — per room rules, e.g. enable/disable notifications for all messages + in a room + 4. Sender — per sender rules, e.g. never notify for messages from a given + user + 5. Underride — the lowest priority "default" rules, e.g. notify for every + message. + +The set of "base rules" are the list of rules that every user has by default. A +user can modify their copy of the push rules in one of three ways: + + 1. Adding a new push rule of a certain kind + 2. Changing the actions of a base rule + 3. Enabling/disabling a base rule. + +The base rules are split into whether they come before or after a particular +kind, so the order of push rule evaluation would be: base rules for before +"override" kind, user defined "override" rules, base rules after "override" +kind, etc, etc. +""" + +import itertools +import logging +from typing import Dict, Iterator, List, Mapping, Sequence, Tuple, Union + +import attr + +from synapse.config.experimental import ExperimentalConfig +from synapse.push.rulekinds import PRIORITY_CLASS_MAP + +logger = logging.getLogger(__name__) + + +@attr.s(auto_attribs=True, slots=True, frozen=True) +class PushRule: + """A push rule + + Attributes: + rule_id: a unique ID for this rule + priority_class: what "kind" of push rule this is (see + `PRIORITY_CLASS_MAP` for mapping between int and kind) + conditions: the sequence of conditions that all need to match + actions: the actions to apply if all conditions are met + default: is this a base rule? + default_enabled: is this enabled by default? + """ + rule_id: str + priority_class: int + conditions: Sequence[Mapping[str, str]] + actions: Sequence[Union[str, Mapping]] + default: bool = False + default_enabled: bool = True -def list_with_base_rules(rawrules: List[Dict[str, Any]]) -> List[Dict[str, Any]]: - """Combine the list of rules set by the user with the default push rules - Args: - rawrules: The rules the user has modified or set. +@attr.s(auto_attribs=True, slots=True, frozen=True, weakref_slot=False) +class PushRules: + """A collection of push rules for an account. - Returns: - A new list with the rules set by the user combined with the defaults. + Can be iterated over, producing push rules in priority order. """ - ruleslist = [] - # Grab the base rules that the user has modified. - # The modified base rules have a priority_class of -1. - modified_base_rules = {r["rule_id"]: r for r in rawrules if r["priority_class"] < 0} + # A mapping from rule ID to push rule that overrides a base rule. These will + # be returned instead of the base rule. + overriden_base_rules: Dict[str, PushRule] = attr.Factory(dict) + + # The following stores the custom push rules at each priority class. + # + # We keep these separate (rather than combining into one big list) to avoid + # copying the base rules around all the time. + override: List[PushRule] = attr.Factory(list) + content: List[PushRule] = attr.Factory(list) + room: List[PushRule] = attr.Factory(list) + sender: List[PushRule] = attr.Factory(list) + underride: List[PushRule] = attr.Factory(list) + + def __iter__(self) -> Iterator[PushRule]: + # When iterating over the push rules we need to return the base rules + # interspersed at the correct spots. + for rule in itertools.chain( + BASE_PREPEND_OVERRIDE_RULES, + self.override, + BASE_APPEND_OVERRIDE_RULES, + self.content, + BASE_APPEND_CONTENT_RULES, + self.room, + self.sender, + self.underride, + BASE_APPEND_UNDERRIDE_RULES, + ): + # Check if a base rule has been overriden by a custom rule. If so + # return that instead. + override_rule = self.overriden_base_rules.get(rule.rule_id) + if override_rule: + yield override_rule + else: + yield rule + + def __len__(self) -> int: + # The length is mostly used by caches to get a sense of "size" / amount + # of memory this object is using, so we only count the number of custom + # rules. + return ( + len(self.overriden_base_rules) + + len(self.override) + + len(self.content) + + len(self.room) + + len(self.sender) + + len(self.underride) + ) - # Remove the modified base rules from the list, They'll be added back - # in the default positions in the list. - rawrules = [r for r in rawrules if r["priority_class"] >= 0] - # shove the server default rules for each kind onto the end of each - current_prio_class = list(PRIORITY_CLASS_INVERSE_MAP)[-1] +@attr.s(auto_attribs=True, slots=True, frozen=True, weakref_slot=False) +class FilteredPushRules: + """A wrapper around `PushRules` that filters out disabled experimental push + rules, and includes the "enabled" state for each rule when iterated over. + """ - ruleslist.extend( - make_base_prepend_rules( - PRIORITY_CLASS_INVERSE_MAP[current_prio_class], modified_base_rules - ) - ) + push_rules: PushRules + enabled_map: Dict[str, bool] + experimental_config: ExperimentalConfig - for r in rawrules: - if r["priority_class"] < current_prio_class: - while r["priority_class"] < current_prio_class: - ruleslist.extend( - make_base_append_rules( - PRIORITY_CLASS_INVERSE_MAP[current_prio_class], - modified_base_rules, - ) - ) - current_prio_class -= 1 - if current_prio_class > 0: - ruleslist.extend( - make_base_prepend_rules( - PRIORITY_CLASS_INVERSE_MAP[current_prio_class], - modified_base_rules, - ) - ) - - ruleslist.append(r) - - while current_prio_class > 0: - ruleslist.extend( - make_base_append_rules( - PRIORITY_CLASS_INVERSE_MAP[current_prio_class], modified_base_rules - ) - ) - current_prio_class -= 1 - if current_prio_class > 0: - ruleslist.extend( - make_base_prepend_rules( - PRIORITY_CLASS_INVERSE_MAP[current_prio_class], modified_base_rules - ) - ) + def __iter__(self) -> Iterator[Tuple[PushRule, bool]]: + for rule in self.push_rules: + if not _is_experimental_rule_enabled( + rule.rule_id, self.experimental_config + ): + continue - return ruleslist + enabled = self.enabled_map.get(rule.rule_id, rule.default_enabled) + yield rule, enabled -def make_base_append_rules( - kind: str, modified_base_rules: Dict[str, Dict[str, Any]] -) -> List[Dict[str, Any]]: - rules = [] + def __len__(self) -> int: + return len(self.push_rules) - if kind == "override": - rules = BASE_APPEND_OVERRIDE_RULES - elif kind == "underride": - rules = BASE_APPEND_UNDERRIDE_RULES - elif kind == "content": - rules = BASE_APPEND_CONTENT_RULES - # Copy the rules before modifying them - rules = copy.deepcopy(rules) - for r in rules: - # Only modify the actions, keep the conditions the same. - assert isinstance(r["rule_id"], str) - modified = modified_base_rules.get(r["rule_id"]) - if modified: - r["actions"] = modified["actions"] +DEFAULT_EMPTY_PUSH_RULES = PushRules() - return rules +def compile_push_rules(rawrules: List[PushRule]) -> PushRules: + """Given a set of custom push rules return a `PushRules` instance (which + includes the base rules). + """ + + if not rawrules: + # Fast path to avoid allocating empty lists when there are no custom + # rules for the user. + return DEFAULT_EMPTY_PUSH_RULES + + rules = PushRules() -def make_base_prepend_rules( - kind: str, - modified_base_rules: Dict[str, Dict[str, Any]], -) -> List[Dict[str, Any]]: - rules = [] + for rule in rawrules: + # We need to decide which bucket each custom push rule goes into. - if kind == "override": - rules = BASE_PREPEND_OVERRIDE_RULES + # If it has the same ID as a base rule then it overrides that... + overriden_base_rule = BASE_RULES_BY_ID.get(rule.rule_id) + if overriden_base_rule: + rules.overriden_base_rules[rule.rule_id] = attr.evolve( + overriden_base_rule, actions=rule.actions + ) + continue + + # ... otherwise it gets added to the appropriate priority class bucket + collection: List[PushRule] + if rule.priority_class == 5: + collection = rules.override + elif rule.priority_class == 4: + collection = rules.content + elif rule.priority_class == 3: + collection = rules.room + elif rule.priority_class == 2: + collection = rules.sender + elif rule.priority_class == 1: + collection = rules.underride + elif rule.priority_class <= 0: + logger.info( + "Got rule with priority class less than zero, but doesn't override a base rule: %s", + rule, + ) + continue + else: + # We log and continue here so as not to break event sending + logger.error("Unknown priority class: %", rule.priority_class) + continue - # Copy the rules before modifying them - rules = copy.deepcopy(rules) - for r in rules: - # Only modify the actions, keep the conditions the same. - assert isinstance(r["rule_id"], str) - modified = modified_base_rules.get(r["rule_id"]) - if modified: - r["actions"] = modified["actions"] + collection.append(rule) return rules -# We have to annotate these types, otherwise mypy infers them as -# `List[Dict[str, Sequence[Collection[str]]]]`. -BASE_APPEND_CONTENT_RULES: List[Dict[str, Any]] = [ - { - "rule_id": "global/content/.m.rule.contains_user_name", - "conditions": [ +def _is_experimental_rule_enabled( + rule_id: str, experimental_config: ExperimentalConfig +) -> bool: + """Used by `FilteredPushRules` to filter out experimental rules when they + have not been enabled. + """ + if ( + rule_id == "global/override/.org.matrix.msc3786.rule.room.server_acl" + and not experimental_config.msc3786_enabled + ): + return False + if ( + rule_id == "global/underride/.org.matrix.msc3772.thread_reply" + and not experimental_config.msc3772_enabled + ): + return False + return True + + +BASE_APPEND_CONTENT_RULES = [ + PushRule( + default=True, + priority_class=PRIORITY_CLASS_MAP["content"], + rule_id="global/content/.m.rule.contains_user_name", + conditions=[ { "kind": "event_match", "key": "content.body", @@ -143,29 +250,33 @@ BASE_APPEND_CONTENT_RULES: List[Dict[str, Any]] = [ "pattern_type": "user_localpart", } ], - "actions": [ + actions=[ "notify", {"set_tweak": "sound", "value": "default"}, {"set_tweak": "highlight"}, ], - } + ) ] -BASE_PREPEND_OVERRIDE_RULES: List[Dict[str, Any]] = [ - { - "rule_id": "global/override/.m.rule.master", - "enabled": False, - "conditions": [], - "actions": ["dont_notify"], - } +BASE_PREPEND_OVERRIDE_RULES = [ + PushRule( + default=True, + priority_class=PRIORITY_CLASS_MAP["override"], + rule_id="global/override/.m.rule.master", + default_enabled=False, + conditions=[], + actions=["dont_notify"], + ) ] -BASE_APPEND_OVERRIDE_RULES: List[Dict[str, Any]] = [ - { - "rule_id": "global/override/.m.rule.suppress_notices", - "conditions": [ +BASE_APPEND_OVERRIDE_RULES = [ + PushRule( + default=True, + priority_class=PRIORITY_CLASS_MAP["override"], + rule_id="global/override/.m.rule.suppress_notices", + conditions=[ { "kind": "event_match", "key": "content.msgtype", @@ -173,13 +284,15 @@ BASE_APPEND_OVERRIDE_RULES: List[Dict[str, Any]] = [ "_cache_key": "_suppress_notices", } ], - "actions": ["dont_notify"], - }, + actions=["dont_notify"], + ), # NB. .m.rule.invite_for_me must be higher prio than .m.rule.member_event # otherwise invites will be matched by .m.rule.member_event - { - "rule_id": "global/override/.m.rule.invite_for_me", - "conditions": [ + PushRule( + default=True, + priority_class=PRIORITY_CLASS_MAP["override"], + rule_id="global/override/.m.rule.invite_for_me", + conditions=[ { "kind": "event_match", "key": "type", @@ -195,21 +308,23 @@ BASE_APPEND_OVERRIDE_RULES: List[Dict[str, Any]] = [ # Match the requester's MXID. {"kind": "event_match", "key": "state_key", "pattern_type": "user_id"}, ], - "actions": [ + actions=[ "notify", {"set_tweak": "sound", "value": "default"}, {"set_tweak": "highlight", "value": False}, ], - }, + ), # Will we sometimes want to know about people joining and leaving? # Perhaps: if so, this could be expanded upon. Seems the most usual case # is that we don't though. We add this override rule so that even if # the room rule is set to notify, we don't get notifications about # join/leave/avatar/displayname events. # See also: https://matrix.org/jira/browse/SYN-607 - { - "rule_id": "global/override/.m.rule.member_event", - "conditions": [ + PushRule( + default=True, + priority_class=PRIORITY_CLASS_MAP["override"], + rule_id="global/override/.m.rule.member_event", + conditions=[ { "kind": "event_match", "key": "type", @@ -217,24 +332,28 @@ BASE_APPEND_OVERRIDE_RULES: List[Dict[str, Any]] = [ "_cache_key": "_member", } ], - "actions": ["dont_notify"], - }, + actions=["dont_notify"], + ), # This was changed from underride to override so it's closer in priority # to the content rules where the user name highlight rule lives. This # way a room rule is lower priority than both but a custom override rule # is higher priority than both. - { - "rule_id": "global/override/.m.rule.contains_display_name", - "conditions": [{"kind": "contains_display_name"}], - "actions": [ + PushRule( + default=True, + priority_class=PRIORITY_CLASS_MAP["override"], + rule_id="global/override/.m.rule.contains_display_name", + conditions=[{"kind": "contains_display_name"}], + actions=[ "notify", {"set_tweak": "sound", "value": "default"}, {"set_tweak": "highlight"}, ], - }, - { - "rule_id": "global/override/.m.rule.roomnotif", - "conditions": [ + ), + PushRule( + default=True, + priority_class=PRIORITY_CLASS_MAP["override"], + rule_id="global/override/.m.rule.roomnotif", + conditions=[ { "kind": "event_match", "key": "content.body", @@ -247,11 +366,13 @@ BASE_APPEND_OVERRIDE_RULES: List[Dict[str, Any]] = [ "_cache_key": "_roomnotif_pl", }, ], - "actions": ["notify", {"set_tweak": "highlight", "value": True}], - }, - { - "rule_id": "global/override/.m.rule.tombstone", - "conditions": [ + actions=["notify", {"set_tweak": "highlight", "value": True}], + ), + PushRule( + default=True, + priority_class=PRIORITY_CLASS_MAP["override"], + rule_id="global/override/.m.rule.tombstone", + conditions=[ { "kind": "event_match", "key": "type", @@ -265,11 +386,13 @@ BASE_APPEND_OVERRIDE_RULES: List[Dict[str, Any]] = [ "_cache_key": "_tombstone_statekey", }, ], - "actions": ["notify", {"set_tweak": "highlight", "value": True}], - }, - { - "rule_id": "global/override/.m.rule.reaction", - "conditions": [ + actions=["notify", {"set_tweak": "highlight", "value": True}], + ), + PushRule( + default=True, + priority_class=PRIORITY_CLASS_MAP["override"], + rule_id="global/override/.m.rule.reaction", + conditions=[ { "kind": "event_match", "key": "type", @@ -277,14 +400,16 @@ BASE_APPEND_OVERRIDE_RULES: List[Dict[str, Any]] = [ "_cache_key": "_reaction", } ], - "actions": ["dont_notify"], - }, + actions=["dont_notify"], + ), # XXX: This is an experimental rule that is only enabled if msc3786_enabled # is enabled, if it is not the rule gets filtered out in _load_rules() in # PushRulesWorkerStore - { - "rule_id": "global/override/.org.matrix.msc3786.rule.room.server_acl", - "conditions": [ + PushRule( + default=True, + priority_class=PRIORITY_CLASS_MAP["override"], + rule_id="global/override/.org.matrix.msc3786.rule.room.server_acl", + conditions=[ { "kind": "event_match", "key": "type", @@ -298,15 +423,17 @@ BASE_APPEND_OVERRIDE_RULES: List[Dict[str, Any]] = [ "_cache_key": "_room_server_acl_state_key", }, ], - "actions": [], - }, + actions=[], + ), ] -BASE_APPEND_UNDERRIDE_RULES: List[Dict[str, Any]] = [ - { - "rule_id": "global/underride/.m.rule.call", - "conditions": [ +BASE_APPEND_UNDERRIDE_RULES = [ + PushRule( + default=True, + priority_class=PRIORITY_CLASS_MAP["underride"], + rule_id="global/underride/.m.rule.call", + conditions=[ { "kind": "event_match", "key": "type", @@ -314,17 +441,19 @@ BASE_APPEND_UNDERRIDE_RULES: List[Dict[str, Any]] = [ "_cache_key": "_call", } ], - "actions": [ + actions=[ "notify", {"set_tweak": "sound", "value": "ring"}, {"set_tweak": "highlight", "value": False}, ], - }, + ), # XXX: once m.direct is standardised everywhere, we should use it to detect # a DM from the user's perspective rather than this heuristic. - { - "rule_id": "global/underride/.m.rule.room_one_to_one", - "conditions": [ + PushRule( + default=True, + priority_class=PRIORITY_CLASS_MAP["underride"], + rule_id="global/underride/.m.rule.room_one_to_one", + conditions=[ {"kind": "room_member_count", "is": "2", "_cache_key": "member_count"}, { "kind": "event_match", @@ -333,17 +462,19 @@ BASE_APPEND_UNDERRIDE_RULES: List[Dict[str, Any]] = [ "_cache_key": "_message", }, ], - "actions": [ + actions=[ "notify", {"set_tweak": "sound", "value": "default"}, {"set_tweak": "highlight", "value": False}, ], - }, + ), # XXX: this is going to fire for events which aren't m.room.messages # but are encrypted (e.g. m.call.*)... - { - "rule_id": "global/underride/.m.rule.encrypted_room_one_to_one", - "conditions": [ + PushRule( + default=True, + priority_class=PRIORITY_CLASS_MAP["underride"], + rule_id="global/underride/.m.rule.encrypted_room_one_to_one", + conditions=[ {"kind": "room_member_count", "is": "2", "_cache_key": "member_count"}, { "kind": "event_match", @@ -352,15 +483,17 @@ BASE_APPEND_UNDERRIDE_RULES: List[Dict[str, Any]] = [ "_cache_key": "_encrypted", }, ], - "actions": [ + actions=[ "notify", {"set_tweak": "sound", "value": "default"}, {"set_tweak": "highlight", "value": False}, ], - }, - { - "rule_id": "global/underride/.org.matrix.msc3772.thread_reply", - "conditions": [ + ), + PushRule( + default=True, + priority_class=PRIORITY_CLASS_MAP["underride"], + rule_id="global/underride/.org.matrix.msc3772.thread_reply", + conditions=[ { "kind": "org.matrix.msc3772.relation_match", "rel_type": "m.thread", @@ -368,11 +501,13 @@ BASE_APPEND_UNDERRIDE_RULES: List[Dict[str, Any]] = [ "sender_type": "user_id", } ], - "actions": ["notify", {"set_tweak": "highlight", "value": False}], - }, - { - "rule_id": "global/underride/.m.rule.message", - "conditions": [ + actions=["notify", {"set_tweak": "highlight", "value": False}], + ), + PushRule( + default=True, + priority_class=PRIORITY_CLASS_MAP["underride"], + rule_id="global/underride/.m.rule.message", + conditions=[ { "kind": "event_match", "key": "type", @@ -380,13 +515,15 @@ BASE_APPEND_UNDERRIDE_RULES: List[Dict[str, Any]] = [ "_cache_key": "_message", } ], - "actions": ["notify", {"set_tweak": "highlight", "value": False}], - }, + actions=["notify", {"set_tweak": "highlight", "value": False}], + ), # XXX: this is going to fire for events which aren't m.room.messages # but are encrypted (e.g. m.call.*)... - { - "rule_id": "global/underride/.m.rule.encrypted", - "conditions": [ + PushRule( + default=True, + priority_class=PRIORITY_CLASS_MAP["underride"], + rule_id="global/underride/.m.rule.encrypted", + conditions=[ { "kind": "event_match", "key": "type", @@ -394,11 +531,13 @@ BASE_APPEND_UNDERRIDE_RULES: List[Dict[str, Any]] = [ "_cache_key": "_encrypted", } ], - "actions": ["notify", {"set_tweak": "highlight", "value": False}], - }, - { - "rule_id": "global/underride/.im.vector.jitsi", - "conditions": [ + actions=["notify", {"set_tweak": "highlight", "value": False}], + ), + PushRule( + default=True, + priority_class=PRIORITY_CLASS_MAP["underride"], + rule_id="global/underride/.im.vector.jitsi", + conditions=[ { "kind": "event_match", "key": "type", @@ -418,29 +557,27 @@ BASE_APPEND_UNDERRIDE_RULES: List[Dict[str, Any]] = [ "_cache_key": "_is_state_event", }, ], - "actions": ["notify", {"set_tweak": "highlight", "value": False}], - }, + actions=["notify", {"set_tweak": "highlight", "value": False}], + ), ] BASE_RULE_IDS = set() +BASE_RULES_BY_ID: Dict[str, PushRule] = {} + for r in BASE_APPEND_CONTENT_RULES: - r["priority_class"] = PRIORITY_CLASS_MAP["content"] - r["default"] = True - BASE_RULE_IDS.add(r["rule_id"]) + BASE_RULE_IDS.add(r.rule_id) + BASE_RULES_BY_ID[r.rule_id] = r for r in BASE_PREPEND_OVERRIDE_RULES: - r["priority_class"] = PRIORITY_CLASS_MAP["override"] - r["default"] = True - BASE_RULE_IDS.add(r["rule_id"]) + BASE_RULE_IDS.add(r.rule_id) + BASE_RULES_BY_ID[r.rule_id] = r for r in BASE_APPEND_OVERRIDE_RULES: - r["priority_class"] = PRIORITY_CLASS_MAP["override"] - r["default"] = True - BASE_RULE_IDS.add(r["rule_id"]) + BASE_RULE_IDS.add(r.rule_id) + BASE_RULES_BY_ID[r.rule_id] = r for r in BASE_APPEND_UNDERRIDE_RULES: - r["priority_class"] = PRIORITY_CLASS_MAP["underride"] - r["default"] = True - BASE_RULE_IDS.add(r["rule_id"]) + BASE_RULE_IDS.add(r.rule_id) + BASE_RULES_BY_ID[r.rule_id] = r diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 713dcf6950..ccd512be54 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -15,7 +15,18 @@ import itertools import logging -from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Set, Tuple, Union +from typing import ( + TYPE_CHECKING, + Collection, + Dict, + Iterable, + List, + Mapping, + Optional, + Set, + Tuple, + Union, +) from prometheus_client import Counter @@ -30,6 +41,7 @@ from synapse.util.caches import register_cache from synapse.util.metrics import measure_func from synapse.visibility import filter_event_for_clients_with_state +from .baserules import FilteredPushRules, PushRule from .push_rule_evaluator import PushRuleEvaluatorForEvent if TYPE_CHECKING: @@ -112,7 +124,7 @@ class BulkPushRuleEvaluator: async def _get_rules_for_event( self, event: EventBase, - ) -> Dict[str, List[Dict[str, Any]]]: + ) -> Dict[str, FilteredPushRules]: """Get the push rules for all users who may need to be notified about the event. @@ -186,7 +198,7 @@ class BulkPushRuleEvaluator: return pl_event.content if pl_event else {}, sender_level async def _get_mutual_relations( - self, event: EventBase, rules: Iterable[Dict[str, Any]] + self, event: EventBase, rules: Iterable[Tuple[PushRule, bool]] ) -> Dict[str, Set[Tuple[str, str]]]: """ Fetch event metadata for events which related to the same event as the given event. @@ -216,12 +228,11 @@ class BulkPushRuleEvaluator: # Pre-filter to figure out which relation types are interesting. rel_types = set() - for rule in rules: - # Skip disabled rules. - if "enabled" in rule and not rule["enabled"]: + for rule, enabled in rules: + if not enabled: continue - for condition in rule["conditions"]: + for condition in rule.conditions: if condition["kind"] != "org.matrix.msc3772.relation_match": continue @@ -254,7 +265,7 @@ class BulkPushRuleEvaluator: count_as_unread = _should_count_as_unread(event, context) rules_by_user = await self._get_rules_for_event(event) - actions_by_user: Dict[str, List[Union[dict, str]]] = {} + actions_by_user: Dict[str, Collection[Union[Mapping, str]]] = {} room_member_count = await self.store.get_number_joined_users_in_room( event.room_id @@ -317,15 +328,13 @@ class BulkPushRuleEvaluator: # current user, it'll be added to the dict later. actions_by_user[uid] = [] - for rule in rules: - if "enabled" in rule and not rule["enabled"]: + for rule, enabled in rules: + if not enabled: continue - matches = evaluator.check_conditions( - rule["conditions"], uid, display_name - ) + matches = evaluator.check_conditions(rule.conditions, uid, display_name) if matches: - actions = [x for x in rule["actions"] if x != "dont_notify"] + actions = [x for x in rule.actions if x != "dont_notify"] if actions and "notify" in actions: # Push rules say we should notify the user of this event actions_by_user[uid] = actions diff --git a/synapse/push/clientformat.py b/synapse/push/clientformat.py index 5117ef6854..73618d9234 100644 --- a/synapse/push/clientformat.py +++ b/synapse/push/clientformat.py @@ -18,16 +18,15 @@ from typing import Any, Dict, List, Optional from synapse.push.rulekinds import PRIORITY_CLASS_INVERSE_MAP, PRIORITY_CLASS_MAP from synapse.types import UserID +from .baserules import FilteredPushRules, PushRule + def format_push_rules_for_user( - user: UserID, ruleslist: List + user: UserID, ruleslist: FilteredPushRules ) -> Dict[str, Dict[str, list]]: """Converts a list of rawrules and a enabled map into nested dictionaries to match the Matrix client-server format for push rules""" - # We're going to be mutating this a lot, so do a deep copy - ruleslist = copy.deepcopy(ruleslist) - rules: Dict[str, Dict[str, List[Dict[str, Any]]]] = { "global": {}, "device": {}, @@ -35,11 +34,30 @@ def format_push_rules_for_user( rules["global"] = _add_empty_priority_class_arrays(rules["global"]) - for r in ruleslist: - template_name = _priority_class_to_template_name(r["priority_class"]) + for r, enabled in ruleslist: + template_name = _priority_class_to_template_name(r.priority_class) + + rulearray = rules["global"][template_name] + + template_rule = _rule_to_template(r) + if not template_rule: + continue + + rulearray.append(template_rule) + + template_rule["enabled"] = enabled + + if "conditions" not in template_rule: + # Not all formatted rules have explicit conditions, e.g. "room" + # rules omit them as they can be derived from the kind and rule ID. + # + # If the formatted rule has no conditions then we can skip the + # formatting of conditions. + continue # Remove internal stuff. - for c in r["conditions"]: + template_rule["conditions"] = copy.deepcopy(template_rule["conditions"]) + for c in template_rule["conditions"]: c.pop("_cache_key", None) pattern_type = c.pop("pattern_type", None) @@ -52,16 +70,6 @@ def format_push_rules_for_user( if sender_type == "user_id": c["sender"] = user.to_string() - rulearray = rules["global"][template_name] - - template_rule = _rule_to_template(r) - if template_rule: - if "enabled" in r: - template_rule["enabled"] = r["enabled"] - else: - template_rule["enabled"] = True - rulearray.append(template_rule) - return rules @@ -71,24 +79,24 @@ def _add_empty_priority_class_arrays(d: Dict[str, list]) -> Dict[str, list]: return d -def _rule_to_template(rule: Dict[str, Any]) -> Optional[Dict[str, Any]]: - unscoped_rule_id = None - if "rule_id" in rule: - unscoped_rule_id = _rule_id_from_namespaced(rule["rule_id"]) +def _rule_to_template(rule: PushRule) -> Optional[Dict[str, Any]]: + templaterule: Dict[str, Any] + + unscoped_rule_id = _rule_id_from_namespaced(rule.rule_id) - template_name = _priority_class_to_template_name(rule["priority_class"]) + template_name = _priority_class_to_template_name(rule.priority_class) if template_name in ["override", "underride"]: - templaterule = {k: rule[k] for k in ["conditions", "actions"]} + templaterule = {"conditions": rule.conditions, "actions": rule.actions} elif template_name in ["sender", "room"]: - templaterule = {"actions": rule["actions"]} - unscoped_rule_id = rule["conditions"][0]["pattern"] + templaterule = {"actions": rule.actions} + unscoped_rule_id = rule.conditions[0]["pattern"] elif template_name == "content": - if len(rule["conditions"]) != 1: + if len(rule.conditions) != 1: return None - thecond = rule["conditions"][0] + thecond = rule.conditions[0] if "pattern" not in thecond: return None - templaterule = {"actions": rule["actions"]} + templaterule = {"actions": rule.actions} templaterule["pattern"] = thecond["pattern"] else: # This should not be reached unless this function is not kept in sync @@ -97,8 +105,8 @@ def _rule_to_template(rule: Dict[str, Any]) -> Optional[Dict[str, Any]]: if unscoped_rule_id: templaterule["rule_id"] = unscoped_rule_id - if "default" in rule: - templaterule["default"] = rule["default"] + if rule.default: + templaterule["default"] = True return templaterule diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py index 2e8a017add..3c5632cd91 100644 --- a/synapse/push/push_rule_evaluator.py +++ b/synapse/push/push_rule_evaluator.py @@ -15,7 +15,18 @@ import logging import re -from typing import Any, Dict, List, Mapping, Optional, Pattern, Set, Tuple, Union +from typing import ( + Any, + Dict, + List, + Mapping, + Optional, + Pattern, + Sequence, + Set, + Tuple, + Union, +) from matrix_common.regex import glob_to_regex, to_word_pattern @@ -32,14 +43,14 @@ INEQUALITY_EXPR = re.compile("^([=<>]*)([0-9]*)$") def _room_member_count( - ev: EventBase, condition: Dict[str, Any], room_member_count: int + ev: EventBase, condition: Mapping[str, Any], room_member_count: int ) -> bool: return _test_ineq_condition(condition, room_member_count) def _sender_notification_permission( ev: EventBase, - condition: Dict[str, Any], + condition: Mapping[str, Any], sender_power_level: int, power_levels: Dict[str, Union[int, Dict[str, int]]], ) -> bool: @@ -54,7 +65,7 @@ def _sender_notification_permission( return sender_power_level >= room_notif_level -def _test_ineq_condition(condition: Dict[str, Any], number: int) -> bool: +def _test_ineq_condition(condition: Mapping[str, Any], number: int) -> bool: if "is" not in condition: return False m = INEQUALITY_EXPR.match(condition["is"]) @@ -137,7 +148,7 @@ class PushRuleEvaluatorForEvent: self._condition_cache: Dict[str, bool] = {} def check_conditions( - self, conditions: List[dict], uid: str, display_name: Optional[str] + self, conditions: Sequence[Mapping], uid: str, display_name: Optional[str] ) -> bool: """ Returns true if a user's conditions/user ID/display name match the event. @@ -169,7 +180,7 @@ class PushRuleEvaluatorForEvent: return True def matches( - self, condition: Dict[str, Any], user_id: str, display_name: Optional[str] + self, condition: Mapping[str, Any], user_id: str, display_name: Optional[str] ) -> bool: """ Returns true if a user's condition/user ID/display name match the event. @@ -204,7 +215,7 @@ class PushRuleEvaluatorForEvent: # endpoint with an unknown kind, see _rule_tuple_from_request_object. return True - def _event_match(self, condition: dict, user_id: str) -> bool: + def _event_match(self, condition: Mapping, user_id: str) -> bool: """ Check an "event_match" push rule condition. @@ -269,7 +280,7 @@ class PushRuleEvaluatorForEvent: return bool(r.search(body)) - def _relation_match(self, condition: dict, user_id: str) -> bool: + def _relation_match(self, condition: Mapping, user_id: str) -> bool: """ Check an "relation_match" push rule condition. diff --git a/synapse/res/templates/account_previously_renewed.html b/synapse/res/templates/account_previously_renewed.html index b751359bdf..bd4f7cea97 100644 --- a/synapse/res/templates/account_previously_renewed.html +++ b/synapse/res/templates/account_previously_renewed.html @@ -1 +1,12 @@ -<html><body>Your account is valid until {{ expiration_ts|format_ts("%d-%m-%Y") }}.</body><html> +<!DOCTYPE html> +<html lang="en"> +<head> + <meta charset="UTF-8"> + <meta http-equiv="X-UA-Compatible" content="IE=edge"> + <meta name="viewport" content="width=device-width, initial-scale=1.0"> + <title>Your account is valid until {{ expiration_ts|format_ts("%d-%m-%Y") }}.</title> +</head> +<body> + Your account is valid until {{ expiration_ts|format_ts("%d-%m-%Y") }}. +</body> +</html> \ No newline at end of file diff --git a/synapse/res/templates/account_renewed.html b/synapse/res/templates/account_renewed.html index e8c0f52f05..57b319f375 100644 --- a/synapse/res/templates/account_renewed.html +++ b/synapse/res/templates/account_renewed.html @@ -1 +1,12 @@ -<html><body>Your account has been successfully renewed and is valid until {{ expiration_ts|format_ts("%d-%m-%Y") }}.</body><html> +<!DOCTYPE html> +<html lang="en"> +<head> + <meta charset="UTF-8"> + <meta http-equiv="X-UA-Compatible" content="IE=edge"> + <meta name="viewport" content="width=device-width, initial-scale=1.0"> + <title>Your account has been successfully renewed and is valid until {{ expiration_ts|format_ts("%d-%m-%Y") }}.</title> +</head> +<body> + Your account has been successfully renewed and is valid until {{ expiration_ts|format_ts("%d-%m-%Y") }}. +</body> +</html> \ No newline at end of file diff --git a/synapse/res/templates/add_threepid.html b/synapse/res/templates/add_threepid.html index cc4ab07e09..71f2215b7a 100644 --- a/synapse/res/templates/add_threepid.html +++ b/synapse/res/templates/add_threepid.html @@ -1,9 +1,14 @@ -<html> +<!DOCTYPE html> +<html lang="en"> +<head> + <meta charset="UTF-8"> + <meta http-equiv="X-UA-Compatible" content="IE=edge"> + <meta name="viewport" content="width=device-width, initial-scale=1.0"> + <title>Request to add an email address to your Matrix account</title> +</head> <body> <p>A request to add an email address to your Matrix account has been received. If this was you, please click the link below to confirm adding this email:</p> - <a href="{{ link }}">{{ link }}</a> - <p>If this was not you, you can safely ignore this email. Thank you.</p> </body> </html> diff --git a/synapse/res/templates/add_threepid_failure.html b/synapse/res/templates/add_threepid_failure.html index 441d11c846..bd627ee9ce 100644 --- a/synapse/res/templates/add_threepid_failure.html +++ b/synapse/res/templates/add_threepid_failure.html @@ -1,8 +1,13 @@ -<html> -<head></head> +<!DOCTYPE html> +<html lang="en"> +<head> + <meta charset="UTF-8"> + <meta http-equiv="X-UA-Compatible" content="IE=edge"> + <meta name="viewport" content="width=device-width, initial-scale=1.0"> + <title>Request failed</title> +</head> <body> -<p>The request failed for the following reason: {{ failure_reason }}.</p> - -<p>No changes have been made to your account.</p> + <p>The request failed for the following reason: {{ failure_reason }}.</p> + <p>No changes have been made to your account.</p> </body> </html> diff --git a/synapse/res/templates/add_threepid_success.html b/synapse/res/templates/add_threepid_success.html index fbd6e4018f..49170c138e 100644 --- a/synapse/res/templates/add_threepid_success.html +++ b/synapse/res/templates/add_threepid_success.html @@ -1,6 +1,12 @@ -<html> -<head></head> +<!DOCTYPE html> +<html lang="en"> +<head> + <meta charset="UTF-8"> + <meta http-equiv="X-UA-Compatible" content="IE=edge"> + <meta name="viewport" content="width=device-width, initial-scale=1.0"> + <title>Your email has now been validated</title> +</head> <body> -<p>Your email has now been validated, please return to your client. You may now close this window.</p> + <p>Your email has now been validated, please return to your client. You may now close this window.</p> </body> -</html> +</html> \ No newline at end of file diff --git a/synapse/res/templates/auth_success.html b/synapse/res/templates/auth_success.html index baf4633142..2d6ac44a0e 100644 --- a/synapse/res/templates/auth_success.html +++ b/synapse/res/templates/auth_success.html @@ -1,8 +1,8 @@ <html> <head> <title>Success!</title> -<meta name='viewport' content='width=device-width, initial-scale=1, - user-scalable=no, minimum-scale=1.0, maximum-scale=1.0'> + <meta http-equiv="X-UA-Compatible" content="IE=edge"> + <meta name="viewport" content="width=device-width, initial-scale=1.0"> <link rel="stylesheet" href="/_matrix/static/client/register/style.css"> <script> if (window.onAuthDone) { diff --git a/synapse/res/templates/invalid_token.html b/synapse/res/templates/invalid_token.html index 6bd2b98364..2c7c384fe3 100644 --- a/synapse/res/templates/invalid_token.html +++ b/synapse/res/templates/invalid_token.html @@ -1 +1,12 @@ -<html><body>Invalid renewal token.</body><html> +<!DOCTYPE html> +<html lang="en"> +<head> + <meta charset="UTF-8"> + <meta http-equiv="X-UA-Compatible" content="IE=edge"> + <meta name="viewport" content="width=device-width, initial-scale=1.0"> + <title>Invalid renewal token.</title> +</head> +<body> + Invalid renewal token. +</body> +</html> diff --git a/synapse/res/templates/notice_expiry.html b/synapse/res/templates/notice_expiry.html index d87311f659..865f9f7ada 100644 --- a/synapse/res/templates/notice_expiry.html +++ b/synapse/res/templates/notice_expiry.html @@ -1,6 +1,8 @@ <!doctype html> <html lang="en"> <head> + <meta http-equiv="X-UA-Compatible" content="IE=edge"> + <meta name="viewport" content="width=device-width, initial-scale=1.0"> <style type="text/css"> {% include 'mail.css' without context %} {% include "mail-%s.css" % app_name ignore missing without context %} diff --git a/synapse/res/templates/notif_mail.html b/synapse/res/templates/notif_mail.html index 27d4182790..9dba0c0253 100644 --- a/synapse/res/templates/notif_mail.html +++ b/synapse/res/templates/notif_mail.html @@ -1,6 +1,8 @@ <!doctype html> <html lang="en"> <head> + <meta http-equiv="X-UA-Compatible" content="IE=edge"> + <meta name="viewport" content="width=device-width, initial-scale=1.0"> <style type="text/css"> {%- include 'mail.css' without context %} {%- include "mail-%s.css" % app_name ignore missing without context %} diff --git a/synapse/res/templates/password_reset.html b/synapse/res/templates/password_reset.html index a197bf872c..a8bdce357b 100644 --- a/synapse/res/templates/password_reset.html +++ b/synapse/res/templates/password_reset.html @@ -1,4 +1,9 @@ -<html> +<html lang="en"> + <head> + <title>Password reset</title> + <meta http-equiv="X-UA-Compatible" content="IE=edge"> + <meta name="viewport" content="width=device-width, initial-scale=1.0"> + </head> <body> <p>A password reset request has been received for your Matrix account. If this was you, please click the link below to confirm resetting your password:</p> diff --git a/synapse/res/templates/password_reset_confirmation.html b/synapse/res/templates/password_reset_confirmation.html index def4b5162b..2e3fd2ec1e 100644 --- a/synapse/res/templates/password_reset_confirmation.html +++ b/synapse/res/templates/password_reset_confirmation.html @@ -1,5 +1,9 @@ -<html> -<head></head> +<html lang="en"> +<head> + <title>Password reset confirmation</title> + <meta http-equiv="X-UA-Compatible" content="IE=edge"> + <meta name="viewport" content="width=device-width, initial-scale=1.0"> +</head> <body> <!--Use a hidden form to resubmit the information necessary to reset the password--> <form method="post"> diff --git a/synapse/res/templates/password_reset_failure.html b/synapse/res/templates/password_reset_failure.html index 9e3c4446e3..2d59c463f0 100644 --- a/synapse/res/templates/password_reset_failure.html +++ b/synapse/res/templates/password_reset_failure.html @@ -1,5 +1,9 @@ -<html> -<head></head> +<html lang="en"> +<head> + <title>Password reset failure</title> + <meta http-equiv="X-UA-Compatible" content="IE=edge"> + <meta name="viewport" content="width=device-width, initial-scale=1.0"> +</head> <body> <p>The request failed for the following reason: {{ failure_reason }}.</p> diff --git a/synapse/res/templates/password_reset_success.html b/synapse/res/templates/password_reset_success.html index 7324d66d1e..5165bd1fa2 100644 --- a/synapse/res/templates/password_reset_success.html +++ b/synapse/res/templates/password_reset_success.html @@ -1,5 +1,8 @@ -<html> -<head></head> +<html lang="en"> +<head> + <meta http-equiv="X-UA-Compatible" content="IE=edge"> + <meta name="viewport" content="width=device-width, initial-scale=1.0"> +</head> <body> <p>Your email has now been validated, please return to your client to reset your password. You may now close this window.</p> </body> diff --git a/synapse/res/templates/recaptcha.html b/synapse/res/templates/recaptcha.html index b3db06ef97..615d3239c6 100644 --- a/synapse/res/templates/recaptcha.html +++ b/synapse/res/templates/recaptcha.html @@ -1,8 +1,8 @@ <html> <head> <title>Authentication</title> -<meta name='viewport' content='width=device-width, initial-scale=1, - user-scalable=no, minimum-scale=1.0, maximum-scale=1.0'> + <meta http-equiv="X-UA-Compatible" content="IE=edge"> + <meta name="viewport" content="width=device-width, initial-scale=1.0"> <script src="https://www.recaptcha.net/recaptcha/api.js" async defer></script> <script src="//code.jquery.com/jquery-1.11.2.min.js"></script> diff --git a/synapse/res/templates/registration.html b/synapse/res/templates/registration.html index 16730a527f..20e831ff4a 100644 --- a/synapse/res/templates/registration.html +++ b/synapse/res/templates/registration.html @@ -1,4 +1,9 @@ -<html> +<html lang="en"> +<head> + <title>Registration</title> + <meta http-equiv="X-UA-Compatible" content="IE=edge"> + <meta name="viewport" content="width=device-width, initial-scale=1.0"> +</head> <body> <p>You have asked us to register this email with a new Matrix account. If this was you, please click the link below to confirm your email address:</p> diff --git a/synapse/res/templates/registration_failure.html b/synapse/res/templates/registration_failure.html index 2833d79c37..a6ed22bc90 100644 --- a/synapse/res/templates/registration_failure.html +++ b/synapse/res/templates/registration_failure.html @@ -1,5 +1,8 @@ -<html> -<head></head> +<html lang="en"> +<head> + <meta http-equiv="X-UA-Compatible" content="IE=edge"> + <meta name="viewport" content="width=device-width, initial-scale=1.0"> +</head> <body> <p>Validation failed for the following reason: {{ failure_reason }}.</p> </body> diff --git a/synapse/res/templates/registration_success.html b/synapse/res/templates/registration_success.html index fbd6e4018f..d51d5549d8 100644 --- a/synapse/res/templates/registration_success.html +++ b/synapse/res/templates/registration_success.html @@ -1,5 +1,9 @@ -<html> -<head></head> +<html lang="en"> +<head> + <title>Your email has now been validated</title> + <meta http-equiv="X-UA-Compatible" content="IE=edge"> + <meta name="viewport" content="width=device-width, initial-scale=1.0"> +</head> <body> <p>Your email has now been validated, please return to your client. You may now close this window.</p> </body> diff --git a/synapse/res/templates/registration_token.html b/synapse/res/templates/registration_token.html index 4577ce1702..59a98f564c 100644 --- a/synapse/res/templates/registration_token.html +++ b/synapse/res/templates/registration_token.html @@ -1,8 +1,8 @@ -<html> +<html lang="en"> <head> <title>Authentication</title> -<meta name='viewport' content='width=device-width, initial-scale=1, - user-scalable=no, minimum-scale=1.0, maximum-scale=1.0'> + <meta http-equiv="X-UA-Compatible" content="IE=edge"> + <meta name="viewport" content="width=device-width, initial-scale=1.0"> <link rel="stylesheet" href="/_matrix/static/client/register/style.css"> </head> <body> diff --git a/synapse/res/templates/sso_account_deactivated.html b/synapse/res/templates/sso_account_deactivated.html index c3e4deed93..075f801cec 100644 --- a/synapse/res/templates/sso_account_deactivated.html +++ b/synapse/res/templates/sso_account_deactivated.html @@ -3,8 +3,8 @@ <head> <meta charset="UTF-8"> <title>SSO account deactivated</title> - <meta name="viewport" content="width=device-width, user-scalable=no"> - <style type="text/css"> + <meta http-equiv="X-UA-Compatible" content="IE=edge"> + <meta name="viewport" content="width=device-width, initial-scale=1.0"> <style type="text/css"> {% include "sso.css" without context %} </style> </head> diff --git a/synapse/res/templates/sso_auth_account_details.html b/synapse/res/templates/sso_auth_account_details.html index cf72df0a2a..2d1db386e1 100644 --- a/synapse/res/templates/sso_auth_account_details.html +++ b/synapse/res/templates/sso_auth_account_details.html @@ -3,7 +3,8 @@ <head> <title>Create your account</title> <meta charset="utf-8"> - <meta name="viewport" content="width=device-width, user-scalable=no"> + <meta http-equiv="X-UA-Compatible" content="IE=edge"> + <meta name="viewport" content="width=device-width, initial-scale=1.0"> <script type="text/javascript"> let wasKeyboard = false; document.addEventListener("mousedown", function() { wasKeyboard = false; }); diff --git a/synapse/res/templates/sso_auth_bad_user.html b/synapse/res/templates/sso_auth_bad_user.html index da579ffe69..94403fc3ce 100644 --- a/synapse/res/templates/sso_auth_bad_user.html +++ b/synapse/res/templates/sso_auth_bad_user.html @@ -3,7 +3,8 @@ <head> <meta charset="UTF-8"> <title>Authentication failed</title> - <meta name="viewport" content="width=device-width, user-scalable=no"> + <meta http-equiv="X-UA-Compatible" content="IE=edge"> + <meta name="viewport" content="width=device-width, initial-scale=1.0"> <style type="text/css"> {% include "sso.css" without context %} </style> diff --git a/synapse/res/templates/sso_auth_confirm.html b/synapse/res/templates/sso_auth_confirm.html index f9d0456f0a..aa1c974a6b 100644 --- a/synapse/res/templates/sso_auth_confirm.html +++ b/synapse/res/templates/sso_auth_confirm.html @@ -3,7 +3,8 @@ <head> <meta charset="UTF-8"> <title>Confirm it's you</title> - <meta name="viewport" content="width=device-width, user-scalable=no"> + <meta http-equiv="X-UA-Compatible" content="IE=edge"> + <meta name="viewport" content="width=device-width, initial-scale=1.0"> <style type="text/css"> {% include "sso.css" without context %} </style> diff --git a/synapse/res/templates/sso_auth_success.html b/synapse/res/templates/sso_auth_success.html index 1ed3967e87..4898af6011 100644 --- a/synapse/res/templates/sso_auth_success.html +++ b/synapse/res/templates/sso_auth_success.html @@ -3,7 +3,8 @@ <head> <meta charset="UTF-8"> <title>Authentication successful</title> - <meta name="viewport" content="width=device-width, user-scalable=no"> + <meta http-equiv="X-UA-Compatible" content="IE=edge"> + <meta name="viewport" content="width=device-width, initial-scale=1.0"> <style type="text/css"> {% include "sso.css" without context %} </style> diff --git a/synapse/res/templates/sso_error.html b/synapse/res/templates/sso_error.html index 472309c350..19992ff2ad 100644 --- a/synapse/res/templates/sso_error.html +++ b/synapse/res/templates/sso_error.html @@ -3,7 +3,8 @@ <head> <meta charset="UTF-8"> <title>Authentication failed</title> - <meta name="viewport" content="width=device-width, user-scalable=no"> + <meta http-equiv="X-UA-Compatible" content="IE=edge"> + <meta name="viewport" content="width=device-width, initial-scale=1.0"> <style type="text/css"> {% include "sso.css" without context %} diff --git a/synapse/res/templates/sso_login_idp_picker.html b/synapse/res/templates/sso_login_idp_picker.html index 53b82db84e..56fabfa3d2 100644 --- a/synapse/res/templates/sso_login_idp_picker.html +++ b/synapse/res/templates/sso_login_idp_picker.html @@ -1,6 +1,8 @@ <!DOCTYPE html> <html lang="en"> <head> + <meta http-equiv="X-UA-Compatible" content="IE=edge"> + <meta name="viewport" content="width=device-width, initial-scale=1.0"> <meta charset="UTF-8"> <title>Choose identity provider</title> <style type="text/css"> diff --git a/synapse/res/templates/sso_new_user_consent.html b/synapse/res/templates/sso_new_user_consent.html index 68c8b9f33a..523f64c4fc 100644 --- a/synapse/res/templates/sso_new_user_consent.html +++ b/synapse/res/templates/sso_new_user_consent.html @@ -3,7 +3,8 @@ <head> <meta charset="UTF-8"> <title>Agree to terms and conditions</title> - <meta name="viewport" content="width=device-width, user-scalable=no"> + <meta http-equiv="X-UA-Compatible" content="IE=edge"> + <meta name="viewport" content="width=device-width, initial-scale=1.0"> <style type="text/css"> {% include "sso.css" without context %} diff --git a/synapse/res/templates/sso_redirect_confirm.html b/synapse/res/templates/sso_redirect_confirm.html index 1b01471ac8..1049a9bd92 100644 --- a/synapse/res/templates/sso_redirect_confirm.html +++ b/synapse/res/templates/sso_redirect_confirm.html @@ -3,7 +3,8 @@ <head> <meta charset="UTF-8"> <title>Continue to your account</title> - <meta name="viewport" content="width=device-width, user-scalable=no"> + <meta http-equiv="X-UA-Compatible" content="IE=edge"> + <meta name="viewport" content="width=device-width, initial-scale=1.0"> <style type="text/css"> {% include "sso.css" without context %} diff --git a/synapse/res/templates/terms.html b/synapse/res/templates/terms.html index 369ff446d2..2081d990ab 100644 --- a/synapse/res/templates/terms.html +++ b/synapse/res/templates/terms.html @@ -1,8 +1,8 @@ <html> <head> <title>Authentication</title> -<meta name='viewport' content='width=device-width, initial-scale=1, - user-scalable=no, minimum-scale=1.0, maximum-scale=1.0'> +<meta http-equiv="X-UA-Compatible" content="IE=edge"> +<meta name="viewport" content="width=device-width, initial-scale=1.0"> <link rel="stylesheet" href="/_matrix/static/client/register/style.css"> </head> <body> diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py index 9d953d58de..68054ffc28 100644 --- a/synapse/rest/admin/rooms.py +++ b/synapse/rest/admin/rooms.py @@ -303,6 +303,7 @@ class RoomRestServlet(RestServlet): members = await self.store.get_users_in_room(room_id) ret["joined_local_devices"] = await self.store.count_devices_by_users(members) + ret["forgotten"] = await self.store.is_locally_forgotten_room(room_id) return HTTPStatus.OK, ret diff --git a/synapse/rest/client/account.py b/synapse/rest/client/account.py index 50edc6b7d3..e5ee63133b 100644 --- a/synapse/rest/client/account.py +++ b/synapse/rest/client/account.py @@ -15,10 +15,11 @@ # limitations under the License. import logging import random -from http import HTTPStatus from typing import TYPE_CHECKING, Optional, Tuple from urllib.parse import urlparse +from pydantic import StrictBool, StrictStr, constr + from twisted.web.server import Request from synapse.api.constants import LoginType @@ -34,12 +35,15 @@ from synapse.http.server import HttpServer, finish_request, respond_with_html from synapse.http.servlet import ( RestServlet, assert_params_in_dict, + parse_and_validate_json_object_from_request, parse_json_object_from_request, parse_string, ) from synapse.http.site import SynapseRequest from synapse.metrics import threepid_send_requests from synapse.push.mailer import Mailer +from synapse.rest.client.models import AuthenticationData, EmailRequestTokenBody +from synapse.rest.models import RequestBodyModel from synapse.types import JsonDict from synapse.util.msisdn import phone_number_to_msisdn from synapse.util.stringutils import assert_valid_client_secret, random_string @@ -82,32 +86,16 @@ class EmailPasswordRequestTokenRestServlet(RestServlet): 400, "Email-based password resets have been disabled on this server" ) - body = parse_json_object_from_request(request) - - assert_params_in_dict(body, ["client_secret", "email", "send_attempt"]) - - # Extract params from body - client_secret = body["client_secret"] - assert_valid_client_secret(client_secret) - - # Canonicalise the email address. The addresses are all stored canonicalised - # in the database. This allows the user to reset his password without having to - # know the exact spelling (eg. upper and lower case) of address in the database. - # Stored in the database "foo@bar.com" - # User requests with "FOO@bar.com" would raise a Not Found error - try: - email = validate_email(body["email"]) - except ValueError as e: - raise SynapseError(400, str(e)) - send_attempt = body["send_attempt"] - next_link = body.get("next_link") # Optional param + body = parse_and_validate_json_object_from_request( + request, EmailRequestTokenBody + ) - if next_link: + if body.next_link: # Raise if the provided next_link value isn't valid - assert_valid_next_link(self.hs, next_link) + assert_valid_next_link(self.hs, body.next_link) await self.identity_handler.ratelimit_request_token_requests( - request, "email", email + request, "email", body.email ) # The email will be sent to the stored address. @@ -115,7 +103,7 @@ class EmailPasswordRequestTokenRestServlet(RestServlet): # an email address which is controlled by the attacker but which, after # canonicalisation, matches the one in our database. existing_user_id = await self.hs.get_datastores().main.get_user_id_by_threepid( - "email", email + "email", body.email ) if existing_user_id is None: @@ -135,26 +123,26 @@ class EmailPasswordRequestTokenRestServlet(RestServlet): # Have the configured identity server handle the request ret = await self.identity_handler.request_email_token( self.hs.config.registration.account_threepid_delegate_email, - email, - client_secret, - send_attempt, - next_link, + body.email, + body.client_secret, + body.send_attempt, + body.next_link, ) else: # Send password reset emails from Synapse sid = await self.identity_handler.send_threepid_validation( - email, - client_secret, - send_attempt, + body.email, + body.client_secret, + body.send_attempt, self.mailer.send_password_reset_mail, - next_link, + body.next_link, ) # Wrap the session id in a JSON object ret = {"sid": sid} threepid_send_requests.labels(type="email", reason="password_reset").observe( - send_attempt + body.send_attempt ) return 200, ret @@ -172,16 +160,23 @@ class PasswordRestServlet(RestServlet): self.password_policy_handler = hs.get_password_policy_handler() self._set_password_handler = hs.get_set_password_handler() + class PostBody(RequestBodyModel): + auth: Optional[AuthenticationData] = None + logout_devices: StrictBool = True + if TYPE_CHECKING: + # workaround for https://github.com/samuelcolvin/pydantic/issues/156 + new_password: Optional[StrictStr] = None + else: + new_password: Optional[constr(max_length=512, strict=True)] = None + @interactive_auth_handler async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]: - body = parse_json_object_from_request(request) + body = parse_and_validate_json_object_from_request(request, self.PostBody) # we do basic sanity checks here because the auth layer will store these # in sessions. Pull out the new password provided to us. - new_password = body.pop("new_password", None) + new_password = body.new_password if new_password is not None: - if not isinstance(new_password, str) or len(new_password) > 512: - raise SynapseError(400, "Invalid password") self.password_policy_handler.validate_password(new_password) # there are two possibilities here. Either the user does not have an @@ -201,7 +196,7 @@ class PasswordRestServlet(RestServlet): params, session_id = await self.auth_handler.validate_user_via_ui_auth( requester, request, - body, + body.dict(), "modify your account password", ) except InteractiveAuthIncompleteError as e: @@ -224,7 +219,7 @@ class PasswordRestServlet(RestServlet): result, params, session_id = await self.auth_handler.check_ui_auth( [[LoginType.EMAIL_IDENTITY]], request, - body, + body.dict(), "modify your account password", ) except InteractiveAuthIncompleteError as e: @@ -299,37 +294,33 @@ class DeactivateAccountRestServlet(RestServlet): self.auth_handler = hs.get_auth_handler() self._deactivate_account_handler = hs.get_deactivate_account_handler() + class PostBody(RequestBodyModel): + auth: Optional[AuthenticationData] = None + id_server: Optional[StrictStr] = None + # Not specced, see https://github.com/matrix-org/matrix-spec/issues/297 + erase: StrictBool = False + @interactive_auth_handler async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]: - body = parse_json_object_from_request(request) - erase = body.get("erase", False) - if not isinstance(erase, bool): - raise SynapseError( - HTTPStatus.BAD_REQUEST, - "Param 'erase' must be a boolean, if given", - Codes.BAD_JSON, - ) + body = parse_and_validate_json_object_from_request(request, self.PostBody) requester = await self.auth.get_user_by_req(request) # allow ASes to deactivate their own users if requester.app_service: await self._deactivate_account_handler.deactivate_account( - requester.user.to_string(), erase, requester + requester.user.to_string(), body.erase, requester ) return 200, {} await self.auth_handler.validate_user_via_ui_auth( requester, request, - body, + body.dict(), "deactivate your account", ) result = await self._deactivate_account_handler.deactivate_account( - requester.user.to_string(), - erase, - requester, - id_server=body.get("id_server"), + requester.user.to_string(), body.erase, requester, id_server=body.id_server ) if result: id_server_unbind_result = "success" @@ -364,28 +355,15 @@ class EmailThreepidRequestTokenRestServlet(RestServlet): "Adding emails have been disabled due to lack of an email config" ) raise SynapseError( - 400, "Adding an email to your account is disabled on this server" + 400, + "Adding an email to your account is disabled on this server", ) - body = parse_json_object_from_request(request) - assert_params_in_dict(body, ["client_secret", "email", "send_attempt"]) - client_secret = body["client_secret"] - assert_valid_client_secret(client_secret) - - # Canonicalise the email address. The addresses are all stored canonicalised - # in the database. - # This ensures that the validation email is sent to the canonicalised address - # as it will later be entered into the database. - # Otherwise the email will be sent to "FOO@bar.com" and stored as - # "foo@bar.com" in database. - try: - email = validate_email(body["email"]) - except ValueError as e: - raise SynapseError(400, str(e)) - send_attempt = body["send_attempt"] - next_link = body.get("next_link") # Optional param + body = parse_and_validate_json_object_from_request( + request, EmailRequestTokenBody + ) - if not await check_3pid_allowed(self.hs, "email", email): + if not await check_3pid_allowed(self.hs, "email", body.email): raise SynapseError( 403, "Your email domain is not authorized on this server", @@ -393,14 +371,14 @@ class EmailThreepidRequestTokenRestServlet(RestServlet): ) await self.identity_handler.ratelimit_request_token_requests( - request, "email", email + request, "email", body.email ) - if next_link: + if body.next_link: # Raise if the provided next_link value isn't valid - assert_valid_next_link(self.hs, next_link) + assert_valid_next_link(self.hs, body.next_link) - existing_user_id = await self.store.get_user_id_by_threepid("email", email) + existing_user_id = await self.store.get_user_id_by_threepid("email", body.email) if existing_user_id is not None: if self.config.server.request_token_inhibit_3pid_errors: @@ -419,26 +397,26 @@ class EmailThreepidRequestTokenRestServlet(RestServlet): # Have the configured identity server handle the request ret = await self.identity_handler.request_email_token( self.hs.config.registration.account_threepid_delegate_email, - email, - client_secret, - send_attempt, - next_link, + body.email, + body.client_secret, + body.send_attempt, + body.next_link, ) else: # Send threepid validation emails from Synapse sid = await self.identity_handler.send_threepid_validation( - email, - client_secret, - send_attempt, + body.email, + body.client_secret, + body.send_attempt, self.mailer.send_add_threepid_mail, - next_link, + body.next_link, ) # Wrap the session id in a JSON object ret = {"sid": sid} threepid_send_requests.labels(type="email", reason="add_threepid").observe( - send_attempt + body.send_attempt ) return 200, ret diff --git a/synapse/rest/client/models.py b/synapse/rest/client/models.py new file mode 100644 index 0000000000..3150602997 --- /dev/null +++ b/synapse/rest/client/models.py @@ -0,0 +1,69 @@ +# Copyright 2022 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import TYPE_CHECKING, Dict, Optional + +from pydantic import Extra, StrictInt, StrictStr, constr, validator + +from synapse.rest.models import RequestBodyModel +from synapse.util.threepids import validate_email + + +class AuthenticationData(RequestBodyModel): + """ + Data used during user-interactive authentication. + + (The name "Authentication Data" is taken directly from the spec.) + + Additional keys will be present, depending on the `type` field. Use `.dict()` to + access them. + """ + + class Config: + extra = Extra.allow + + session: Optional[StrictStr] = None + type: Optional[StrictStr] = None + + +class EmailRequestTokenBody(RequestBodyModel): + if TYPE_CHECKING: + client_secret: StrictStr + else: + # See also assert_valid_client_secret() + client_secret: constr( + regex="[0-9a-zA-Z.=_-]", # noqa: F722 + min_length=0, + max_length=255, + strict=True, + ) + email: StrictStr + id_server: Optional[StrictStr] + id_access_token: Optional[StrictStr] + next_link: Optional[StrictStr] + send_attempt: StrictInt + + @validator("id_access_token", always=True) + def token_required_for_identity_server( + cls, token: Optional[str], values: Dict[str, object] + ) -> Optional[str]: + if values.get("id_server") is not None and token is None: + raise ValueError("id_access_token is required if an id_server is supplied.") + return token + + # Canonicalise the email address. The addresses are all stored canonicalised + # in the database. This allows the user to reset his password without having to + # know the exact spelling (eg. upper and lower case) of address in the database. + # Without this, an email stored in the database as "foo@bar.com" would cause + # user requests for "FOO@bar.com" to raise a Not Found error. + _email_validator = validator("email", allow_reuse=True)(validate_email) diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py index 3880846e9a..a8adf00176 100644 --- a/synapse/rest/client/room.py +++ b/synapse/rest/client/room.py @@ -16,9 +16,12 @@ """ This module contains REST servlets to do with rooms: /rooms/<paths> """ import logging import re +from enum import Enum from typing import TYPE_CHECKING, Awaitable, Dict, List, Optional, Tuple from urllib import parse as urlparse +from prometheus_client.core import Histogram + from twisted.web.server import Request from synapse import event_auth @@ -46,6 +49,7 @@ from synapse.http.servlet import ( parse_strings_from_args, ) from synapse.http.site import SynapseRequest +from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.logging.tracing import set_attribute from synapse.rest.client._base import client_patterns from synapse.rest.client.transactions import HttpTransactionCache @@ -61,6 +65,66 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) +class _RoomSize(Enum): + """ + Enum to differentiate sizes of rooms. This is a pretty good approximation + about how hard it will be to get events in the room. We could also look at + room "complexity". + """ + + # This doesn't necessarily mean the room is a DM, just that there is a DM + # amount of people there. + DM_SIZE = "direct_message_size" + SMALL = "small" + SUBSTANTIAL = "substantial" + LARGE = "large" + + @staticmethod + def from_member_count(member_count: int) -> "_RoomSize": + if member_count <= 2: + return _RoomSize.DM_SIZE + elif member_count < 100: + return _RoomSize.SMALL + elif member_count < 1000: + return _RoomSize.SUBSTANTIAL + else: + return _RoomSize.LARGE + + +# This is an extra metric on top of `synapse_http_server_response_time_seconds` +# which times the same sort of thing but this one allows us to see values +# greater than 10s. We use a separate dedicated histogram with its own buckets +# so that we don't increase the cardinality of the general one because it's +# multiplied across hundreds of servlets. +messsages_response_timer = Histogram( + "synapse_room_message_list_rest_servlet_response_time_seconds", + "sec", + # We have a label for room size so we can try to see a more realistic + # picture of /messages response time for bigger rooms. We don't want the + # tiny rooms that can always respond fast skewing our results when we're trying + # to optimize the bigger cases. + ["room_size"], + buckets=( + 0.005, + 0.01, + 0.025, + 0.05, + 0.1, + 0.25, + 0.5, + 1.0, + 2.5, + 5.0, + 10.0, + 30.0, + 60.0, + 120.0, + 180.0, + "+Inf", + ), +) + + class TransactionRestServlet(RestServlet): def __init__(self, hs: "HomeServer"): super().__init__() @@ -556,6 +620,7 @@ class RoomMessageListRestServlet(RestServlet): def __init__(self, hs: "HomeServer"): super().__init__() self._hs = hs + self.clock = hs.get_clock() self.pagination_handler = hs.get_pagination_handler() self.auth = hs.get_auth() self.store = hs.get_datastores().main @@ -563,6 +628,18 @@ class RoomMessageListRestServlet(RestServlet): async def on_GET( self, request: SynapseRequest, room_id: str ) -> Tuple[int, JsonDict]: + processing_start_time = self.clock.time_msec() + # Fire off and hope that we get a result by the end. + # + # We're using the mypy type ignore comment because the `@cached` + # decorator on `get_number_joined_users_in_room` doesn't play well with + # the type system. Maybe in the future, it can use some ParamSpec + # wizardry to fix it up. + room_member_count_deferred = run_in_background( # type: ignore[call-arg] + self.store.get_number_joined_users_in_room, + room_id, # type: ignore[arg-type] + ) + requester = await self.auth.get_user_by_req(request, allow_guest=True) pagination_config = await PaginationConfig.from_request( self.store, request, default_limit=10 @@ -593,6 +670,12 @@ class RoomMessageListRestServlet(RestServlet): event_filter=event_filter, ) + processing_end_time = self.clock.time_msec() + room_member_count = await make_deferred_yieldable(room_member_count_deferred) + messsages_response_timer.labels( + room_size=_RoomSize.from_member_count(room_member_count) + ).observe((processing_start_time - processing_end_time) / 1000) + return 200, msgs diff --git a/synapse/rest/models.py b/synapse/rest/models.py new file mode 100644 index 0000000000..ac39cda8e5 --- /dev/null +++ b/synapse/rest/models.py @@ -0,0 +1,23 @@ +from pydantic import BaseModel, Extra + + +class RequestBodyModel(BaseModel): + """A custom version of Pydantic's BaseModel which + + - ignores unknown fields and + - does not allow fields to be overwritten after construction, + + but otherwise uses Pydantic's default behaviour. + + Ignoring unknown fields is a useful default. It means that clients can provide + unstable field not known to the server without the request being refused outright. + + Subclassing in this way is recommended by + https://pydantic-docs.helpmanual.io/usage/model_config/#change-behaviour-globally + """ + + class Config: + # By default, ignore fields that we don't recognise. + extra = Extra.ignore + # By default, don't allow fields to be reassigned after parsing. + allow_mutation = False diff --git a/synapse/static/client/login/index.html b/synapse/static/client/login/index.html index 9e6daf38ac..40510889ac 100644 --- a/synapse/static/client/login/index.html +++ b/synapse/static/client/login/index.html @@ -3,7 +3,8 @@ <head> <meta http-equiv="Content-Type" content="text/html; charset=utf-8"> <title> Login </title> - <meta name='viewport' content='width=device-width, initial-scale=1, user-scalable=no, minimum-scale=1.0, maximum-scale=1.0'> + <meta http-equiv="X-UA-Compatible" content="IE=edge"> + <meta name="viewport" content="width=device-width, initial-scale=1.0"> <link rel="stylesheet" href="style.css"> <script src="js/jquery-3.4.1.min.js"></script> <script src="js/login.js"></script> diff --git a/synapse/static/client/register/index.html b/synapse/static/client/register/index.html index 140653574d..27bbd76f51 100644 --- a/synapse/static/client/register/index.html +++ b/synapse/static/client/register/index.html @@ -2,7 +2,8 @@ <html> <head> <title> Registration </title> -<meta name='viewport' content='width=device-width, initial-scale=1, user-scalable=no, minimum-scale=1.0, maximum-scale=1.0'> +<meta http-equiv="X-UA-Compatible" content="IE=edge"> +<meta name="viewport" content="width=device-width, initial-scale=1.0"> <link rel="stylesheet" href="style.css"> <script src="js/jquery-3.4.1.min.js"></script> <script src="https://www.recaptcha.net/recaptcha/api/js/recaptcha_ajax.js"></script> diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py index f34c067515..bb14729a9d 100644 --- a/synapse/storage/controllers/persist_events.py +++ b/synapse/storage/controllers/persist_events.py @@ -46,7 +46,14 @@ from synapse.api.constants import EventTypes, Membership from synapse.events import EventBase from synapse.events.snapshot import EventContext from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable -from synapse.logging.tracing import Link, get_active_span, start_active_span, trace +from synapse.logging.tracing import ( + Link, + SynapseTags, + get_active_span, + set_attribute, + start_active_span, + trace, +) from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.controllers.state import StateStorageController from synapse.storage.databases import Databases @@ -383,9 +390,21 @@ class EventsPersistenceStorageController: PartialStateConflictError: if attempting to persist a partial state event in a room that has been un-partial stated. """ + event_ids: List[str] = [] partitioned: Dict[str, List[Tuple[EventBase, EventContext]]] = {} for event, ctx in events_and_contexts: partitioned.setdefault(event.room_id, []).append((event, ctx)) + event_ids.append(event.event_id) + + set_attribute( + SynapseTags.FUNC_ARG_PREFIX + "event_ids", + str(event_ids), + ) + set_attribute( + SynapseTags.FUNC_ARG_PREFIX + "event_ids.length", + str(len(event_ids)), + ) + set_attribute(SynapseTags.FUNC_ARG_PREFIX + "backfilled", str(backfilled)) async def enqueue( item: Tuple[str, List[Tuple[EventBase, EventContext]]] diff --git a/synapse/storage/controllers/state.py b/synapse/storage/controllers/state.py index 7b2084664c..5964835ea3 100644 --- a/synapse/storage/controllers/state.py +++ b/synapse/storage/controllers/state.py @@ -29,7 +29,8 @@ from typing import ( from synapse.api.constants import EventTypes from synapse.events import EventBase -from synapse.logging.tracing import trace +from synapse.logging.tracing import tag_args, trace +from synapse.storage.roommember import ProfileInfo from synapse.storage.state import StateFilter from synapse.storage.util.partial_state_events_tracker import ( PartialCurrentStateTracker, @@ -228,10 +229,12 @@ class StateStorageController: return {event: event_to_state[event] for event in event_ids} @trace + @tag_args async def get_state_ids_for_events( self, event_ids: Collection[str], state_filter: Optional[StateFilter] = None, + await_full_state: bool = True, ) -> Dict[str, StateMap[str]]: """ Get the state dicts corresponding to a list of events, containing the event_ids @@ -240,6 +243,9 @@ class StateStorageController: Args: event_ids: events whose state should be returned state_filter: The state filter used to fetch state from the database. + await_full_state: if `True`, will block if we do not yet have complete state + at these events and `state_filter` is not satisfied by partial state. + Defaults to `True`. Returns: A dict from event_id -> (type, state_key) -> event_id @@ -248,8 +254,12 @@ class StateStorageController: RuntimeError if we don't have a state group for one or more of the events (ie they are outliers or unknown) """ - await_full_state = True - if state_filter and not state_filter.must_await_full_state(self._is_mine_id): + if ( + await_full_state + and state_filter + and not state_filter.must_await_full_state(self._is_mine_id) + ): + # Full state is not required if the state filter is restrictive enough. await_full_state = False event_to_groups = await self.get_state_group_for_events( @@ -292,7 +302,10 @@ class StateStorageController: @trace async def get_state_ids_for_event( - self, event_id: str, state_filter: Optional[StateFilter] = None + self, + event_id: str, + state_filter: Optional[StateFilter] = None, + await_full_state: bool = True, ) -> StateMap[str]: """ Get the state dict corresponding to a particular event @@ -300,6 +313,9 @@ class StateStorageController: Args: event_id: event whose state should be returned state_filter: The state filter used to fetch state from the database. + await_full_state: if `True`, will block if we do not yet have complete state + at the event and `state_filter` is not satisfied by partial state. + Defaults to `True`. Returns: A dict from (type, state_key) -> state_event_id @@ -309,7 +325,9 @@ class StateStorageController: outlier or is unknown) """ state_map = await self.get_state_ids_for_events( - [event_id], state_filter or StateFilter.all() + [event_id], + state_filter or StateFilter.all(), + await_full_state=await_full_state, ) return state_map[event_id] @@ -332,6 +350,7 @@ class StateStorageController: ) @trace + @tag_args async def get_state_group_for_events( self, event_ids: Collection[str], @@ -473,6 +492,7 @@ class StateStorageController: prev_stream_id, max_stream_id ) + @trace async def get_current_state( self, room_id: str, state_filter: Optional[StateFilter] = None ) -> StateMap[EventBase]: @@ -506,3 +526,15 @@ class StateStorageController: await self._partial_state_room_tracker.await_full_state(room_id) return await self.stores.main.get_current_hosts_in_room(room_id) + + async def get_users_in_room_with_profiles( + self, room_id: str + ) -> Dict[str, ProfileInfo]: + """ + Get the current users in the room with their profiles. + If the room is currently partial-stated, this will block until the room has + full state. + """ + await self._partial_state_room_tracker.await_full_state(room_id) + + return await self.stores.main.get_users_in_room_with_profiles(room_id) diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index eec55b6478..41b015dba1 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -33,6 +33,7 @@ from synapse.api.constants import MAX_DEPTH, EventTypes from synapse.api.errors import StoreError from synapse.api.room_versions import EventFormatVersions, RoomVersion from synapse.events import EventBase, make_event_from_dict +from synapse.logging.tracing import tag_args, trace from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause from synapse.storage.database import ( @@ -126,6 +127,8 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas ) return await self.get_events_as_list(event_ids) + @trace + @tag_args async def get_auth_chain_ids( self, room_id: str, @@ -709,6 +712,8 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas # Return all events where not all sets can reach them. return {eid for eid, n in event_to_missing_sets.items() if n} + @trace + @tag_args async def get_oldest_event_ids_with_depth_in_room( self, room_id: str ) -> List[Tuple[str, int]]: @@ -767,6 +772,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas room_id, ) + @trace async def get_insertion_event_backward_extremities_in_room( self, room_id: str ) -> List[Tuple[str, int]]: @@ -1339,6 +1345,8 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas event_results.reverse() return event_results + @trace + @tag_args async def get_successor_events(self, event_id: str) -> List[str]: """Fetch all events that have the given event as a prev event @@ -1375,6 +1383,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas _delete_old_forward_extrem_cache_txn, ) + @trace async def insert_insertion_extremity(self, event_id: str, room_id: str) -> None: await self.db_pool.simple_upsert( table="insertion_event_extremities", diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 161aad0f89..eabf9c9739 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -74,7 +74,17 @@ receipt. """ import logging -from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, Union, cast +from typing import ( + TYPE_CHECKING, + Collection, + Dict, + List, + Mapping, + Optional, + Tuple, + Union, + cast, +) import attr @@ -154,7 +164,9 @@ class NotifCounts: highlight_count: int = 0 -def _serialize_action(actions: List[Union[dict, str]], is_highlight: bool) -> str: +def _serialize_action( + actions: Collection[Union[Mapping, str]], is_highlight: bool +) -> str: """Custom serializer for actions. This allows us to "compress" common actions. We use the fact that most users have the same actions for notifs (and for @@ -227,7 +239,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas user_id: str, ) -> NotifCounts: """Get the notification count, the highlight count and the unread message count - for a given user in a given room after the given read receipt. + for a given user in a given room after their latest read receipt. Note that this function assumes the user to be a current member of the room, since it's either called by the sync handler to handle joined room entries, or by @@ -238,9 +250,8 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas user_id: The user to retrieve the counts for. Returns - A dict containing the counts mentioned earlier in this docstring, - respectively under the keys "notify_count", "highlight_count" and - "unread_count". + A NotifCounts object containing the notification count, the highlight count + and the unread message count. """ return await self.db_pool.runInteraction( "get_unread_event_push_actions_by_room", @@ -255,6 +266,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas room_id: str, user_id: str, ) -> NotifCounts: + # Get the stream ordering of the user's latest receipt in the room. result = self.get_last_receipt_for_user_txn( txn, user_id, @@ -266,13 +278,11 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas ), ) - stream_ordering = None if result: _, stream_ordering = result - if stream_ordering is None: - # Either last_read_event_id is None, or it's an event we don't have (e.g. - # because it's been purged), in which case retrieve the stream ordering for + else: + # If the user has no receipts in the room, retrieve the stream ordering for # the latest membership event from this user in this room (which we assume is # a join). event_id = self.db_pool.simple_select_one_onecol_txn( @@ -289,10 +299,26 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas ) def _get_unread_counts_by_pos_txn( - self, txn: LoggingTransaction, room_id: str, user_id: str, stream_ordering: int + self, + txn: LoggingTransaction, + room_id: str, + user_id: str, + receipt_stream_ordering: int, ) -> NotifCounts: """Get the number of unread messages for a user/room that have happened since the given stream ordering. + + Args: + txn: The database transaction. + room_id: The room ID to get unread counts for. + user_id: The user ID to get unread counts for. + receipt_stream_ordering: The stream ordering of the user's latest + receipt in the room. If there are no receipts, the stream ordering + of the user's join event. + + Returns + A NotifCounts object containing the notification count, the highlight count + and the unread message count. """ counts = NotifCounts() @@ -320,7 +346,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas OR last_receipt_stream_ordering = ? ) """, - (room_id, user_id, stream_ordering, stream_ordering), + (room_id, user_id, receipt_stream_ordering, receipt_stream_ordering), ) row = txn.fetchone() @@ -338,17 +364,20 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas AND stream_ordering > ? AND highlight = 1 """ - txn.execute(sql, (user_id, room_id, stream_ordering)) + txn.execute(sql, (user_id, room_id, receipt_stream_ordering)) row = txn.fetchone() if row: counts.highlight_count += row[0] # Finally we need to count push actions that aren't included in the - # summary returned above, e.g. recent events that haven't been - # summarised yet, or the summary is empty due to a recent read receipt. - stream_ordering = max(stream_ordering, summary_stream_ordering) + # summary returned above. This might be due to recent events that haven't + # been summarised yet or the summary is out of date due to a recent read + # receipt. + start_unread_stream_ordering = max( + receipt_stream_ordering, summary_stream_ordering + ) notify_count, unread_count = self._get_notif_unread_count_for_user_room( - txn, room_id, user_id, stream_ordering + txn, room_id, user_id, start_unread_stream_ordering ) counts.notify_count += notify_count @@ -733,7 +762,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas async def add_push_actions_to_staging( self, event_id: str, - user_id_actions: Dict[str, List[Union[dict, str]]], + user_id_actions: Dict[str, Collection[Union[Mapping, str]]], count_as_unread: bool, ) -> None: """Add the push actions for the event to the push action staging area. @@ -750,7 +779,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas # This is a helper function for generating the necessary tuple that # can be used to insert into the `event_push_actions_staging` table. def _gen_entry( - user_id: str, actions: List[Union[dict, str]] + user_id: str, actions: Collection[Union[Mapping, str]] ) -> Tuple[str, str, str, int, int, int]: is_highlight = 1 if _action_has_highlight(actions) else 0 notif = 1 if "notify" in actions else 0 @@ -1151,8 +1180,6 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas txn: The database transaction. old_rotate_stream_ordering: The previous maximum event stream ordering. rotate_to_stream_ordering: The new maximum event stream ordering to summarise. - - Returns whether the archiving process has caught up or not. """ # Calculate the new counts that should be upserted into event_push_summary @@ -1238,9 +1265,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas (rotate_to_stream_ordering,), ) - async def _remove_old_push_actions_that_have_rotated( - self, - ) -> None: + async def _remove_old_push_actions_that_have_rotated(self) -> None: """Clear out old push actions that have been summarised.""" # We want to clear out anything that is older than a day that *has* already @@ -1397,7 +1422,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore): ] -def _action_has_highlight(actions: List[Union[dict, str]]) -> bool: +def _action_has_highlight(actions: Collection[Union[Mapping, str]]) -> bool: for action in actions: if not isinstance(action, dict): continue diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 5560b38a48..1c3b804da0 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -40,6 +40,7 @@ from synapse.api.errors import Codes, SynapseError from synapse.api.room_versions import RoomVersions from synapse.events import EventBase, relation_from_event from synapse.events.snapshot import EventContext +from synapse.logging.tracing import trace from synapse.storage._base import db_to_json, make_in_list_sql_clause from synapse.storage.database import ( DatabasePool, @@ -145,6 +146,7 @@ class PersistEventsStore: self._backfill_id_gen: AbstractStreamIdGenerator = self.store._backfill_id_gen self._stream_id_gen: AbstractStreamIdGenerator = self.store._stream_id_gen + @trace async def _persist_events_and_state_updates( self, events_and_contexts: List[Tuple[EventBase, EventContext]], diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index e9ff6cfb34..90e6d82058 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -54,6 +54,7 @@ from synapse.logging.context import ( current_context, make_deferred_yieldable, ) +from synapse.logging.tracing import start_active_span, tag_args, trace from synapse.metrics.background_process_metrics import ( run_as_background_process, wrap_as_background_process, @@ -430,6 +431,8 @@ class EventsWorkerStore(SQLBaseStore): return {e.event_id: e for e in events} + @trace + @tag_args async def get_events_as_list( self, event_ids: Collection[str], @@ -1090,23 +1093,42 @@ class EventsWorkerStore(SQLBaseStore): """ fetched_event_ids: Set[str] = set() fetched_events: Dict[str, _EventRow] = {} - events_to_fetch = event_ids - while events_to_fetch: - row_map = await self._enqueue_events(events_to_fetch) + async def _fetch_event_ids_and_get_outstanding_redactions( + event_ids_to_fetch: Collection[str], + ) -> Collection[str]: + """ + Fetch all of the given event_ids and return any associated redaction event_ids + that we still need to fetch in the next iteration. + """ + row_map = await self._enqueue_events(event_ids_to_fetch) # we need to recursively fetch any redactions of those events redaction_ids: Set[str] = set() - for event_id in events_to_fetch: + for event_id in event_ids_to_fetch: row = row_map.get(event_id) fetched_event_ids.add(event_id) if row: fetched_events[event_id] = row redaction_ids.update(row.redactions) - events_to_fetch = redaction_ids.difference(fetched_event_ids) - if events_to_fetch: - logger.debug("Also fetching redaction events %s", events_to_fetch) + event_ids_to_fetch = redaction_ids.difference(fetched_event_ids) + return event_ids_to_fetch + + # Grab the initial list of events requested + event_ids_to_fetch = await _fetch_event_ids_and_get_outstanding_redactions( + event_ids + ) + # Then go and recursively find all of the associated redactions + with start_active_span("recursively fetching redactions"): + while event_ids_to_fetch: + logger.debug("Also fetching redaction events %s", event_ids_to_fetch) + + event_ids_to_fetch = ( + await _fetch_event_ids_and_get_outstanding_redactions( + event_ids_to_fetch + ) + ) # build a map from event_id to EventBase event_map: Dict[str, EventBase] = {} @@ -1424,6 +1446,8 @@ class EventsWorkerStore(SQLBaseStore): return {r["event_id"] for r in rows} + @trace + @tag_args async def have_seen_events( self, room_id: str, event_ids: Iterable[str] ) -> Set[str]: @@ -2200,3 +2224,63 @@ class EventsWorkerStore(SQLBaseStore): (room_id,), ) return [row[0] for row in txn] + + def mark_event_rejected_txn( + self, + txn: LoggingTransaction, + event_id: str, + rejection_reason: Optional[str], + ) -> None: + """Mark an event that was previously accepted as rejected, or vice versa + + This can happen, for example, when resyncing state during a faster join. + + Args: + txn: + event_id: ID of event to update + rejection_reason: reason it has been rejected, or None if it is now accepted + """ + if rejection_reason is None: + logger.info( + "Marking previously-processed event %s as accepted", + event_id, + ) + self.db_pool.simple_delete_txn( + txn, + "rejections", + keyvalues={"event_id": event_id}, + ) + else: + logger.info( + "Marking previously-processed event %s as rejected(%s)", + event_id, + rejection_reason, + ) + self.db_pool.simple_upsert_txn( + txn, + table="rejections", + keyvalues={"event_id": event_id}, + values={ + "reason": rejection_reason, + "last_check": self._clock.time_msec(), + }, + ) + self.db_pool.simple_update_txn( + txn, + table="events", + keyvalues={"event_id": event_id}, + updatevalues={"rejection_reason": rejection_reason}, + ) + + self.invalidate_get_event_cache_after_txn(txn, event_id) + + # TODO(faster_joins): invalidate the cache on workers. Ideally we'd just + # call '_send_invalidation_to_replication', but we actually need the other + # end to call _invalidate_local_get_event_cache() rather than (just) + # _get_event_cache.invalidate(). + # + # One solution might be to (somehow) get the workers to call + # _invalidate_caches_for_event() (though that will invalidate more than + # strictly necessary). + # + # https://github.com/matrix-org/synapse/issues/12994 diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py index 768f95d16c..255620f996 100644 --- a/synapse/storage/databases/main/push_rule.py +++ b/synapse/storage/databases/main/push_rule.py @@ -14,11 +14,23 @@ # limitations under the License. import abc import logging -from typing import TYPE_CHECKING, Collection, Dict, List, Optional, Tuple, Union, cast +from typing import ( + TYPE_CHECKING, + Any, + Collection, + Dict, + List, + Mapping, + Optional, + Sequence, + Tuple, + Union, + cast, +) from synapse.api.errors import StoreError from synapse.config.homeserver import ExperimentalConfig -from synapse.push.baserules import list_with_base_rules +from synapse.push.baserules import FilteredPushRules, PushRule, compile_push_rules from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker from synapse.storage._base import SQLBaseStore, db_to_json from synapse.storage.database import ( @@ -50,60 +62,30 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) -def _is_experimental_rule_enabled( - rule_id: str, experimental_config: ExperimentalConfig -) -> bool: - """Used by `_load_rules` to filter out experimental rules when they - have not been enabled. - """ - if ( - rule_id == "global/override/.org.matrix.msc3786.rule.room.server_acl" - and not experimental_config.msc3786_enabled - ): - return False - if ( - rule_id == "global/underride/.org.matrix.msc3772.thread_reply" - and not experimental_config.msc3772_enabled - ): - return False - return True - - def _load_rules( rawrules: List[JsonDict], enabled_map: Dict[str, bool], experimental_config: ExperimentalConfig, -) -> List[JsonDict]: - ruleslist = [] - for rawrule in rawrules: - rule = dict(rawrule) - rule["conditions"] = db_to_json(rawrule["conditions"]) - rule["actions"] = db_to_json(rawrule["actions"]) - rule["default"] = False - ruleslist.append(rule) - - # We're going to be mutating this a lot, so copy it. We also filter out - # any experimental default push rules that aren't enabled. - rules = [ - rule - for rule in list_with_base_rules(ruleslist) - if _is_experimental_rule_enabled(rule["rule_id"], experimental_config) - ] +) -> FilteredPushRules: + """Take the DB rows returned from the DB and convert them into a full + `FilteredPushRules` object. + """ - for i, rule in enumerate(rules): - rule_id = rule["rule_id"] + ruleslist = [ + PushRule( + rule_id=rawrule["rule_id"], + priority_class=rawrule["priority_class"], + conditions=db_to_json(rawrule["conditions"]), + actions=db_to_json(rawrule["actions"]), + ) + for rawrule in rawrules + ] - if rule_id not in enabled_map: - continue - if rule.get("enabled", True) == bool(enabled_map[rule_id]): - continue + push_rules = compile_push_rules(ruleslist) - # Rules are cached across users. - rule = dict(rule) - rule["enabled"] = bool(enabled_map[rule_id]) - rules[i] = rule + filtered_rules = FilteredPushRules(push_rules, enabled_map, experimental_config) - return rules + return filtered_rules # The ABCMeta metaclass ensures that it cannot be instantiated without @@ -162,7 +144,7 @@ class PushRulesWorkerStore( raise NotImplementedError() @cached(max_entries=5000) - async def get_push_rules_for_user(self, user_id: str) -> List[JsonDict]: + async def get_push_rules_for_user(self, user_id: str) -> FilteredPushRules: rows = await self.db_pool.simple_select_list( table="push_rules", keyvalues={"user_name": user_id}, @@ -216,11 +198,11 @@ class PushRulesWorkerStore( @cachedList(cached_method_name="get_push_rules_for_user", list_name="user_ids") async def bulk_get_push_rules( self, user_ids: Collection[str] - ) -> Dict[str, List[JsonDict]]: + ) -> Dict[str, FilteredPushRules]: if not user_ids: return {} - results: Dict[str, List[JsonDict]] = {user_id: [] for user_id in user_ids} + raw_rules: Dict[str, List[JsonDict]] = {user_id: [] for user_id in user_ids} rows = await self.db_pool.simple_select_many_batch( table="push_rules", @@ -234,11 +216,13 @@ class PushRulesWorkerStore( rows.sort(key=lambda row: (-int(row["priority_class"]), -int(row["priority"]))) for row in rows: - results.setdefault(row["user_name"], []).append(row) + raw_rules.setdefault(row["user_name"], []).append(row) enabled_map_by_user = await self.bulk_get_push_rules_enabled(user_ids) - for user_id, rules in results.items(): + results: Dict[str, FilteredPushRules] = {} + + for user_id, rules in raw_rules.items(): results[user_id] = _load_rules( rules, enabled_map_by_user.get(user_id, {}), self.hs.config.experimental ) @@ -345,8 +329,8 @@ class PushRuleStore(PushRulesWorkerStore): user_id: str, rule_id: str, priority_class: int, - conditions: List[Dict[str, str]], - actions: List[Union[JsonDict, str]], + conditions: Sequence[Mapping[str, str]], + actions: Sequence[Union[Mapping[str, Any], str]], before: Optional[str] = None, after: Optional[str] = None, ) -> None: @@ -817,7 +801,7 @@ class PushRuleStore(PushRulesWorkerStore): return self._push_rules_stream_id_gen.get_current_token() async def copy_push_rule_from_room_to_room( - self, new_room_id: str, user_id: str, rule: dict + self, new_room_id: str, user_id: str, rule: PushRule ) -> None: """Copy a single push rule from one room to another for a specific user. @@ -827,21 +811,27 @@ class PushRuleStore(PushRulesWorkerStore): rule: A push rule. """ # Create new rule id - rule_id_scope = "/".join(rule["rule_id"].split("/")[:-1]) + rule_id_scope = "/".join(rule.rule_id.split("/")[:-1]) new_rule_id = rule_id_scope + "/" + new_room_id + new_conditions = [] + # Change room id in each condition - for condition in rule.get("conditions", []): + for condition in rule.conditions: + new_condition = condition if condition.get("key") == "room_id": - condition["pattern"] = new_room_id + new_condition = dict(condition) + new_condition["pattern"] = new_room_id + + new_conditions.append(new_condition) # Add the rule for the new room await self.add_push_rule( user_id=user_id, rule_id=new_rule_id, - priority_class=rule["priority_class"], - conditions=rule["conditions"], - actions=rule["actions"], + priority_class=rule.priority_class, + conditions=new_conditions, + actions=rule.actions, ) async def copy_push_rules_from_room_to_room_for_user( @@ -859,8 +849,11 @@ class PushRuleStore(PushRulesWorkerStore): user_push_rules = await self.get_push_rules_for_user(user_id) # Get rules relating to the old room and copy them to the new room - for rule in user_push_rules: - conditions = rule.get("conditions", []) + for rule, enabled in user_push_rules: + if not enabled: + continue + + conditions = rule.conditions if any( (c.get("key") == "room_id" and c.get("pattern") == old_room_id) for c in conditions diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 0090c9f225..124c70ad37 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -161,7 +161,7 @@ class ReceiptsWorkerStore(SQLBaseStore): receipt_type: The receipt types to fetch. Returns: - The latest receipt, if one exists. + The event ID and stream ordering of the latest receipt, if one exists. """ clause, args = make_in_list_sql_clause( diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index 0f1f0d11ea..b7d4baa6bb 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -2001,9 +2001,15 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore): where_clause = "WHERE " + " AND ".join(filters) if len(filters) > 0 else "" + # We join on room_stats_state despite not using any columns from it + # because the join can influence the number of rows returned; + # e.g. a room that doesn't have state, maybe because it was deleted. + # The query returning the total count should be consistent with + # the query returning the results. sql = """ SELECT COUNT(*) as total_event_reports FROM event_reports AS er + JOIN room_stats_state ON room_stats_state.room_id = er.room_id {} """.format( where_clause diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 93ff4816c8..827c1f1efd 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -283,6 +283,9 @@ class RoomMemberWorkerStore(EventsWorkerStore): Returns: A mapping from user ID to ProfileInfo. + + Preconditions: + - There is full state available for the room (it is not partial-stated). """ def _get_users_in_room_with_profiles( @@ -1212,6 +1215,30 @@ class RoomMemberWorkerStore(EventsWorkerStore): "get_forgotten_rooms_for_user", _get_forgotten_rooms_for_user_txn ) + async def is_locally_forgotten_room(self, room_id: str) -> bool: + """Returns whether all local users have forgotten this room_id. + + Args: + room_id: The room ID to query. + + Returns: + Whether the room is forgotten. + """ + + sql = """ + SELECT count(*) > 0 FROM local_current_membership + INNER JOIN room_memberships USING (room_id, event_id) + WHERE + room_id = ? + AND forgotten = 0; + """ + + rows = await self.db_pool.execute("is_forgotten_room", None, sql, room_id) + + # `count(*)` returns always an integer + # If any rows still exist it means someone has not forgotten this room yet + return not rows[0][0] + async def get_rooms_user_has_been_in(self, user_id: str) -> Set[str]: """Get all rooms that the user has ever been in. diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py index f70705a0af..0b10af0e58 100644 --- a/synapse/storage/databases/main/state.py +++ b/synapse/storage/databases/main/state.py @@ -430,6 +430,11 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): updatevalues={"state_group": state_group}, ) + # the event may now be rejected where it was not before, or vice versa, + # in which case we need to update the rejected flags. + if bool(context.rejected) != (event.rejected_reason is not None): + self.mark_event_rejected_txn(txn, event.event_id, context.rejected) + self.db_pool.simple_delete_one_txn( txn, table="partial_state_events", diff --git a/synapse/storage/state.py b/synapse/storage/state.py index af3bab2c15..0004d955b4 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -539,15 +539,6 @@ class StateFilter: is_mine_id: a callable which confirms if a given state_key matches a mxid of a local user """ - - # TODO(faster_joins): it's not entirely clear that this is safe. In particular, - # there may be circumstances in which we return a piece of state that, once we - # resync the state, we discover is invalid. For example: if it turns out that - # the sender of a piece of state wasn't actually in the room, then clearly that - # state shouldn't have been returned. - # We should at least add some tests around this to see what happens. - # https://github.com/matrix-org/synapse/issues/13006 - # if we haven't requested membership events, then it depends on the value of # 'include_others' if EventTypes.Member not in self.types: diff --git a/synapse/storage/util/partial_state_events_tracker.py b/synapse/storage/util/partial_state_events_tracker.py index 466e5137f2..07af89ee31 100644 --- a/synapse/storage/util/partial_state_events_tracker.py +++ b/synapse/storage/util/partial_state_events_tracker.py @@ -20,6 +20,7 @@ from twisted.internet import defer from twisted.internet.defer import Deferred from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable +from synapse.logging.tracing import trace_with_opname from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.databases.main.room import RoomWorkerStore from synapse.util import unwrapFirstError @@ -58,6 +59,7 @@ class PartialStateEventsTracker: for o in observers: o.callback(None) + @trace_with_opname("PartialStateEventsTracker.await_full_state") async def await_full_state(self, event_ids: Collection[str]) -> None: """Wait for all the given events to have full state. @@ -151,6 +153,7 @@ class PartialCurrentStateTracker: for o in observers: o.callback(None) + @trace_with_opname("PartialCurrentStateTracker.await_full_state") async def await_full_state(self, room_id: str) -> None: # We add the deferred immediately so that the DB call to check for # partial state doesn't race when we unpartial the room. diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py index 6394cc39ac..603570c10f 100644 --- a/synapse/util/ratelimitutils.py +++ b/synapse/util/ratelimitutils.py @@ -18,6 +18,8 @@ import logging import typing from typing import Any, DefaultDict, Iterator, List, Set +from prometheus_client.core import Counter + from twisted.internet import defer from synapse.api.errors import LimitExceededError @@ -27,6 +29,8 @@ from synapse.logging.context import ( make_deferred_yieldable, run_in_background, ) +from synapse.logging.tracing import start_active_span +from synapse.metrics import Histogram, LaterGauge from synapse.util import Clock if typing.TYPE_CHECKING: @@ -35,6 +39,32 @@ if typing.TYPE_CHECKING: logger = logging.getLogger(__name__) +# Track how much the ratelimiter is affecting requests +rate_limit_sleep_counter = Counter("synapse_rate_limit_sleep", "") +rate_limit_reject_counter = Counter("synapse_rate_limit_reject", "") +queue_wait_timer = Histogram( + "synapse_rate_limit_queue_wait_time_seconds", + "sec", + [], + buckets=( + 0.005, + 0.01, + 0.025, + 0.05, + 0.1, + 0.25, + 0.5, + 0.75, + 1.0, + 2.5, + 5.0, + 10.0, + 20.0, + "+Inf", + ), +) + + class FederationRateLimiter: def __init__(self, clock: Clock, config: FederationRatelimitSettings): def new_limiter() -> "_PerHostRatelimiter": @@ -44,6 +74,27 @@ class FederationRateLimiter: str, "_PerHostRatelimiter" ] = collections.defaultdict(new_limiter) + # We track the number of affected hosts per time-period so we can + # differentiate one really noisy homeserver from a general + # ratelimit tuning problem across the federation. + LaterGauge( + "synapse_rate_limit_sleep_affected_hosts", + "Number of hosts that had requests put to sleep", + [], + lambda: sum( + ratelimiter.should_sleep() for ratelimiter in self.ratelimiters.values() + ), + ) + LaterGauge( + "synapse_rate_limit_reject_affected_hosts", + "Number of hosts that had requests rejected", + [], + lambda: sum( + ratelimiter.should_reject() + for ratelimiter in self.ratelimiters.values() + ), + ) + def ratelimit(self, host: str) -> "_GeneratorContextManager[defer.Deferred[None]]": """Used to ratelimit an incoming request from a given host @@ -59,7 +110,7 @@ class FederationRateLimiter: Returns: context manager which returns a deferred. """ - return self.ratelimiters[host].ratelimit() + return self.ratelimiters[host].ratelimit(host) class _PerHostRatelimiter: @@ -94,19 +145,42 @@ class _PerHostRatelimiter: self.request_times: List[int] = [] @contextlib.contextmanager - def ratelimit(self) -> "Iterator[defer.Deferred[None]]": + def ratelimit(self, host: str) -> "Iterator[defer.Deferred[None]]": # `contextlib.contextmanager` takes a generator and turns it into a # context manager. The generator should only yield once with a value # to be returned by manager. # Exceptions will be reraised at the yield. + self.host = host + request_id = object() - ret = self._on_enter(request_id) + # Ideally we'd use `Deferred.fromCoroutine()` here, to save on redundant + # type-checking, but we'd need Twisted >= 21.2. + ret = defer.ensureDeferred(self._on_enter_with_tracing(request_id)) try: yield ret finally: self._on_exit(request_id) + def should_reject(self) -> bool: + """ + Whether to reject the request if we already have too many queued up + (either sleeping or in the ready queue). + """ + queue_size = len(self.ready_request_queue) + len(self.sleeping_requests) + return queue_size > self.reject_limit + + def should_sleep(self) -> bool: + """ + Whether to sleep the request if we already have too many requests coming + through within the window. + """ + return len(self.request_times) > self.sleep_limit + + async def _on_enter_with_tracing(self, request_id: object) -> None: + with start_active_span("ratelimit wait"), queue_wait_timer.time(): + await self._on_enter(request_id) + def _on_enter(self, request_id: object) -> "defer.Deferred[None]": time_now = self.clock.time_msec() @@ -117,8 +191,9 @@ class _PerHostRatelimiter: # reject the request if we already have too many queued up (either # sleeping or in the ready queue). - queue_size = len(self.ready_request_queue) + len(self.sleeping_requests) - if queue_size > self.reject_limit: + if self.should_reject(): + logger.debug("Ratelimiter(%s): rejecting request", self.host) + rate_limit_reject_counter.inc() raise LimitExceededError( retry_after_ms=int(self.window_size / self.sleep_limit) ) @@ -130,7 +205,8 @@ class _PerHostRatelimiter: queue_defer: defer.Deferred[None] = defer.Deferred() self.ready_request_queue[request_id] = queue_defer logger.info( - "Ratelimiter: queueing request (queue now %i items)", + "Ratelimiter(%s): queueing request (queue now %i items)", + self.host, len(self.ready_request_queue), ) @@ -139,19 +215,28 @@ class _PerHostRatelimiter: return defer.succeed(None) logger.debug( - "Ratelimit [%s]: len(self.request_times)=%d", + "Ratelimit(%s) [%s]: len(self.request_times)=%d", + self.host, id(request_id), len(self.request_times), ) - if len(self.request_times) > self.sleep_limit: - logger.debug("Ratelimiter: sleeping request for %f sec", self.sleep_sec) + if self.should_sleep(): + logger.debug( + "Ratelimiter(%s) [%s]: sleeping request for %f sec", + self.host, + id(request_id), + self.sleep_sec, + ) + rate_limit_sleep_counter.inc() ret_defer = run_in_background(self.clock.sleep, self.sleep_sec) self.sleeping_requests.add(request_id) def on_wait_finished(_: Any) -> "defer.Deferred[None]": - logger.debug("Ratelimit [%s]: Finished sleeping", id(request_id)) + logger.debug( + "Ratelimit(%s) [%s]: Finished sleeping", self.host, id(request_id) + ) self.sleeping_requests.discard(request_id) queue_defer = queue_request() return queue_defer @@ -161,7 +246,9 @@ class _PerHostRatelimiter: ret_defer = queue_request() def on_start(r: object) -> object: - logger.debug("Ratelimit [%s]: Processing req", id(request_id)) + logger.debug( + "Ratelimit(%s) [%s]: Processing req", self.host, id(request_id) + ) self.current_processing.add(request_id) return r @@ -183,7 +270,7 @@ class _PerHostRatelimiter: return make_deferred_yieldable(ret_defer) def _on_exit(self, request_id: object) -> None: - logger.debug("Ratelimit [%s]: Processed req", id(request_id)) + logger.debug("Ratelimit(%s) [%s]: Processed req", self.host, id(request_id)) self.current_processing.discard(request_id) try: # start processing the next item on the queue. diff --git a/synapse/visibility.py b/synapse/visibility.py index 813fe1a155..342d60a921 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py @@ -73,8 +73,8 @@ async def filter_events_for_client( * the user is not currently a member of the room, and: * the user has not been a member of the room since the given events - always_include_ids: set of event ids to specifically - include (unless sender is ignored) + always_include_ids: set of event ids to specifically include, if present + in events (unless sender is ignored) filter_send_to_client: Whether we're checking an event that's going to be sent to a client. This might not always be the case since this function can also be called to check whether a user can see the state at a given point. |