From 9d1971a5c440fe2bb92ea092d17ac00d37e36466 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 29 Nov 2021 10:43:20 -0500 Subject: Return the stable `event` field from `/send_join` per MSC3083. (#11413) This does not remove the unstable field and still parses both. Handling of the unstable field will need to be removed in the future. --- synapse/federation/federation_server.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'synapse/federation/federation_server.py') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 9a8758e9a6..8fbc75aa65 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -613,8 +613,11 @@ class FederationServer(FederationBase): state = await self.store.get_events(state_ids) time_now = self._clock.time_msec() + event_json = event.get_pdu_json() return { - "org.matrix.msc3083.v2.event": event.get_pdu_json(), + # TODO Remove the unstable prefix when servers have updated. + "org.matrix.msc3083.v2.event": event_json, + "event": event_json, "state": [p.get_pdu_json(time_now) for p in state.values()], "auth_chain": [p.get_pdu_json(time_now) for p in auth_chain], } -- cgit 1.5.1 From a6f1a3abecf8e8fd3e1bff439a06b853df18f194 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 2 Dec 2021 01:02:20 -0600 Subject: Add MSC3030 experimental client and federation API endpoints to get the closest event to a given timestamp (#9445) MSC3030: https://github.com/matrix-org/matrix-doc/pull/3030 Client API endpoint. This will also go and fetch from the federation API endpoint if unable to find an event locally or we found an extremity with possibly a closer event we don't know about. ``` GET /_matrix/client/unstable/org.matrix.msc3030/rooms//timestamp_to_event?ts=&dir= { "event_id": ... "origin_server_ts": ... } ``` Federation API endpoint: ``` GET /_matrix/federation/unstable/org.matrix.msc3030/timestamp_to_event/?ts=&dir= { "event_id": ... "origin_server_ts": ... } ``` Co-authored-by: Erik Johnston --- changelog.d/9445.feature | 1 + synapse/config/experimental.py | 3 + synapse/federation/federation_client.py | 77 +++++++++ synapse/federation/federation_server.py | 43 +++++ synapse/federation/transport/client.py | 36 ++++ synapse/federation/transport/server/__init__.py | 12 +- synapse/federation/transport/server/federation.py | 41 +++++ synapse/handlers/federation.py | 61 +++---- synapse/handlers/room.py | 144 ++++++++++++++++ synapse/http/servlet.py | 29 ++++ synapse/rest/client/room.py | 58 +++++++ synapse/server.py | 5 + synapse/storage/databases/main/events_worker.py | 195 ++++++++++++++++++++++ 13 files changed, 674 insertions(+), 31 deletions(-) create mode 100644 changelog.d/9445.feature (limited to 'synapse/federation/federation_server.py') diff --git a/changelog.d/9445.feature b/changelog.d/9445.feature new file mode 100644 index 0000000000..6d12eea71f --- /dev/null +++ b/changelog.d/9445.feature @@ -0,0 +1 @@ +Add [MSC3030](https://github.com/matrix-org/matrix-doc/pull/3030) experimental client and federation API endpoints to get the closest event to a given timestamp. diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py index 8b098ad48d..d78a15097c 100644 --- a/synapse/config/experimental.py +++ b/synapse/config/experimental.py @@ -46,3 +46,6 @@ class ExperimentalConfig(Config): # MSC3266 (room summary api) self.msc3266_enabled: bool = experimental.get("msc3266_enabled", False) + + # MSC3030 (Jump to date API endpoint) + self.msc3030_enabled: bool = experimental.get("msc3030_enabled", False) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index bc3f96c1fc..be1423da24 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -1517,6 +1517,83 @@ class FederationClient(FederationBase): self._get_room_hierarchy_cache[(room_id, suggested_only)] = result return result + async def timestamp_to_event( + self, destination: str, room_id: str, timestamp: int, direction: str + ) -> "TimestampToEventResponse": + """ + Calls a remote federating server at `destination` asking for their + closest event to the given timestamp in the given direction. Also + validates the response to always return the expected keys or raises an + error. + + Args: + destination: Domain name of the remote homeserver + room_id: Room to fetch the event from + timestamp: The point in time (inclusive) we should navigate from in + the given direction to find the closest event. + direction: ["f"|"b"] to indicate whether we should navigate forward + or backward from the given timestamp to find the closest event. + + Returns: + A parsed TimestampToEventResponse including the closest event_id + and origin_server_ts + + Raises: + Various exceptions when the request fails + InvalidResponseError when the response does not have the correct + keys or wrong types + """ + remote_response = await self.transport_layer.timestamp_to_event( + destination, room_id, timestamp, direction + ) + + if not isinstance(remote_response, dict): + raise InvalidResponseError( + "Response must be a JSON dictionary but received %r" % remote_response + ) + + try: + return TimestampToEventResponse.from_json_dict(remote_response) + except ValueError as e: + raise InvalidResponseError(str(e)) + + +@attr.s(frozen=True, slots=True, auto_attribs=True) +class TimestampToEventResponse: + """Typed response dictionary for the federation /timestamp_to_event endpoint""" + + event_id: str + origin_server_ts: int + + # the raw data, including the above keys + data: JsonDict + + @classmethod + def from_json_dict(cls, d: JsonDict) -> "TimestampToEventResponse": + """Parsed response from the federation /timestamp_to_event endpoint + + Args: + d: JSON object response to be parsed + + Raises: + ValueError if d does not the correct keys or they are the wrong types + """ + + event_id = d.get("event_id") + if not isinstance(event_id, str): + raise ValueError( + "Invalid response: 'event_id' must be a str but received %r" % event_id + ) + + origin_server_ts = d.get("origin_server_ts") + if not isinstance(origin_server_ts, int): + raise ValueError( + "Invalid response: 'origin_server_ts' must be a int but received %r" + % origin_server_ts + ) + + return cls(event_id, origin_server_ts, d) + @attr.s(frozen=True, slots=True, auto_attribs=True) class FederationSpaceSummaryEventResult: diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 8fbc75aa65..cce85526e7 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -110,6 +110,7 @@ class FederationServer(FederationBase): super().__init__(hs) self.handler = hs.get_federation_handler() + self.storage = hs.get_storage() self._federation_event_handler = hs.get_federation_event_handler() self.state = hs.get_state_handler() self._event_auth_handler = hs.get_event_auth_handler() @@ -200,6 +201,48 @@ class FederationServer(FederationBase): return 200, res + async def on_timestamp_to_event_request( + self, origin: str, room_id: str, timestamp: int, direction: str + ) -> Tuple[int, Dict[str, Any]]: + """When we receive a federated `/timestamp_to_event` request, + handle all of the logic for validating and fetching the event. + + Args: + origin: The server we received the event from + room_id: Room to fetch the event from + timestamp: The point in time (inclusive) we should navigate from in + the given direction to find the closest event. + direction: ["f"|"b"] to indicate whether we should navigate forward + or backward from the given timestamp to find the closest event. + + Returns: + Tuple indicating the response status code and dictionary response + body including `event_id`. + """ + with (await self._server_linearizer.queue((origin, room_id))): + origin_host, _ = parse_server_name(origin) + await self.check_server_matches_acl(origin_host, room_id) + + # We only try to fetch data from the local database + event_id = await self.store.get_event_id_for_timestamp( + room_id, timestamp, direction + ) + if event_id: + event = await self.store.get_event( + event_id, allow_none=False, allow_rejected=False + ) + + return 200, { + "event_id": event_id, + "origin_server_ts": event.origin_server_ts, + } + + raise SynapseError( + 404, + "Unable to find event from %s in direction %s" % (timestamp, direction), + errcode=Codes.NOT_FOUND, + ) + async def on_incoming_transaction( self, origin: str, diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index fe29bcfd4b..d1f4be641d 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -148,6 +148,42 @@ class TransportLayerClient: destination, path=path, args=args, try_trailing_slash_on_400=True ) + @log_function + async def timestamp_to_event( + self, destination: str, room_id: str, timestamp: int, direction: str + ) -> Union[JsonDict, List]: + """ + Calls a remote federating server at `destination` asking for their + closest event to the given timestamp in the given direction. + + Args: + destination: Domain name of the remote homeserver + room_id: Room to fetch the event from + timestamp: The point in time (inclusive) we should navigate from in + the given direction to find the closest event. + direction: ["f"|"b"] to indicate whether we should navigate forward + or backward from the given timestamp to find the closest event. + + Returns: + Response dict received from the remote homeserver. + + Raises: + Various exceptions when the request fails + """ + path = _create_path( + FEDERATION_UNSTABLE_PREFIX, + "/org.matrix.msc3030/timestamp_to_event/%s", + room_id, + ) + + args = {"ts": [str(timestamp)], "dir": [direction]} + + remote_response = await self.client.get_json( + destination, path=path, args=args, try_trailing_slash_on_400=True + ) + + return remote_response + @log_function async def send_transaction( self, diff --git a/synapse/federation/transport/server/__init__.py b/synapse/federation/transport/server/__init__.py index c32539bf5a..abcb8728f5 100644 --- a/synapse/federation/transport/server/__init__.py +++ b/synapse/federation/transport/server/__init__.py @@ -22,7 +22,10 @@ from synapse.federation.transport.server._base import ( Authenticator, BaseFederationServlet, ) -from synapse.federation.transport.server.federation import FEDERATION_SERVLET_CLASSES +from synapse.federation.transport.server.federation import ( + FEDERATION_SERVLET_CLASSES, + FederationTimestampLookupServlet, +) from synapse.federation.transport.server.groups_local import GROUP_LOCAL_SERVLET_CLASSES from synapse.federation.transport.server.groups_server import ( GROUP_SERVER_SERVLET_CLASSES, @@ -324,6 +327,13 @@ def register_servlets( ) for servletclass in DEFAULT_SERVLET_GROUPS[servlet_group]: + # Only allow the `/timestamp_to_event` servlet if msc3030 is enabled + if ( + servletclass == FederationTimestampLookupServlet + and not hs.config.experimental.msc3030_enabled + ): + continue + servletclass( hs=hs, authenticator=authenticator, diff --git a/synapse/federation/transport/server/federation.py b/synapse/federation/transport/server/federation.py index 66e915228c..77bfd88ad0 100644 --- a/synapse/federation/transport/server/federation.py +++ b/synapse/federation/transport/server/federation.py @@ -174,6 +174,46 @@ class FederationBackfillServlet(BaseFederationServerServlet): return await self.handler.on_backfill_request(origin, room_id, versions, limit) +class FederationTimestampLookupServlet(BaseFederationServerServlet): + """ + API endpoint to fetch the `event_id` of the closest event to the given + timestamp (`ts` query parameter) in the given direction (`dir` query + parameter). + + Useful for other homeservers when they're unable to find an event locally. + + `ts` is a timestamp in milliseconds where we will find the closest event in + the given direction. + + `dir` can be `f` or `b` to indicate forwards and backwards in time from the + given timestamp. + + GET /_matrix/federation/unstable/org.matrix.msc3030/timestamp_to_event/?ts=&dir= + { + "event_id": ... + } + """ + + PATH = "/timestamp_to_event/(?P[^/]*)/?" + PREFIX = FEDERATION_UNSTABLE_PREFIX + "/org.matrix.msc3030" + + async def on_GET( + self, + origin: str, + content: Literal[None], + query: Dict[bytes, List[bytes]], + room_id: str, + ) -> Tuple[int, JsonDict]: + timestamp = parse_integer_from_args(query, "ts", required=True) + direction = parse_string_from_args( + query, "dir", default="f", allowed_values=["f", "b"], required=True + ) + + return await self.handler.on_timestamp_to_event_request( + origin, room_id, timestamp, direction + ) + + class FederationQueryServlet(BaseFederationServerServlet): PATH = "/query/(?P[^/]*)" @@ -683,6 +723,7 @@ FEDERATION_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = ( FederationStateV1Servlet, FederationStateIdsServlet, FederationBackfillServlet, + FederationTimestampLookupServlet, FederationQueryServlet, FederationMakeJoinServlet, FederationMakeLeaveServlet, diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 3112cc88b1..1ea837d082 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -68,6 +68,37 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) +def get_domains_from_state(state: StateMap[EventBase]) -> List[Tuple[str, int]]: + """Get joined domains from state + + Args: + state: State map from type/state key to event. + + Returns: + Returns a list of servers with the lowest depth of their joins. + Sorted by lowest depth first. + """ + joined_users = [ + (state_key, int(event.depth)) + for (e_type, state_key), event in state.items() + if e_type == EventTypes.Member and event.membership == Membership.JOIN + ] + + joined_domains: Dict[str, int] = {} + for u, d in joined_users: + try: + dom = get_domain_from_id(u) + old_d = joined_domains.get(dom) + if old_d: + joined_domains[dom] = min(d, old_d) + else: + joined_domains[dom] = d + except Exception: + pass + + return sorted(joined_domains.items(), key=lambda d: d[1]) + + class FederationHandler: """Handles general incoming federation requests @@ -268,36 +299,6 @@ class FederationHandler: curr_state = await self.state_handler.get_current_state(room_id) - def get_domains_from_state(state: StateMap[EventBase]) -> List[Tuple[str, int]]: - """Get joined domains from state - - Args: - state: State map from type/state key to event. - - Returns: - Returns a list of servers with the lowest depth of their joins. - Sorted by lowest depth first. - """ - joined_users = [ - (state_key, int(event.depth)) - for (e_type, state_key), event in state.items() - if e_type == EventTypes.Member and event.membership == Membership.JOIN - ] - - joined_domains: Dict[str, int] = {} - for u, d in joined_users: - try: - dom = get_domain_from_id(u) - old_d = joined_domains.get(dom) - if old_d: - joined_domains[dom] = min(d, old_d) - else: - joined_domains[dom] = d - except Exception: - pass - - return sorted(joined_domains.items(), key=lambda d: d[1]) - curr_domains = get_domains_from_state(curr_state) likely_domains = [ diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 88053f9869..2bcdf32dcc 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -46,6 +46,7 @@ from synapse.api.constants import ( from synapse.api.errors import ( AuthError, Codes, + HttpResponseException, LimitExceededError, NotFoundError, StoreError, @@ -56,6 +57,8 @@ from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion from synapse.event_auth import validate_event_for_room_version from synapse.events import EventBase from synapse.events.utils import copy_power_levels_contents +from synapse.federation.federation_client import InvalidResponseError +from synapse.handlers.federation import get_domains_from_state from synapse.rest.admin._base import assert_user_is_admin from synapse.storage.state import StateFilter from synapse.streams import EventSource @@ -1220,6 +1223,147 @@ class RoomContextHandler: return results +class TimestampLookupHandler: + def __init__(self, hs: "HomeServer"): + self.server_name = hs.hostname + self.store = hs.get_datastore() + self.state_handler = hs.get_state_handler() + self.federation_client = hs.get_federation_client() + + async def get_event_for_timestamp( + self, + requester: Requester, + room_id: str, + timestamp: int, + direction: str, + ) -> Tuple[str, int]: + """Find the closest event to the given timestamp in the given direction. + If we can't find an event locally or the event we have locally is next to a gap, + it will ask other federated homeservers for an event. + + Args: + requester: The user making the request according to the access token + room_id: Room to fetch the event from + timestamp: The point in time (inclusive) we should navigate from in + the given direction to find the closest event. + direction: ["f"|"b"] to indicate whether we should navigate forward + or backward from the given timestamp to find the closest event. + + Returns: + A tuple containing the `event_id` closest to the given timestamp in + the given direction and the `origin_server_ts`. + + Raises: + SynapseError if unable to find any event locally in the given direction + """ + + local_event_id = await self.store.get_event_id_for_timestamp( + room_id, timestamp, direction + ) + logger.debug( + "get_event_for_timestamp: locally, we found event_id=%s closest to timestamp=%s", + local_event_id, + timestamp, + ) + + # Check for gaps in the history where events could be hiding in between + # the timestamp given and the event we were able to find locally + is_event_next_to_backward_gap = False + is_event_next_to_forward_gap = False + if local_event_id: + local_event = await self.store.get_event( + local_event_id, allow_none=False, allow_rejected=False + ) + + if direction == "f": + # We only need to check for a backward gap if we're looking forwards + # to ensure there is nothing in between. + is_event_next_to_backward_gap = ( + await self.store.is_event_next_to_backward_gap(local_event) + ) + elif direction == "b": + # We only need to check for a forward gap if we're looking backwards + # to ensure there is nothing in between + is_event_next_to_forward_gap = ( + await self.store.is_event_next_to_forward_gap(local_event) + ) + + # If we found a gap, we should probably ask another homeserver first + # about more history in between + if ( + not local_event_id + or is_event_next_to_backward_gap + or is_event_next_to_forward_gap + ): + logger.debug( + "get_event_for_timestamp: locally, we found event_id=%s closest to timestamp=%s which is next to a gap in event history so we're asking other homeservers first", + local_event_id, + timestamp, + ) + + # Find other homeservers from the given state in the room + curr_state = await self.state_handler.get_current_state(room_id) + curr_domains = get_domains_from_state(curr_state) + likely_domains = [ + domain for domain, depth in curr_domains if domain != self.server_name + ] + + # Loop through each homeserver candidate until we get a succesful response + for domain in likely_domains: + try: + remote_response = await self.federation_client.timestamp_to_event( + domain, room_id, timestamp, direction + ) + logger.debug( + "get_event_for_timestamp: response from domain(%s)=%s", + domain, + remote_response, + ) + + # TODO: Do we want to persist this as an extremity? + # TODO: I think ideally, we would try to backfill from + # this event and run this whole + # `get_event_for_timestamp` function again to make sure + # they didn't give us an event from their gappy history. + remote_event_id = remote_response.event_id + origin_server_ts = remote_response.origin_server_ts + + # Only return the remote event if it's closer than the local event + if not local_event or ( + abs(origin_server_ts - timestamp) + < abs(local_event.origin_server_ts - timestamp) + ): + return remote_event_id, origin_server_ts + except (HttpResponseException, InvalidResponseError) as ex: + # Let's not put a high priority on some other homeserver + # failing to respond or giving a random response + logger.debug( + "Failed to fetch /timestamp_to_event from %s because of exception(%s) %s args=%s", + domain, + type(ex).__name__, + ex, + ex.args, + ) + except Exception as ex: + # But we do want to see some exceptions in our code + logger.warning( + "Failed to fetch /timestamp_to_event from %s because of exception(%s) %s args=%s", + domain, + type(ex).__name__, + ex, + ex.args, + ) + + if not local_event_id: + raise SynapseError( + 404, + "Unable to find event from %s in direction %s" % (timestamp, direction), + errcode=Codes.NOT_FOUND, + ) + + return local_event_id, local_event.origin_server_ts + + class RoomEventSource(EventSource[RoomStreamToken, EventBase]): def __init__(self, hs: "HomeServer"): self.store = hs.get_datastore() diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py index 91ba93372c..6dd9b9ad03 100644 --- a/synapse/http/servlet.py +++ b/synapse/http/servlet.py @@ -79,6 +79,35 @@ def parse_integer( return parse_integer_from_args(args, name, default, required) +@overload +def parse_integer_from_args( + args: Mapping[bytes, Sequence[bytes]], + name: str, + default: Optional[int] = None, +) -> Optional[int]: + ... + + +@overload +def parse_integer_from_args( + args: Mapping[bytes, Sequence[bytes]], + name: str, + *, + required: Literal[True], +) -> int: + ... + + +@overload +def parse_integer_from_args( + args: Mapping[bytes, Sequence[bytes]], + name: str, + default: Optional[int] = None, + required: bool = False, +) -> Optional[int]: + ... + + def parse_integer_from_args( args: Mapping[bytes, Sequence[bytes]], name: str, diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py index 99f303c88e..3598967be0 100644 --- a/synapse/rest/client/room.py +++ b/synapse/rest/client/room.py @@ -1070,6 +1070,62 @@ def register_txn_path( ) +class TimestampLookupRestServlet(RestServlet): + """ + API endpoint to fetch the `event_id` of the closest event to the given + timestamp (`ts` query parameter) in the given direction (`dir` query + parameter). + + Useful for cases like jump to date so you can start paginating messages from + a given date in the archive. + + `ts` is a timestamp in milliseconds where we will find the closest event in + the given direction. + + `dir` can be `f` or `b` to indicate forwards and backwards in time from the + given timestamp. + + GET /_matrix/client/unstable/org.matrix.msc3030/rooms//timestamp_to_event?ts=&dir= + { + "event_id": ... + } + """ + + PATTERNS = ( + re.compile( + "^/_matrix/client/unstable/org.matrix.msc3030" + "/rooms/(?P[^/]*)/timestamp_to_event$" + ), + ) + + def __init__(self, hs: "HomeServer"): + super().__init__() + self._auth = hs.get_auth() + self._store = hs.get_datastore() + self.timestamp_lookup_handler = hs.get_timestamp_lookup_handler() + + async def on_GET( + self, request: SynapseRequest, room_id: str + ) -> Tuple[int, JsonDict]: + requester = await self._auth.get_user_by_req(request) + await self._auth.check_user_in_room(room_id, requester.user.to_string()) + + timestamp = parse_integer(request, "ts", required=True) + direction = parse_string(request, "dir", default="f", allowed_values=["f", "b"]) + + ( + event_id, + origin_server_ts, + ) = await self.timestamp_lookup_handler.get_event_for_timestamp( + requester, room_id, timestamp, direction + ) + + return 200, { + "event_id": event_id, + "origin_server_ts": origin_server_ts, + } + + class RoomSpaceSummaryRestServlet(RestServlet): PATTERNS = ( re.compile( @@ -1239,6 +1295,8 @@ def register_servlets( RoomAliasListServlet(hs).register(http_server) SearchRestServlet(hs).register(http_server) RoomCreateRestServlet(hs).register(http_server) + if hs.config.experimental.msc3030_enabled: + TimestampLookupRestServlet(hs).register(http_server) # Some servlets only get registered for the main process. if not is_worker: diff --git a/synapse/server.py b/synapse/server.py index 877eba6c08..185e40e4da 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -97,6 +97,7 @@ from synapse.handlers.room import ( RoomContextHandler, RoomCreationHandler, RoomShutdownHandler, + TimestampLookupHandler, ) from synapse.handlers.room_batch import RoomBatchHandler from synapse.handlers.room_list import RoomListHandler @@ -728,6 +729,10 @@ class HomeServer(metaclass=abc.ABCMeta): def get_room_context_handler(self) -> RoomContextHandler: return RoomContextHandler(self) + @cache_in_self + def get_timestamp_lookup_handler(self) -> TimestampLookupHandler: + return TimestampLookupHandler(self) + @cache_in_self def get_registration_handler(self) -> RegistrationHandler: return RegistrationHandler(self) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 4cefc0a07e..fd19674f93 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1762,3 +1762,198 @@ class EventsWorkerStore(SQLBaseStore): "_cleanup_old_transaction_ids", _cleanup_old_transaction_ids_txn, ) + + async def is_event_next_to_backward_gap(self, event: EventBase) -> bool: + """Check if the given event is next to a backward gap of missing events. + A(False)--->B(False)--->C(True)---> + + Args: + room_id: room where the event lives + event_id: event to check + + Returns: + Boolean indicating whether it's an extremity + """ + + def is_event_next_to_backward_gap_txn(txn: LoggingTransaction) -> bool: + # If the event in question has any of its prev_events listed as a + # backward extremity, it's next to a gap. + # + # We can't just check the backward edges in `event_edges` because + # when we persist events, we will also record the prev_events as + # edges to the event in question regardless of whether we have those + # prev_events yet. We need to check whether those prev_events are + # backward extremities, also known as gaps, that need to be + # backfilled. + backward_extremity_query = """ + SELECT 1 FROM event_backward_extremities + WHERE + room_id = ? + AND %s + LIMIT 1 + """ + + # If the event in question is a backward extremity or has any of its + # prev_events listed as a backward extremity, it's next to a + # backward gap. + clause, args = make_in_list_sql_clause( + self.database_engine, + "event_id", + [event.event_id] + list(event.prev_event_ids()), + ) + + txn.execute(backward_extremity_query % (clause,), [event.room_id] + args) + backward_extremities = txn.fetchall() + + # We consider any backward extremity as a backward gap + if len(backward_extremities): + return True + + return False + + return await self.db_pool.runInteraction( + "is_event_next_to_backward_gap_txn", + is_event_next_to_backward_gap_txn, + ) + + async def is_event_next_to_forward_gap(self, event: EventBase) -> bool: + """Check if the given event is next to a forward gap of missing events. + The gap in front of the latest events is not considered a gap. + A(False)--->B(False)--->C(False)---> + A(False)--->B(False)---> --->D(True)--->E(False) + + Args: + room_id: room where the event lives + event_id: event to check + + Returns: + Boolean indicating whether it's an extremity + """ + + def is_event_next_to_gap_txn(txn: LoggingTransaction) -> bool: + # If the event in question is a forward extremity, we will just + # consider any potential forward gap as not a gap since it's one of + # the latest events in the room. + # + # `event_forward_extremities` does not include backfilled or outlier + # events so we can't rely on it to find forward gaps. We can only + # use it to determine whether a message is the latest in the room. + # + # We can't combine this query with the `forward_edge_query` below + # because if the event in question has no forward edges (isn't + # referenced by any other event's prev_events) but is in + # `event_forward_extremities`, we don't want to return 0 rows and + # say it's next to a gap. + forward_extremity_query = """ + SELECT 1 FROM event_forward_extremities + WHERE + room_id = ? + AND event_id = ? + LIMIT 1 + """ + + # Check to see whether the event in question is already referenced + # by another event. If we don't see any edges, we're next to a + # forward gap. + forward_edge_query = """ + SELECT 1 FROM event_edges + /* Check to make sure the event referencing our event in question is not rejected */ + LEFT JOIN rejections ON event_edges.event_id == rejections.event_id + WHERE + event_edges.room_id = ? + AND event_edges.prev_event_id = ? + /* It's not a valid edge if the event referencing our event in + * question is rejected. + */ + AND rejections.event_id IS NULL + LIMIT 1 + """ + + # We consider any forward extremity as the latest in the room and + # not a forward gap. + # + # To expand, even though there is technically a gap at the front of + # the room where the forward extremities are, we consider those the + # latest messages in the room so asking other homeservers for more + # is useless. The new latest messages will just be federated as + # usual. + txn.execute(forward_extremity_query, (event.room_id, event.event_id)) + forward_extremities = txn.fetchall() + if len(forward_extremities): + return False + + # If there are no forward edges to the event in question (another + # event hasn't referenced this event in their prev_events), then we + # assume there is a forward gap in the history. + txn.execute(forward_edge_query, (event.room_id, event.event_id)) + forward_edges = txn.fetchall() + if not len(forward_edges): + return True + + return False + + return await self.db_pool.runInteraction( + "is_event_next_to_gap_txn", + is_event_next_to_gap_txn, + ) + + async def get_event_id_for_timestamp( + self, room_id: str, timestamp: int, direction: str + ) -> Optional[str]: + """Find the closest event to the given timestamp in the given direction. + + Args: + room_id: Room to fetch the event from + timestamp: The point in time (inclusive) we should navigate from in + the given direction to find the closest event. + direction: ["f"|"b"] to indicate whether we should navigate forward + or backward from the given timestamp to find the closest event. + + Returns: + The closest event_id otherwise None if we can't find any event in + the given direction. + """ + + sql_template = """ + SELECT event_id FROM events + LEFT JOIN rejections USING (event_id) + WHERE + origin_server_ts %s ? + AND room_id = ? + /* Make sure event is not rejected */ + AND rejections.event_id IS NULL + ORDER BY origin_server_ts %s + LIMIT 1; + """ + + def get_event_id_for_timestamp_txn(txn: LoggingTransaction) -> Optional[str]: + if direction == "b": + # Find closest event *before* a given timestamp. We use descending + # (which gives values largest to smallest) because we want the + # largest possible timestamp *before* the given timestamp. + comparison_operator = "<=" + order = "DESC" + else: + # Find closest event *after* a given timestamp. We use ascending + # (which gives values smallest to largest) because we want the + # closest possible timestamp *after* the given timestamp. + comparison_operator = ">=" + order = "ASC" + + txn.execute( + sql_template % (comparison_operator, order), (timestamp, room_id) + ) + row = txn.fetchone() + if row: + (event_id,) = row + return event_id + + return None + + if direction not in ("f", "b"): + raise ValueError("Unknown direction: %s" % (direction,)) + + return await self.db_pool.runInteraction( + "get_event_id_for_timestamp_txn", + get_event_id_for_timestamp_txn, + ) -- cgit 1.5.1 From d2279f471ba8f44d9f578e62b286897a338d8aa1 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 2 Dec 2021 11:18:10 -0500 Subject: Add most of the missing type hints to `synapse.federation`. (#11483) This skips a few methods which are difficult to type. --- changelog.d/11483.misc | 1 + mypy.ini | 6 +++ synapse/federation/federation_client.py | 4 +- synapse/federation/federation_server.py | 10 +++-- synapse/federation/persistence.py | 4 +- synapse/federation/send_queue.py | 25 +++++------ synapse/federation/sender/per_destination_queue.py | 13 ++++-- synapse/federation/transport/client.py | 20 +++++---- synapse/federation/transport/server/__init__.py | 2 +- synapse/federation/transport/server/_base.py | 48 +++++++++++++--------- 10 files changed, 84 insertions(+), 49 deletions(-) create mode 100644 changelog.d/11483.misc (limited to 'synapse/federation/federation_server.py') diff --git a/changelog.d/11483.misc b/changelog.d/11483.misc new file mode 100644 index 0000000000..4cc47fcab9 --- /dev/null +++ b/changelog.d/11483.misc @@ -0,0 +1 @@ +Add missing type hints to `synapse.federation`. diff --git a/mypy.ini b/mypy.ini index 99b5c41ad6..d8296b4fa3 100644 --- a/mypy.ini +++ b/mypy.ini @@ -158,6 +158,12 @@ disallow_untyped_defs = True [mypy-synapse.events.*] disallow_untyped_defs = True +[mypy-synapse.federation.*] +disallow_untyped_defs = True + +[mypy-synapse.federation.transport.client] +disallow_untyped_defs = False + [mypy-synapse.handlers.*] disallow_untyped_defs = True diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index be1423da24..fee1477ab6 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -128,7 +128,7 @@ class FederationClient(FederationBase): reset_expiry_on_get=False, ) - def _clear_tried_cache(self): + def _clear_tried_cache(self) -> None: """Clear pdu_destination_tried cache""" now = self._clock.time_msec() @@ -800,7 +800,7 @@ class FederationClient(FederationBase): no servers successfully handle the request. """ - async def send_request(destination) -> SendJoinResult: + async def send_request(destination: str) -> SendJoinResult: response = await self._do_send_join(room_version, destination, pdu) # If an event was returned (and expected to be returned): diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index cce85526e7..8e37e76206 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -1,6 +1,6 @@ # Copyright 2015, 2016 OpenMarket Ltd # Copyright 2018 New Vector Ltd -# Copyright 2019 Matrix.org Federation C.I.C +# Copyright 2019-2021 Matrix.org Federation C.I.C # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -450,7 +450,7 @@ class FederationServer(FederationBase): # require callouts to other servers to fetch missing events), but # impose a limit to avoid going too crazy with ram/cpu. - async def process_pdus_for_room(room_id: str): + async def process_pdus_for_room(room_id: str) -> None: with nested_logging_context(room_id): logger.debug("Processing PDUs for %s", room_id) @@ -547,7 +547,7 @@ class FederationServer(FederationBase): async def on_state_ids_request( self, origin: str, room_id: str, event_id: str - ) -> Tuple[int, Dict[str, Any]]: + ) -> Tuple[int, JsonDict]: if not event_id: raise NotImplementedError("Specify an event") @@ -567,7 +567,9 @@ class FederationServer(FederationBase): return 200, resp - async def _on_state_ids_request_compute(self, room_id, event_id): + async def _on_state_ids_request_compute( + self, room_id: str, event_id: str + ) -> JsonDict: state_ids = await self.handler.get_state_ids_for_pdu(room_id, event_id) auth_chain_ids = await self.store.get_auth_chain_ids(room_id, state_ids) return {"pdu_ids": state_ids, "auth_chain_ids": auth_chain_ids} diff --git a/synapse/federation/persistence.py b/synapse/federation/persistence.py index 4fead6ca29..523ab1c51e 100644 --- a/synapse/federation/persistence.py +++ b/synapse/federation/persistence.py @@ -1,4 +1,5 @@ # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2021 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -23,6 +24,7 @@ from typing import Optional, Tuple from synapse.federation.units import Transaction from synapse.logging.utils import log_function +from synapse.storage.databases.main import DataStore from synapse.types import JsonDict logger = logging.getLogger(__name__) @@ -31,7 +33,7 @@ logger = logging.getLogger(__name__) class TransactionActions: """Defines persistence actions that relate to handling Transactions.""" - def __init__(self, datastore): + def __init__(self, datastore: DataStore): self.store = datastore @log_function diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 1fbf325fdc..63289a5a33 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -1,4 +1,5 @@ # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2021 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -350,7 +351,7 @@ class BaseFederationRow: TypeId = "" # Unique string that ids the type. Must be overridden in sub classes. @staticmethod - def from_data(data): + def from_data(data: JsonDict) -> "BaseFederationRow": """Parse the data from the federation stream into a row. Args: @@ -359,7 +360,7 @@ class BaseFederationRow: """ raise NotImplementedError() - def to_data(self): + def to_data(self) -> JsonDict: """Serialize this row to be sent over the federation stream. Returns: @@ -368,7 +369,7 @@ class BaseFederationRow: """ raise NotImplementedError() - def add_to_buffer(self, buff): + def add_to_buffer(self, buff: "ParsedFederationStreamData") -> None: """Add this row to the appropriate field in the buffer ready for this to be sent over federation. @@ -391,15 +392,15 @@ class PresenceDestinationsRow( TypeId = "pd" @staticmethod - def from_data(data): + def from_data(data: JsonDict) -> "PresenceDestinationsRow": return PresenceDestinationsRow( state=UserPresenceState.from_dict(data["state"]), destinations=data["dests"] ) - def to_data(self): + def to_data(self) -> JsonDict: return {"state": self.state.as_dict(), "dests": self.destinations} - def add_to_buffer(self, buff): + def add_to_buffer(self, buff: "ParsedFederationStreamData") -> None: buff.presence_destinations.append((self.state, self.destinations)) @@ -417,13 +418,13 @@ class KeyedEduRow( TypeId = "k" @staticmethod - def from_data(data): + def from_data(data: JsonDict) -> "KeyedEduRow": return KeyedEduRow(key=tuple(data["key"]), edu=Edu(**data["edu"])) - def to_data(self): + def to_data(self) -> JsonDict: return {"key": self.key, "edu": self.edu.get_internal_dict()} - def add_to_buffer(self, buff): + def add_to_buffer(self, buff: "ParsedFederationStreamData") -> None: buff.keyed_edus.setdefault(self.edu.destination, {})[self.key] = self.edu @@ -433,13 +434,13 @@ class EduRow(BaseFederationRow, namedtuple("EduRow", ("edu",))): # Edu TypeId = "e" @staticmethod - def from_data(data): + def from_data(data: JsonDict) -> "EduRow": return EduRow(Edu(**data)) - def to_data(self): + def to_data(self) -> JsonDict: return self.edu.get_internal_dict() - def add_to_buffer(self, buff): + def add_to_buffer(self, buff: "ParsedFederationStreamData") -> None: buff.edus.setdefault(self.edu.destination, []).append(self.edu) diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index afe35e72b6..391b30fbb5 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -1,5 +1,6 @@ # Copyright 2014-2016 OpenMarket Ltd # Copyright 2019 New Vector Ltd +# Copyright 2021 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,7 +15,8 @@ # limitations under the License. import datetime import logging -from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple +from types import TracebackType +from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple, Type import attr from prometheus_client import Counter @@ -213,7 +215,7 @@ class PerDestinationQueue: self._pending_edus_keyed[(edu.edu_type, key)] = edu self.attempt_new_transaction() - def send_edu(self, edu) -> None: + def send_edu(self, edu: Edu) -> None: self._pending_edus.append(edu) self.attempt_new_transaction() @@ -701,7 +703,12 @@ class _TransactionQueueManager: return self._pdus, pending_edus - async def __aexit__(self, exc_type, exc, tb): + async def __aexit__( + self, + exc_type: Optional[Type[BaseException]], + exc: Optional[BaseException], + tb: Optional[TracebackType], + ) -> None: if exc_type is not None: # Failed to send transaction, so we bail out. return diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index d1f4be641d..9fc4c31c93 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -21,6 +21,7 @@ from typing import ( Callable, Collection, Dict, + Generator, Iterable, List, Mapping, @@ -235,11 +236,16 @@ class TransportLayerClient: @log_function async def make_query( - self, destination, query_type, args, retry_on_dns_fail, ignore_backoff=False - ): + self, + destination: str, + query_type: str, + args: dict, + retry_on_dns_fail: bool, + ignore_backoff: bool = False, + ) -> JsonDict: path = _create_v1_path("/query/%s", query_type) - content = await self.client.get_json( + return await self.client.get_json( destination=destination, path=path, args=args, @@ -248,8 +254,6 @@ class TransportLayerClient: ignore_backoff=ignore_backoff, ) - return content - @log_function async def make_membership_event( self, @@ -1317,7 +1321,7 @@ class SendJoinResponse: @ijson.coroutine -def _event_parser(event_dict: JsonDict): +def _event_parser(event_dict: JsonDict) -> Generator[None, Tuple[str, Any], None]: """Helper function for use with `ijson.kvitems_coro` to parse key-value pairs to add them to a given dictionary. """ @@ -1328,7 +1332,9 @@ def _event_parser(event_dict: JsonDict): @ijson.coroutine -def _event_list_parser(room_version: RoomVersion, events: List[EventBase]): +def _event_list_parser( + room_version: RoomVersion, events: List[EventBase] +) -> Generator[None, JsonDict, None]: """Helper function for use with `ijson.items_coro` to parse an array of events and add them to the given list. """ diff --git a/synapse/federation/transport/server/__init__.py b/synapse/federation/transport/server/__init__.py index abcb8728f5..77b936361a 100644 --- a/synapse/federation/transport/server/__init__.py +++ b/synapse/federation/transport/server/__init__.py @@ -302,7 +302,7 @@ def register_servlets( authenticator: Authenticator, ratelimiter: FederationRateLimiter, servlet_groups: Optional[Iterable[str]] = None, -): +) -> None: """Initialize and register servlet classes. Will by default register all servlets. For custom behaviour, pass in diff --git a/synapse/federation/transport/server/_base.py b/synapse/federation/transport/server/_base.py index cef65929c5..dc39e3537b 100644 --- a/synapse/federation/transport/server/_base.py +++ b/synapse/federation/transport/server/_base.py @@ -15,10 +15,13 @@ import functools import logging import re +from typing import Any, Awaitable, Callable, Optional, Tuple, cast from synapse.api.errors import Codes, FederationDeniedError, SynapseError from synapse.api.urls import FEDERATION_V1_PREFIX +from synapse.http.server import HttpServer, ServletCallback from synapse.http.servlet import parse_json_object_from_request +from synapse.http.site import SynapseRequest from synapse.logging import opentracing from synapse.logging.context import run_in_background from synapse.logging.opentracing import ( @@ -29,6 +32,7 @@ from synapse.logging.opentracing import ( whitelisted_homeserver, ) from synapse.server import HomeServer +from synapse.types import JsonDict from synapse.util.ratelimitutils import FederationRateLimiter from synapse.util.stringutils import parse_and_validate_server_name @@ -59,9 +63,11 @@ class Authenticator: self.replication_client = hs.get_tcp_replication() # A method just so we can pass 'self' as the authenticator to the Servlets - async def authenticate_request(self, request, content): + async def authenticate_request( + self, request: SynapseRequest, content: Optional[JsonDict] + ) -> str: now = self._clock.time_msec() - json_request = { + json_request: JsonDict = { "method": request.method.decode("ascii"), "uri": request.uri.decode("ascii"), "destination": self.server_name, @@ -114,7 +120,7 @@ class Authenticator: return origin - async def _reset_retry_timings(self, origin): + async def _reset_retry_timings(self, origin: str) -> None: try: logger.info("Marking origin %r as up", origin) await self.store.set_destination_retry_timings(origin, None, 0, 0) @@ -133,14 +139,14 @@ class Authenticator: logger.exception("Error resetting retry timings on %s", origin) -def _parse_auth_header(header_bytes): +def _parse_auth_header(header_bytes: bytes) -> Tuple[str, str, str]: """Parse an X-Matrix auth header Args: - header_bytes (bytes): header value + header_bytes: header value Returns: - Tuple[str, str, str]: origin, key id, signature. + origin, key id, signature. Raises: AuthenticationError if the header could not be parsed @@ -148,9 +154,9 @@ def _parse_auth_header(header_bytes): try: header_str = header_bytes.decode("utf-8") params = header_str.split(" ")[1].split(",") - param_dict = dict(kv.split("=") for kv in params) + param_dict = {k: v for k, v in (kv.split("=", maxsplit=1) for kv in params)} - def strip_quotes(value): + def strip_quotes(value: str) -> str: if value.startswith('"'): return value[1:-1] else: @@ -233,23 +239,25 @@ class BaseFederationServlet: self.ratelimiter = ratelimiter self.server_name = server_name - def _wrap(self, func): + def _wrap(self, func: Callable[..., Awaitable[Tuple[int, Any]]]) -> ServletCallback: authenticator = self.authenticator ratelimiter = self.ratelimiter @functools.wraps(func) - async def new_func(request, *args, **kwargs): + async def new_func( + request: SynapseRequest, *args: Any, **kwargs: str + ) -> Optional[Tuple[int, Any]]: """A callback which can be passed to HttpServer.RegisterPaths Args: - request (twisted.web.http.Request): + request: *args: unused? - **kwargs (dict[unicode, unicode]): the dict mapping keys to path - components as specified in the path match regexp. + **kwargs: the dict mapping keys to path components as specified + in the path match regexp. Returns: - Tuple[int, object]|None: (response code, response object) as returned by - the callback method. None if the request has already been handled. + (response code, response object) as returned by the callback method. + None if the request has already been handled. """ content = None if request.method in [b"PUT", b"POST"]: @@ -257,7 +265,9 @@ class BaseFederationServlet: content = parse_json_object_from_request(request) try: - origin = await authenticator.authenticate_request(request, content) + origin: Optional[str] = await authenticator.authenticate_request( + request, content + ) except NoAuthenticationError: origin = None if self.REQUIRE_AUTH: @@ -301,7 +311,7 @@ class BaseFederationServlet: "client disconnected before we started processing " "request" ) - return -1, None + return None response = await func( origin, content, request.args, *args, **kwargs ) @@ -312,9 +322,9 @@ class BaseFederationServlet: return response - return new_func + return cast(ServletCallback, new_func) - def register(self, server): + def register(self, server: HttpServer) -> None: pattern = re.compile("^" + self.PREFIX + self.PATH + "$") for method in ("GET", "PUT", "POST"): -- cgit 1.5.1 From a77c36989785c0d5565ab9a1169f4f88e512ce8a Mon Sep 17 00:00:00 2001 From: Sean Quah <8349537+squahtx@users.noreply.github.com> Date: Mon, 6 Dec 2021 11:36:08 +0000 Subject: Move `glob_to_regex` and `re_word_boundary` to `matrix-python-common` (#11505) --- changelog.d/11505.misc | 1 + synapse/config/room_directory.py | 3 +- synapse/config/tls.py | 3 +- synapse/federation/federation_server.py | 3 +- synapse/push/push_rule_evaluator.py | 7 ++-- synapse/python_dependencies.py | 1 + synapse/util/__init__.py | 59 +-------------------------------- tests/util/test_glob_to_regex.py | 59 --------------------------------- 8 files changed, 13 insertions(+), 123 deletions(-) create mode 100644 changelog.d/11505.misc delete mode 100644 tests/util/test_glob_to_regex.py (limited to 'synapse/federation/federation_server.py') diff --git a/changelog.d/11505.misc b/changelog.d/11505.misc new file mode 100644 index 0000000000..926b562fad --- /dev/null +++ b/changelog.d/11505.misc @@ -0,0 +1 @@ +Move `glob_to_regex` and `re_word_boundary` to `matrix-python-common`. diff --git a/synapse/config/room_directory.py b/synapse/config/room_directory.py index 57316c59b6..3c5e0f7ce7 100644 --- a/synapse/config/room_directory.py +++ b/synapse/config/room_directory.py @@ -15,8 +15,9 @@ from typing import List +from matrix_common.regex import glob_to_regex + from synapse.types import JsonDict -from synapse.util import glob_to_regex from ._base import Config, ConfigError diff --git a/synapse/config/tls.py b/synapse/config/tls.py index 4ca111618f..3e235b57a7 100644 --- a/synapse/config/tls.py +++ b/synapse/config/tls.py @@ -16,11 +16,12 @@ import logging import os from typing import List, Optional, Pattern +from matrix_common.regex import glob_to_regex + from OpenSSL import SSL, crypto from twisted.internet._sslverify import Certificate, trustRootFromCertificates from synapse.config._base import Config, ConfigError -from synapse.util import glob_to_regex logger = logging.getLogger(__name__) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 8e37e76206..4697a62c18 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -28,6 +28,7 @@ from typing import ( Union, ) +from matrix_common.regex import glob_to_regex from prometheus_client import Counter, Gauge, Histogram from twisted.internet import defer @@ -66,7 +67,7 @@ from synapse.replication.http.federation import ( ) from synapse.storage.databases.main.lock import Lock from synapse.types import JsonDict, get_domain_from_id -from synapse.util import glob_to_regex, json_decoder, unwrapFirstError +from synapse.util import json_decoder, unwrapFirstError from synapse.util.async_helpers import Linearizer, concurrently_execute from synapse.util.caches.response_cache import ResponseCache from synapse.util.stringutils import parse_server_name diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py index 7f68092ec5..659a53805d 100644 --- a/synapse/push/push_rule_evaluator.py +++ b/synapse/push/push_rule_evaluator.py @@ -17,9 +17,10 @@ import logging import re from typing import Any, Dict, List, Optional, Pattern, Tuple, Union +from matrix_common.regex import glob_to_regex, to_word_pattern + from synapse.events import EventBase from synapse.types import JsonDict, UserID -from synapse.util import glob_to_regex, re_word_boundary from synapse.util.caches.lrucache import LruCache logger = logging.getLogger(__name__) @@ -184,7 +185,7 @@ class PushRuleEvaluatorForEvent: r = regex_cache.get((display_name, False, True), None) if not r: r1 = re.escape(display_name) - r1 = re_word_boundary(r1) + r1 = to_word_pattern(r1) r = re.compile(r1, flags=re.IGNORECASE) regex_cache[(display_name, False, True)] = r @@ -213,7 +214,7 @@ def _glob_matches(glob: str, value: str, word_boundary: bool = False) -> bool: try: r = regex_cache.get((glob, True, word_boundary), None) if not r: - r = glob_to_regex(glob, word_boundary) + r = glob_to_regex(glob, word_boundary=word_boundary) regex_cache[(glob, True, word_boundary)] = r return bool(r.search(value)) except re.error: diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index 7d26954244..386debd7db 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -87,6 +87,7 @@ REQUIREMENTS = [ # with the latest security patches. "cryptography>=3.4.7", "ijson>=3.1", + "matrix-common==1.0.0", ] CONDITIONAL_REQUIREMENTS = { diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 95f23e27b6..f157132210 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -14,9 +14,8 @@ import json import logging -import re import typing -from typing import Any, Callable, Dict, Generator, Optional, Pattern +from typing import Any, Callable, Dict, Generator, Optional import attr from frozendict import frozendict @@ -35,9 +34,6 @@ if typing.TYPE_CHECKING: logger = logging.getLogger(__name__) -_WILDCARD_RUN = re.compile(r"([\?\*]+)") - - def _reject_invalid_json(val: Any) -> None: """Do not allow Infinity, -Infinity, or NaN values in JSON.""" raise ValueError("Invalid JSON value: '%s'" % val) @@ -185,56 +181,3 @@ def log_failure( if not consumeErrors: return failure return None - - -def glob_to_regex(glob: str, word_boundary: bool = False) -> Pattern: - """Converts a glob to a compiled regex object. - - Args: - glob: pattern to match - word_boundary: If True, the pattern will be allowed to match at word boundaries - anywhere in the string. Otherwise, the pattern is anchored at the start and - end of the string. - - Returns: - compiled regex pattern - """ - - # Patterns with wildcards must be simplified to avoid performance cliffs - # - The glob `?**?**?` is equivalent to the glob `???*` - # - The glob `???*` is equivalent to the regex `.{3,}` - chunks = [] - for chunk in _WILDCARD_RUN.split(glob): - # No wildcards? re.escape() - if not _WILDCARD_RUN.match(chunk): - chunks.append(re.escape(chunk)) - continue - - # Wildcards? Simplify. - qmarks = chunk.count("?") - if "*" in chunk: - chunks.append(".{%d,}" % qmarks) - else: - chunks.append(".{%d}" % qmarks) - - res = "".join(chunks) - - if word_boundary: - res = re_word_boundary(res) - else: - # \A anchors at start of string, \Z at end of string - res = r"\A" + res + r"\Z" - - return re.compile(res, re.IGNORECASE) - - -def re_word_boundary(r: str) -> str: - """ - Adds word boundary characters to the start and end of an - expression to require that the match occur as a whole word, - but do so respecting the fact that strings starting or ending - with non-word characters will change word boundaries. - """ - # we can't use \b as it chokes on unicode. however \W seems to be okay - # as shorthand for [^0-9A-Za-z_]. - return r"(^|\W)%s(\W|$)" % (r,) diff --git a/tests/util/test_glob_to_regex.py b/tests/util/test_glob_to_regex.py deleted file mode 100644 index 220accb92b..0000000000 --- a/tests/util/test_glob_to_regex.py +++ /dev/null @@ -1,59 +0,0 @@ -# Copyright 2021 The Matrix.org Foundation C.I.C. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from synapse.util import glob_to_regex - -from tests.unittest import TestCase - - -class GlobToRegexTestCase(TestCase): - def test_literal_match(self): - """patterns without wildcards should match""" - pat = glob_to_regex("foobaz") - self.assertTrue( - pat.match("FoobaZ"), "patterns should match and be case-insensitive" - ) - self.assertFalse( - pat.match("x foobaz"), "pattern should not match at word boundaries" - ) - - def test_wildcard_match(self): - pat = glob_to_regex("f?o*baz") - - self.assertTrue( - pat.match("FoobarbaZ"), - "* should match string and pattern should be case-insensitive", - ) - self.assertTrue(pat.match("foobaz"), "* should match 0 characters") - self.assertFalse(pat.match("fooxaz"), "the character after * must match") - self.assertFalse(pat.match("fobbaz"), "? should not match 0 characters") - self.assertFalse(pat.match("fiiobaz"), "? should not match 2 characters") - - def test_multi_wildcard(self): - """patterns with multiple wildcards in a row should match""" - pat = glob_to_regex("**baz") - self.assertTrue(pat.match("agsgsbaz"), "** should match any string") - self.assertTrue(pat.match("baz"), "** should match the empty string") - self.assertEqual(pat.pattern, r"\A.{0,}baz\Z") - - pat = glob_to_regex("*?baz") - self.assertTrue(pat.match("agsgsbaz"), "*? should match any string") - self.assertTrue(pat.match("abaz"), "*? should match a single char") - self.assertFalse(pat.match("baz"), "*? should not match the empty string") - self.assertEqual(pat.pattern, r"\A.{1,}baz\Z") - - pat = glob_to_regex("a?*?*?baz") - self.assertTrue(pat.match("a g baz"), "?*?*? should match 3 chars") - self.assertFalse(pat.match("a..baz"), "?*?*? should not match 2 chars") - self.assertTrue(pat.match("a.gg.baz"), "?*?*? should match 4 chars") - self.assertEqual(pat.pattern, r"\Aa.{3,}baz\Z") -- cgit 1.5.1 From 088d748f2cb51f03f3bcacc0fb3af1e0f9607737 Mon Sep 17 00:00:00 2001 From: Sean Quah <8349537+squahtx@users.noreply.github.com> Date: Tue, 7 Dec 2021 13:51:11 +0000 Subject: Revert "Move `glob_to_regex` and `re_word_boundary` to `matrix-python-common` (#11505) (#11527) This reverts commit a77c36989785c0d5565ab9a1169f4f88e512ce8a. --- changelog.d/11527.misc | 1 + synapse/config/room_directory.py | 3 +- synapse/config/tls.py | 3 +- synapse/federation/federation_server.py | 3 +- synapse/push/push_rule_evaluator.py | 7 ++-- synapse/python_dependencies.py | 1 - synapse/util/__init__.py | 59 ++++++++++++++++++++++++++++++++- tests/util/test_glob_to_regex.py | 59 +++++++++++++++++++++++++++++++++ 8 files changed, 124 insertions(+), 12 deletions(-) create mode 100644 changelog.d/11527.misc create mode 100644 tests/util/test_glob_to_regex.py (limited to 'synapse/federation/federation_server.py') diff --git a/changelog.d/11527.misc b/changelog.d/11527.misc new file mode 100644 index 0000000000..081eae317c --- /dev/null +++ b/changelog.d/11527.misc @@ -0,0 +1 @@ +Temporarily revert usage of `matrix-python-common`. diff --git a/synapse/config/room_directory.py b/synapse/config/room_directory.py index 3c5e0f7ce7..57316c59b6 100644 --- a/synapse/config/room_directory.py +++ b/synapse/config/room_directory.py @@ -15,9 +15,8 @@ from typing import List -from matrix_common.regex import glob_to_regex - from synapse.types import JsonDict +from synapse.util import glob_to_regex from ._base import Config, ConfigError diff --git a/synapse/config/tls.py b/synapse/config/tls.py index 3e235b57a7..4ca111618f 100644 --- a/synapse/config/tls.py +++ b/synapse/config/tls.py @@ -16,12 +16,11 @@ import logging import os from typing import List, Optional, Pattern -from matrix_common.regex import glob_to_regex - from OpenSSL import SSL, crypto from twisted.internet._sslverify import Certificate, trustRootFromCertificates from synapse.config._base import Config, ConfigError +from synapse.util import glob_to_regex logger = logging.getLogger(__name__) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 4697a62c18..8e37e76206 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -28,7 +28,6 @@ from typing import ( Union, ) -from matrix_common.regex import glob_to_regex from prometheus_client import Counter, Gauge, Histogram from twisted.internet import defer @@ -67,7 +66,7 @@ from synapse.replication.http.federation import ( ) from synapse.storage.databases.main.lock import Lock from synapse.types import JsonDict, get_domain_from_id -from synapse.util import json_decoder, unwrapFirstError +from synapse.util import glob_to_regex, json_decoder, unwrapFirstError from synapse.util.async_helpers import Linearizer, concurrently_execute from synapse.util.caches.response_cache import ResponseCache from synapse.util.stringutils import parse_server_name diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py index 659a53805d..7f68092ec5 100644 --- a/synapse/push/push_rule_evaluator.py +++ b/synapse/push/push_rule_evaluator.py @@ -17,10 +17,9 @@ import logging import re from typing import Any, Dict, List, Optional, Pattern, Tuple, Union -from matrix_common.regex import glob_to_regex, to_word_pattern - from synapse.events import EventBase from synapse.types import JsonDict, UserID +from synapse.util import glob_to_regex, re_word_boundary from synapse.util.caches.lrucache import LruCache logger = logging.getLogger(__name__) @@ -185,7 +184,7 @@ class PushRuleEvaluatorForEvent: r = regex_cache.get((display_name, False, True), None) if not r: r1 = re.escape(display_name) - r1 = to_word_pattern(r1) + r1 = re_word_boundary(r1) r = re.compile(r1, flags=re.IGNORECASE) regex_cache[(display_name, False, True)] = r @@ -214,7 +213,7 @@ def _glob_matches(glob: str, value: str, word_boundary: bool = False) -> bool: try: r = regex_cache.get((glob, True, word_boundary), None) if not r: - r = glob_to_regex(glob, word_boundary=word_boundary) + r = glob_to_regex(glob, word_boundary) regex_cache[(glob, True, word_boundary)] = r return bool(r.search(value)) except re.error: diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index 386debd7db..7d26954244 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -87,7 +87,6 @@ REQUIREMENTS = [ # with the latest security patches. "cryptography>=3.4.7", "ijson>=3.1", - "matrix-common==1.0.0", ] CONDITIONAL_REQUIREMENTS = { diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index f157132210..95f23e27b6 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -14,8 +14,9 @@ import json import logging +import re import typing -from typing import Any, Callable, Dict, Generator, Optional +from typing import Any, Callable, Dict, Generator, Optional, Pattern import attr from frozendict import frozendict @@ -34,6 +35,9 @@ if typing.TYPE_CHECKING: logger = logging.getLogger(__name__) +_WILDCARD_RUN = re.compile(r"([\?\*]+)") + + def _reject_invalid_json(val: Any) -> None: """Do not allow Infinity, -Infinity, or NaN values in JSON.""" raise ValueError("Invalid JSON value: '%s'" % val) @@ -181,3 +185,56 @@ def log_failure( if not consumeErrors: return failure return None + + +def glob_to_regex(glob: str, word_boundary: bool = False) -> Pattern: + """Converts a glob to a compiled regex object. + + Args: + glob: pattern to match + word_boundary: If True, the pattern will be allowed to match at word boundaries + anywhere in the string. Otherwise, the pattern is anchored at the start and + end of the string. + + Returns: + compiled regex pattern + """ + + # Patterns with wildcards must be simplified to avoid performance cliffs + # - The glob `?**?**?` is equivalent to the glob `???*` + # - The glob `???*` is equivalent to the regex `.{3,}` + chunks = [] + for chunk in _WILDCARD_RUN.split(glob): + # No wildcards? re.escape() + if not _WILDCARD_RUN.match(chunk): + chunks.append(re.escape(chunk)) + continue + + # Wildcards? Simplify. + qmarks = chunk.count("?") + if "*" in chunk: + chunks.append(".{%d,}" % qmarks) + else: + chunks.append(".{%d}" % qmarks) + + res = "".join(chunks) + + if word_boundary: + res = re_word_boundary(res) + else: + # \A anchors at start of string, \Z at end of string + res = r"\A" + res + r"\Z" + + return re.compile(res, re.IGNORECASE) + + +def re_word_boundary(r: str) -> str: + """ + Adds word boundary characters to the start and end of an + expression to require that the match occur as a whole word, + but do so respecting the fact that strings starting or ending + with non-word characters will change word boundaries. + """ + # we can't use \b as it chokes on unicode. however \W seems to be okay + # as shorthand for [^0-9A-Za-z_]. + return r"(^|\W)%s(\W|$)" % (r,) diff --git a/tests/util/test_glob_to_regex.py b/tests/util/test_glob_to_regex.py new file mode 100644 index 0000000000..220accb92b --- /dev/null +++ b/tests/util/test_glob_to_regex.py @@ -0,0 +1,59 @@ +# Copyright 2021 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from synapse.util import glob_to_regex + +from tests.unittest import TestCase + + +class GlobToRegexTestCase(TestCase): + def test_literal_match(self): + """patterns without wildcards should match""" + pat = glob_to_regex("foobaz") + self.assertTrue( + pat.match("FoobaZ"), "patterns should match and be case-insensitive" + ) + self.assertFalse( + pat.match("x foobaz"), "pattern should not match at word boundaries" + ) + + def test_wildcard_match(self): + pat = glob_to_regex("f?o*baz") + + self.assertTrue( + pat.match("FoobarbaZ"), + "* should match string and pattern should be case-insensitive", + ) + self.assertTrue(pat.match("foobaz"), "* should match 0 characters") + self.assertFalse(pat.match("fooxaz"), "the character after * must match") + self.assertFalse(pat.match("fobbaz"), "? should not match 0 characters") + self.assertFalse(pat.match("fiiobaz"), "? should not match 2 characters") + + def test_multi_wildcard(self): + """patterns with multiple wildcards in a row should match""" + pat = glob_to_regex("**baz") + self.assertTrue(pat.match("agsgsbaz"), "** should match any string") + self.assertTrue(pat.match("baz"), "** should match the empty string") + self.assertEqual(pat.pattern, r"\A.{0,}baz\Z") + + pat = glob_to_regex("*?baz") + self.assertTrue(pat.match("agsgsbaz"), "*? should match any string") + self.assertTrue(pat.match("abaz"), "*? should match a single char") + self.assertFalse(pat.match("baz"), "*? should not match the empty string") + self.assertEqual(pat.pattern, r"\A.{1,}baz\Z") + + pat = glob_to_regex("a?*?*?baz") + self.assertTrue(pat.match("a g baz"), "?*?*? should match 3 chars") + self.assertFalse(pat.match("a..baz"), "?*?*? should not match 2 chars") + self.assertTrue(pat.match("a.gg.baz"), "?*?*? should match 4 chars") + self.assertEqual(pat.pattern, r"\Aa.{3,}baz\Z") -- cgit 1.5.1