From fb58611d212ea16be2b42d0e2441a6dc09f6f61d Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 29 Nov 2021 16:01:54 -0600 Subject: Refactor `backfilled` into specific behavior function arguments (`_persist_events_and_state_updates`) (#11417) Part of https://github.com/matrix-org/synapse/issues/11300 Call stack: - `_persist_events_and_state_updates` (added `use_negative_stream_ordering`) - `_persist_events_txn` - `_update_room_depths_txn` (added `update_room_forward_stream_ordering`) - `_update_metadata_tables_txn` - `_store_room_members_txn` (added `inhibit_local_membership_updates`) Using keyword-only arguments (`*`) to reduce the mistakes from `backfilled` being left as a positional argument somewhere and being interpreted wrong by our new arguments. --- synapse/storage/databases/main/events.py | 74 +++++++++++++++++++++++--------- 1 file changed, 54 insertions(+), 20 deletions(-) (limited to 'synapse/storage/databases') diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index c3440de2cb..4171b904eb 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -124,10 +124,12 @@ class PersistEventsStore: async def _persist_events_and_state_updates( self, events_and_contexts: List[Tuple[EventBase, EventContext]], + *, current_state_for_room: Dict[str, StateMap[str]], state_delta_for_room: Dict[str, DeltaState], new_forward_extremeties: Dict[str, List[str]], - backfilled: bool = False, + use_negative_stream_ordering: bool = False, + inhibit_local_membership_updates: bool = False, ) -> None: """Persist a set of events alongside updates to the current state and forward extremities tables. @@ -140,7 +142,14 @@ class PersistEventsStore: room state new_forward_extremities: Map from room_id to list of event IDs that are the new forward extremities of the room. - backfilled + use_negative_stream_ordering: Whether to start stream_ordering on + the negative side and decrement. This should be set as True + for backfilled events because backfilled events get a negative + stream ordering so they don't come down incremental `/sync`. + inhibit_local_membership_updates: Stop the local_current_membership + from being updated by these events. This should be set to True + for backfilled events because backfilled events in the past do + not affect the current local state. Returns: Resolves when the events have been persisted @@ -162,7 +171,7 @@ class PersistEventsStore: # # Note: Multiple instances of this function cannot be in flight at # the same time for the same room. - if backfilled: + if use_negative_stream_ordering: stream_ordering_manager = self._backfill_id_gen.get_next_mult( len(events_and_contexts) ) @@ -179,13 +188,13 @@ class PersistEventsStore: "persist_events", self._persist_events_txn, events_and_contexts=events_and_contexts, - backfilled=backfilled, + inhibit_local_membership_updates=inhibit_local_membership_updates, state_delta_for_room=state_delta_for_room, new_forward_extremeties=new_forward_extremeties, ) persist_event_counter.inc(len(events_and_contexts)) - if not backfilled: + if stream < 0: # backfilled events have negative stream orderings, so we don't # want to set the event_persisted_position to that. synapse.metrics.event_persisted_position.set( @@ -319,8 +328,9 @@ class PersistEventsStore: def _persist_events_txn( self, txn: LoggingTransaction, + *, events_and_contexts: List[Tuple[EventBase, EventContext]], - backfilled: bool, + inhibit_local_membership_updates: bool = False, state_delta_for_room: Optional[Dict[str, DeltaState]] = None, new_forward_extremeties: Optional[Dict[str, List[str]]] = None, ): @@ -333,7 +343,10 @@ class PersistEventsStore: Args: txn events_and_contexts: events to persist - backfilled: True if the events were backfilled + inhibit_local_membership_updates: Stop the local_current_membership + from being updated by these events. This should be set to True + for backfilled events because backfilled events in the past do + not affect the current local state. delete_existing True to purge existing table rows for the events from the database. This is useful when retrying due to IntegrityError. @@ -366,9 +379,7 @@ class PersistEventsStore: events_and_contexts ) - self._update_room_depths_txn( - txn, events_and_contexts=events_and_contexts, backfilled=backfilled - ) + self._update_room_depths_txn(txn, events_and_contexts=events_and_contexts) # _update_outliers_txn filters out any events which have already been # persisted, and returns the filtered list. @@ -401,7 +412,7 @@ class PersistEventsStore: txn, events_and_contexts=events_and_contexts, all_events_and_contexts=all_events_and_contexts, - backfilled=backfilled, + inhibit_local_membership_updates=inhibit_local_membership_updates, ) # We call this last as it assumes we've inserted the events into @@ -1203,7 +1214,6 @@ class PersistEventsStore: self, txn, events_and_contexts: List[Tuple[EventBase, EventContext]], - backfilled: bool, ): """Update min_depth for each room @@ -1211,13 +1221,18 @@ class PersistEventsStore: txn (twisted.enterprise.adbapi.Connection): db connection events_and_contexts (list[(EventBase, EventContext)]): events we are persisting - backfilled (bool): True if the events were backfilled """ depth_updates: Dict[str, int] = {} for event, context in events_and_contexts: # Remove the any existing cache entries for the event_ids txn.call_after(self.store._invalidate_get_event_cache, event.event_id) - if not backfilled: + # Then update the `stream_ordering` position to mark the latest + # event as the front of the room. This should not be done for + # backfilled events because backfilled events have negative + # stream_ordering and happened in the past so we know that we don't + # need to update the stream_ordering tip/front for the room. + assert event.internal_metadata.stream_ordering is not None + if event.internal_metadata.stream_ordering >= 0: txn.call_after( self.store._events_stream_cache.entity_has_changed, event.room_id, @@ -1430,7 +1445,12 @@ class PersistEventsStore: return [ec for ec in events_and_contexts if ec[0] not in to_remove] def _update_metadata_tables_txn( - self, txn, events_and_contexts, all_events_and_contexts, backfilled + self, + txn, + *, + events_and_contexts, + all_events_and_contexts, + inhibit_local_membership_updates: bool = False, ): """Update all the miscellaneous tables for new events @@ -1442,7 +1462,10 @@ class PersistEventsStore: events that we were going to persist. This includes events we've already persisted, etc, that wouldn't appear in events_and_context. - backfilled (bool): True if the events were backfilled + inhibit_local_membership_updates: Stop the local_current_membership + from being updated by these events. This should be set to True + for backfilled events because backfilled events in the past do + not affect the current local state. """ # Insert all the push actions into the event_push_actions table. @@ -1516,7 +1539,7 @@ class PersistEventsStore: for event, _ in events_and_contexts if event.type == EventTypes.Member ], - backfilled=backfilled, + inhibit_local_membership_updates=inhibit_local_membership_updates, ) # Insert event_reference_hashes table. @@ -1643,8 +1666,19 @@ class PersistEventsStore: txn, table="event_reference_hashes", values=vals ) - def _store_room_members_txn(self, txn, events, backfilled): - """Store a room member in the database.""" + def _store_room_members_txn( + self, txn, events, *, inhibit_local_membership_updates: bool = False + ): + """ + Store a room member in the database. + Args: + txn: The transaction to use. + events: List of events to store. + inhibit_local_membership_updates: Stop the local_current_membership + from being updated by these events. This should be set to True + for backfilled events because backfilled events in the past do + not affect the current local state. + """ def non_null_str_or_none(val: Any) -> Optional[str]: return val if isinstance(val, str) and "\u0000" not in val else None @@ -1687,7 +1721,7 @@ class PersistEventsStore: # band membership", like a remote invite or a rejection of a remote invite. if ( self.is_mine_id(event.state_key) - and not backfilled + and not inhibit_local_membership_updates and event.internal_metadata.is_outlier() and event.internal_metadata.is_out_of_band_membership() ): -- cgit 1.5.1 From a9481223d1d5a8b3bf0d7ce2140dd3c919481f4f Mon Sep 17 00:00:00 2001 From: Marcus Date: Tue, 30 Nov 2021 12:49:20 +0100 Subject: Improved push typing (#11409) Co-authored-by: Sean Quah <8349537+squahtx@users.noreply.github.com> --- changelog.d/11409.misc | 1 + docs/templates.md | 5 + synapse/push/emailpusher.py | 10 +- synapse/push/httppusher.py | 3 +- synapse/push/mailer.py | 72 ++++++----- synapse/push/push_types.py | 136 +++++++++++++++++++++ .../storage/databases/main/event_push_actions.py | 19 ++- 7 files changed, 210 insertions(+), 36 deletions(-) create mode 100644 changelog.d/11409.misc create mode 100644 synapse/push/push_types.py (limited to 'synapse/storage/databases') diff --git a/changelog.d/11409.misc b/changelog.d/11409.misc new file mode 100644 index 0000000000..f9e8ae9e3a --- /dev/null +++ b/changelog.d/11409.misc @@ -0,0 +1 @@ +Improve internal types in push code. diff --git a/docs/templates.md b/docs/templates.md index a240f58b54..2b66e9d862 100644 --- a/docs/templates.md +++ b/docs/templates.md @@ -71,7 +71,12 @@ Below are the templates Synapse will look for when generating the content of an * `sender_avatar_url`: the avatar URL (as a `mxc://` URL) for the event's sender * `sender_hash`: a hash of the user ID of the sender + * `msgtype`: the type of the message + * `body_text_html`: html representation of the message + * `body_text_plain`: plaintext representation of the message + * `image_url`: mxc url of an image, when "msgtype" is "m.image" * `link`: a `matrix.to` link to the room + * `avator_url`: url to the room's avator * `reason`: information on the event that triggered the email to be sent. It's an object with the following attributes: * `room_id`: the ID of the room the event was sent in diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index cf5abdfbda..4f13c0418a 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -21,6 +21,8 @@ from twisted.internet.interfaces import IDelayedCall from synapse.metrics.background_process_metrics import run_as_background_process from synapse.push import Pusher, PusherConfig, PusherConfigException, ThrottleParams from synapse.push.mailer import Mailer +from synapse.push.push_types import EmailReason +from synapse.storage.databases.main.event_push_actions import EmailPushAction from synapse.util.threepids import validate_email if TYPE_CHECKING: @@ -190,7 +192,7 @@ class EmailPusher(Pusher): # we then consider all previously outstanding notifications # to be delivered. - reason = { + reason: EmailReason = { "room_id": push_action["room_id"], "now": self.clock.time_msec(), "received_at": received_at, @@ -275,7 +277,7 @@ class EmailPusher(Pusher): return may_send_at async def sent_notif_update_throttle( - self, room_id: str, notified_push_action: dict + self, room_id: str, notified_push_action: EmailPushAction ) -> None: # We have sent a notification, so update the throttle accordingly. # If the event that triggered the notif happened more than @@ -315,7 +317,9 @@ class EmailPusher(Pusher): self.pusher_id, room_id, self.throttle_params[room_id] ) - async def send_notification(self, push_actions: List[dict], reason: dict) -> None: + async def send_notification( + self, push_actions: List[EmailPushAction], reason: EmailReason + ) -> None: logger.info("Sending notif email for user %r", self.user_id) await self.mailer.send_notification_mail( diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index dbf4ad7f97..3fa603ccb7 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -26,6 +26,7 @@ from synapse.events import EventBase from synapse.logging import opentracing from synapse.metrics.background_process_metrics import run_as_background_process from synapse.push import Pusher, PusherConfig, PusherConfigException +from synapse.storage.databases.main.event_push_actions import HttpPushAction from . import push_rule_evaluator, push_tools @@ -273,7 +274,7 @@ class HttpPusher(Pusher): ) break - async def _process_one(self, push_action: dict) -> bool: + async def _process_one(self, push_action: HttpPushAction) -> bool: if "notify" not in push_action["actions"]: return True diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py index ce299ba3da..ba4f866487 100644 --- a/synapse/push/mailer.py +++ b/synapse/push/mailer.py @@ -14,7 +14,7 @@ import logging import urllib.parse -from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, TypeVar +from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, TypeVar import bleach import jinja2 @@ -28,6 +28,14 @@ from synapse.push.presentable_names import ( descriptor_from_member_events, name_from_member_event, ) +from synapse.push.push_types import ( + EmailReason, + MessageVars, + NotifVars, + RoomVars, + TemplateVars, +) +from synapse.storage.databases.main.event_push_actions import EmailPushAction from synapse.storage.state import StateFilter from synapse.types import StateMap, UserID from synapse.util.async_helpers import concurrently_execute @@ -135,7 +143,7 @@ class Mailer: % urllib.parse.urlencode(params) ) - template_vars = {"link": link} + template_vars: TemplateVars = {"link": link} await self.send_email( email_address, @@ -165,7 +173,7 @@ class Mailer: % urllib.parse.urlencode(params) ) - template_vars = {"link": link} + template_vars: TemplateVars = {"link": link} await self.send_email( email_address, @@ -196,7 +204,7 @@ class Mailer: % urllib.parse.urlencode(params) ) - template_vars = {"link": link} + template_vars: TemplateVars = {"link": link} await self.send_email( email_address, @@ -210,8 +218,8 @@ class Mailer: app_id: str, user_id: str, email_address: str, - push_actions: Iterable[Dict[str, Any]], - reason: Dict[str, Any], + push_actions: Iterable[EmailPushAction], + reason: EmailReason, ) -> None: """ Send email regarding a user's room notifications @@ -230,7 +238,7 @@ class Mailer: [pa["event_id"] for pa in push_actions] ) - notifs_by_room: Dict[str, List[Dict[str, Any]]] = {} + notifs_by_room: Dict[str, List[EmailPushAction]] = {} for pa in push_actions: notifs_by_room.setdefault(pa["room_id"], []).append(pa) @@ -258,7 +266,7 @@ class Mailer: # actually sort our so-called rooms_in_order list, most recent room first rooms_in_order.sort(key=lambda r: -(notifs_by_room[r][-1]["received_ts"] or 0)) - rooms: List[Dict[str, Any]] = [] + rooms: List[RoomVars] = [] for r in rooms_in_order: roomvars = await self._get_room_vars( @@ -289,7 +297,7 @@ class Mailer: notifs_by_room, state_by_room, notif_events, reason ) - template_vars = { + template_vars: TemplateVars = { "user_display_name": user_display_name, "unsubscribe_link": self._make_unsubscribe_link( user_id, app_id, email_address @@ -302,10 +310,10 @@ class Mailer: await self.send_email(email_address, summary_text, template_vars) async def send_email( - self, email_address: str, subject: str, extra_template_vars: Dict[str, Any] + self, email_address: str, subject: str, extra_template_vars: TemplateVars ) -> None: """Send an email with the given information and template text""" - template_vars = { + template_vars: TemplateVars = { "app_name": self.app_name, "server_name": self.hs.config.server.server_name, } @@ -327,10 +335,10 @@ class Mailer: self, room_id: str, user_id: str, - notifs: Iterable[Dict[str, Any]], + notifs: Iterable[EmailPushAction], notif_events: Dict[str, EventBase], room_state_ids: StateMap[str], - ) -> Dict[str, Any]: + ) -> RoomVars: """ Generate the variables for notifications on a per-room basis. @@ -356,7 +364,7 @@ class Mailer: room_name = await calculate_room_name(self.store, room_state_ids, user_id) - room_vars: Dict[str, Any] = { + room_vars: RoomVars = { "title": room_name, "hash": string_ordinal_total(room_id), # See sender avatar hash "notifs": [], @@ -417,11 +425,11 @@ class Mailer: async def _get_notif_vars( self, - notif: Dict[str, Any], + notif: EmailPushAction, user_id: str, notif_event: EventBase, room_state_ids: StateMap[str], - ) -> Dict[str, Any]: + ) -> NotifVars: """ Generate the variables for a single notification. @@ -442,7 +450,7 @@ class Mailer: after_limit=CONTEXT_AFTER, ) - ret = { + ret: NotifVars = { "link": self._make_notif_link(notif), "ts": notif["received_ts"], "messages": [], @@ -461,8 +469,8 @@ class Mailer: return ret async def _get_message_vars( - self, notif: Dict[str, Any], event: EventBase, room_state_ids: StateMap[str] - ) -> Optional[Dict[str, Any]]: + self, notif: EmailPushAction, event: EventBase, room_state_ids: StateMap[str] + ) -> Optional[MessageVars]: """ Generate the variables for a single event, if possible. @@ -494,7 +502,9 @@ class Mailer: if sender_state_event: sender_name = name_from_member_event(sender_state_event) - sender_avatar_url = sender_state_event.content.get("avatar_url") + sender_avatar_url: Optional[str] = sender_state_event.content.get( + "avatar_url" + ) else: # No state could be found, fallback to the MXID. sender_name = event.sender @@ -504,7 +514,7 @@ class Mailer: # sender_hash % the number of default images to choose from sender_hash = string_ordinal_total(event.sender) - ret = { + ret: MessageVars = { "event_type": event.type, "is_historical": event.event_id != notif["event_id"], "id": event.event_id, @@ -519,6 +529,8 @@ class Mailer: return ret msgtype = event.content.get("msgtype") + if not isinstance(msgtype, str): + msgtype = None ret["msgtype"] = msgtype @@ -533,7 +545,7 @@ class Mailer: return ret def _add_text_message_vars( - self, messagevars: Dict[str, Any], event: EventBase + self, messagevars: MessageVars, event: EventBase ) -> None: """ Potentially add a sanitised message body to the message variables. @@ -543,8 +555,8 @@ class Mailer: event: The event under consideration. """ msgformat = event.content.get("format") - - messagevars["format"] = msgformat + if not isinstance(msgformat, str): + msgformat = None formatted_body = event.content.get("formatted_body") body = event.content.get("body") @@ -555,7 +567,7 @@ class Mailer: messagevars["body_text_html"] = safe_text(body) def _add_image_message_vars( - self, messagevars: Dict[str, Any], event: EventBase + self, messagevars: MessageVars, event: EventBase ) -> None: """ Potentially add an image URL to the message variables. @@ -570,7 +582,7 @@ class Mailer: async def _make_summary_text_single_room( self, room_id: str, - notifs: List[Dict[str, Any]], + notifs: List[EmailPushAction], room_state_ids: StateMap[str], notif_events: Dict[str, EventBase], user_id: str, @@ -685,10 +697,10 @@ class Mailer: async def _make_summary_text( self, - notifs_by_room: Dict[str, List[Dict[str, Any]]], + notifs_by_room: Dict[str, List[EmailPushAction]], room_state_ids: Dict[str, StateMap[str]], notif_events: Dict[str, EventBase], - reason: Dict[str, Any], + reason: EmailReason, ) -> str: """ Make a summary text for the email when multiple rooms have notifications. @@ -718,7 +730,7 @@ class Mailer: async def _make_summary_text_from_member_events( self, room_id: str, - notifs: List[Dict[str, Any]], + notifs: List[EmailPushAction], room_state_ids: StateMap[str], notif_events: Dict[str, EventBase], ) -> str: @@ -805,7 +817,7 @@ class Mailer: base_url = "https://matrix.to/#" return "%s/%s" % (base_url, room_id) - def _make_notif_link(self, notif: Dict[str, str]) -> str: + def _make_notif_link(self, notif: EmailPushAction) -> str: """ Generate a link to open an event in the web client. diff --git a/synapse/push/push_types.py b/synapse/push/push_types.py new file mode 100644 index 0000000000..8d16ab62ce --- /dev/null +++ b/synapse/push/push_types.py @@ -0,0 +1,136 @@ +# Copyright 2021 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import List, Optional + +from typing_extensions import TypedDict + + +class EmailReason(TypedDict, total=False): + """ + Information on the event that triggered the email to be sent + + room_id: the ID of the room the event was sent in + now: timestamp in ms when the email is being sent out + room_name: a human-readable name for the room the event was sent in + received_at: the time in milliseconds at which the event was received + delay_before_mail_ms: the amount of time in milliseconds Synapse always waits + before ever emailing about a notification (to give the user a chance to respond + to other push or notice the window) + last_sent_ts: the time in milliseconds at which a notification was last sent + for an event in this room + throttle_ms: the minimum amount of time in milliseconds between two + notifications can be sent for this room + """ + + room_id: str + now: int + room_name: Optional[str] + received_at: int + delay_before_mail_ms: int + last_sent_ts: int + throttle_ms: int + + +class MessageVars(TypedDict, total=False): + """ + Details about a specific message to include in a notification + + event_type: the type of the event + is_historical: a boolean, which is `False` if the message is the one + that triggered the notification, `True` otherwise + id: the ID of the event + ts: the time in milliseconds at which the event was sent + sender_name: the display name for the event's sender + sender_avatar_url: the avatar URL (as a `mxc://` URL) for the event's + sender + sender_hash: a hash of the user ID of the sender + msgtype: the type of the message + body_text_html: html representation of the message + body_text_plain: plaintext representation of the message + image_url: mxc url of an image, when "msgtype" is "m.image" + """ + + event_type: str + is_historical: bool + id: str + ts: int + sender_name: str + sender_avatar_url: Optional[str] + sender_hash: int + msgtype: Optional[str] + body_text_html: str + body_text_plain: str + image_url: str + + +class NotifVars(TypedDict): + """ + Details about an event we are about to include in a notification + + link: a `matrix.to` link to the event + ts: the time in milliseconds at which the event was received + messages: a list of messages containing one message before the event, the + message in the event, and one message after the event. + """ + + link: str + ts: Optional[int] + messages: List[MessageVars] + + +class RoomVars(TypedDict): + """ + Represents a room containing events to include in the email. + + title: a human-readable name for the room + hash: a hash of the ID of the room + invite: a boolean, which is `True` if the room is an invite the user hasn't + accepted yet, `False` otherwise + notifs: a list of events, or an empty list if `invite` is `True`. + link: a `matrix.to` link to the room + avator_url: url to the room's avator + """ + + title: Optional[str] + hash: int + invite: bool + notifs: List[NotifVars] + link: str + avatar_url: Optional[str] + + +class TemplateVars(TypedDict, total=False): + """ + Generic structure for passing to the email sender, can hold all the fields used in email templates. + + app_name: name of the app/service this homeserver is associated with + server_name: name of our own homeserver + link: a link to include into the email to be sent + user_display_name: the display name for the user receiving the notification + unsubscribe_link: the link users can click to unsubscribe from email notifications + summary_text: a summary of the notification(s). The text used can be customised + by configuring the various settings in the `email.subjects` section of the + configuration file. + rooms: a list of rooms containing events to include in the email + reason: information on the event that triggered the email to be sent + """ + + app_name: str + server_name: str + link: str + user_display_name: str + unsubscribe_link: str + summary_text: str + rooms: List[RoomVars] + reason: EmailReason diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index d957e770dc..3efdd0c920 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -16,6 +16,7 @@ import logging from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, Union import attr +from typing_extensions import TypedDict from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.storage._base import SQLBaseStore, db_to_json @@ -37,6 +38,20 @@ DEFAULT_HIGHLIGHT_ACTION = [ ] +class BasePushAction(TypedDict): + event_id: str + actions: List[Union[dict, str]] + + +class HttpPushAction(BasePushAction): + room_id: str + stream_ordering: int + + +class EmailPushAction(HttpPushAction): + received_ts: Optional[int] + + def _serialize_action(actions, is_highlight): """Custom serializer for actions. This allows us to "compress" common actions. @@ -221,7 +236,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): min_stream_ordering: int, max_stream_ordering: int, limit: int = 20, - ) -> List[dict]: + ) -> List[HttpPushAction]: """Get a list of the most recent unread push actions for a given user, within the given stream ordering range. Called by the httppusher. @@ -326,7 +341,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): min_stream_ordering: int, max_stream_ordering: int, limit: int = 20, - ) -> List[dict]: + ) -> List[EmailPushAction]: """Get a list of the most recent unread push actions for a given user, within the given stream ordering range. Called by the emailpusher -- cgit 1.5.1 From a6f1a3abecf8e8fd3e1bff439a06b853df18f194 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 2 Dec 2021 01:02:20 -0600 Subject: Add MSC3030 experimental client and federation API endpoints to get the closest event to a given timestamp (#9445) MSC3030: https://github.com/matrix-org/matrix-doc/pull/3030 Client API endpoint. This will also go and fetch from the federation API endpoint if unable to find an event locally or we found an extremity with possibly a closer event we don't know about. ``` GET /_matrix/client/unstable/org.matrix.msc3030/rooms//timestamp_to_event?ts=&dir= { "event_id": ... "origin_server_ts": ... } ``` Federation API endpoint: ``` GET /_matrix/federation/unstable/org.matrix.msc3030/timestamp_to_event/?ts=&dir= { "event_id": ... "origin_server_ts": ... } ``` Co-authored-by: Erik Johnston --- changelog.d/9445.feature | 1 + synapse/config/experimental.py | 3 + synapse/federation/federation_client.py | 77 +++++++++ synapse/federation/federation_server.py | 43 +++++ synapse/federation/transport/client.py | 36 ++++ synapse/federation/transport/server/__init__.py | 12 +- synapse/federation/transport/server/federation.py | 41 +++++ synapse/handlers/federation.py | 61 +++---- synapse/handlers/room.py | 144 ++++++++++++++++ synapse/http/servlet.py | 29 ++++ synapse/rest/client/room.py | 58 +++++++ synapse/server.py | 5 + synapse/storage/databases/main/events_worker.py | 195 ++++++++++++++++++++++ 13 files changed, 674 insertions(+), 31 deletions(-) create mode 100644 changelog.d/9445.feature (limited to 'synapse/storage/databases') diff --git a/changelog.d/9445.feature b/changelog.d/9445.feature new file mode 100644 index 0000000000..6d12eea71f --- /dev/null +++ b/changelog.d/9445.feature @@ -0,0 +1 @@ +Add [MSC3030](https://github.com/matrix-org/matrix-doc/pull/3030) experimental client and federation API endpoints to get the closest event to a given timestamp. diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py index 8b098ad48d..d78a15097c 100644 --- a/synapse/config/experimental.py +++ b/synapse/config/experimental.py @@ -46,3 +46,6 @@ class ExperimentalConfig(Config): # MSC3266 (room summary api) self.msc3266_enabled: bool = experimental.get("msc3266_enabled", False) + + # MSC3030 (Jump to date API endpoint) + self.msc3030_enabled: bool = experimental.get("msc3030_enabled", False) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index bc3f96c1fc..be1423da24 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -1517,6 +1517,83 @@ class FederationClient(FederationBase): self._get_room_hierarchy_cache[(room_id, suggested_only)] = result return result + async def timestamp_to_event( + self, destination: str, room_id: str, timestamp: int, direction: str + ) -> "TimestampToEventResponse": + """ + Calls a remote federating server at `destination` asking for their + closest event to the given timestamp in the given direction. Also + validates the response to always return the expected keys or raises an + error. + + Args: + destination: Domain name of the remote homeserver + room_id: Room to fetch the event from + timestamp: The point in time (inclusive) we should navigate from in + the given direction to find the closest event. + direction: ["f"|"b"] to indicate whether we should navigate forward + or backward from the given timestamp to find the closest event. + + Returns: + A parsed TimestampToEventResponse including the closest event_id + and origin_server_ts + + Raises: + Various exceptions when the request fails + InvalidResponseError when the response does not have the correct + keys or wrong types + """ + remote_response = await self.transport_layer.timestamp_to_event( + destination, room_id, timestamp, direction + ) + + if not isinstance(remote_response, dict): + raise InvalidResponseError( + "Response must be a JSON dictionary but received %r" % remote_response + ) + + try: + return TimestampToEventResponse.from_json_dict(remote_response) + except ValueError as e: + raise InvalidResponseError(str(e)) + + +@attr.s(frozen=True, slots=True, auto_attribs=True) +class TimestampToEventResponse: + """Typed response dictionary for the federation /timestamp_to_event endpoint""" + + event_id: str + origin_server_ts: int + + # the raw data, including the above keys + data: JsonDict + + @classmethod + def from_json_dict(cls, d: JsonDict) -> "TimestampToEventResponse": + """Parsed response from the federation /timestamp_to_event endpoint + + Args: + d: JSON object response to be parsed + + Raises: + ValueError if d does not the correct keys or they are the wrong types + """ + + event_id = d.get("event_id") + if not isinstance(event_id, str): + raise ValueError( + "Invalid response: 'event_id' must be a str but received %r" % event_id + ) + + origin_server_ts = d.get("origin_server_ts") + if not isinstance(origin_server_ts, int): + raise ValueError( + "Invalid response: 'origin_server_ts' must be a int but received %r" + % origin_server_ts + ) + + return cls(event_id, origin_server_ts, d) + @attr.s(frozen=True, slots=True, auto_attribs=True) class FederationSpaceSummaryEventResult: diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 8fbc75aa65..cce85526e7 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -110,6 +110,7 @@ class FederationServer(FederationBase): super().__init__(hs) self.handler = hs.get_federation_handler() + self.storage = hs.get_storage() self._federation_event_handler = hs.get_federation_event_handler() self.state = hs.get_state_handler() self._event_auth_handler = hs.get_event_auth_handler() @@ -200,6 +201,48 @@ class FederationServer(FederationBase): return 200, res + async def on_timestamp_to_event_request( + self, origin: str, room_id: str, timestamp: int, direction: str + ) -> Tuple[int, Dict[str, Any]]: + """When we receive a federated `/timestamp_to_event` request, + handle all of the logic for validating and fetching the event. + + Args: + origin: The server we received the event from + room_id: Room to fetch the event from + timestamp: The point in time (inclusive) we should navigate from in + the given direction to find the closest event. + direction: ["f"|"b"] to indicate whether we should navigate forward + or backward from the given timestamp to find the closest event. + + Returns: + Tuple indicating the response status code and dictionary response + body including `event_id`. + """ + with (await self._server_linearizer.queue((origin, room_id))): + origin_host, _ = parse_server_name(origin) + await self.check_server_matches_acl(origin_host, room_id) + + # We only try to fetch data from the local database + event_id = await self.store.get_event_id_for_timestamp( + room_id, timestamp, direction + ) + if event_id: + event = await self.store.get_event( + event_id, allow_none=False, allow_rejected=False + ) + + return 200, { + "event_id": event_id, + "origin_server_ts": event.origin_server_ts, + } + + raise SynapseError( + 404, + "Unable to find event from %s in direction %s" % (timestamp, direction), + errcode=Codes.NOT_FOUND, + ) + async def on_incoming_transaction( self, origin: str, diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index fe29bcfd4b..d1f4be641d 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -148,6 +148,42 @@ class TransportLayerClient: destination, path=path, args=args, try_trailing_slash_on_400=True ) + @log_function + async def timestamp_to_event( + self, destination: str, room_id: str, timestamp: int, direction: str + ) -> Union[JsonDict, List]: + """ + Calls a remote federating server at `destination` asking for their + closest event to the given timestamp in the given direction. + + Args: + destination: Domain name of the remote homeserver + room_id: Room to fetch the event from + timestamp: The point in time (inclusive) we should navigate from in + the given direction to find the closest event. + direction: ["f"|"b"] to indicate whether we should navigate forward + or backward from the given timestamp to find the closest event. + + Returns: + Response dict received from the remote homeserver. + + Raises: + Various exceptions when the request fails + """ + path = _create_path( + FEDERATION_UNSTABLE_PREFIX, + "/org.matrix.msc3030/timestamp_to_event/%s", + room_id, + ) + + args = {"ts": [str(timestamp)], "dir": [direction]} + + remote_response = await self.client.get_json( + destination, path=path, args=args, try_trailing_slash_on_400=True + ) + + return remote_response + @log_function async def send_transaction( self, diff --git a/synapse/federation/transport/server/__init__.py b/synapse/federation/transport/server/__init__.py index c32539bf5a..abcb8728f5 100644 --- a/synapse/federation/transport/server/__init__.py +++ b/synapse/federation/transport/server/__init__.py @@ -22,7 +22,10 @@ from synapse.federation.transport.server._base import ( Authenticator, BaseFederationServlet, ) -from synapse.federation.transport.server.federation import FEDERATION_SERVLET_CLASSES +from synapse.federation.transport.server.federation import ( + FEDERATION_SERVLET_CLASSES, + FederationTimestampLookupServlet, +) from synapse.federation.transport.server.groups_local import GROUP_LOCAL_SERVLET_CLASSES from synapse.federation.transport.server.groups_server import ( GROUP_SERVER_SERVLET_CLASSES, @@ -324,6 +327,13 @@ def register_servlets( ) for servletclass in DEFAULT_SERVLET_GROUPS[servlet_group]: + # Only allow the `/timestamp_to_event` servlet if msc3030 is enabled + if ( + servletclass == FederationTimestampLookupServlet + and not hs.config.experimental.msc3030_enabled + ): + continue + servletclass( hs=hs, authenticator=authenticator, diff --git a/synapse/federation/transport/server/federation.py b/synapse/federation/transport/server/federation.py index 66e915228c..77bfd88ad0 100644 --- a/synapse/federation/transport/server/federation.py +++ b/synapse/federation/transport/server/federation.py @@ -174,6 +174,46 @@ class FederationBackfillServlet(BaseFederationServerServlet): return await self.handler.on_backfill_request(origin, room_id, versions, limit) +class FederationTimestampLookupServlet(BaseFederationServerServlet): + """ + API endpoint to fetch the `event_id` of the closest event to the given + timestamp (`ts` query parameter) in the given direction (`dir` query + parameter). + + Useful for other homeservers when they're unable to find an event locally. + + `ts` is a timestamp in milliseconds where we will find the closest event in + the given direction. + + `dir` can be `f` or `b` to indicate forwards and backwards in time from the + given timestamp. + + GET /_matrix/federation/unstable/org.matrix.msc3030/timestamp_to_event/?ts=&dir= + { + "event_id": ... + } + """ + + PATH = "/timestamp_to_event/(?P[^/]*)/?" + PREFIX = FEDERATION_UNSTABLE_PREFIX + "/org.matrix.msc3030" + + async def on_GET( + self, + origin: str, + content: Literal[None], + query: Dict[bytes, List[bytes]], + room_id: str, + ) -> Tuple[int, JsonDict]: + timestamp = parse_integer_from_args(query, "ts", required=True) + direction = parse_string_from_args( + query, "dir", default="f", allowed_values=["f", "b"], required=True + ) + + return await self.handler.on_timestamp_to_event_request( + origin, room_id, timestamp, direction + ) + + class FederationQueryServlet(BaseFederationServerServlet): PATH = "/query/(?P[^/]*)" @@ -683,6 +723,7 @@ FEDERATION_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = ( FederationStateV1Servlet, FederationStateIdsServlet, FederationBackfillServlet, + FederationTimestampLookupServlet, FederationQueryServlet, FederationMakeJoinServlet, FederationMakeLeaveServlet, diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 3112cc88b1..1ea837d082 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -68,6 +68,37 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) +def get_domains_from_state(state: StateMap[EventBase]) -> List[Tuple[str, int]]: + """Get joined domains from state + + Args: + state: State map from type/state key to event. + + Returns: + Returns a list of servers with the lowest depth of their joins. + Sorted by lowest depth first. + """ + joined_users = [ + (state_key, int(event.depth)) + for (e_type, state_key), event in state.items() + if e_type == EventTypes.Member and event.membership == Membership.JOIN + ] + + joined_domains: Dict[str, int] = {} + for u, d in joined_users: + try: + dom = get_domain_from_id(u) + old_d = joined_domains.get(dom) + if old_d: + joined_domains[dom] = min(d, old_d) + else: + joined_domains[dom] = d + except Exception: + pass + + return sorted(joined_domains.items(), key=lambda d: d[1]) + + class FederationHandler: """Handles general incoming federation requests @@ -268,36 +299,6 @@ class FederationHandler: curr_state = await self.state_handler.get_current_state(room_id) - def get_domains_from_state(state: StateMap[EventBase]) -> List[Tuple[str, int]]: - """Get joined domains from state - - Args: - state: State map from type/state key to event. - - Returns: - Returns a list of servers with the lowest depth of their joins. - Sorted by lowest depth first. - """ - joined_users = [ - (state_key, int(event.depth)) - for (e_type, state_key), event in state.items() - if e_type == EventTypes.Member and event.membership == Membership.JOIN - ] - - joined_domains: Dict[str, int] = {} - for u, d in joined_users: - try: - dom = get_domain_from_id(u) - old_d = joined_domains.get(dom) - if old_d: - joined_domains[dom] = min(d, old_d) - else: - joined_domains[dom] = d - except Exception: - pass - - return sorted(joined_domains.items(), key=lambda d: d[1]) - curr_domains = get_domains_from_state(curr_state) likely_domains = [ diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 88053f9869..2bcdf32dcc 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -46,6 +46,7 @@ from synapse.api.constants import ( from synapse.api.errors import ( AuthError, Codes, + HttpResponseException, LimitExceededError, NotFoundError, StoreError, @@ -56,6 +57,8 @@ from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion from synapse.event_auth import validate_event_for_room_version from synapse.events import EventBase from synapse.events.utils import copy_power_levels_contents +from synapse.federation.federation_client import InvalidResponseError +from synapse.handlers.federation import get_domains_from_state from synapse.rest.admin._base import assert_user_is_admin from synapse.storage.state import StateFilter from synapse.streams import EventSource @@ -1220,6 +1223,147 @@ class RoomContextHandler: return results +class TimestampLookupHandler: + def __init__(self, hs: "HomeServer"): + self.server_name = hs.hostname + self.store = hs.get_datastore() + self.state_handler = hs.get_state_handler() + self.federation_client = hs.get_federation_client() + + async def get_event_for_timestamp( + self, + requester: Requester, + room_id: str, + timestamp: int, + direction: str, + ) -> Tuple[str, int]: + """Find the closest event to the given timestamp in the given direction. + If we can't find an event locally or the event we have locally is next to a gap, + it will ask other federated homeservers for an event. + + Args: + requester: The user making the request according to the access token + room_id: Room to fetch the event from + timestamp: The point in time (inclusive) we should navigate from in + the given direction to find the closest event. + direction: ["f"|"b"] to indicate whether we should navigate forward + or backward from the given timestamp to find the closest event. + + Returns: + A tuple containing the `event_id` closest to the given timestamp in + the given direction and the `origin_server_ts`. + + Raises: + SynapseError if unable to find any event locally in the given direction + """ + + local_event_id = await self.store.get_event_id_for_timestamp( + room_id, timestamp, direction + ) + logger.debug( + "get_event_for_timestamp: locally, we found event_id=%s closest to timestamp=%s", + local_event_id, + timestamp, + ) + + # Check for gaps in the history where events could be hiding in between + # the timestamp given and the event we were able to find locally + is_event_next_to_backward_gap = False + is_event_next_to_forward_gap = False + if local_event_id: + local_event = await self.store.get_event( + local_event_id, allow_none=False, allow_rejected=False + ) + + if direction == "f": + # We only need to check for a backward gap if we're looking forwards + # to ensure there is nothing in between. + is_event_next_to_backward_gap = ( + await self.store.is_event_next_to_backward_gap(local_event) + ) + elif direction == "b": + # We only need to check for a forward gap if we're looking backwards + # to ensure there is nothing in between + is_event_next_to_forward_gap = ( + await self.store.is_event_next_to_forward_gap(local_event) + ) + + # If we found a gap, we should probably ask another homeserver first + # about more history in between + if ( + not local_event_id + or is_event_next_to_backward_gap + or is_event_next_to_forward_gap + ): + logger.debug( + "get_event_for_timestamp: locally, we found event_id=%s closest to timestamp=%s which is next to a gap in event history so we're asking other homeservers first", + local_event_id, + timestamp, + ) + + # Find other homeservers from the given state in the room + curr_state = await self.state_handler.get_current_state(room_id) + curr_domains = get_domains_from_state(curr_state) + likely_domains = [ + domain for domain, depth in curr_domains if domain != self.server_name + ] + + # Loop through each homeserver candidate until we get a succesful response + for domain in likely_domains: + try: + remote_response = await self.federation_client.timestamp_to_event( + domain, room_id, timestamp, direction + ) + logger.debug( + "get_event_for_timestamp: response from domain(%s)=%s", + domain, + remote_response, + ) + + # TODO: Do we want to persist this as an extremity? + # TODO: I think ideally, we would try to backfill from + # this event and run this whole + # `get_event_for_timestamp` function again to make sure + # they didn't give us an event from their gappy history. + remote_event_id = remote_response.event_id + origin_server_ts = remote_response.origin_server_ts + + # Only return the remote event if it's closer than the local event + if not local_event or ( + abs(origin_server_ts - timestamp) + < abs(local_event.origin_server_ts - timestamp) + ): + return remote_event_id, origin_server_ts + except (HttpResponseException, InvalidResponseError) as ex: + # Let's not put a high priority on some other homeserver + # failing to respond or giving a random response + logger.debug( + "Failed to fetch /timestamp_to_event from %s because of exception(%s) %s args=%s", + domain, + type(ex).__name__, + ex, + ex.args, + ) + except Exception as ex: + # But we do want to see some exceptions in our code + logger.warning( + "Failed to fetch /timestamp_to_event from %s because of exception(%s) %s args=%s", + domain, + type(ex).__name__, + ex, + ex.args, + ) + + if not local_event_id: + raise SynapseError( + 404, + "Unable to find event from %s in direction %s" % (timestamp, direction), + errcode=Codes.NOT_FOUND, + ) + + return local_event_id, local_event.origin_server_ts + + class RoomEventSource(EventSource[RoomStreamToken, EventBase]): def __init__(self, hs: "HomeServer"): self.store = hs.get_datastore() diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py index 91ba93372c..6dd9b9ad03 100644 --- a/synapse/http/servlet.py +++ b/synapse/http/servlet.py @@ -79,6 +79,35 @@ def parse_integer( return parse_integer_from_args(args, name, default, required) +@overload +def parse_integer_from_args( + args: Mapping[bytes, Sequence[bytes]], + name: str, + default: Optional[int] = None, +) -> Optional[int]: + ... + + +@overload +def parse_integer_from_args( + args: Mapping[bytes, Sequence[bytes]], + name: str, + *, + required: Literal[True], +) -> int: + ... + + +@overload +def parse_integer_from_args( + args: Mapping[bytes, Sequence[bytes]], + name: str, + default: Optional[int] = None, + required: bool = False, +) -> Optional[int]: + ... + + def parse_integer_from_args( args: Mapping[bytes, Sequence[bytes]], name: str, diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py index 99f303c88e..3598967be0 100644 --- a/synapse/rest/client/room.py +++ b/synapse/rest/client/room.py @@ -1070,6 +1070,62 @@ def register_txn_path( ) +class TimestampLookupRestServlet(RestServlet): + """ + API endpoint to fetch the `event_id` of the closest event to the given + timestamp (`ts` query parameter) in the given direction (`dir` query + parameter). + + Useful for cases like jump to date so you can start paginating messages from + a given date in the archive. + + `ts` is a timestamp in milliseconds where we will find the closest event in + the given direction. + + `dir` can be `f` or `b` to indicate forwards and backwards in time from the + given timestamp. + + GET /_matrix/client/unstable/org.matrix.msc3030/rooms//timestamp_to_event?ts=&dir= + { + "event_id": ... + } + """ + + PATTERNS = ( + re.compile( + "^/_matrix/client/unstable/org.matrix.msc3030" + "/rooms/(?P[^/]*)/timestamp_to_event$" + ), + ) + + def __init__(self, hs: "HomeServer"): + super().__init__() + self._auth = hs.get_auth() + self._store = hs.get_datastore() + self.timestamp_lookup_handler = hs.get_timestamp_lookup_handler() + + async def on_GET( + self, request: SynapseRequest, room_id: str + ) -> Tuple[int, JsonDict]: + requester = await self._auth.get_user_by_req(request) + await self._auth.check_user_in_room(room_id, requester.user.to_string()) + + timestamp = parse_integer(request, "ts", required=True) + direction = parse_string(request, "dir", default="f", allowed_values=["f", "b"]) + + ( + event_id, + origin_server_ts, + ) = await self.timestamp_lookup_handler.get_event_for_timestamp( + requester, room_id, timestamp, direction + ) + + return 200, { + "event_id": event_id, + "origin_server_ts": origin_server_ts, + } + + class RoomSpaceSummaryRestServlet(RestServlet): PATTERNS = ( re.compile( @@ -1239,6 +1295,8 @@ def register_servlets( RoomAliasListServlet(hs).register(http_server) SearchRestServlet(hs).register(http_server) RoomCreateRestServlet(hs).register(http_server) + if hs.config.experimental.msc3030_enabled: + TimestampLookupRestServlet(hs).register(http_server) # Some servlets only get registered for the main process. if not is_worker: diff --git a/synapse/server.py b/synapse/server.py index 877eba6c08..185e40e4da 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -97,6 +97,7 @@ from synapse.handlers.room import ( RoomContextHandler, RoomCreationHandler, RoomShutdownHandler, + TimestampLookupHandler, ) from synapse.handlers.room_batch import RoomBatchHandler from synapse.handlers.room_list import RoomListHandler @@ -728,6 +729,10 @@ class HomeServer(metaclass=abc.ABCMeta): def get_room_context_handler(self) -> RoomContextHandler: return RoomContextHandler(self) + @cache_in_self + def get_timestamp_lookup_handler(self) -> TimestampLookupHandler: + return TimestampLookupHandler(self) + @cache_in_self def get_registration_handler(self) -> RegistrationHandler: return RegistrationHandler(self) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 4cefc0a07e..fd19674f93 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1762,3 +1762,198 @@ class EventsWorkerStore(SQLBaseStore): "_cleanup_old_transaction_ids", _cleanup_old_transaction_ids_txn, ) + + async def is_event_next_to_backward_gap(self, event: EventBase) -> bool: + """Check if the given event is next to a backward gap of missing events. + A(False)--->B(False)--->C(True)---> + + Args: + room_id: room where the event lives + event_id: event to check + + Returns: + Boolean indicating whether it's an extremity + """ + + def is_event_next_to_backward_gap_txn(txn: LoggingTransaction) -> bool: + # If the event in question has any of its prev_events listed as a + # backward extremity, it's next to a gap. + # + # We can't just check the backward edges in `event_edges` because + # when we persist events, we will also record the prev_events as + # edges to the event in question regardless of whether we have those + # prev_events yet. We need to check whether those prev_events are + # backward extremities, also known as gaps, that need to be + # backfilled. + backward_extremity_query = """ + SELECT 1 FROM event_backward_extremities + WHERE + room_id = ? + AND %s + LIMIT 1 + """ + + # If the event in question is a backward extremity or has any of its + # prev_events listed as a backward extremity, it's next to a + # backward gap. + clause, args = make_in_list_sql_clause( + self.database_engine, + "event_id", + [event.event_id] + list(event.prev_event_ids()), + ) + + txn.execute(backward_extremity_query % (clause,), [event.room_id] + args) + backward_extremities = txn.fetchall() + + # We consider any backward extremity as a backward gap + if len(backward_extremities): + return True + + return False + + return await self.db_pool.runInteraction( + "is_event_next_to_backward_gap_txn", + is_event_next_to_backward_gap_txn, + ) + + async def is_event_next_to_forward_gap(self, event: EventBase) -> bool: + """Check if the given event is next to a forward gap of missing events. + The gap in front of the latest events is not considered a gap. + A(False)--->B(False)--->C(False)---> + A(False)--->B(False)---> --->D(True)--->E(False) + + Args: + room_id: room where the event lives + event_id: event to check + + Returns: + Boolean indicating whether it's an extremity + """ + + def is_event_next_to_gap_txn(txn: LoggingTransaction) -> bool: + # If the event in question is a forward extremity, we will just + # consider any potential forward gap as not a gap since it's one of + # the latest events in the room. + # + # `event_forward_extremities` does not include backfilled or outlier + # events so we can't rely on it to find forward gaps. We can only + # use it to determine whether a message is the latest in the room. + # + # We can't combine this query with the `forward_edge_query` below + # because if the event in question has no forward edges (isn't + # referenced by any other event's prev_events) but is in + # `event_forward_extremities`, we don't want to return 0 rows and + # say it's next to a gap. + forward_extremity_query = """ + SELECT 1 FROM event_forward_extremities + WHERE + room_id = ? + AND event_id = ? + LIMIT 1 + """ + + # Check to see whether the event in question is already referenced + # by another event. If we don't see any edges, we're next to a + # forward gap. + forward_edge_query = """ + SELECT 1 FROM event_edges + /* Check to make sure the event referencing our event in question is not rejected */ + LEFT JOIN rejections ON event_edges.event_id == rejections.event_id + WHERE + event_edges.room_id = ? + AND event_edges.prev_event_id = ? + /* It's not a valid edge if the event referencing our event in + * question is rejected. + */ + AND rejections.event_id IS NULL + LIMIT 1 + """ + + # We consider any forward extremity as the latest in the room and + # not a forward gap. + # + # To expand, even though there is technically a gap at the front of + # the room where the forward extremities are, we consider those the + # latest messages in the room so asking other homeservers for more + # is useless. The new latest messages will just be federated as + # usual. + txn.execute(forward_extremity_query, (event.room_id, event.event_id)) + forward_extremities = txn.fetchall() + if len(forward_extremities): + return False + + # If there are no forward edges to the event in question (another + # event hasn't referenced this event in their prev_events), then we + # assume there is a forward gap in the history. + txn.execute(forward_edge_query, (event.room_id, event.event_id)) + forward_edges = txn.fetchall() + if not len(forward_edges): + return True + + return False + + return await self.db_pool.runInteraction( + "is_event_next_to_gap_txn", + is_event_next_to_gap_txn, + ) + + async def get_event_id_for_timestamp( + self, room_id: str, timestamp: int, direction: str + ) -> Optional[str]: + """Find the closest event to the given timestamp in the given direction. + + Args: + room_id: Room to fetch the event from + timestamp: The point in time (inclusive) we should navigate from in + the given direction to find the closest event. + direction: ["f"|"b"] to indicate whether we should navigate forward + or backward from the given timestamp to find the closest event. + + Returns: + The closest event_id otherwise None if we can't find any event in + the given direction. + """ + + sql_template = """ + SELECT event_id FROM events + LEFT JOIN rejections USING (event_id) + WHERE + origin_server_ts %s ? + AND room_id = ? + /* Make sure event is not rejected */ + AND rejections.event_id IS NULL + ORDER BY origin_server_ts %s + LIMIT 1; + """ + + def get_event_id_for_timestamp_txn(txn: LoggingTransaction) -> Optional[str]: + if direction == "b": + # Find closest event *before* a given timestamp. We use descending + # (which gives values largest to smallest) because we want the + # largest possible timestamp *before* the given timestamp. + comparison_operator = "<=" + order = "DESC" + else: + # Find closest event *after* a given timestamp. We use ascending + # (which gives values smallest to largest) because we want the + # closest possible timestamp *after* the given timestamp. + comparison_operator = ">=" + order = "ASC" + + txn.execute( + sql_template % (comparison_operator, order), (timestamp, room_id) + ) + row = txn.fetchone() + if row: + (event_id,) = row + return event_id + + return None + + if direction not in ("f", "b"): + raise ValueError("Unknown direction: %s" % (direction,)) + + return await self.db_pool.runInteraction( + "get_event_id_for_timestamp_txn", + get_event_id_for_timestamp_txn, + ) -- cgit 1.5.1 From 435f04480728c5d982e1a63c1b2777784bf9cd26 Mon Sep 17 00:00:00 2001 From: reivilibre Date: Thu, 2 Dec 2021 15:30:05 +0000 Subject: Add type annotations to `tests.storage.test_appservice`. (#11488) --- changelog.d/11488.misc | 1 + mypy.ini | 1 - synapse/appservice/__init__.py | 3 +- synapse/storage/databases/main/appservice.py | 6 +- tests/storage/test_appservice.py | 140 ++++++++++++++++++--------- 5 files changed, 98 insertions(+), 53 deletions(-) create mode 100644 changelog.d/11488.misc (limited to 'synapse/storage/databases') diff --git a/changelog.d/11488.misc b/changelog.d/11488.misc new file mode 100644 index 0000000000..c14a7d2e98 --- /dev/null +++ b/changelog.d/11488.misc @@ -0,0 +1 @@ +Add type annotations to `tests.storage.test_appservice`. \ No newline at end of file diff --git a/mypy.ini b/mypy.ini index 51056a8f64..99b5c41ad6 100644 --- a/mypy.ini +++ b/mypy.ini @@ -111,7 +111,6 @@ exclude = (?x) |tests/server_notices/test_resource_limits_server_notices.py |tests/state/test_v2.py |tests/storage/test_account_data.py - |tests/storage/test_appservice.py |tests/storage/test_background_update.py |tests/storage/test_base.py |tests/storage/test_client_ips.py diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index 6504c6bd3f..f9d3bd337d 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -13,6 +13,7 @@ # limitations under the License. import logging import re +from enum import Enum from typing import TYPE_CHECKING, Iterable, List, Match, Optional from synapse.api.constants import EventTypes @@ -27,7 +28,7 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) -class ApplicationServiceState: +class ApplicationServiceState(Enum): DOWN = "down" UP = "up" diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py index baec35ee27..4a883dc166 100644 --- a/synapse/storage/databases/main/appservice.py +++ b/synapse/storage/databases/main/appservice.py @@ -143,7 +143,7 @@ class ApplicationServiceTransactionWorkerStore( A list of ApplicationServices, which may be empty. """ results = await self.db_pool.simple_select_list( - "application_services_state", {"state": state}, ["as_id"] + "application_services_state", {"state": state.value}, ["as_id"] ) # NB: This assumes this class is linked with ApplicationServiceStore as_list = self.get_app_services() @@ -173,7 +173,7 @@ class ApplicationServiceTransactionWorkerStore( desc="get_appservice_state", ) if result: - return result.get("state") + return ApplicationServiceState(result.get("state")) return None async def set_appservice_state( @@ -186,7 +186,7 @@ class ApplicationServiceTransactionWorkerStore( state: The connectivity state to apply. """ await self.db_pool.simple_upsert( - "application_services_state", {"as_id": service.id}, {"state": state} + "application_services_state", {"as_id": service.id}, {"state": state.value} ) async def create_appservice_txn( diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index f26d5acf9c..4b20a28ca2 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -14,19 +14,25 @@ import json import os import tempfile +from typing import Any, Generator, List, Optional, cast from unittest.mock import Mock import yaml from twisted.internet import defer +from twisted.internet.defer import Deferred +from twisted.test.proto_helpers import MemoryReactor from synapse.appservice import ApplicationService, ApplicationServiceState from synapse.config._base import ConfigError +from synapse.events import EventBase +from synapse.server import HomeServer from synapse.storage.database import DatabasePool, make_conn from synapse.storage.databases.main.appservice import ( ApplicationServiceStore, ApplicationServiceTransactionStore, ) +from synapse.util import Clock from tests import unittest from tests.test_utils import make_awaitable @@ -36,7 +42,7 @@ from tests.utils import setup_test_homeserver class ApplicationServiceStoreTestCase(unittest.TestCase): @defer.inlineCallbacks def setUp(self): - self.as_yaml_files = [] + self.as_yaml_files: List[str] = [] hs = yield setup_test_homeserver( self.addCleanup, federation_sender=Mock(), federation_client=Mock() ) @@ -58,7 +64,7 @@ class ApplicationServiceStoreTestCase(unittest.TestCase): database, make_conn(database._database_config, database.engine, "test"), hs ) - def tearDown(self): + def tearDown(self) -> None: # TODO: suboptimal that we need to create files for tests! for f in self.as_yaml_files: try: @@ -66,7 +72,7 @@ class ApplicationServiceStoreTestCase(unittest.TestCase): except Exception: pass - def _add_appservice(self, as_token, id, url, hs_token, sender): + def _add_appservice(self, as_token, id, url, hs_token, sender) -> None: as_yaml = { "url": url, "as_token": as_token, @@ -80,12 +86,13 @@ class ApplicationServiceStoreTestCase(unittest.TestCase): outfile.write(yaml.dump(as_yaml)) self.as_yaml_files.append(as_token) - def test_retrieve_unknown_service_token(self): + def test_retrieve_unknown_service_token(self) -> None: service = self.store.get_app_service_by_token("invalid_token") self.assertEquals(service, None) - def test_retrieval_of_service(self): + def test_retrieval_of_service(self) -> None: stored_service = self.store.get_app_service_by_token(self.as_token) + assert stored_service is not None self.assertEquals(stored_service.token, self.as_token) self.assertEquals(stored_service.id, self.as_id) self.assertEquals(stored_service.url, self.as_url) @@ -93,7 +100,7 @@ class ApplicationServiceStoreTestCase(unittest.TestCase): self.assertEquals(stored_service.namespaces[ApplicationService.NS_ROOMS], []) self.assertEquals(stored_service.namespaces[ApplicationService.NS_USERS], []) - def test_retrieval_of_all_services(self): + def test_retrieval_of_all_services(self) -> None: services = self.store.get_app_services() self.assertEquals(len(services), 3) @@ -101,7 +108,7 @@ class ApplicationServiceStoreTestCase(unittest.TestCase): class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): @defer.inlineCallbacks def setUp(self): - self.as_yaml_files = [] + self.as_yaml_files: List[str] = [] hs = yield setup_test_homeserver( self.addCleanup, federation_sender=Mock(), federation_client=Mock() @@ -117,7 +124,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): {"token": "gamma_tok", "url": "https://gamma.com", "id": "id_gamma"}, ] for s in self.as_list: - yield self._add_service(s["url"], s["token"], s["id"]) + self._add_service(s["url"], s["token"], s["id"]) self.as_yaml_files = [] @@ -131,7 +138,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): database, make_conn(db_config, self.engine, "test"), hs ) - def _add_service(self, url, as_token, id): + def _add_service(self, url, as_token, id) -> None: as_yaml = { "url": url, "as_token": as_token, @@ -145,13 +152,15 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): outfile.write(yaml.dump(as_yaml)) self.as_yaml_files.append(as_token) - def _set_state(self, id, state, txn=None): + def _set_state( + self, id: str, state: ApplicationServiceState, txn: Optional[int] = None + ): return self.db_pool.runOperation( self.engine.convert_param_style( "INSERT INTO application_services_state(as_id, state, last_txn) " "VALUES(?,?,?)" ), - (id, state, txn), + (id, state.value, txn), ) def _insert_txn(self, as_id, txn_id, events): @@ -169,24 +178,30 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): "INSERT INTO application_services_state(as_id, last_txn, state) " "VALUES(?,?,?)" ), - (as_id, txn_id, ApplicationServiceState.UP), + (as_id, txn_id, ApplicationServiceState.UP.value), ) @defer.inlineCallbacks - def test_get_appservice_state_none(self): + def test_get_appservice_state_none( + self, + ) -> Generator["Deferred[object]", object, None]: service = Mock(id="999") state = yield defer.ensureDeferred(self.store.get_appservice_state(service)) self.assertEquals(None, state) @defer.inlineCallbacks - def test_get_appservice_state_up(self): + def test_get_appservice_state_up( + self, + ) -> Generator["Deferred[object]", object, None]: yield self._set_state(self.as_list[0]["id"], ApplicationServiceState.UP) service = Mock(id=self.as_list[0]["id"]) state = yield defer.ensureDeferred(self.store.get_appservice_state(service)) self.assertEquals(ApplicationServiceState.UP, state) @defer.inlineCallbacks - def test_get_appservice_state_down(self): + def test_get_appservice_state_down( + self, + ) -> Generator["Deferred[object]", object, None]: yield self._set_state(self.as_list[0]["id"], ApplicationServiceState.UP) yield self._set_state(self.as_list[1]["id"], ApplicationServiceState.DOWN) yield self._set_state(self.as_list[2]["id"], ApplicationServiceState.DOWN) @@ -195,14 +210,18 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): self.assertEquals(ApplicationServiceState.DOWN, state) @defer.inlineCallbacks - def test_get_appservices_by_state_none(self): + def test_get_appservices_by_state_none( + self, + ) -> Generator["Deferred[object]", Any, None]: services = yield defer.ensureDeferred( self.store.get_appservices_by_state(ApplicationServiceState.DOWN) ) self.assertEquals(0, len(services)) @defer.inlineCallbacks - def test_set_appservices_state_down(self): + def test_set_appservices_state_down( + self, + ) -> Generator["Deferred[object]", Any, None]: service = Mock(id=self.as_list[1]["id"]) yield defer.ensureDeferred( self.store.set_appservice_state(service, ApplicationServiceState.DOWN) @@ -211,12 +230,14 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): self.engine.convert_param_style( "SELECT as_id FROM application_services_state WHERE state=?" ), - (ApplicationServiceState.DOWN,), + (ApplicationServiceState.DOWN.value,), ) self.assertEquals(service.id, rows[0][0]) @defer.inlineCallbacks - def test_set_appservices_state_multiple_up(self): + def test_set_appservices_state_multiple_up( + self, + ) -> Generator["Deferred[object]", Any, None]: service = Mock(id=self.as_list[1]["id"]) yield defer.ensureDeferred( self.store.set_appservice_state(service, ApplicationServiceState.UP) @@ -231,14 +252,16 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): self.engine.convert_param_style( "SELECT as_id FROM application_services_state WHERE state=?" ), - (ApplicationServiceState.UP,), + (ApplicationServiceState.UP.value,), ) self.assertEquals(service.id, rows[0][0]) @defer.inlineCallbacks - def test_create_appservice_txn_first(self): + def test_create_appservice_txn_first( + self, + ) -> Generator["Deferred[object]", Any, None]: service = Mock(id=self.as_list[0]["id"]) - events = [Mock(event_id="e1"), Mock(event_id="e2")] + events = cast(List[EventBase], [Mock(event_id="e1"), Mock(event_id="e2")]) txn = yield defer.ensureDeferred( self.store.create_appservice_txn(service, events, []) ) @@ -247,9 +270,11 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): self.assertEquals(txn.service, service) @defer.inlineCallbacks - def test_create_appservice_txn_older_last_txn(self): + def test_create_appservice_txn_older_last_txn( + self, + ) -> Generator["Deferred[object]", Any, None]: service = Mock(id=self.as_list[0]["id"]) - events = [Mock(event_id="e1"), Mock(event_id="e2")] + events = cast(List[EventBase], [Mock(event_id="e1"), Mock(event_id="e2")]) yield self._set_last_txn(service.id, 9643) # AS is falling behind yield self._insert_txn(service.id, 9644, events) yield self._insert_txn(service.id, 9645, events) @@ -261,9 +286,11 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): self.assertEquals(txn.service, service) @defer.inlineCallbacks - def test_create_appservice_txn_up_to_date_last_txn(self): + def test_create_appservice_txn_up_to_date_last_txn( + self, + ) -> Generator["Deferred[object]", Any, None]: service = Mock(id=self.as_list[0]["id"]) - events = [Mock(event_id="e1"), Mock(event_id="e2")] + events = cast(List[EventBase], [Mock(event_id="e1"), Mock(event_id="e2")]) yield self._set_last_txn(service.id, 9643) txn = yield defer.ensureDeferred( self.store.create_appservice_txn(service, events, []) @@ -273,9 +300,11 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): self.assertEquals(txn.service, service) @defer.inlineCallbacks - def test_create_appservice_txn_up_fuzzing(self): + def test_create_appservice_txn_up_fuzzing( + self, + ) -> Generator["Deferred[object]", Any, None]: service = Mock(id=self.as_list[0]["id"]) - events = [Mock(event_id="e1"), Mock(event_id="e2")] + events = cast(List[EventBase], [Mock(event_id="e1"), Mock(event_id="e2")]) yield self._set_last_txn(service.id, 9643) # dump in rows with higher IDs to make sure the queries aren't wrong. @@ -296,7 +325,9 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): self.assertEquals(txn.service, service) @defer.inlineCallbacks - def test_complete_appservice_txn_first_txn(self): + def test_complete_appservice_txn_first_txn( + self, + ) -> Generator["Deferred[object]", Any, None]: service = Mock(id=self.as_list[0]["id"]) events = [Mock(event_id="e1"), Mock(event_id="e2")] txn_id = 1 @@ -324,7 +355,9 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): self.assertEquals(0, len(res)) @defer.inlineCallbacks - def test_complete_appservice_txn_existing_in_state_table(self): + def test_complete_appservice_txn_existing_in_state_table( + self, + ) -> Generator["Deferred[object]", Any, None]: service = Mock(id=self.as_list[0]["id"]) events = [Mock(event_id="e1"), Mock(event_id="e2")] txn_id = 5 @@ -342,7 +375,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): ) self.assertEquals(1, len(res)) self.assertEquals(txn_id, res[0][0]) - self.assertEquals(ApplicationServiceState.UP, res[0][1]) + self.assertEquals(ApplicationServiceState.UP.value, res[0][1]) res = yield self.db_pool.runQuery( self.engine.convert_param_style( @@ -353,20 +386,23 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): self.assertEquals(0, len(res)) @defer.inlineCallbacks - def test_get_oldest_unsent_txn_none(self): + def test_get_oldest_unsent_txn_none( + self, + ) -> Generator["Deferred[object]", Any, None]: service = Mock(id=self.as_list[0]["id"]) txn = yield defer.ensureDeferred(self.store.get_oldest_unsent_txn(service)) self.assertEquals(None, txn) @defer.inlineCallbacks - def test_get_oldest_unsent_txn(self): + def test_get_oldest_unsent_txn(self) -> Generator["Deferred[object]", Any, None]: service = Mock(id=self.as_list[0]["id"]) events = [Mock(event_id="e1"), Mock(event_id="e2")] other_events = [Mock(event_id="e5"), Mock(event_id="e6")] # we aren't testing store._base stuff here, so mock this out - self.store.get_events_as_list = Mock(return_value=make_awaitable(events)) + # (ignore needed because Mypy won't allow us to assign to a method otherwise) + self.store.get_events_as_list = Mock(return_value=make_awaitable(events)) # type: ignore[assignment] yield self._insert_txn(self.as_list[1]["id"], 9, other_events) yield self._insert_txn(service.id, 10, events) @@ -379,7 +415,9 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): self.assertEquals(events, txn.events) @defer.inlineCallbacks - def test_get_appservices_by_state_single(self): + def test_get_appservices_by_state_single( + self, + ) -> Generator["Deferred[object]", Any, None]: yield self._set_state(self.as_list[0]["id"], ApplicationServiceState.DOWN) yield self._set_state(self.as_list[1]["id"], ApplicationServiceState.UP) @@ -390,7 +428,9 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): self.assertEquals(self.as_list[0]["id"], services[0].id) @defer.inlineCallbacks - def test_get_appservices_by_state_multiple(self): + def test_get_appservices_by_state_multiple( + self, + ) -> Generator["Deferred[object]", Any, None]: yield self._set_state(self.as_list[0]["id"], ApplicationServiceState.DOWN) yield self._set_state(self.as_list[1]["id"], ApplicationServiceState.UP) yield self._set_state(self.as_list[2]["id"], ApplicationServiceState.DOWN) @@ -407,16 +447,20 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): class ApplicationServiceStoreTypeStreamIds(unittest.HomeserverTestCase): - def make_homeserver(self, reactor, clock): + def make_homeserver(self, reactor, clock) -> "HomeServer": hs = self.setup_test_homeserver() return hs - def prepare(self, hs, reactor, clock): + def prepare( + self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer + ) -> None: self.service = Mock(id="foo") self.store = self.hs.get_datastore() - self.get_success(self.store.set_appservice_state(self.service, "up")) + self.get_success( + self.store.set_appservice_state(self.service, ApplicationServiceState.UP) + ) - def test_get_type_stream_id_for_appservice_no_value(self): + def test_get_type_stream_id_for_appservice_no_value(self) -> None: value = self.get_success( self.store.get_type_stream_id_for_appservice(self.service, "read_receipt") ) @@ -427,13 +471,13 @@ class ApplicationServiceStoreTypeStreamIds(unittest.HomeserverTestCase): ) self.assertEquals(value, 0) - def test_get_type_stream_id_for_appservice_invalid_type(self): + def test_get_type_stream_id_for_appservice_invalid_type(self) -> None: self.get_failure( self.store.get_type_stream_id_for_appservice(self.service, "foobar"), ValueError, ) - def test_set_type_stream_id_for_appservice(self): + def test_set_type_stream_id_for_appservice(self) -> None: read_receipt_value = 1024 self.get_success( self.store.set_type_stream_id_for_appservice( @@ -455,7 +499,7 @@ class ApplicationServiceStoreTypeStreamIds(unittest.HomeserverTestCase): ) self.assertEqual(result, read_receipt_value) - def test_set_type_stream_id_for_appservice_invalid_type(self): + def test_set_type_stream_id_for_appservice_invalid_type(self) -> None: self.get_failure( self.store.set_type_stream_id_for_appservice(self.service, "foobar", 1024), ValueError, @@ -464,12 +508,12 @@ class ApplicationServiceStoreTypeStreamIds(unittest.HomeserverTestCase): # required for ApplicationServiceTransactionStoreTestCase tests class TestTransactionStore(ApplicationServiceTransactionStore, ApplicationServiceStore): - def __init__(self, database: DatabasePool, db_conn, hs): + def __init__(self, database: DatabasePool, db_conn, hs) -> None: super().__init__(database, db_conn, hs) class ApplicationServiceStoreConfigTestCase(unittest.TestCase): - def _write_config(self, suffix, **kwargs): + def _write_config(self, suffix, **kwargs) -> str: vals = { "id": "id" + suffix, "url": "url" + suffix, @@ -486,7 +530,7 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase): return path @defer.inlineCallbacks - def test_unique_works(self): + def test_unique_works(self) -> Generator["Deferred[object]", Any, None]: f1 = self._write_config(suffix="1") f2 = self._write_config(suffix="2") @@ -503,7 +547,7 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase): ) @defer.inlineCallbacks - def test_duplicate_ids(self): + def test_duplicate_ids(self) -> Generator["Deferred[object]", Any, None]: f1 = self._write_config(id="id", suffix="1") f2 = self._write_config(id="id", suffix="2") @@ -528,7 +572,7 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase): self.assertIn("id", str(e)) @defer.inlineCallbacks - def test_duplicate_as_tokens(self): + def test_duplicate_as_tokens(self) -> Generator["Deferred[object]", Any, None]: f1 = self._write_config(as_token="as_token", suffix="1") f2 = self._write_config(as_token="as_token", suffix="2") -- cgit 1.5.1 From d26808dd854006bd26a2366c675428ce0737238c Mon Sep 17 00:00:00 2001 From: David Robertson Date: Thu, 2 Dec 2021 20:58:32 +0000 Subject: Comments on the /sync tentacles (#11494) This mainly consists of docstrings and inline comments. There are one or two type annotations and variable renames thrown in while I was here. Co-authored-by: Patrick Cloke --- changelog.d/11494.misc | 1 + synapse/handlers/sync.py | 156 +++++++++++++++++++++++-------- synapse/storage/databases/main/stream.py | 15 ++- 3 files changed, 129 insertions(+), 43 deletions(-) create mode 100644 changelog.d/11494.misc (limited to 'synapse/storage/databases') diff --git a/changelog.d/11494.misc b/changelog.d/11494.misc new file mode 100644 index 0000000000..7afd7d3a07 --- /dev/null +++ b/changelog.d/11494.misc @@ -0,0 +1 @@ +Add comments to various parts of the `/sync` handler. \ No newline at end of file diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 891435c14d..53d4627147 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -334,6 +334,19 @@ class SyncHandler: full_state: bool, cache_context: ResponseCacheContext[SyncRequestKey], ) -> SyncResult: + """The start of the machinery that produces a /sync response. + + See https://spec.matrix.org/v1.1/client-server-api/#syncing for full details. + + This method does high-level bookkeeping: + - tracking the kind of sync in the logging context + - deleting any to_device messages whose delivery has been acknowledged. + - deciding if we should dispatch an instant or delayed response + - marking the sync as being lazily loaded, if appropriate + + Computing the body of the response begins in the next method, + `current_sync_for_user`. + """ if since_token is None: sync_type = "initial_sync" elif full_state: @@ -363,7 +376,7 @@ class SyncHandler: sync_config, since_token, full_state=full_state ) else: - + # Otherwise, we wait for something to happen and report it to the user. async def current_sync_callback( before_token: StreamToken, after_token: StreamToken ) -> SyncResult: @@ -402,7 +415,12 @@ class SyncHandler: since_token: Optional[StreamToken] = None, full_state: bool = False, ) -> SyncResult: - """Get the sync for client needed to match what the server has now.""" + """Generates the response body of a sync result, represented as a SyncResult. + + This is a wrapper around `generate_sync_result` which starts an open tracing + span to track the sync. See `generate_sync_result` for the next part of your + indoctrination. + """ with start_active_span("current_sync_for_user"): log_kv({"since_token": since_token}) sync_result = await self.generate_sync_result( @@ -560,7 +578,7 @@ class SyncHandler: # that have happened since `since_key` up to `end_key`, so we # can just use `get_room_events_stream_for_room`. # Otherwise, we want to return the last N events in the room - # in toplogical ordering. + # in topological ordering. if since_key: events, end_key = await self.store.get_room_events_stream_for_room( room_id, @@ -1042,7 +1060,18 @@ class SyncHandler: since_token: Optional[StreamToken] = None, full_state: bool = False, ) -> SyncResult: - """Generates a sync result.""" + """Generates the response body of a sync result. + + This is represented by a `SyncResult` struct, which is built from small pieces + using a `SyncResultBuilder`. See also + https://spec.matrix.org/v1.1/client-server-api/#get_matrixclientv3sync + the `sync_result_builder` is passed as a mutable ("inout") parameter to various + helper functions. These retrieve and process the data which forms the sync body, + often writing to the `sync_result_builder` to store their output. + + At the end, we transfer data from the `sync_result_builder` to a new `SyncResult` + instance to signify that the sync calculation is complete. + """ # NB: The now_token gets changed by some of the generate_sync_* methods, # this is due to some of the underlying streams not supporting the ability # to query up to a given point. @@ -1344,14 +1373,22 @@ class SyncHandler: async def _generate_sync_entry_for_account_data( self, sync_result_builder: "SyncResultBuilder" ) -> Dict[str, Dict[str, JsonDict]]: - """Generates the account data portion of the sync response. Populates - `sync_result_builder` with the result. + """Generates the account data portion of the sync response. + + Account data (called "Client Config" in the spec) can be set either globally + or for a specific room. Account data consists of a list of events which + accumulate state, much like a room. + + This function retrieves global and per-room account data. The former is written + to the given `sync_result_builder`. The latter is returned directly, to be + later written to the `sync_result_builder` on a room-by-room basis. Args: sync_result_builder Returns: - A dictionary containing the per room account data. + A dictionary whose keys (room ids) map to the per room account data for that + room. """ sync_config = sync_result_builder.sync_config user_id = sync_result_builder.sync_config.user.to_string() @@ -1359,7 +1396,7 @@ class SyncHandler: if since_token and not sync_result_builder.full_state: ( - account_data, + global_account_data, account_data_by_room, ) = await self.store.get_updated_account_data_for_user( user_id, since_token.account_data_key @@ -1370,23 +1407,23 @@ class SyncHandler: ) if push_rules_changed: - account_data["m.push_rules"] = await self.push_rules_for_user( + global_account_data["m.push_rules"] = await self.push_rules_for_user( sync_config.user ) else: ( - account_data, + global_account_data, account_data_by_room, ) = await self.store.get_account_data_for_user(sync_config.user.to_string()) - account_data["m.push_rules"] = await self.push_rules_for_user( + global_account_data["m.push_rules"] = await self.push_rules_for_user( sync_config.user ) account_data_for_user = await sync_config.filter_collection.filter_account_data( [ {"type": account_data_type, "content": content} - for account_data_type, content in account_data.items() + for account_data_type, content in global_account_data.items() ] ) @@ -1460,15 +1497,22 @@ class SyncHandler: """Generates the rooms portion of the sync response. Populates the `sync_result_builder` with the result. + In the response that reaches the client, rooms are divided into four categories: + `invite`, `join`, `knock`, `leave`. These aren't the same as the four sets of + room ids returned by this function. + Args: sync_result_builder account_data_by_room: Dictionary of per room account data Returns: - Returns a 4-tuple of - `(newly_joined_rooms, newly_joined_or_invited_users, - newly_left_rooms, newly_left_users)` + Returns a 4-tuple whose entries are: + - newly_joined_rooms + - newly_joined_or_invited_or_knocked_users + - newly_left_rooms + - newly_left_users """ + # Start by fetching all ephemeral events in rooms we've joined (if required). user_id = sync_result_builder.sync_config.user.to_string() block_all_room_ephemeral = ( sync_result_builder.since_token is None @@ -1590,6 +1634,8 @@ class SyncHandler: ) -> bool: """Returns whether there may be any new events that should be sent down the sync. Returns True if there are. + + Does not modify the `sync_result_builder`. """ user_id = sync_result_builder.sync_config.user.to_string() since_token = sync_result_builder.since_token @@ -1597,12 +1643,13 @@ class SyncHandler: assert since_token - # Get a list of membership change events that have happened. - rooms_changed = await self.store.get_membership_changes_for_user( + # Get a list of membership change events that have happened to the user + # requesting the sync. + membership_changes = await self.store.get_membership_changes_for_user( user_id, since_token.room_key, now_token.room_key ) - if rooms_changed: + if membership_changes: return True stream_id = since_token.room_key.stream @@ -1614,7 +1661,25 @@ class SyncHandler: async def _get_rooms_changed( self, sync_result_builder: "SyncResultBuilder", ignored_users: FrozenSet[str] ) -> _RoomChanges: - """Gets the the changes that have happened since the last sync.""" + """Determine the changes in rooms to report to the user. + + Ideally, we want to report all events whose stream ordering `s` lies in the + range `since_token < s <= now_token`, where the two tokens are read from the + sync_result_builder. + + If there are too many events in that range to report, things get complicated. + In this situation we return a truncated list of the most recent events, and + indicate in the response that there is a "gap" of omitted events. Additionally: + + - we include a "state_delta", to describe the changes in state over the gap, + - we include all membership events applying to the user making the request, + even those in the gap. + + See the spec for the rationale: + https://spec.matrix.org/v1.1/client-server-api/#syncing + + The sync_result_builder is not modified by this function. + """ user_id = sync_result_builder.sync_config.user.to_string() since_token = sync_result_builder.since_token now_token = sync_result_builder.now_token @@ -1622,21 +1687,36 @@ class SyncHandler: assert since_token - # Get a list of membership change events that have happened. - rooms_changed = await self.store.get_membership_changes_for_user( + # The spec + # https://spec.matrix.org/v1.1/client-server-api/#get_matrixclientv3sync + # notes that membership events need special consideration: + # + # > When a sync is limited, the server MUST return membership events for events + # > in the gap (between since and the start of the returned timeline), regardless + # > as to whether or not they are redundant. + # + # We fetch such events here, but we only seem to use them for categorising rooms + # as newly joined, newly left, invited or knocked. + # TODO: we've already called this function and ran this query in + # _have_rooms_changed. We could keep the results in memory to avoid a + # second query, at the cost of more complicated source code. + membership_change_events = await self.store.get_membership_changes_for_user( user_id, since_token.room_key, now_token.room_key ) mem_change_events_by_room_id: Dict[str, List[EventBase]] = {} - for event in rooms_changed: + for event in membership_change_events: mem_change_events_by_room_id.setdefault(event.room_id, []).append(event) - newly_joined_rooms = [] - newly_left_rooms = [] - room_entries = [] - invited = [] - knocked = [] + newly_joined_rooms: List[str] = [] + newly_left_rooms: List[str] = [] + room_entries: List[RoomSyncResultBuilder] = [] + invited: List[InvitedSyncResult] = [] + knocked: List[KnockedSyncResult] = [] for room_id, events in mem_change_events_by_room_id.items(): + # The body of this loop will add this room to at least one of the five lists + # above. Things get messy if you've e.g. joined, left, joined then left the + # room all in the same sync period. logger.debug( "Membership changes in %s: [%s]", room_id, @@ -1781,7 +1861,9 @@ class SyncHandler: timeline_limit = sync_config.filter_collection.timeline_limit() - # Get all events for rooms we're currently joined to. + # Get all events since the `from_key` in rooms we're currently joined to. + # If there are too many, we get the most recent events only. This leaves + # a "gap" in the timeline, as described by the spec for /sync. room_to_events = await self.store.get_room_events_stream_for_rooms( room_ids=sync_result_builder.joined_room_ids, from_key=since_token.room_key, @@ -1842,6 +1924,10 @@ class SyncHandler: ) -> _RoomChanges: """Returns entries for all rooms for the user. + Like `_get_rooms_changed`, but assumes the `since_token` is `None`. + + This function does not modify the sync_result_builder. + Args: sync_result_builder ignored_users: Set of users ignored by user. @@ -1853,16 +1939,9 @@ class SyncHandler: now_token = sync_result_builder.now_token sync_config = sync_result_builder.sync_config - membership_list = ( - Membership.INVITE, - Membership.KNOCK, - Membership.JOIN, - Membership.LEAVE, - Membership.BAN, - ) - room_list = await self.store.get_rooms_for_local_user_where_membership_is( - user_id=user_id, membership_list=membership_list + user_id=user_id, + membership_list=Membership.LIST, ) room_entries = [] @@ -2212,8 +2291,7 @@ def _calculate_state( # to only include membership events for the senders in the timeline. # In practice, we can do this by removing them from the p_ids list, # which is the list of relevant state we know we have already sent to the client. - # see https://github.com/matrix-org/synapse/pull/2970 - # /files/efcdacad7d1b7f52f879179701c7e0d9b763511f#r204732809 + # see https://github.com/matrix-org/synapse/pull/2970/files/efcdacad7d1b7f52f879179701c7e0d9b763511f#r204732809 if lazy_load_members: p_ids.difference_update( diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 42dc807d17..57aab55259 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -497,7 +497,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta): oldest `limit` events. Returns: - The list of events (in ascending order) and the token from the start + The list of events (in ascending stream order) and the token from the start of the chunk of events returned. """ if from_key == to_key: @@ -510,7 +510,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta): if not has_changed: return [], from_key - def f(txn): + def f(txn: LoggingTransaction) -> List[_EventDictReturn]: # To handle tokens with a non-empty instance_map we fetch more # results than necessary and then filter down min_from_id = from_key.stream @@ -565,6 +565,13 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta): async def get_membership_changes_for_user( self, user_id: str, from_key: RoomStreamToken, to_key: RoomStreamToken ) -> List[EventBase]: + """Fetch membership events for a given user. + + All such events whose stream ordering `s` lies in the range + `from_key < s <= to_key` are returned. Events are ordered by ascending stream + order. + """ + # Start by ruling out cases where a DB query is not necessary. if from_key == to_key: return [] @@ -575,7 +582,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta): if not has_changed: return [] - def f(txn): + def f(txn: LoggingTransaction) -> List[_EventDictReturn]: # To handle tokens with a non-empty instance_map we fetch more # results than necessary and then filter down min_from_id = from_key.stream @@ -634,7 +641,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta): Returns: A list of events and a token pointing to the start of the returned - events. The events returned are in ascending order. + events. The events returned are in ascending topological order. """ rows, token = await self.get_recent_event_ids_for_room( -- cgit 1.5.1 From 5640992d176a499204a0756b1677c9b1575b0a49 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Thu, 2 Dec 2021 22:42:58 +0000 Subject: Disambiguate queries on `state_key` (#11497) We're going to add a `state_key` column to the `events` table, so we need to add some disambiguation to queries which use it. --- changelog.d/11497.misc | 1 + synapse/storage/databases/main/event_federation.py | 4 ++-- synapse/storage/databases/main/events.py | 4 ++-- synapse/storage/databases/main/events_worker.py | 16 ++++++++-------- synapse/storage/databases/main/purge_events.py | 2 +- synapse/storage/databases/main/roommember.py | 4 ++-- synapse/storage/schema/__init__.py | 6 +++++- 7 files changed, 21 insertions(+), 16 deletions(-) create mode 100644 changelog.d/11497.misc (limited to 'synapse/storage/databases') diff --git a/changelog.d/11497.misc b/changelog.d/11497.misc new file mode 100644 index 0000000000..c4393f6193 --- /dev/null +++ b/changelog.d/11497.misc @@ -0,0 +1 @@ +Preparation for database schema simplifications: disambiguate queries on `state_key`. diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index ef5d1ef01e..9580a40785 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1552,9 +1552,9 @@ class EventFederationStore(EventFederationWorkerStore): DELETE FROM event_auth WHERE event_id IN ( SELECT event_id FROM events - LEFT JOIN state_events USING (room_id, event_id) + LEFT JOIN state_events AS se USING (room_id, event_id) WHERE ? <= stream_ordering AND stream_ordering < ? - AND state_key IS null + AND se.state_key IS null ) """ diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 4171b904eb..4e528612ea 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -575,9 +575,9 @@ class PersistEventsStore: # fetch their auth event info. while missing_auth_chains: sql = """ - SELECT event_id, events.type, state_key, chain_id, sequence_number + SELECT event_id, events.type, se.state_key, chain_id, sequence_number FROM events - INNER JOIN state_events USING (event_id) + INNER JOIN state_events AS se USING (event_id) LEFT JOIN event_auth_chains USING (event_id) WHERE """ diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index fd19674f93..c7b660ac5a 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1408,10 +1408,10 @@ class EventsWorkerStore(SQLBaseStore): ) -> List[Tuple[int, str, str, str, str, str, str, str, str]]: sql = ( "SELECT e.stream_ordering, e.event_id, e.room_id, e.type," - " state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL" + " se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL" " FROM events AS e" " LEFT JOIN redactions USING (event_id)" - " LEFT JOIN state_events USING (event_id)" + " LEFT JOIN state_events AS se USING (event_id)" " LEFT JOIN event_relations USING (event_id)" " LEFT JOIN room_memberships USING (event_id)" " LEFT JOIN rejections USING (event_id)" @@ -1449,11 +1449,11 @@ class EventsWorkerStore(SQLBaseStore): ) -> List[Tuple[int, str, str, str, str, str, str, str, str]]: sql = ( "SELECT event_stream_ordering, e.event_id, e.room_id, e.type," - " state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL" + " se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL" " FROM events AS e" " INNER JOIN ex_outlier_stream AS out USING (event_id)" " LEFT JOIN redactions USING (event_id)" - " LEFT JOIN state_events USING (event_id)" + " LEFT JOIN state_events AS se USING (event_id)" " LEFT JOIN event_relations USING (event_id)" " LEFT JOIN room_memberships USING (event_id)" " LEFT JOIN rejections USING (event_id)" @@ -1507,10 +1507,10 @@ class EventsWorkerStore(SQLBaseStore): ) -> Tuple[List[Tuple[int, Tuple[str, str, str, str, str, str]]], int, bool]: sql = ( "SELECT -e.stream_ordering, e.event_id, e.room_id, e.type," - " state_key, redacts, relates_to_id" + " se.state_key, redacts, relates_to_id" " FROM events AS e" " LEFT JOIN redactions USING (event_id)" - " LEFT JOIN state_events USING (event_id)" + " LEFT JOIN state_events AS se USING (event_id)" " LEFT JOIN event_relations USING (event_id)" " WHERE ? > stream_ordering AND stream_ordering >= ?" " AND instance_name = ?" @@ -1537,11 +1537,11 @@ class EventsWorkerStore(SQLBaseStore): sql = ( "SELECT -event_stream_ordering, e.event_id, e.room_id, e.type," - " state_key, redacts, relates_to_id" + " se.state_key, redacts, relates_to_id" " FROM events AS e" " INNER JOIN ex_outlier_stream AS out USING (event_id)" " LEFT JOIN redactions USING (event_id)" - " LEFT JOIN state_events USING (event_id)" + " LEFT JOIN state_events AS se USING (event_id)" " LEFT JOIN event_relations USING (event_id)" " WHERE ? > event_stream_ordering" " AND event_stream_ordering >= ?" diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py index 3eb30944bf..91b0576b85 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py @@ -118,7 +118,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore): logger.info("[purge] looking for events to delete") - should_delete_expr = "state_key IS NULL" + should_delete_expr = "state_events.state_key IS NULL" should_delete_params: Tuple[Any, ...] = () if not delete_local_events: should_delete_expr += " AND event_id NOT LIKE ?" diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 033a9831d6..6b2a8d06a6 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -476,7 +476,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): INNER JOIN events AS e USING (room_id, event_id) WHERE c.type = 'm.room.member' - AND state_key = ? + AND c.state_key = ? AND c.membership = ? """ else: @@ -487,7 +487,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): INNER JOIN events AS e USING (room_id, event_id) WHERE c.type = 'm.room.member' - AND state_key = ? + AND c.state_key = ? AND m.membership = ? """ diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py index 3a00ed6835..50d08094d5 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -SCHEMA_VERSION = 65 # remember to update the list below when updating +SCHEMA_VERSION = 66 # remember to update the list below when updating """Represents the expectations made by the codebase about the database schema This should be incremented whenever the codebase changes its requirements on the @@ -46,6 +46,10 @@ Changes in SCHEMA_VERSION = 65: - MSC2716: Remove unique event_id constraint from insertion_event_edges because an insertion event can have multiple edges. - Remove unused tables `user_stats_historical` and `room_stats_historical`. + +Changes in SCHEMA_VERSION = 66: + - Queries on state_key columns are now disambiguated (ie, the codebase can handle + the `events` table having a `state_key` column). """ -- cgit 1.5.1 From 8b4b153c9e86c04c7db8c74fde4b6a04becbc461 Mon Sep 17 00:00:00 2001 From: Dirk Klimpel <5740567+dklimpel@users.noreply.github.com> Date: Mon, 6 Dec 2021 17:59:50 +0100 Subject: Add admin API to get some information about federation status (#11407) --- changelog.d/11407.feature | 1 + docs/SUMMARY.md | 1 + docs/usage/administration/admin_api/federation.md | 114 ++++++ synapse/rest/admin/__init__.py | 6 + synapse/rest/admin/federation.py | 135 +++++++ synapse/storage/databases/main/transactions.py | 70 ++++ tests/rest/admin/test_federation.py | 456 ++++++++++++++++++++++ 7 files changed, 783 insertions(+) create mode 100644 changelog.d/11407.feature create mode 100644 docs/usage/administration/admin_api/federation.md create mode 100644 synapse/rest/admin/federation.py create mode 100644 tests/rest/admin/test_federation.py (limited to 'synapse/storage/databases') diff --git a/changelog.d/11407.feature b/changelog.d/11407.feature new file mode 100644 index 0000000000..1d21bde98f --- /dev/null +++ b/changelog.d/11407.feature @@ -0,0 +1 @@ +Add admin API to get some information about federation status with remote servers. \ No newline at end of file diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md index 41c8f0fbc9..b05af6d690 100644 --- a/docs/SUMMARY.md +++ b/docs/SUMMARY.md @@ -65,6 +65,7 @@ - [Statistics](admin_api/statistics.md) - [Users](admin_api/user_admin_api.md) - [Server Version](admin_api/version_api.md) + - [Federation](usage/administration/admin_api/federation.md) - [Manhole](manhole.md) - [Monitoring](metrics-howto.md) - [Understanding Synapse Through Grafana Graphs](usage/administration/understanding_synapse_through_grafana_graphs.md) diff --git a/docs/usage/administration/admin_api/federation.md b/docs/usage/administration/admin_api/federation.md new file mode 100644 index 0000000000..8f9535f57b --- /dev/null +++ b/docs/usage/administration/admin_api/federation.md @@ -0,0 +1,114 @@ +# Federation API + +This API allows a server administrator to manage Synapse's federation with other homeservers. + +Note: This API is new, experimental and "subject to change". + +## List of destinations + +This API gets the current destination retry timing info for all remote servers. + +The list contains all the servers with which the server federates, +regardless of whether an error occurred or not. +If an error occurs, it may take up to 20 minutes for the error to be displayed here, +as a complete retry must have failed. + +The API is: + +A standard request with no filtering: + +``` +GET /_synapse/admin/v1/federation/destinations +``` + +A response body like the following is returned: + +```json +{ + "destinations":[ + { + "destination": "matrix.org", + "retry_last_ts": 1557332397936, + "retry_interval": 3000000, + "failure_ts": 1557329397936, + "last_successful_stream_ordering": null + } + ], + "total": 1 +} +``` + +To paginate, check for `next_token` and if present, call the endpoint again +with `from` set to the value of `next_token`. This will return a new page. + +If the endpoint does not return a `next_token` then there are no more destinations +to paginate through. + +**Parameters** + +The following query parameters are available: + +- `from` - Offset in the returned list. Defaults to `0`. +- `limit` - Maximum amount of destinations to return. Defaults to `100`. +- `order_by` - The method in which to sort the returned list of destinations. + Valid values are: + - `destination` - Destinations are ordered alphabetically by remote server name. + This is the default. + - `retry_last_ts` - Destinations are ordered by time of last retry attempt in ms. + - `retry_interval` - Destinations are ordered by how long until next retry in ms. + - `failure_ts` - Destinations are ordered by when the server started failing in ms. + - `last_successful_stream_ordering` - Destinations are ordered by the stream ordering + of the most recent successfully-sent PDU. +- `dir` - Direction of room order. Either `f` for forwards or `b` for backwards. Setting + this value to `b` will reverse the above sort order. Defaults to `f`. + +*Caution:* The database only has an index on the column `destination`. +This means that if a different sort order is used, +this can cause a large load on the database, especially for large environments. + +**Response** + +The following fields are returned in the JSON response body: + +- `destinations` - An array of objects, each containing information about a destination. + Destination objects contain the following fields: + - `destination` - string - Name of the remote server to federate. + - `retry_last_ts` - integer - The last time Synapse tried and failed to reach the + remote server, in ms. This is `0` if the last attempt to communicate with the + remote server was successful. + - `retry_interval` - integer - How long since the last time Synapse tried to reach + the remote server before trying again, in ms. This is `0` if no further retrying occuring. + - `failure_ts` - nullable integer - The first time Synapse tried and failed to reach the + remote server, in ms. This is `null` if communication with the remote server has never failed. + - `last_successful_stream_ordering` - nullable integer - The stream ordering of the most + recent successfully-sent [PDU](understanding_synapse_through_grafana_graphs.md#federation) + to this destination, or `null` if this information has not been tracked yet. +- `next_token`: string representing a positive integer - Indication for pagination. See above. +- `total` - integer - Total number of destinations. + +# Destination Details API + +This API gets the retry timing info for a specific remote server. + +The API is: + +``` +GET /_synapse/admin/v1/federation/destinations/ +``` + +A response body like the following is returned: + +```json +{ + "destination": "matrix.org", + "retry_last_ts": 1557332397936, + "retry_interval": 3000000, + "failure_ts": 1557329397936, + "last_successful_stream_ordering": null +} +``` + +**Response** + +The response fields are the same like in the `destinations` array in +[List of destinations](#list-of-destinations) response. diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py index c51a029bf3..c499afd4be 100644 --- a/synapse/rest/admin/__init__.py +++ b/synapse/rest/admin/__init__.py @@ -40,6 +40,10 @@ from synapse.rest.admin.event_reports import ( EventReportDetailRestServlet, EventReportsRestServlet, ) +from synapse.rest.admin.federation import ( + DestinationsRestServlet, + ListDestinationsRestServlet, +) from synapse.rest.admin.groups import DeleteGroupAdminRestServlet from synapse.rest.admin.media import ListMediaInRoom, register_servlets_for_media_repo from synapse.rest.admin.registration_tokens import ( @@ -261,6 +265,8 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: ListRegistrationTokensRestServlet(hs).register(http_server) NewRegistrationTokenRestServlet(hs).register(http_server) RegistrationTokenRestServlet(hs).register(http_server) + DestinationsRestServlet(hs).register(http_server) + ListDestinationsRestServlet(hs).register(http_server) # Some servlets only get registered for the main process. if hs.config.worker.worker_app is None: diff --git a/synapse/rest/admin/federation.py b/synapse/rest/admin/federation.py new file mode 100644 index 0000000000..744687be35 --- /dev/null +++ b/synapse/rest/admin/federation.py @@ -0,0 +1,135 @@ +# Copyright 2021 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging +from http import HTTPStatus +from typing import TYPE_CHECKING, Tuple + +from synapse.api.errors import Codes, NotFoundError, SynapseError +from synapse.http.servlet import RestServlet, parse_integer, parse_string +from synapse.http.site import SynapseRequest +from synapse.rest.admin._base import admin_patterns, assert_requester_is_admin +from synapse.storage.databases.main.transactions import DestinationSortOrder +from synapse.types import JsonDict + +if TYPE_CHECKING: + from synapse.server import HomeServer + +logger = logging.getLogger(__name__) + + +class ListDestinationsRestServlet(RestServlet): + """Get request to list all destinations. + This needs user to have administrator access in Synapse. + + GET /_synapse/admin/v1/federation/destinations?from=0&limit=10 + + returns: + 200 OK with list of destinations if success otherwise an error. + + The parameters `from` and `limit` are required only for pagination. + By default, a `limit` of 100 is used. + The parameter `destination` can be used to filter by destination. + The parameter `order_by` can be used to order the result. + """ + + PATTERNS = admin_patterns("/federation/destinations$") + + def __init__(self, hs: "HomeServer"): + self._auth = hs.get_auth() + self._store = hs.get_datastore() + + async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]: + await assert_requester_is_admin(self._auth, request) + + start = parse_integer(request, "from", default=0) + limit = parse_integer(request, "limit", default=100) + + if start < 0: + raise SynapseError( + HTTPStatus.BAD_REQUEST, + "Query parameter from must be a string representing a positive integer.", + errcode=Codes.INVALID_PARAM, + ) + + if limit < 0: + raise SynapseError( + HTTPStatus.BAD_REQUEST, + "Query parameter limit must be a string representing a positive integer.", + errcode=Codes.INVALID_PARAM, + ) + + destination = parse_string(request, "destination") + + order_by = parse_string( + request, + "order_by", + default=DestinationSortOrder.DESTINATION.value, + allowed_values=[dest.value for dest in DestinationSortOrder], + ) + + direction = parse_string(request, "dir", default="f", allowed_values=("f", "b")) + + destinations, total = await self._store.get_destinations_paginate( + start, limit, destination, order_by, direction + ) + response = {"destinations": destinations, "total": total} + if (start + limit) < total: + response["next_token"] = str(start + len(destinations)) + + return HTTPStatus.OK, response + + +class DestinationsRestServlet(RestServlet): + """Get details of a destination. + This needs user to have administrator access in Synapse. + + GET /_synapse/admin/v1/federation/destinations/ + + returns: + 200 OK with details of a destination if success otherwise an error. + """ + + PATTERNS = admin_patterns("/federation/destinations/(?P[^/]+)$") + + def __init__(self, hs: "HomeServer"): + self._auth = hs.get_auth() + self._store = hs.get_datastore() + + async def on_GET( + self, request: SynapseRequest, destination: str + ) -> Tuple[int, JsonDict]: + await assert_requester_is_admin(self._auth, request) + + destination_retry_timings = await self._store.get_destination_retry_timings( + destination + ) + + if not destination_retry_timings: + raise NotFoundError("Unknown destination") + + last_successful_stream_ordering = ( + await self._store.get_destination_last_successful_stream_ordering( + destination + ) + ) + + response = { + "destination": destination, + "failure_ts": destination_retry_timings.failure_ts, + "retry_last_ts": destination_retry_timings.retry_last_ts, + "retry_interval": destination_retry_timings.retry_interval, + "last_successful_stream_ordering": last_successful_stream_ordering, + } + + return HTTPStatus.OK, response diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index d7dc1f73ac..1622822552 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -14,6 +14,7 @@ import logging from collections import namedtuple +from enum import Enum from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple import attr @@ -44,6 +45,16 @@ _UpdateTransactionRow = namedtuple( ) +class DestinationSortOrder(Enum): + """Enum to define the sorting method used when returning destinations.""" + + DESTINATION = "destination" + RETRY_LAST_TS = "retry_last_ts" + RETTRY_INTERVAL = "retry_interval" + FAILURE_TS = "failure_ts" + LAST_SUCCESSFUL_STREAM_ORDERING = "last_successful_stream_ordering" + + @attr.s(slots=True, frozen=True, auto_attribs=True) class DestinationRetryTimings: """The current destination retry timing info for a remote server.""" @@ -480,3 +491,62 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore): destinations = [row[0] for row in txn] return destinations + + async def get_destinations_paginate( + self, + start: int, + limit: int, + destination: Optional[str] = None, + order_by: str = DestinationSortOrder.DESTINATION.value, + direction: str = "f", + ) -> Tuple[List[JsonDict], int]: + """Function to retrieve a paginated list of destinations. + This will return a json list of destinations and the + total number of destinations matching the filter criteria. + + Args: + start: start number to begin the query from + limit: number of rows to retrieve + destination: search string in destination + order_by: the sort order of the returned list + direction: sort ascending or descending + Returns: + A tuple of a list of mappings from destination to information + and a count of total destinations. + """ + + def get_destinations_paginate_txn( + txn: LoggingTransaction, + ) -> Tuple[List[JsonDict], int]: + order_by_column = DestinationSortOrder(order_by).value + + if direction == "b": + order = "DESC" + else: + order = "ASC" + + args = [] + where_statement = "" + if destination: + args.extend(["%" + destination.lower() + "%"]) + where_statement = "WHERE LOWER(destination) LIKE ?" + + sql_base = f"FROM destinations {where_statement} " + sql = f"SELECT COUNT(*) as total_destinations {sql_base}" + txn.execute(sql, args) + count = txn.fetchone()[0] + + sql = f""" + SELECT destination, retry_last_ts, retry_interval, failure_ts, + last_successful_stream_ordering + {sql_base} + ORDER BY {order_by_column} {order}, destination ASC + LIMIT ? OFFSET ? + """ + txn.execute(sql, args + [limit, start]) + destinations = self.db_pool.cursor_to_dict(txn) + return destinations, count + + return await self.db_pool.runInteraction( + "get_destinations_paginate_txn", get_destinations_paginate_txn + ) diff --git a/tests/rest/admin/test_federation.py b/tests/rest/admin/test_federation.py new file mode 100644 index 0000000000..5188499ef2 --- /dev/null +++ b/tests/rest/admin/test_federation.py @@ -0,0 +1,456 @@ +# Copyright 2021 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from http import HTTPStatus +from typing import List, Optional + +from parameterized import parameterized + +import synapse.rest.admin +from synapse.api.errors import Codes +from synapse.rest.client import login +from synapse.server import HomeServer +from synapse.types import JsonDict + +from tests import unittest + + +class FederationTestCase(unittest.HomeserverTestCase): + servlets = [ + synapse.rest.admin.register_servlets, + login.register_servlets, + ] + + def prepare(self, reactor, clock, hs: HomeServer): + self.store = hs.get_datastore() + self.register_user("admin", "pass", admin=True) + self.admin_user_tok = self.login("admin", "pass") + + self.url = "/_synapse/admin/v1/federation/destinations" + + @parameterized.expand( + [ + ("/_synapse/admin/v1/federation/destinations",), + ("/_synapse/admin/v1/federation/destinations/dummy",), + ] + ) + def test_requester_is_no_admin(self, url: str): + """ + If the user is not a server admin, an error 403 is returned. + """ + + self.register_user("user", "pass", admin=False) + other_user_tok = self.login("user", "pass") + + channel = self.make_request( + "GET", + url, + content={}, + access_token=other_user_tok, + ) + + self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body) + self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"]) + + def test_invalid_parameter(self): + """ + If parameters are invalid, an error is returned. + """ + + # negative limit + channel = self.make_request( + "GET", + self.url + "?limit=-5", + access_token=self.admin_user_tok, + ) + + self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body) + self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"]) + + # negative from + channel = self.make_request( + "GET", + self.url + "?from=-5", + access_token=self.admin_user_tok, + ) + + self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body) + self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"]) + + # unkown order_by + channel = self.make_request( + "GET", + self.url + "?order_by=bar", + access_token=self.admin_user_tok, + ) + + self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body) + self.assertEqual(Codes.UNKNOWN, channel.json_body["errcode"]) + + # invalid search order + channel = self.make_request( + "GET", + self.url + "?dir=bar", + access_token=self.admin_user_tok, + ) + + self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body) + self.assertEqual(Codes.UNKNOWN, channel.json_body["errcode"]) + + # invalid destination + channel = self.make_request( + "GET", + self.url + "/dummy", + access_token=self.admin_user_tok, + ) + + self.assertEqual(HTTPStatus.NOT_FOUND, channel.code, msg=channel.json_body) + self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"]) + + def test_limit(self): + """ + Testing list of destinations with limit + """ + + number_destinations = 20 + self._create_destinations(number_destinations) + + channel = self.make_request( + "GET", + self.url + "?limit=5", + access_token=self.admin_user_tok, + ) + + self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) + self.assertEqual(channel.json_body["total"], number_destinations) + self.assertEqual(len(channel.json_body["destinations"]), 5) + self.assertEqual(channel.json_body["next_token"], "5") + self._check_fields(channel.json_body["destinations"]) + + def test_from(self): + """ + Testing list of destinations with a defined starting point (from) + """ + + number_destinations = 20 + self._create_destinations(number_destinations) + + channel = self.make_request( + "GET", + self.url + "?from=5", + access_token=self.admin_user_tok, + ) + + self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) + self.assertEqual(channel.json_body["total"], number_destinations) + self.assertEqual(len(channel.json_body["destinations"]), 15) + self.assertNotIn("next_token", channel.json_body) + self._check_fields(channel.json_body["destinations"]) + + def test_limit_and_from(self): + """ + Testing list of destinations with a defined starting point and limit + """ + + number_destinations = 20 + self._create_destinations(number_destinations) + + channel = self.make_request( + "GET", + self.url + "?from=5&limit=10", + access_token=self.admin_user_tok, + ) + + self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) + self.assertEqual(channel.json_body["total"], number_destinations) + self.assertEqual(channel.json_body["next_token"], "15") + self.assertEqual(len(channel.json_body["destinations"]), 10) + self._check_fields(channel.json_body["destinations"]) + + def test_next_token(self): + """ + Testing that `next_token` appears at the right place + """ + + number_destinations = 20 + self._create_destinations(number_destinations) + + # `next_token` does not appear + # Number of results is the number of entries + channel = self.make_request( + "GET", + self.url + "?limit=20", + access_token=self.admin_user_tok, + ) + + self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) + self.assertEqual(channel.json_body["total"], number_destinations) + self.assertEqual(len(channel.json_body["destinations"]), number_destinations) + self.assertNotIn("next_token", channel.json_body) + + # `next_token` does not appear + # Number of max results is larger than the number of entries + channel = self.make_request( + "GET", + self.url + "?limit=21", + access_token=self.admin_user_tok, + ) + + self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) + self.assertEqual(channel.json_body["total"], number_destinations) + self.assertEqual(len(channel.json_body["destinations"]), number_destinations) + self.assertNotIn("next_token", channel.json_body) + + # `next_token` does appear + # Number of max results is smaller than the number of entries + channel = self.make_request( + "GET", + self.url + "?limit=19", + access_token=self.admin_user_tok, + ) + + self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) + self.assertEqual(channel.json_body["total"], number_destinations) + self.assertEqual(len(channel.json_body["destinations"]), 19) + self.assertEqual(channel.json_body["next_token"], "19") + + # Check + # Set `from` to value of `next_token` for request remaining entries + # `next_token` does not appear + channel = self.make_request( + "GET", + self.url + "?from=19", + access_token=self.admin_user_tok, + ) + + self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) + self.assertEqual(channel.json_body["total"], number_destinations) + self.assertEqual(len(channel.json_body["destinations"]), 1) + self.assertNotIn("next_token", channel.json_body) + + def test_list_all_destinations(self): + """ + List all destinations. + """ + number_destinations = 5 + self._create_destinations(number_destinations) + + channel = self.make_request( + "GET", + self.url, + {}, + access_token=self.admin_user_tok, + ) + + self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) + self.assertEqual(number_destinations, len(channel.json_body["destinations"])) + self.assertEqual(number_destinations, channel.json_body["total"]) + + # Check that all fields are available + self._check_fields(channel.json_body["destinations"]) + + def test_order_by(self): + """ + Testing order list with parameter `order_by` + """ + + def _order_test( + expected_destination_list: List[str], + order_by: Optional[str], + dir: Optional[str] = None, + ): + """Request the list of destinations in a certain order. + Assert that order is what we expect + + Args: + expected_destination_list: The list of user_id in the order + we expect to get back from the server + order_by: The type of ordering to give the server + dir: The direction of ordering to give the server + """ + + url = f"{self.url}?" + if order_by is not None: + url += f"order_by={order_by}&" + if dir is not None and dir in ("b", "f"): + url += f"dir={dir}" + channel = self.make_request( + "GET", + url, + access_token=self.admin_user_tok, + ) + self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) + self.assertEqual(channel.json_body["total"], len(expected_destination_list)) + + returned_order = [ + row["destination"] for row in channel.json_body["destinations"] + ] + self.assertEqual(expected_destination_list, returned_order) + self._check_fields(channel.json_body["destinations"]) + + # create destinations + dest = [ + ("sub-a.example.com", 100, 300, 200, 300), + ("sub-b.example.com", 200, 200, 100, 100), + ("sub-c.example.com", 300, 100, 300, 200), + ] + for ( + destination, + failure_ts, + retry_last_ts, + retry_interval, + last_successful_stream_ordering, + ) in dest: + self.get_success( + self.store.set_destination_retry_timings( + destination, failure_ts, retry_last_ts, retry_interval + ) + ) + self.get_success( + self.store.set_destination_last_successful_stream_ordering( + destination, last_successful_stream_ordering + ) + ) + + # order by default (destination) + _order_test([dest[0][0], dest[1][0], dest[2][0]], None) + _order_test([dest[0][0], dest[1][0], dest[2][0]], None, "f") + _order_test([dest[2][0], dest[1][0], dest[0][0]], None, "b") + + # order by destination + _order_test([dest[0][0], dest[1][0], dest[2][0]], "destination") + _order_test([dest[0][0], dest[1][0], dest[2][0]], "destination", "f") + _order_test([dest[2][0], dest[1][0], dest[0][0]], "destination", "b") + + # order by failure_ts + _order_test([dest[0][0], dest[1][0], dest[2][0]], "failure_ts") + _order_test([dest[0][0], dest[1][0], dest[2][0]], "failure_ts", "f") + _order_test([dest[2][0], dest[1][0], dest[0][0]], "failure_ts", "b") + + # order by retry_last_ts + _order_test([dest[2][0], dest[1][0], dest[0][0]], "retry_last_ts") + _order_test([dest[2][0], dest[1][0], dest[0][0]], "retry_last_ts", "f") + _order_test([dest[0][0], dest[1][0], dest[2][0]], "retry_last_ts", "b") + + # order by retry_interval + _order_test([dest[1][0], dest[0][0], dest[2][0]], "retry_interval") + _order_test([dest[1][0], dest[0][0], dest[2][0]], "retry_interval", "f") + _order_test([dest[2][0], dest[0][0], dest[1][0]], "retry_interval", "b") + + # order by last_successful_stream_ordering + _order_test( + [dest[1][0], dest[2][0], dest[0][0]], "last_successful_stream_ordering" + ) + _order_test( + [dest[1][0], dest[2][0], dest[0][0]], "last_successful_stream_ordering", "f" + ) + _order_test( + [dest[0][0], dest[2][0], dest[1][0]], "last_successful_stream_ordering", "b" + ) + + def test_search_term(self): + """Test that searching for a destination works correctly""" + + def _search_test( + expected_destination: Optional[str], + search_term: str, + ): + """Search for a destination and check that the returned destinationis a match + + Args: + expected_destination: The room_id expected to be returned by the API. + Set to None to expect zero results for the search + search_term: The term to search for room names with + """ + url = f"{self.url}?destination={search_term}" + channel = self.make_request( + "GET", + url.encode("ascii"), + access_token=self.admin_user_tok, + ) + self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) + + # Check that destinations were returned + self.assertTrue("destinations" in channel.json_body) + self._check_fields(channel.json_body["destinations"]) + destinations = channel.json_body["destinations"] + + # Check that the expected number of destinations were returned + expected_destination_count = 1 if expected_destination else 0 + self.assertEqual(len(destinations), expected_destination_count) + self.assertEqual(channel.json_body["total"], expected_destination_count) + + if expected_destination: + # Check that the first returned destination is correct + self.assertEqual(expected_destination, destinations[0]["destination"]) + + number_destinations = 3 + self._create_destinations(number_destinations) + + # Test searching + _search_test("sub0.example.com", "0") + _search_test("sub0.example.com", "sub0") + + _search_test("sub1.example.com", "1") + _search_test("sub1.example.com", "1.") + + # Test case insensitive + _search_test("sub0.example.com", "SUB0") + + _search_test(None, "foo") + _search_test(None, "bar") + + def test_get_single_destination(self): + """ + Get one specific destinations. + """ + self._create_destinations(5) + + channel = self.make_request( + "GET", + self.url + "/sub0.example.com", + access_token=self.admin_user_tok, + ) + + self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) + self.assertEqual("sub0.example.com", channel.json_body["destination"]) + + # Check that all fields are available + # convert channel.json_body into a List + self._check_fields([channel.json_body]) + + def _create_destinations(self, number_destinations: int): + """Create a number of destinations + + Args: + number_destinations: Number of destinations to be created + """ + for i in range(0, number_destinations): + dest = f"sub{i}.example.com" + self.get_success(self.store.set_destination_retry_timings(dest, 50, 50, 50)) + self.get_success( + self.store.set_destination_last_successful_stream_ordering(dest, 100) + ) + + def _check_fields(self, content: List[JsonDict]): + """Checks that the expected destination attributes are present in content + + Args: + content: List that is checked for content + """ + for c in content: + self.assertIn("destination", c) + self.assertIn("retry_last_ts", c) + self.assertIn("retry_interval", c) + self.assertIn("failure_ts", c) + self.assertIn("last_successful_stream_ordering", c) -- cgit 1.5.1 From a15a893df8428395df7cb95b729431575001c38a Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Mon, 6 Dec 2021 18:43:06 +0100 Subject: Save the OIDC session ID (sid) with the device on login (#11482) As a step towards allowing back-channel logout for OIDC. --- changelog.d/11482.misc | 1 + synapse/handlers/auth.py | 34 +++++- synapse/handlers/device.py | 8 ++ synapse/handlers/oidc.py | 58 +++++---- synapse/handlers/register.py | 15 ++- synapse/handlers/sso.py | 4 + synapse/module_api/__init__.py | 2 + synapse/replication/http/login.py | 8 ++ synapse/rest/client/login.py | 7 +- synapse/storage/databases/main/devices.py | 50 +++++++- .../delta/65/11_devices_auth_provider_session.sql | 27 +++++ tests/handlers/test_auth.py | 6 +- tests/handlers/test_cas.py | 40 +++++- tests/handlers/test_oidc.py | 135 ++++++++++++++++++--- tests/handlers/test_saml.py | 40 +++++- 15 files changed, 370 insertions(+), 65 deletions(-) create mode 100644 changelog.d/11482.misc create mode 100644 synapse/storage/schema/main/delta/65/11_devices_auth_provider_session.sql (limited to 'synapse/storage/databases') diff --git a/changelog.d/11482.misc b/changelog.d/11482.misc new file mode 100644 index 0000000000..e78662988f --- /dev/null +++ b/changelog.d/11482.misc @@ -0,0 +1 @@ +Save the OpenID Connect session ID on login. diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 4d9c4e5834..61607cf2ba 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -39,6 +39,7 @@ import attr import bcrypt import pymacaroons import unpaddedbase64 +from pymacaroons.exceptions import MacaroonVerificationFailedException from twisted.web.server import Request @@ -182,8 +183,11 @@ class LoginTokenAttributes: user_id = attr.ib(type=str) - # the SSO Identity Provider that the user authenticated with, to get this token auth_provider_id = attr.ib(type=str) + """The SSO Identity Provider that the user authenticated with, to get this token.""" + + auth_provider_session_id = attr.ib(type=Optional[str]) + """The session ID advertised by the SSO Identity Provider.""" class AuthHandler: @@ -1650,6 +1654,7 @@ class AuthHandler: client_redirect_url: str, extra_attributes: Optional[JsonDict] = None, new_user: bool = False, + auth_provider_session_id: Optional[str] = None, ) -> None: """Having figured out a mxid for this user, complete the HTTP request @@ -1665,6 +1670,7 @@ class AuthHandler: during successful login. Must be JSON serializable. new_user: True if we should use wording appropriate to a user who has just registered. + auth_provider_session_id: The session ID from the SSO IdP received during login. """ # If the account has been deactivated, do not proceed with the login # flow. @@ -1685,6 +1691,7 @@ class AuthHandler: extra_attributes, new_user=new_user, user_profile_data=profile, + auth_provider_session_id=auth_provider_session_id, ) def _complete_sso_login( @@ -1696,6 +1703,7 @@ class AuthHandler: extra_attributes: Optional[JsonDict] = None, new_user: bool = False, user_profile_data: Optional[ProfileInfo] = None, + auth_provider_session_id: Optional[str] = None, ) -> None: """ The synchronous portion of complete_sso_login. @@ -1717,7 +1725,9 @@ class AuthHandler: # Create a login token login_token = self.macaroon_gen.generate_short_term_login_token( - registered_user_id, auth_provider_id=auth_provider_id + registered_user_id, + auth_provider_id=auth_provider_id, + auth_provider_session_id=auth_provider_session_id, ) # Append the login token to the original redirect URL (i.e. with its query @@ -1822,6 +1832,7 @@ class MacaroonGenerator: self, user_id: str, auth_provider_id: str, + auth_provider_session_id: Optional[str] = None, duration_in_ms: int = (2 * 60 * 1000), ) -> str: macaroon = self._generate_base_macaroon(user_id) @@ -1830,6 +1841,10 @@ class MacaroonGenerator: expiry = now + duration_in_ms macaroon.add_first_party_caveat("time < %d" % (expiry,)) macaroon.add_first_party_caveat("auth_provider_id = %s" % (auth_provider_id,)) + if auth_provider_session_id is not None: + macaroon.add_first_party_caveat( + "auth_provider_session_id = %s" % (auth_provider_session_id,) + ) return macaroon.serialize() def verify_short_term_login_token(self, token: str) -> LoginTokenAttributes: @@ -1851,15 +1866,28 @@ class MacaroonGenerator: user_id = get_value_from_macaroon(macaroon, "user_id") auth_provider_id = get_value_from_macaroon(macaroon, "auth_provider_id") + auth_provider_session_id: Optional[str] = None + try: + auth_provider_session_id = get_value_from_macaroon( + macaroon, "auth_provider_session_id" + ) + except MacaroonVerificationFailedException: + pass + v = pymacaroons.Verifier() v.satisfy_exact("gen = 1") v.satisfy_exact("type = login") v.satisfy_general(lambda c: c.startswith("user_id = ")) v.satisfy_general(lambda c: c.startswith("auth_provider_id = ")) + v.satisfy_general(lambda c: c.startswith("auth_provider_session_id = ")) satisfy_expiry(v, self.hs.get_clock().time_msec) v.verify(macaroon, self.hs.config.key.macaroon_secret_key) - return LoginTokenAttributes(user_id=user_id, auth_provider_id=auth_provider_id) + return LoginTokenAttributes( + user_id=user_id, + auth_provider_id=auth_provider_id, + auth_provider_session_id=auth_provider_session_id, + ) def generate_delete_pusher_token(self, user_id: str) -> str: macaroon = self._generate_base_macaroon(user_id) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 68b446eb66..82ee11e921 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -301,6 +301,8 @@ class DeviceHandler(DeviceWorkerHandler): user_id: str, device_id: Optional[str], initial_device_display_name: Optional[str] = None, + auth_provider_id: Optional[str] = None, + auth_provider_session_id: Optional[str] = None, ) -> str: """ If the given device has not been registered, register it with the @@ -312,6 +314,8 @@ class DeviceHandler(DeviceWorkerHandler): user_id: @user:id device_id: device id supplied by client initial_device_display_name: device display name from client + auth_provider_id: The SSO IdP the user used, if any. + auth_provider_session_id: The session ID (sid) got from the SSO IdP. Returns: device id (generated if none was supplied) """ @@ -323,6 +327,8 @@ class DeviceHandler(DeviceWorkerHandler): user_id=user_id, device_id=device_id, initial_device_display_name=initial_device_display_name, + auth_provider_id=auth_provider_id, + auth_provider_session_id=auth_provider_session_id, ) if new_device: await self.notify_device_update(user_id, [device_id]) @@ -337,6 +343,8 @@ class DeviceHandler(DeviceWorkerHandler): user_id=user_id, device_id=new_device_id, initial_device_display_name=initial_device_display_name, + auth_provider_id=auth_provider_id, + auth_provider_session_id=auth_provider_session_id, ) if new_device: await self.notify_device_update(user_id, [new_device_id]) diff --git a/synapse/handlers/oidc.py b/synapse/handlers/oidc.py index 3665d91513..deb3539751 100644 --- a/synapse/handlers/oidc.py +++ b/synapse/handlers/oidc.py @@ -23,7 +23,7 @@ from authlib.common.security import generate_token from authlib.jose import JsonWebToken, jwt from authlib.oauth2.auth import ClientAuth from authlib.oauth2.rfc6749.parameters import prepare_grant_uri -from authlib.oidc.core import CodeIDToken, ImplicitIDToken, UserInfo +from authlib.oidc.core import CodeIDToken, UserInfo from authlib.oidc.discovery import OpenIDProviderMetadata, get_well_known_url from jinja2 import Environment, Template from pymacaroons.exceptions import ( @@ -117,7 +117,8 @@ class OidcHandler: for idp_id, p in self._providers.items(): try: await p.load_metadata() - await p.load_jwks() + if not p._uses_userinfo: + await p.load_jwks() except Exception as e: raise Exception( "Error while initialising OIDC provider %r" % (idp_id,) @@ -498,10 +499,6 @@ class OidcProvider: return await self._jwks.get() async def _load_jwks(self) -> JWKS: - if self._uses_userinfo: - # We're not using jwt signing, return an empty jwk set - return {"keys": []} - metadata = await self.load_metadata() # Load the JWKS using the `jwks_uri` metadata. @@ -663,7 +660,7 @@ class OidcProvider: return UserInfo(resp) - async def _parse_id_token(self, token: Token, nonce: str) -> UserInfo: + async def _parse_id_token(self, token: Token, nonce: str) -> CodeIDToken: """Return an instance of UserInfo from token's ``id_token``. Args: @@ -673,7 +670,7 @@ class OidcProvider: request. This value should match the one inside the token. Returns: - An object representing the user. + The decoded claims in the ID token. """ metadata = await self.load_metadata() claims_params = { @@ -684,9 +681,6 @@ class OidcProvider: # If we got an `access_token`, there should be an `at_hash` claim # in the `id_token` that we can check against. claims_params["access_token"] = token["access_token"] - claims_cls = CodeIDToken - else: - claims_cls = ImplicitIDToken alg_values = metadata.get("id_token_signing_alg_values_supported", ["RS256"]) jwt = JsonWebToken(alg_values) @@ -703,7 +697,7 @@ class OidcProvider: claims = jwt.decode( id_token, key=jwk_set, - claims_cls=claims_cls, + claims_cls=CodeIDToken, claims_options=claim_options, claims_params=claims_params, ) @@ -713,7 +707,7 @@ class OidcProvider: claims = jwt.decode( id_token, key=jwk_set, - claims_cls=claims_cls, + claims_cls=CodeIDToken, claims_options=claim_options, claims_params=claims_params, ) @@ -721,7 +715,8 @@ class OidcProvider: logger.debug("Decoded id_token JWT %r; validating", claims) claims.validate(leeway=120) # allows 2 min of clock skew - return UserInfo(claims) + + return claims async def handle_redirect_request( self, @@ -837,8 +832,22 @@ class OidcProvider: logger.debug("Successfully obtained OAuth2 token data: %r", token) - # Now that we have a token, get the userinfo, either by decoding the - # `id_token` or by fetching the `userinfo_endpoint`. + # If there is an id_token, it should be validated, regardless of the + # userinfo endpoint is used or not. + if token.get("id_token") is not None: + try: + id_token = await self._parse_id_token(token, nonce=session_data.nonce) + sid = id_token.get("sid") + except Exception as e: + logger.exception("Invalid id_token") + self._sso_handler.render_error(request, "invalid_token", str(e)) + return + else: + id_token = None + sid = None + + # Now that we have a token, get the userinfo either from the `id_token` + # claims or by fetching the `userinfo_endpoint`. if self._uses_userinfo: try: userinfo = await self._fetch_userinfo(token) @@ -846,13 +855,14 @@ class OidcProvider: logger.exception("Could not fetch userinfo") self._sso_handler.render_error(request, "fetch_error", str(e)) return + elif id_token is not None: + userinfo = UserInfo(id_token) else: - try: - userinfo = await self._parse_id_token(token, nonce=session_data.nonce) - except Exception as e: - logger.exception("Invalid id_token") - self._sso_handler.render_error(request, "invalid_token", str(e)) - return + logger.error("Missing id_token in token response") + self._sso_handler.render_error( + request, "invalid_token", "Missing id_token in token response" + ) + return # first check if we're doing a UIA if session_data.ui_auth_session_id: @@ -884,7 +894,7 @@ class OidcProvider: # Call the mapper to register/login the user try: await self._complete_oidc_login( - userinfo, token, request, session_data.client_redirect_url + userinfo, token, request, session_data.client_redirect_url, sid ) except MappingException as e: logger.exception("Could not map user") @@ -896,6 +906,7 @@ class OidcProvider: token: Token, request: SynapseRequest, client_redirect_url: str, + sid: Optional[str], ) -> None: """Given a UserInfo response, complete the login flow @@ -1008,6 +1019,7 @@ class OidcProvider: oidc_response_to_user_attributes, grandfather_existing_users, extra_attributes, + auth_provider_session_id=sid, ) def _remote_id_from_userinfo(self, userinfo: UserInfo) -> str: diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index b14ddd8267..f08a516a75 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -746,6 +746,7 @@ class RegistrationHandler: is_appservice_ghost: bool = False, auth_provider_id: Optional[str] = None, should_issue_refresh_token: bool = False, + auth_provider_session_id: Optional[str] = None, ) -> Tuple[str, str, Optional[int], Optional[str]]: """Register a device for a user and generate an access token. @@ -756,9 +757,9 @@ class RegistrationHandler: device_id: The device ID to check, or None to generate a new one. initial_display_name: An optional display name for the device. is_guest: Whether this is a guest account - auth_provider_id: The SSO IdP the user used, if any (just used for the - prometheus metrics). + auth_provider_id: The SSO IdP the user used, if any. should_issue_refresh_token: Whether it should also issue a refresh token + auth_provider_session_id: The session ID received during login from the SSO IdP. Returns: Tuple of device ID, access token, access token expiration time and refresh token """ @@ -769,6 +770,8 @@ class RegistrationHandler: is_guest=is_guest, is_appservice_ghost=is_appservice_ghost, should_issue_refresh_token=should_issue_refresh_token, + auth_provider_id=auth_provider_id, + auth_provider_session_id=auth_provider_session_id, ) login_counter.labels( @@ -791,6 +794,8 @@ class RegistrationHandler: is_guest: bool = False, is_appservice_ghost: bool = False, should_issue_refresh_token: bool = False, + auth_provider_id: Optional[str] = None, + auth_provider_session_id: Optional[str] = None, ) -> LoginDict: """Helper for register_device @@ -822,7 +827,11 @@ class RegistrationHandler: refresh_token_id = None registered_device_id = await self.device_handler.check_device_registered( - user_id, device_id, initial_display_name + user_id, + device_id, + initial_display_name, + auth_provider_id=auth_provider_id, + auth_provider_session_id=auth_provider_session_id, ) if is_guest: assert access_token_expiry is None diff --git a/synapse/handlers/sso.py b/synapse/handlers/sso.py index 49fde01cf0..65c27bc64a 100644 --- a/synapse/handlers/sso.py +++ b/synapse/handlers/sso.py @@ -365,6 +365,7 @@ class SsoHandler: sso_to_matrix_id_mapper: Callable[[int], Awaitable[UserAttributes]], grandfather_existing_users: Callable[[], Awaitable[Optional[str]]], extra_login_attributes: Optional[JsonDict] = None, + auth_provider_session_id: Optional[str] = None, ) -> None: """ Given an SSO ID, retrieve the user ID for it and possibly register the user. @@ -415,6 +416,8 @@ class SsoHandler: extra_login_attributes: An optional dictionary of extra attributes to be provided to the client in the login response. + auth_provider_session_id: An optional session ID from the IdP. + Raises: MappingException if there was a problem mapping the response to a user. RedirectException: if the mapping provider needs to redirect the user @@ -490,6 +493,7 @@ class SsoHandler: client_redirect_url, extra_login_attributes, new_user=new_user, + auth_provider_session_id=auth_provider_session_id, ) async def _call_attribute_mapper( diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index a8154168be..6bfb4b8d1b 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -626,6 +626,7 @@ class ModuleApi: user_id: str, duration_in_ms: int = (2 * 60 * 1000), auth_provider_id: str = "", + auth_provider_session_id: Optional[str] = None, ) -> str: """Generate a login token suitable for m.login.token authentication @@ -643,6 +644,7 @@ class ModuleApi: return self._hs.get_macaroon_generator().generate_short_term_login_token( user_id, auth_provider_id, + auth_provider_session_id, duration_in_ms, ) diff --git a/synapse/replication/http/login.py b/synapse/replication/http/login.py index 0db419ea57..daacc34cea 100644 --- a/synapse/replication/http/login.py +++ b/synapse/replication/http/login.py @@ -46,6 +46,8 @@ class RegisterDeviceReplicationServlet(ReplicationEndpoint): is_guest, is_appservice_ghost, should_issue_refresh_token, + auth_provider_id, + auth_provider_session_id, ): """ Args: @@ -63,6 +65,8 @@ class RegisterDeviceReplicationServlet(ReplicationEndpoint): "is_guest": is_guest, "is_appservice_ghost": is_appservice_ghost, "should_issue_refresh_token": should_issue_refresh_token, + "auth_provider_id": auth_provider_id, + "auth_provider_session_id": auth_provider_session_id, } async def _handle_request(self, request, user_id): @@ -73,6 +77,8 @@ class RegisterDeviceReplicationServlet(ReplicationEndpoint): is_guest = content["is_guest"] is_appservice_ghost = content["is_appservice_ghost"] should_issue_refresh_token = content["should_issue_refresh_token"] + auth_provider_id = content["auth_provider_id"] + auth_provider_session_id = content["auth_provider_session_id"] res = await self.registration_handler.register_device_inner( user_id, @@ -81,6 +87,8 @@ class RegisterDeviceReplicationServlet(ReplicationEndpoint): is_guest, is_appservice_ghost=is_appservice_ghost, should_issue_refresh_token=should_issue_refresh_token, + auth_provider_id=auth_provider_id, + auth_provider_session_id=auth_provider_session_id, ) return 200, res diff --git a/synapse/rest/client/login.py b/synapse/rest/client/login.py index a66ee4fb3d..1b23fa18cf 100644 --- a/synapse/rest/client/login.py +++ b/synapse/rest/client/login.py @@ -303,6 +303,7 @@ class LoginRestServlet(RestServlet): ratelimit: bool = True, auth_provider_id: Optional[str] = None, should_issue_refresh_token: bool = False, + auth_provider_session_id: Optional[str] = None, ) -> LoginResponse: """Called when we've successfully authed the user and now need to actually login them in (e.g. create devices). This gets called on @@ -318,10 +319,10 @@ class LoginRestServlet(RestServlet): create_non_existent_users: Whether to create the user if they don't exist. Defaults to False. ratelimit: Whether to ratelimit the login request. - auth_provider_id: The SSO IdP the user used, if any (just used for the - prometheus metrics). + auth_provider_id: The SSO IdP the user used, if any. should_issue_refresh_token: True if this login should issue a refresh token alongside the access token. + auth_provider_session_id: The session ID got during login from the SSO IdP. Returns: result: Dictionary of account information after successful login. @@ -354,6 +355,7 @@ class LoginRestServlet(RestServlet): initial_display_name, auth_provider_id=auth_provider_id, should_issue_refresh_token=should_issue_refresh_token, + auth_provider_session_id=auth_provider_session_id, ) result = LoginResponse( @@ -399,6 +401,7 @@ class LoginRestServlet(RestServlet): self.auth_handler._sso_login_callback, auth_provider_id=res.auth_provider_id, should_issue_refresh_token=should_issue_refresh_token, + auth_provider_session_id=res.auth_provider_session_id, ) async def _do_jwt_login( diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 9ccc66e589..d5a4a661cd 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -139,6 +139,27 @@ class DeviceWorkerStore(SQLBaseStore): return {d["device_id"]: d for d in devices} + async def get_devices_by_auth_provider_session_id( + self, auth_provider_id: str, auth_provider_session_id: str + ) -> List[Dict[str, Any]]: + """Retrieve the list of devices associated with a SSO IdP session ID. + + Args: + auth_provider_id: The SSO IdP ID as defined in the server config + auth_provider_session_id: The session ID within the IdP + Returns: + A list of dicts containing the device_id and the user_id of each device + """ + return await self.db_pool.simple_select_list( + table="device_auth_providers", + keyvalues={ + "auth_provider_id": auth_provider_id, + "auth_provider_session_id": auth_provider_session_id, + }, + retcols=("user_id", "device_id"), + desc="get_devices_by_auth_provider_session_id", + ) + @trace async def get_device_updates_by_remote( self, destination: str, from_stream_id: int, limit: int @@ -1070,7 +1091,12 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): ) async def store_device( - self, user_id: str, device_id: str, initial_device_display_name: Optional[str] + self, + user_id: str, + device_id: str, + initial_device_display_name: Optional[str], + auth_provider_id: Optional[str] = None, + auth_provider_session_id: Optional[str] = None, ) -> bool: """Ensure the given device is known; add it to the store if not @@ -1079,6 +1105,8 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): device_id: id of device initial_device_display_name: initial displayname of the device. Ignored if device exists. + auth_provider_id: The SSO IdP the user used, if any. + auth_provider_session_id: The session ID (sid) got from a OIDC login. Returns: Whether the device was inserted or an existing device existed with that ID. @@ -1115,6 +1143,18 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): if hidden: raise StoreError(400, "The device ID is in use", Codes.FORBIDDEN) + if auth_provider_id and auth_provider_session_id: + await self.db_pool.simple_insert( + "device_auth_providers", + values={ + "user_id": user_id, + "device_id": device_id, + "auth_provider_id": auth_provider_id, + "auth_provider_session_id": auth_provider_session_id, + }, + desc="store_device_auth_provider", + ) + self.device_id_exists_cache.set(key, True) return inserted except StoreError: @@ -1168,6 +1208,14 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): keyvalues={"user_id": user_id}, ) + self.db_pool.simple_delete_many_txn( + txn, + table="device_auth_providers", + column="device_id", + values=device_ids, + keyvalues={"user_id": user_id}, + ) + await self.db_pool.runInteraction("delete_devices", _delete_devices_txn) for device_id in device_ids: self.device_id_exists_cache.invalidate((user_id, device_id)) diff --git a/synapse/storage/schema/main/delta/65/11_devices_auth_provider_session.sql b/synapse/storage/schema/main/delta/65/11_devices_auth_provider_session.sql new file mode 100644 index 0000000000..a65bfb520d --- /dev/null +++ b/synapse/storage/schema/main/delta/65/11_devices_auth_provider_session.sql @@ -0,0 +1,27 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Track the auth provider used by each login as well as the session ID +CREATE TABLE device_auth_providers ( + user_id TEXT NOT NULL, + device_id TEXT NOT NULL, + auth_provider_id TEXT NOT NULL, + auth_provider_session_id TEXT NOT NULL +); + +CREATE INDEX device_auth_providers_devices + ON device_auth_providers (user_id, device_id); +CREATE INDEX device_auth_providers_sessions + ON device_auth_providers (auth_provider_id, auth_provider_session_id); diff --git a/tests/handlers/test_auth.py b/tests/handlers/test_auth.py index 72e176da75..03b8b8615c 100644 --- a/tests/handlers/test_auth.py +++ b/tests/handlers/test_auth.py @@ -71,7 +71,7 @@ class AuthTestCase(unittest.HomeserverTestCase): def test_short_term_login_token_gives_user_id(self): token = self.macaroon_generator.generate_short_term_login_token( - self.user1, "", 5000 + self.user1, "", duration_in_ms=5000 ) res = self.get_success(self.auth_handler.validate_short_term_login_token(token)) self.assertEqual(self.user1, res.user_id) @@ -94,7 +94,7 @@ class AuthTestCase(unittest.HomeserverTestCase): def test_short_term_login_token_cannot_replace_user_id(self): token = self.macaroon_generator.generate_short_term_login_token( - self.user1, "", 5000 + self.user1, "", duration_in_ms=5000 ) macaroon = pymacaroons.Macaroon.deserialize(token) @@ -213,6 +213,6 @@ class AuthTestCase(unittest.HomeserverTestCase): def _get_macaroon(self): token = self.macaroon_generator.generate_short_term_login_token( - self.user1, "", 5000 + self.user1, "", duration_in_ms=5000 ) return pymacaroons.Macaroon.deserialize(token) diff --git a/tests/handlers/test_cas.py b/tests/handlers/test_cas.py index b625995d12..8705ff8943 100644 --- a/tests/handlers/test_cas.py +++ b/tests/handlers/test_cas.py @@ -66,7 +66,13 @@ class CasHandlerTestCase(HomeserverTestCase): # check that the auth handler got called as expected auth_handler.complete_sso_login.assert_called_once_with( - "@test_user:test", "cas", request, "redirect_uri", None, new_user=True + "@test_user:test", + "cas", + request, + "redirect_uri", + None, + new_user=True, + auth_provider_session_id=None, ) def test_map_cas_user_to_existing_user(self): @@ -89,7 +95,13 @@ class CasHandlerTestCase(HomeserverTestCase): # check that the auth handler got called as expected auth_handler.complete_sso_login.assert_called_once_with( - "@test_user:test", "cas", request, "redirect_uri", None, new_user=False + "@test_user:test", + "cas", + request, + "redirect_uri", + None, + new_user=False, + auth_provider_session_id=None, ) # Subsequent calls should map to the same mxid. @@ -98,7 +110,13 @@ class CasHandlerTestCase(HomeserverTestCase): self.handler._handle_cas_response(request, cas_response, "redirect_uri", "") ) auth_handler.complete_sso_login.assert_called_once_with( - "@test_user:test", "cas", request, "redirect_uri", None, new_user=False + "@test_user:test", + "cas", + request, + "redirect_uri", + None, + new_user=False, + auth_provider_session_id=None, ) def test_map_cas_user_to_invalid_localpart(self): @@ -116,7 +134,13 @@ class CasHandlerTestCase(HomeserverTestCase): # check that the auth handler got called as expected auth_handler.complete_sso_login.assert_called_once_with( - "@f=c3=b6=c3=b6:test", "cas", request, "redirect_uri", None, new_user=True + "@f=c3=b6=c3=b6:test", + "cas", + request, + "redirect_uri", + None, + new_user=True, + auth_provider_session_id=None, ) @override_config( @@ -160,7 +184,13 @@ class CasHandlerTestCase(HomeserverTestCase): # check that the auth handler got called as expected auth_handler.complete_sso_login.assert_called_once_with( - "@test_user:test", "cas", request, "redirect_uri", None, new_user=True + "@test_user:test", + "cas", + request, + "redirect_uri", + None, + new_user=True, + auth_provider_session_id=None, ) diff --git a/tests/handlers/test_oidc.py b/tests/handlers/test_oidc.py index a25c89bd5b..cfe3de5266 100644 --- a/tests/handlers/test_oidc.py +++ b/tests/handlers/test_oidc.py @@ -252,13 +252,6 @@ class OidcHandlerTestCase(HomeserverTestCase): with patch.object(self.provider, "load_metadata", patched_load_metadata): self.get_failure(self.provider.load_jwks(force=True), RuntimeError) - # Return empty key set if JWKS are not used - self.provider._scopes = [] # not asking the openid scope - self.http_client.get_json.reset_mock() - jwks = self.get_success(self.provider.load_jwks(force=True)) - self.http_client.get_json.assert_not_called() - self.assertEqual(jwks, {"keys": []}) - @override_config({"oidc_config": DEFAULT_CONFIG}) def test_validate_config(self): """Provider metadatas are extensively validated.""" @@ -455,7 +448,13 @@ class OidcHandlerTestCase(HomeserverTestCase): self.get_success(self.handler.handle_oidc_callback(request)) auth_handler.complete_sso_login.assert_called_once_with( - expected_user_id, "oidc", request, client_redirect_url, None, new_user=True + expected_user_id, + "oidc", + request, + client_redirect_url, + None, + new_user=True, + auth_provider_session_id=None, ) self.provider._exchange_code.assert_called_once_with(code) self.provider._parse_id_token.assert_called_once_with(token, nonce=nonce) @@ -482,17 +481,58 @@ class OidcHandlerTestCase(HomeserverTestCase): self.provider._fetch_userinfo.reset_mock() # With userinfo fetching - self.provider._scopes = [] # do not ask the "openid" scope + self.provider._user_profile_method = "userinfo_endpoint" + token = { + "type": "bearer", + "access_token": "access_token", + } + self.provider._exchange_code = simple_async_mock(return_value=token) self.get_success(self.handler.handle_oidc_callback(request)) auth_handler.complete_sso_login.assert_called_once_with( - expected_user_id, "oidc", request, client_redirect_url, None, new_user=False + expected_user_id, + "oidc", + request, + client_redirect_url, + None, + new_user=False, + auth_provider_session_id=None, ) self.provider._exchange_code.assert_called_once_with(code) self.provider._parse_id_token.assert_not_called() self.provider._fetch_userinfo.assert_called_once_with(token) self.render_error.assert_not_called() + # With an ID token, userinfo fetching and sid in the ID token + self.provider._user_profile_method = "userinfo_endpoint" + token = { + "type": "bearer", + "access_token": "access_token", + "id_token": "id_token", + } + id_token = { + "sid": "abcdefgh", + } + self.provider._parse_id_token = simple_async_mock(return_value=id_token) + self.provider._exchange_code = simple_async_mock(return_value=token) + auth_handler.complete_sso_login.reset_mock() + self.provider._fetch_userinfo.reset_mock() + self.get_success(self.handler.handle_oidc_callback(request)) + + auth_handler.complete_sso_login.assert_called_once_with( + expected_user_id, + "oidc", + request, + client_redirect_url, + None, + new_user=False, + auth_provider_session_id=id_token["sid"], + ) + self.provider._exchange_code.assert_called_once_with(code) + self.provider._parse_id_token.assert_called_once_with(token, nonce=nonce) + self.provider._fetch_userinfo.assert_called_once_with(token) + self.render_error.assert_not_called() + # Handle userinfo fetching error self.provider._fetch_userinfo = simple_async_mock(raises=Exception()) self.get_success(self.handler.handle_oidc_callback(request)) @@ -776,6 +816,7 @@ class OidcHandlerTestCase(HomeserverTestCase): client_redirect_url, {"phone": "1234567"}, new_user=True, + auth_provider_session_id=None, ) @override_config({"oidc_config": DEFAULT_CONFIG}) @@ -790,7 +831,13 @@ class OidcHandlerTestCase(HomeserverTestCase): } self.get_success(_make_callback_with_userinfo(self.hs, userinfo)) auth_handler.complete_sso_login.assert_called_once_with( - "@test_user:test", "oidc", ANY, ANY, None, new_user=True + "@test_user:test", + "oidc", + ANY, + ANY, + None, + new_user=True, + auth_provider_session_id=None, ) auth_handler.complete_sso_login.reset_mock() @@ -801,7 +848,13 @@ class OidcHandlerTestCase(HomeserverTestCase): } self.get_success(_make_callback_with_userinfo(self.hs, userinfo)) auth_handler.complete_sso_login.assert_called_once_with( - "@test_user_2:test", "oidc", ANY, ANY, None, new_user=True + "@test_user_2:test", + "oidc", + ANY, + ANY, + None, + new_user=True, + auth_provider_session_id=None, ) auth_handler.complete_sso_login.reset_mock() @@ -838,14 +891,26 @@ class OidcHandlerTestCase(HomeserverTestCase): } self.get_success(_make_callback_with_userinfo(self.hs, userinfo)) auth_handler.complete_sso_login.assert_called_once_with( - user.to_string(), "oidc", ANY, ANY, None, new_user=False + user.to_string(), + "oidc", + ANY, + ANY, + None, + new_user=False, + auth_provider_session_id=None, ) auth_handler.complete_sso_login.reset_mock() # Subsequent calls should map to the same mxid. self.get_success(_make_callback_with_userinfo(self.hs, userinfo)) auth_handler.complete_sso_login.assert_called_once_with( - user.to_string(), "oidc", ANY, ANY, None, new_user=False + user.to_string(), + "oidc", + ANY, + ANY, + None, + new_user=False, + auth_provider_session_id=None, ) auth_handler.complete_sso_login.reset_mock() @@ -860,7 +925,13 @@ class OidcHandlerTestCase(HomeserverTestCase): } self.get_success(_make_callback_with_userinfo(self.hs, userinfo)) auth_handler.complete_sso_login.assert_called_once_with( - user.to_string(), "oidc", ANY, ANY, None, new_user=False + user.to_string(), + "oidc", + ANY, + ANY, + None, + new_user=False, + auth_provider_session_id=None, ) auth_handler.complete_sso_login.reset_mock() @@ -896,7 +967,13 @@ class OidcHandlerTestCase(HomeserverTestCase): self.get_success(_make_callback_with_userinfo(self.hs, userinfo)) auth_handler.complete_sso_login.assert_called_once_with( - "@TEST_USER_2:test", "oidc", ANY, ANY, None, new_user=False + "@TEST_USER_2:test", + "oidc", + ANY, + ANY, + None, + new_user=False, + auth_provider_session_id=None, ) @override_config({"oidc_config": DEFAULT_CONFIG}) @@ -934,7 +1011,13 @@ class OidcHandlerTestCase(HomeserverTestCase): # test_user is already taken, so test_user1 gets registered instead. auth_handler.complete_sso_login.assert_called_once_with( - "@test_user1:test", "oidc", ANY, ANY, None, new_user=True + "@test_user1:test", + "oidc", + ANY, + ANY, + None, + new_user=True, + auth_provider_session_id=None, ) auth_handler.complete_sso_login.reset_mock() @@ -1018,7 +1101,13 @@ class OidcHandlerTestCase(HomeserverTestCase): # check that the auth handler got called as expected auth_handler.complete_sso_login.assert_called_once_with( - "@tester:test", "oidc", ANY, ANY, None, new_user=True + "@tester:test", + "oidc", + ANY, + ANY, + None, + new_user=True, + auth_provider_session_id=None, ) @override_config( @@ -1043,7 +1132,13 @@ class OidcHandlerTestCase(HomeserverTestCase): # check that the auth handler got called as expected auth_handler.complete_sso_login.assert_called_once_with( - "@tester:test", "oidc", ANY, ANY, None, new_user=True + "@tester:test", + "oidc", + ANY, + ANY, + None, + new_user=True, + auth_provider_session_id=None, ) @override_config( @@ -1156,7 +1251,7 @@ async def _make_callback_with_userinfo( handler = hs.get_oidc_handler() provider = handler._providers["oidc"] - provider._exchange_code = simple_async_mock(return_value={}) + provider._exchange_code = simple_async_mock(return_value={"id_token": ""}) provider._parse_id_token = simple_async_mock(return_value=userinfo) provider._fetch_userinfo = simple_async_mock(return_value=userinfo) diff --git a/tests/handlers/test_saml.py b/tests/handlers/test_saml.py index 8cfc184fef..50551aa6e3 100644 --- a/tests/handlers/test_saml.py +++ b/tests/handlers/test_saml.py @@ -130,7 +130,13 @@ class SamlHandlerTestCase(HomeserverTestCase): # check that the auth handler got called as expected auth_handler.complete_sso_login.assert_called_once_with( - "@test_user:test", "saml", request, "redirect_uri", None, new_user=True + "@test_user:test", + "saml", + request, + "redirect_uri", + None, + new_user=True, + auth_provider_session_id=None, ) @override_config({"saml2_config": {"grandfathered_mxid_source_attribute": "mxid"}}) @@ -156,7 +162,13 @@ class SamlHandlerTestCase(HomeserverTestCase): # check that the auth handler got called as expected auth_handler.complete_sso_login.assert_called_once_with( - "@test_user:test", "saml", request, "", None, new_user=False + "@test_user:test", + "saml", + request, + "", + None, + new_user=False, + auth_provider_session_id=None, ) # Subsequent calls should map to the same mxid. @@ -165,7 +177,13 @@ class SamlHandlerTestCase(HomeserverTestCase): self.handler._handle_authn_response(request, saml_response, "") ) auth_handler.complete_sso_login.assert_called_once_with( - "@test_user:test", "saml", request, "", None, new_user=False + "@test_user:test", + "saml", + request, + "", + None, + new_user=False, + auth_provider_session_id=None, ) def test_map_saml_response_to_invalid_localpart(self): @@ -213,7 +231,13 @@ class SamlHandlerTestCase(HomeserverTestCase): # test_user is already taken, so test_user1 gets registered instead. auth_handler.complete_sso_login.assert_called_once_with( - "@test_user1:test", "saml", request, "", None, new_user=True + "@test_user1:test", + "saml", + request, + "", + None, + new_user=True, + auth_provider_session_id=None, ) auth_handler.complete_sso_login.reset_mock() @@ -309,7 +333,13 @@ class SamlHandlerTestCase(HomeserverTestCase): # check that the auth handler got called as expected auth_handler.complete_sso_login.assert_called_once_with( - "@test_user:test", "saml", request, "redirect_uri", None, new_user=True + "@test_user:test", + "saml", + request, + "redirect_uri", + None, + new_user=True, + auth_provider_session_id=None, ) -- cgit 1.5.1 From 8541809cb952ebf0da2a95dd93eccd5644dab49d Mon Sep 17 00:00:00 2001 From: Hubert Chathi Date: Wed, 8 Dec 2021 05:01:38 -0500 Subject: Send and handle cross-signing messages using the stable prefix. (#10520) --- changelog.d/10520.misc | 1 + synapse/handlers/e2e_keys.py | 8 ++++++-- synapse/storage/databases/main/devices.py | 4 +++- tests/federation/test_federation_sender.py | 5 +++-- 4 files changed, 13 insertions(+), 5 deletions(-) create mode 100644 changelog.d/10520.misc (limited to 'synapse/storage/databases') diff --git a/changelog.d/10520.misc b/changelog.d/10520.misc new file mode 100644 index 0000000000..a911e165da --- /dev/null +++ b/changelog.d/10520.misc @@ -0,0 +1 @@ +Send and handle cross-signing messages using the stable prefix. diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 60c11e3d21..b2554bda04 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -65,8 +65,12 @@ class E2eKeysHandler: else: # Only register this edu handler on master as it requires writing # device updates to the db - # - # FIXME: switch to m.signing_key_update when MSC1756 is merged into the spec + federation_registry.register_edu_handler( + "m.signing_key_update", + self._edu_updater.incoming_signing_key_update, + ) + # also handle the unstable version + # FIXME: remove this when enough servers have upgraded federation_registry.register_edu_handler( "org.matrix.signing_key_update", self._edu_updater.incoming_signing_key_update, diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index d5a4a661cd..838a2a6a3d 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -274,7 +274,9 @@ class DeviceWorkerStore(SQLBaseStore): # add the updated cross-signing keys to the results list for user_id, result in cross_signing_keys_by_user.items(): result["user_id"] = user_id - # FIXME: switch to m.signing_key_update when MSC1756 is merged into the spec + results.append(("m.signing_key_update", result)) + # also send the unstable version + # FIXME: remove this when enough servers have upgraded results.append(("org.matrix.signing_key_update", result)) return now_stream_id, results diff --git a/tests/federation/test_federation_sender.py b/tests/federation/test_federation_sender.py index b457dad6d2..b2376e2db9 100644 --- a/tests/federation/test_federation_sender.py +++ b/tests/federation/test_federation_sender.py @@ -266,7 +266,8 @@ class FederationSenderDevicesTestCases(HomeserverTestCase): ) # expect signing key update edu - self.assertEqual(len(self.edus), 1) + self.assertEqual(len(self.edus), 2) + self.assertEqual(self.edus.pop(0)["edu_type"], "m.signing_key_update") self.assertEqual(self.edus.pop(0)["edu_type"], "org.matrix.signing_key_update") # sign the devices @@ -491,7 +492,7 @@ class FederationSenderDevicesTestCases(HomeserverTestCase): ) -> None: """Check that the txn has an EDU with a signing key update.""" edus = txn["edus"] - self.assertEqual(len(edus), 1) + self.assertEqual(len(edus), 2) def generate_and_upload_device_signing_key( self, user_id: str, device_id: str -- cgit 1.5.1