From dfa536490ee2295d664160362c6339d5e39b6b85 Mon Sep 17 00:00:00 2001 From: Aaron R Date: Tue, 16 Nov 2021 07:47:58 -0600 Subject: Add support for `/_matrix/client/v3` APIs (#11318) This is one of the changes required to support Matrix 1.1 Signed-off-by: Aaron Raimist --- synapse/app/homeserver.py | 1 + 1 file changed, 1 insertion(+) (limited to 'synapse/app') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 7bb3744f04..4efadde57e 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -193,6 +193,7 @@ class SynapseHomeServer(HomeServer): { "/_matrix/client/api/v1": client_resource, "/_matrix/client/r0": client_resource, + "/_matrix/client/v3": client_resource, "/_matrix/client/unstable": client_resource, "/_matrix/client/v2_alpha": client_resource, "/_matrix/client/versions": client_resource, -- cgit 1.5.1 From d993c3bb1e89f77d91af6302bfb118494c6f6664 Mon Sep 17 00:00:00 2001 From: Aaron R Date: Wed, 17 Nov 2021 09:30:24 -0600 Subject: Add support for `/_matrix/media/v3` APIs (#11371) * Add support for `/_matrix/media/v3` APIs Signed-off-by: Aaron Raimist * Update `workers.md` to use v3 client and media APIs Signed-off-by: Aaron Raimist * Add changelog Signed-off-by: Aaron Raimist --- changelog.d/11371.feature | 1 + docker/configure_workers_and_start.py | 22 +++++------ docs/workers.md | 70 +++++++++++++++++------------------ synapse/api/urls.py | 3 +- synapse/app/generic_worker.py | 6 ++- synapse/app/homeserver.py | 9 ++++- 6 files changed, 60 insertions(+), 51 deletions(-) create mode 100644 changelog.d/11371.feature (limited to 'synapse/app') diff --git a/changelog.d/11371.feature b/changelog.d/11371.feature new file mode 100644 index 0000000000..8e9ca2d633 --- /dev/null +++ b/changelog.d/11371.feature @@ -0,0 +1 @@ +Add support for the `/_matrix/media/v3` APIs from Matrix v1.1. \ No newline at end of file diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py index efb9476cd6..f4ac1c22a4 100755 --- a/docker/configure_workers_and_start.py +++ b/docker/configure_workers_and_start.py @@ -48,7 +48,7 @@ WORKERS_CONFIG = { "app": "synapse.app.user_dir", "listener_resources": ["client"], "endpoint_patterns": [ - "^/_matrix/client/(api/v1|r0|unstable)/user_directory/search$" + "^/_matrix/client/(api/v1|r0|v3|unstable)/user_directory/search$" ], "shared_extra_conf": {"update_user_directory": False}, "worker_extra_conf": "", @@ -85,10 +85,10 @@ WORKERS_CONFIG = { "app": "synapse.app.generic_worker", "listener_resources": ["client"], "endpoint_patterns": [ - "^/_matrix/client/(v2_alpha|r0)/sync$", - "^/_matrix/client/(api/v1|v2_alpha|r0)/events$", - "^/_matrix/client/(api/v1|r0)/initialSync$", - "^/_matrix/client/(api/v1|r0)/rooms/[^/]+/initialSync$", + "^/_matrix/client/(v2_alpha|r0|v3)/sync$", + "^/_matrix/client/(api/v1|v2_alpha|r0|v3)/events$", + "^/_matrix/client/(api/v1|r0|v3)/initialSync$", + "^/_matrix/client/(api/v1|r0|v3)/rooms/[^/]+/initialSync$", ], "shared_extra_conf": {}, "worker_extra_conf": "", @@ -146,11 +146,11 @@ WORKERS_CONFIG = { "app": "synapse.app.generic_worker", "listener_resources": ["client"], "endpoint_patterns": [ - "^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/redact", - "^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/send", - "^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/(join|invite|leave|ban|unban|kick)$", - "^/_matrix/client/(api/v1|r0|unstable)/join/", - "^/_matrix/client/(api/v1|r0|unstable)/profile/", + "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/redact", + "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/send", + "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/(join|invite|leave|ban|unban|kick)$", + "^/_matrix/client/(api/v1|r0|v3|unstable)/join/", + "^/_matrix/client/(api/v1|r0|v3|unstable)/profile/", ], "shared_extra_conf": {}, "worker_extra_conf": "", @@ -158,7 +158,7 @@ WORKERS_CONFIG = { "frontend_proxy": { "app": "synapse.app.frontend_proxy", "listener_resources": ["client", "replication"], - "endpoint_patterns": ["^/_matrix/client/(api/v1|r0|unstable)/keys/upload"], + "endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/keys/upload"], "shared_extra_conf": {}, "worker_extra_conf": ( "worker_main_http_uri: http://127.0.0.1:%d" diff --git a/docs/workers.md b/docs/workers.md index f88e2c1de3..17c8bfeef6 100644 --- a/docs/workers.md +++ b/docs/workers.md @@ -182,10 +182,10 @@ This worker can handle API requests matching the following regular expressions: # Sync requests - ^/_matrix/client/(v2_alpha|r0)/sync$ - ^/_matrix/client/(api/v1|v2_alpha|r0)/events$ - ^/_matrix/client/(api/v1|r0)/initialSync$ - ^/_matrix/client/(api/v1|r0)/rooms/[^/]+/initialSync$ + ^/_matrix/client/(v2_alpha|r0|v3)/sync$ + ^/_matrix/client/(api/v1|v2_alpha|r0|v3)/events$ + ^/_matrix/client/(api/v1|r0|v3)/initialSync$ + ^/_matrix/client/(api/v1|r0|v3)/rooms/[^/]+/initialSync$ # Federation requests ^/_matrix/federation/v1/event/ @@ -216,40 +216,40 @@ expressions: ^/_matrix/federation/v1/send/ # Client API requests - ^/_matrix/client/(api/v1|r0|unstable)/createRoom$ - ^/_matrix/client/(api/v1|r0|unstable)/publicRooms$ - ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/joined_members$ - ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/context/.*$ - ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/members$ - ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/state$ + ^/_matrix/client/(api/v1|r0|v3|unstable)/createRoom$ + ^/_matrix/client/(api/v1|r0|v3|unstable)/publicRooms$ + ^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/joined_members$ + ^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/context/.*$ + ^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/members$ + ^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/state$ ^/_matrix/client/unstable/org.matrix.msc2946/rooms/.*/spaces$ ^/_matrix/client/unstable/org.matrix.msc2946/rooms/.*/hierarchy$ ^/_matrix/client/unstable/im.nheko.summary/rooms/.*/summary$ - ^/_matrix/client/(api/v1|r0|unstable)/account/3pid$ - ^/_matrix/client/(api/v1|r0|unstable)/devices$ - ^/_matrix/client/(api/v1|r0|unstable)/keys/query$ - ^/_matrix/client/(api/v1|r0|unstable)/keys/changes$ + ^/_matrix/client/(api/v1|r0|v3|unstable)/account/3pid$ + ^/_matrix/client/(api/v1|r0|v3|unstable)/devices$ + ^/_matrix/client/(api/v1|r0|v3|unstable)/keys/query$ + ^/_matrix/client/(api/v1|r0|v3|unstable)/keys/changes$ ^/_matrix/client/versions$ - ^/_matrix/client/(api/v1|r0|unstable)/voip/turnServer$ - ^/_matrix/client/(api/v1|r0|unstable)/joined_groups$ - ^/_matrix/client/(api/v1|r0|unstable)/publicised_groups$ - ^/_matrix/client/(api/v1|r0|unstable)/publicised_groups/ - ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/event/ - ^/_matrix/client/(api/v1|r0|unstable)/joined_rooms$ - ^/_matrix/client/(api/v1|r0|unstable)/search$ + ^/_matrix/client/(api/v1|r0|v3|unstable)/voip/turnServer$ + ^/_matrix/client/(api/v1|r0|v3|unstable)/joined_groups$ + ^/_matrix/client/(api/v1|r0|v3|unstable)/publicised_groups$ + ^/_matrix/client/(api/v1|r0|v3|unstable)/publicised_groups/ + ^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/event/ + ^/_matrix/client/(api/v1|r0|v3|unstable)/joined_rooms$ + ^/_matrix/client/(api/v1|r0|v3|unstable)/search$ # Registration/login requests - ^/_matrix/client/(api/v1|r0|unstable)/login$ - ^/_matrix/client/(r0|unstable)/register$ + ^/_matrix/client/(api/v1|r0|v3|unstable)/login$ + ^/_matrix/client/(r0|v3|unstable)/register$ ^/_matrix/client/unstable/org.matrix.msc3231/register/org.matrix.msc3231.login.registration_token/validity$ # Event sending requests - ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/redact - ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/send - ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/state/ - ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/(join|invite|leave|ban|unban|kick)$ - ^/_matrix/client/(api/v1|r0|unstable)/join/ - ^/_matrix/client/(api/v1|r0|unstable)/profile/ + ^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/redact + ^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/send + ^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/state/ + ^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/(join|invite|leave|ban|unban|kick)$ + ^/_matrix/client/(api/v1|r0|v3|unstable)/join/ + ^/_matrix/client/(api/v1|r0|v3|unstable)/profile/ Additionally, the following REST endpoints can be handled for GET requests: @@ -261,14 +261,14 @@ room must be routed to the same instance. Additionally, care must be taken to ensure that the purge history admin API is not used while pagination requests for the room are in flight: - ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/messages$ + ^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/messages$ Additionally, the following endpoints should be included if Synapse is configured to use SSO (you only need to include the ones for whichever SSO provider you're using): # for all SSO providers - ^/_matrix/client/(api/v1|r0|unstable)/login/sso/redirect + ^/_matrix/client/(api/v1|r0|v3|unstable)/login/sso/redirect ^/_synapse/client/pick_idp$ ^/_synapse/client/pick_username ^/_synapse/client/new_user_consent$ @@ -281,7 +281,7 @@ using): ^/_synapse/client/saml2/authn_response$ # CAS requests. - ^/_matrix/client/(api/v1|r0|unstable)/login/cas/ticket$ + ^/_matrix/client/(api/v1|r0|v3|unstable)/login/cas/ticket$ Ensure that all SSO logins go to a single process. For multiple workers not handling the SSO endpoints properly, see @@ -465,7 +465,7 @@ Note that if a reverse proxy is used , then `/_matrix/media/` must be routed for Handles searches in the user directory. It can handle REST endpoints matching the following regular expressions: - ^/_matrix/client/(api/v1|r0|unstable)/user_directory/search$ + ^/_matrix/client/(api/v1|r0|v3|unstable)/user_directory/search$ When using this worker you must also set `update_user_directory: False` in the shared configuration file to stop the main synapse running background @@ -477,12 +477,12 @@ Proxies some frequently-requested client endpoints to add caching and remove load from the main synapse. It can handle REST endpoints matching the following regular expressions: - ^/_matrix/client/(api/v1|r0|unstable)/keys/upload + ^/_matrix/client/(api/v1|r0|v3|unstable)/keys/upload If `use_presence` is False in the homeserver config, it can also handle REST endpoints matching the following regular expressions: - ^/_matrix/client/(api/v1|r0|unstable)/presence/[^/]+/status + ^/_matrix/client/(api/v1|r0|v3|unstable)/presence/[^/]+/status This "stub" presence handler will pass through `GET` request but make the `PUT` effectively a no-op. diff --git a/synapse/api/urls.py b/synapse/api/urls.py index 4486b3bc7d..f9f9467dc1 100644 --- a/synapse/api/urls.py +++ b/synapse/api/urls.py @@ -30,7 +30,8 @@ FEDERATION_UNSTABLE_PREFIX = FEDERATION_PREFIX + "/unstable" STATIC_PREFIX = "/_matrix/static" WEB_CLIENT_PREFIX = "/_matrix/client" SERVER_KEY_V2_PREFIX = "/_matrix/key/v2" -MEDIA_PREFIX = "/_matrix/media/r0" +MEDIA_R0_PREFIX = "/_matrix/media/r0" +MEDIA_V3_PREFIX = "/_matrix/media/v3" LEGACY_MEDIA_PREFIX = "/_matrix/media/v1" diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 46f0feff70..502cc8e8d1 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -26,7 +26,8 @@ from synapse.api.urls import ( CLIENT_API_PREFIX, FEDERATION_PREFIX, LEGACY_MEDIA_PREFIX, - MEDIA_PREFIX, + MEDIA_R0_PREFIX, + MEDIA_V3_PREFIX, SERVER_KEY_V2_PREFIX, ) from synapse.app import _base @@ -338,7 +339,8 @@ class GenericWorkerServer(HomeServer): resources.update( { - MEDIA_PREFIX: media_repo, + MEDIA_R0_PREFIX: media_repo, + MEDIA_V3_PREFIX: media_repo, LEGACY_MEDIA_PREFIX: media_repo, "/_synapse/admin": admin_resource, } diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 4efadde57e..7e09530ad2 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -29,7 +29,8 @@ from synapse import events from synapse.api.urls import ( FEDERATION_PREFIX, LEGACY_MEDIA_PREFIX, - MEDIA_PREFIX, + MEDIA_R0_PREFIX, + MEDIA_V3_PREFIX, SERVER_KEY_V2_PREFIX, STATIC_PREFIX, WEB_CLIENT_PREFIX, @@ -245,7 +246,11 @@ class SynapseHomeServer(HomeServer): if self.config.server.enable_media_repo: media_repo = self.get_media_repository_resource() resources.update( - {MEDIA_PREFIX: media_repo, LEGACY_MEDIA_PREFIX: media_repo} + { + MEDIA_R0_PREFIX: media_repo, + MEDIA_V3_PREFIX: media_repo, + LEGACY_MEDIA_PREFIX: media_repo, + } ) elif name == "media": raise ConfigError( -- cgit 1.5.1 From 84fac0f814f69645ff1ad564ef8294b31203dc95 Mon Sep 17 00:00:00 2001 From: Sean Quah <8349537+squahtx@users.noreply.github.com> Date: Wed, 17 Nov 2021 19:07:02 +0000 Subject: Add type annotations to `synapse.metrics` (#10847) --- changelog.d/10847.misc | 1 + mypy.ini | 3 + synapse/app/_base.py | 2 +- synapse/groups/attestations.py | 4 +- synapse/handlers/typing.py | 2 +- synapse/metrics/__init__.py | 101 ++++++++++++++++++-------- synapse/metrics/_exposition.py | 34 ++++----- synapse/metrics/background_process_metrics.py | 78 +++++++++++++++----- synapse/metrics/jemalloc.py | 10 ++- synapse/storage/database.py | 6 +- synapse/util/caches/expiringcache.py | 2 +- synapse/util/metrics.py | 15 ++-- 12 files changed, 173 insertions(+), 85 deletions(-) create mode 100644 changelog.d/10847.misc (limited to 'synapse/app') diff --git a/changelog.d/10847.misc b/changelog.d/10847.misc new file mode 100644 index 0000000000..7933a38dca --- /dev/null +++ b/changelog.d/10847.misc @@ -0,0 +1 @@ +Add type annotations to `synapse.metrics`. diff --git a/mypy.ini b/mypy.ini index f32c6c41a3..308cfd95d8 100644 --- a/mypy.ini +++ b/mypy.ini @@ -160,6 +160,9 @@ disallow_untyped_defs = True [mypy-synapse.handlers.*] disallow_untyped_defs = True +[mypy-synapse.metrics.*] +disallow_untyped_defs = True + [mypy-synapse.push.*] disallow_untyped_defs = True diff --git a/synapse/app/_base.py b/synapse/app/_base.py index 573bb487b2..807ee3d46e 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -402,7 +402,7 @@ async def start(hs: "HomeServer") -> None: if hasattr(signal, "SIGHUP"): @wrap_as_background_process("sighup") - def handle_sighup(*args: Any, **kwargs: Any) -> None: + async def handle_sighup(*args: Any, **kwargs: Any) -> None: # Tell systemd our state, if we're using it. This will silently fail if # we're not using systemd. sdnotify(b"RELOADING=1") diff --git a/synapse/groups/attestations.py b/synapse/groups/attestations.py index 53f99031b1..a87896e538 100644 --- a/synapse/groups/attestations.py +++ b/synapse/groups/attestations.py @@ -40,6 +40,8 @@ from typing import TYPE_CHECKING, Optional, Tuple from signedjson.sign import sign_json +from twisted.internet.defer import Deferred + from synapse.api.errors import HttpResponseException, RequestSendFailed, SynapseError from synapse.metrics.background_process_metrics import run_as_background_process from synapse.types import JsonDict, get_domain_from_id @@ -166,7 +168,7 @@ class GroupAttestionRenewer: return {} - def _start_renew_attestations(self) -> None: + def _start_renew_attestations(self) -> "Deferred[None]": return run_as_background_process("renew_attestations", self._renew_attestations) async def _renew_attestations(self) -> None: diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 22c6174821..1676ebd057 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -90,7 +90,7 @@ class FollowerTypingHandler: self.wheel_timer = WheelTimer(bucket_size=5000) @wrap_as_background_process("typing._handle_timeouts") - def _handle_timeouts(self) -> None: + async def _handle_timeouts(self) -> None: logger.debug("Checking for typing timeouts") now = self.clock.time_msec() diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 91ee5c8193..ceef57ad88 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -20,10 +20,25 @@ import os import platform import threading import time -from typing import Callable, Dict, Iterable, Mapping, Optional, Tuple, Union +from typing import ( + Any, + Callable, + Dict, + Generic, + Iterable, + Mapping, + Optional, + Sequence, + Set, + Tuple, + Type, + TypeVar, + Union, + cast, +) import attr -from prometheus_client import Counter, Gauge, Histogram +from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram, Metric from prometheus_client.core import ( REGISTRY, CounterMetricFamily, @@ -32,6 +47,7 @@ from prometheus_client.core import ( ) from twisted.internet import reactor +from twisted.internet.base import ReactorBase from twisted.python.threadpool import ThreadPool import synapse @@ -54,7 +70,7 @@ HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat") class RegistryProxy: @staticmethod - def collect(): + def collect() -> Iterable[Metric]: for metric in REGISTRY.collect(): if not metric.name.startswith("__"): yield metric @@ -74,7 +90,7 @@ class LaterGauge: ] ) - def collect(self): + def collect(self) -> Iterable[Metric]: g = GaugeMetricFamily(self.name, self.desc, labels=self.labels) @@ -93,10 +109,10 @@ class LaterGauge: yield g - def __attrs_post_init__(self): + def __attrs_post_init__(self) -> None: self._register() - def _register(self): + def _register(self) -> None: if self.name in all_gauges.keys(): logger.warning("%s already registered, reregistering" % (self.name,)) REGISTRY.unregister(all_gauges.pop(self.name)) @@ -105,7 +121,12 @@ class LaterGauge: all_gauges[self.name] = self -class InFlightGauge: +# `MetricsEntry` only makes sense when it is a `Protocol`, +# but `Protocol` can't be used as a `TypeVar` bound. +MetricsEntry = TypeVar("MetricsEntry") + + +class InFlightGauge(Generic[MetricsEntry]): """Tracks number of things (e.g. requests, Measure blocks, etc) in flight at any given time. @@ -115,14 +136,19 @@ class InFlightGauge: callbacks. Args: - name (str) - desc (str) - labels (list[str]) - sub_metrics (list[str]): A list of sub metrics that the callbacks - will update. + name + desc + labels + sub_metrics: A list of sub metrics that the callbacks will update. """ - def __init__(self, name, desc, labels, sub_metrics): + def __init__( + self, + name: str, + desc: str, + labels: Sequence[str], + sub_metrics: Sequence[str], + ): self.name = name self.desc = desc self.labels = labels @@ -130,19 +156,25 @@ class InFlightGauge: # Create a class which have the sub_metrics values as attributes, which # default to 0 on initialization. Used to pass to registered callbacks. - self._metrics_class = attr.make_class( + self._metrics_class: Type[MetricsEntry] = attr.make_class( "_MetricsEntry", attrs={x: attr.ib(0) for x in sub_metrics}, slots=True ) # Counts number of in flight blocks for a given set of label values - self._registrations: Dict = {} + self._registrations: Dict[ + Tuple[str, ...], Set[Callable[[MetricsEntry], None]] + ] = {} # Protects access to _registrations self._lock = threading.Lock() self._register_with_collector() - def register(self, key, callback): + def register( + self, + key: Tuple[str, ...], + callback: Callable[[MetricsEntry], None], + ) -> None: """Registers that we've entered a new block with labels `key`. `callback` gets called each time the metrics are collected. The same @@ -158,13 +190,17 @@ class InFlightGauge: with self._lock: self._registrations.setdefault(key, set()).add(callback) - def unregister(self, key, callback): + def unregister( + self, + key: Tuple[str, ...], + callback: Callable[[MetricsEntry], None], + ) -> None: """Registers that we've exited a block with labels `key`.""" with self._lock: self._registrations.setdefault(key, set()).discard(callback) - def collect(self): + def collect(self) -> Iterable[Metric]: """Called by prometheus client when it reads metrics. Note: may be called by a separate thread. @@ -200,7 +236,7 @@ class InFlightGauge: gauge.add_metric(key, getattr(metrics, name)) yield gauge - def _register_with_collector(self): + def _register_with_collector(self) -> None: if self.name in all_gauges.keys(): logger.warning("%s already registered, reregistering" % (self.name,)) REGISTRY.unregister(all_gauges.pop(self.name)) @@ -230,7 +266,7 @@ class GaugeBucketCollector: name: str, documentation: str, buckets: Iterable[float], - registry=REGISTRY, + registry: CollectorRegistry = REGISTRY, ): """ Args: @@ -257,12 +293,12 @@ class GaugeBucketCollector: registry.register(self) - def collect(self): + def collect(self) -> Iterable[Metric]: # Don't report metrics unless we've already collected some data if self._metric is not None: yield self._metric - def update_data(self, values: Iterable[float]): + def update_data(self, values: Iterable[float]) -> None: """Update the data to be reported by the metric The existing data is cleared, and each measurement in the input is assigned @@ -304,7 +340,7 @@ class GaugeBucketCollector: class CPUMetrics: - def __init__(self): + def __init__(self) -> None: ticks_per_sec = 100 try: # Try and get the system config @@ -314,7 +350,7 @@ class CPUMetrics: self.ticks_per_sec = ticks_per_sec - def collect(self): + def collect(self) -> Iterable[Metric]: if not HAVE_PROC_SELF_STAT: return @@ -364,7 +400,7 @@ gc_time = Histogram( class GCCounts: - def collect(self): + def collect(self) -> Iterable[Metric]: cm = GaugeMetricFamily("python_gc_counts", "GC object counts", labels=["gen"]) for n, m in enumerate(gc.get_count()): cm.add_metric([str(n)], m) @@ -382,7 +418,7 @@ if not running_on_pypy: class PyPyGCStats: - def collect(self): + def collect(self) -> Iterable[Metric]: # @stats is a pretty-printer object with __str__() returning a nice table, # plus some fields that contain data from that table. @@ -565,7 +601,7 @@ def register_threadpool(name: str, threadpool: ThreadPool) -> None: class ReactorLastSeenMetric: - def collect(self): + def collect(self) -> Iterable[Metric]: cm = GaugeMetricFamily( "python_twisted_reactor_last_seen", "Seconds since the Twisted reactor was last seen", @@ -584,9 +620,12 @@ MIN_TIME_BETWEEN_GCS = (1.0, 10.0, 30.0) _last_gc = [0.0, 0.0, 0.0] -def runUntilCurrentTimer(reactor, func): +F = TypeVar("F", bound=Callable[..., Any]) + + +def runUntilCurrentTimer(reactor: ReactorBase, func: F) -> F: @functools.wraps(func) - def f(*args, **kwargs): + def f(*args: Any, **kwargs: Any) -> Any: now = reactor.seconds() num_pending = 0 @@ -649,7 +688,7 @@ def runUntilCurrentTimer(reactor, func): return ret - return f + return cast(F, f) try: @@ -677,5 +716,5 @@ __all__ = [ "start_http_server", "LaterGauge", "InFlightGauge", - "BucketCollector", + "GaugeBucketCollector", ] diff --git a/synapse/metrics/_exposition.py b/synapse/metrics/_exposition.py index bb9bcb5592..353d0a63b6 100644 --- a/synapse/metrics/_exposition.py +++ b/synapse/metrics/_exposition.py @@ -25,27 +25,25 @@ import math import threading from http.server import BaseHTTPRequestHandler, HTTPServer from socketserver import ThreadingMixIn -from typing import Dict, List +from typing import Any, Dict, List, Type, Union from urllib.parse import parse_qs, urlparse -from prometheus_client import REGISTRY +from prometheus_client import REGISTRY, CollectorRegistry +from prometheus_client.core import Sample from twisted.web.resource import Resource +from twisted.web.server import Request from synapse.util import caches CONTENT_TYPE_LATEST = "text/plain; version=0.0.4; charset=utf-8" -INF = float("inf") -MINUS_INF = float("-inf") - - -def floatToGoString(d): +def floatToGoString(d: Union[int, float]) -> str: d = float(d) - if d == INF: + if d == math.inf: return "+Inf" - elif d == MINUS_INF: + elif d == -math.inf: return "-Inf" elif math.isnan(d): return "NaN" @@ -60,7 +58,7 @@ def floatToGoString(d): return s -def sample_line(line, name): +def sample_line(line: Sample, name: str) -> str: if line.labels: labelstr = "{{{0}}}".format( ",".join( @@ -82,7 +80,7 @@ def sample_line(line, name): return "{}{} {}{}\n".format(name, labelstr, floatToGoString(line.value), timestamp) -def generate_latest(registry, emit_help=False): +def generate_latest(registry: CollectorRegistry, emit_help: bool = False) -> bytes: # Trigger the cache metrics to be rescraped, which updates the common # metrics but do not produce metrics themselves @@ -187,7 +185,7 @@ class MetricsHandler(BaseHTTPRequestHandler): registry = REGISTRY - def do_GET(self): + def do_GET(self) -> None: registry = self.registry params = parse_qs(urlparse(self.path).query) @@ -207,11 +205,11 @@ class MetricsHandler(BaseHTTPRequestHandler): self.end_headers() self.wfile.write(output) - def log_message(self, format, *args): + def log_message(self, format: str, *args: Any) -> None: """Log nothing.""" @classmethod - def factory(cls, registry): + def factory(cls, registry: CollectorRegistry) -> Type: """Returns a dynamic MetricsHandler class tied to the passed registry. """ @@ -236,7 +234,9 @@ class _ThreadingSimpleServer(ThreadingMixIn, HTTPServer): daemon_threads = True -def start_http_server(port, addr="", registry=REGISTRY): +def start_http_server( + port: int, addr: str = "", registry: CollectorRegistry = REGISTRY +) -> None: """Starts an HTTP server for prometheus metrics as a daemon thread""" CustomMetricsHandler = MetricsHandler.factory(registry) httpd = _ThreadingSimpleServer((addr, port), CustomMetricsHandler) @@ -252,10 +252,10 @@ class MetricsResource(Resource): isLeaf = True - def __init__(self, registry=REGISTRY): + def __init__(self, registry: CollectorRegistry = REGISTRY): self.registry = registry - def render_GET(self, request): + def render_GET(self, request: Request) -> bytes: request.setHeader(b"Content-Type", CONTENT_TYPE_LATEST.encode("ascii")) response = generate_latest(self.registry) request.setHeader(b"Content-Length", str(len(response))) diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index 2ab599a334..53c508af91 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -15,19 +15,37 @@ import logging import threading from functools import wraps -from typing import TYPE_CHECKING, Dict, Optional, Set, Union +from types import TracebackType +from typing import ( + TYPE_CHECKING, + Any, + Awaitable, + Callable, + Dict, + Iterable, + Optional, + Set, + Type, + TypeVar, + Union, + cast, +) +from prometheus_client import Metric from prometheus_client.core import REGISTRY, Counter, Gauge from twisted.internet import defer -from synapse.logging.context import LoggingContext, PreserveLoggingContext +from synapse.logging.context import ( + ContextResourceUsage, + LoggingContext, + PreserveLoggingContext, +) from synapse.logging.opentracing import ( SynapseTags, noop_context_manager, start_active_span, ) -from synapse.util.async_helpers import maybe_awaitable if TYPE_CHECKING: import resource @@ -116,7 +134,7 @@ class _Collector: before they are returned. """ - def collect(self): + def collect(self) -> Iterable[Metric]: global _background_processes_active_since_last_scrape # We swap out the _background_processes set with an empty one so that @@ -144,12 +162,12 @@ REGISTRY.register(_Collector()) class _BackgroundProcess: - def __init__(self, desc, ctx): + def __init__(self, desc: str, ctx: LoggingContext): self.desc = desc self._context = ctx - self._reported_stats = None + self._reported_stats: Optional[ContextResourceUsage] = None - def update_metrics(self): + def update_metrics(self) -> None: """Updates the metrics with values from this process.""" new_stats = self._context.get_resource_usage() if self._reported_stats is None: @@ -169,7 +187,16 @@ class _BackgroundProcess: ) -def run_as_background_process(desc: str, func, *args, bg_start_span=True, **kwargs): +R = TypeVar("R") + + +def run_as_background_process( + desc: str, + func: Callable[..., Awaitable[Optional[R]]], + *args: Any, + bg_start_span: bool = True, + **kwargs: Any, +) -> "defer.Deferred[Optional[R]]": """Run the given function in its own logcontext, with resource metrics This should be used to wrap processes which are fired off to run in the @@ -189,11 +216,13 @@ def run_as_background_process(desc: str, func, *args, bg_start_span=True, **kwar args: positional args for func kwargs: keyword args for func - Returns: Deferred which returns the result of func, but note that it does not - follow the synapse logcontext rules. + Returns: + Deferred which returns the result of func, or `None` if func raises. + Note that the returned Deferred does not follow the synapse logcontext + rules. """ - async def run(): + async def run() -> Optional[R]: with _bg_metrics_lock: count = _background_process_counts.get(desc, 0) _background_process_counts[desc] = count + 1 @@ -210,12 +239,13 @@ def run_as_background_process(desc: str, func, *args, bg_start_span=True, **kwar else: ctx = noop_context_manager() with ctx: - return await maybe_awaitable(func(*args, **kwargs)) + return await func(*args, **kwargs) except Exception: logger.exception( "Background process '%s' threw an exception", desc, ) + return None finally: _background_process_in_flight_count.labels(desc).dec() @@ -225,19 +255,24 @@ def run_as_background_process(desc: str, func, *args, bg_start_span=True, **kwar return defer.ensureDeferred(run()) -def wrap_as_background_process(desc): +F = TypeVar("F", bound=Callable[..., Awaitable[Optional[Any]]]) + + +def wrap_as_background_process(desc: str) -> Callable[[F], F]: """Decorator that wraps a function that gets called as a background process. - Equivalent of calling the function with `run_as_background_process` + Equivalent to calling the function with `run_as_background_process` """ - def wrap_as_background_process_inner(func): + def wrap_as_background_process_inner(func: F) -> F: @wraps(func) - def wrap_as_background_process_inner_2(*args, **kwargs): + def wrap_as_background_process_inner_2( + *args: Any, **kwargs: Any + ) -> "defer.Deferred[Optional[R]]": return run_as_background_process(desc, func, *args, **kwargs) - return wrap_as_background_process_inner_2 + return cast(F, wrap_as_background_process_inner_2) return wrap_as_background_process_inner @@ -265,7 +300,7 @@ class BackgroundProcessLoggingContext(LoggingContext): super().__init__("%s-%s" % (name, instance_id)) self._proc = _BackgroundProcess(name, self) - def start(self, rusage: "Optional[resource.struct_rusage]"): + def start(self, rusage: "Optional[resource.struct_rusage]") -> None: """Log context has started running (again).""" super().start(rusage) @@ -276,7 +311,12 @@ class BackgroundProcessLoggingContext(LoggingContext): with _bg_metrics_lock: _background_processes_active_since_last_scrape.add(self._proc) - def __exit__(self, type, value, traceback) -> None: + def __exit__( + self, + type: Optional[Type[BaseException]], + value: Optional[BaseException], + traceback: Optional[TracebackType], + ) -> None: """Log context has finished.""" super().__exit__(type, value, traceback) diff --git a/synapse/metrics/jemalloc.py b/synapse/metrics/jemalloc.py index 29ab6c0229..98ed9c0829 100644 --- a/synapse/metrics/jemalloc.py +++ b/synapse/metrics/jemalloc.py @@ -16,14 +16,16 @@ import ctypes import logging import os import re -from typing import Optional +from typing import Iterable, Optional + +from prometheus_client import Metric from synapse.metrics import REGISTRY, GaugeMetricFamily logger = logging.getLogger(__name__) -def _setup_jemalloc_stats(): +def _setup_jemalloc_stats() -> None: """Checks to see if jemalloc is loaded, and hooks up a collector to record statistics exposed by jemalloc. """ @@ -135,7 +137,7 @@ def _setup_jemalloc_stats(): class JemallocCollector: """Metrics for internal jemalloc stats.""" - def collect(self): + def collect(self) -> Iterable[Metric]: _jemalloc_refresh_stats() g = GaugeMetricFamily( @@ -185,7 +187,7 @@ def _setup_jemalloc_stats(): logger.debug("Added jemalloc stats") -def setup_jemalloc_stats(): +def setup_jemalloc_stats() -> None: """Try to setup jemalloc stats, if jemalloc is loaded.""" try: diff --git a/synapse/storage/database.py b/synapse/storage/database.py index d4cab69ebf..0693d39006 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -188,7 +188,7 @@ class LoggingDatabaseConnection: # The type of entry which goes on our after_callbacks and exception_callbacks lists. -_CallbackListEntry = Tuple[Callable[..., None], Iterable[Any], Dict[str, Any]] +_CallbackListEntry = Tuple[Callable[..., object], Iterable[Any], Dict[str, Any]] R = TypeVar("R") @@ -235,7 +235,7 @@ class LoggingTransaction: self.after_callbacks = after_callbacks self.exception_callbacks = exception_callbacks - def call_after(self, callback: Callable[..., None], *args: Any, **kwargs: Any): + def call_after(self, callback: Callable[..., object], *args: Any, **kwargs: Any): """Call the given callback on the main twisted thread after the transaction has finished. Used to invalidate the caches on the correct thread. @@ -247,7 +247,7 @@ class LoggingTransaction: self.after_callbacks.append((callback, args, kwargs)) def call_on_exception( - self, callback: Callable[..., None], *args: Any, **kwargs: Any + self, callback: Callable[..., object], *args: Any, **kwargs: Any ): # if self.exception_callbacks is None, that means that whatever constructed the # LoggingTransaction isn't expecting there to be any callbacks; assert that diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py index 6a7e534576..67ee4c693b 100644 --- a/synapse/util/caches/expiringcache.py +++ b/synapse/util/caches/expiringcache.py @@ -159,7 +159,7 @@ class ExpiringCache(Generic[KT, VT]): self[key] = value return value - def _prune_cache(self) -> None: + async def _prune_cache(self) -> None: if not self._expiry_ms: # zero expiry time means don't expire. This should never get called # since we have this check in start too. diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py index ad775dfc7d..98ee49af6e 100644 --- a/synapse/util/metrics.py +++ b/synapse/util/metrics.py @@ -56,8 +56,15 @@ block_db_sched_duration = Counter( "synapse_util_metrics_block_db_sched_duration_seconds", "", ["block_name"] ) + +# This is dynamically created in InFlightGauge.__init__. +class _InFlightMetric(Protocol): + real_time_max: float + real_time_sum: float + + # Tracks the number of blocks currently active -in_flight = InFlightGauge( +in_flight: InFlightGauge[_InFlightMetric] = InFlightGauge( "synapse_util_metrics_block_in_flight", "", labels=["block_name"], @@ -65,12 +72,6 @@ in_flight = InFlightGauge( ) -# This is dynamically created in InFlightGauge.__init__. -class _InFlightMetric(Protocol): - real_time_max: float - real_time_sum: float - - T = TypeVar("T", bound=Callable[..., Any]) -- cgit 1.5.1 From 7b4e228e415a6ed85bbd6a2fc37f305e1cb81c82 Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Mon, 29 Nov 2021 15:13:23 +0200 Subject: Fix using MSC2716 batch sending with event persistence workers (#11220) Signed-off-by: Tulir Asokan --- changelog.d/11220.bugfix | 1 + synapse/app/generic_worker.py | 2 ++ synapse/events/snapshot.py | 5 +++++ 3 files changed, 8 insertions(+) create mode 100644 changelog.d/11220.bugfix (limited to 'synapse/app') diff --git a/changelog.d/11220.bugfix b/changelog.d/11220.bugfix new file mode 100644 index 0000000000..8baae28d5b --- /dev/null +++ b/changelog.d/11220.bugfix @@ -0,0 +1 @@ +Fix using MSC2716 batch sending in combination with event persistence workers. Contributed by @tulir at Beeper. diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 502cc8e8d1..b4bed5bf40 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -113,6 +113,7 @@ from synapse.storage.databases.main.monthly_active_users import ( ) from synapse.storage.databases.main.presence import PresenceStore from synapse.storage.databases.main.room import RoomWorkerStore +from synapse.storage.databases.main.room_batch import RoomBatchStore from synapse.storage.databases.main.search import SearchStore from synapse.storage.databases.main.session import SessionStore from synapse.storage.databases.main.stats import StatsStore @@ -240,6 +241,7 @@ class GenericWorkerSlavedStore( SlavedEventStore, SlavedKeyStore, RoomWorkerStore, + RoomBatchStore, DirectoryStore, SlavedApplicationServiceStore, SlavedRegistrationStore, diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index d7527008c4..f251402ed8 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -322,6 +322,11 @@ class _AsyncEventContextImpl(EventContext): attributes by loading from the database. """ if self.state_group is None: + # No state group means the event is an outlier. Usually the state_ids dicts are also + # pre-set to empty dicts, but they get reset when the context is serialized, so set + # them to empty dicts again here. + self._current_state_ids = {} + self._prev_state_ids = {} return current_state_ids = await self._storage.state.get_state_ids_for_group( -- cgit 1.5.1 From a4521ce0a8d252e77ca8bd261ecf40ba67511a31 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 29 Nov 2021 14:32:20 -0500 Subject: Support the stable /hierarchy endpoint from MSC2946 (#11329) This also makes additional updates where the implementation had drifted from the approved MSC. Unstable endpoints will be removed at a later data. --- .github/workflows/tests.yml | 2 +- changelog.d/11329.feature | 1 + docs/workers.md | 4 +- scripts-dev/complement.sh | 2 +- synapse/app/homeserver.py | 1 + synapse/federation/federation_client.py | 31 ++++++-- synapse/federation/transport/client.py | 22 +++++- synapse/federation/transport/server/federation.py | 6 +- synapse/handlers/room_summary.py | 14 +++- synapse/rest/client/room.py | 8 +- tests/handlers/test_room_summary.py | 94 ++++++++++++++++------- 11 files changed, 134 insertions(+), 51 deletions(-) create mode 100644 changelog.d/11329.feature (limited to 'synapse/app') diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 3bee4ee9f3..21c9ee7823 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -374,7 +374,7 @@ jobs: working-directory: complement/dockerfiles # Run Complement - - run: go test -v -tags synapse_blacklist,msc2403,msc2946 ./tests/... + - run: go test -v -tags synapse_blacklist,msc2403 ./tests/... env: COMPLEMENT_BASE_IMAGE: complement-synapse:latest working-directory: complement diff --git a/changelog.d/11329.feature b/changelog.d/11329.feature new file mode 100644 index 0000000000..7e0efb3b00 --- /dev/null +++ b/changelog.d/11329.feature @@ -0,0 +1 @@ +Support the stable API endpoints for [MSC2946](https://github.com/matrix-org/matrix-doc/pull/2946): the room `/hierarchy` endpoint. diff --git a/docs/workers.md b/docs/workers.md index 17c8bfeef6..fd83e2ddeb 100644 --- a/docs/workers.md +++ b/docs/workers.md @@ -210,7 +210,7 @@ expressions: ^/_matrix/federation/v1/get_groups_publicised$ ^/_matrix/key/v2/query ^/_matrix/federation/unstable/org.matrix.msc2946/spaces/ - ^/_matrix/federation/unstable/org.matrix.msc2946/hierarchy/ + ^/_matrix/federation/(v1|unstable/org.matrix.msc2946)/hierarchy/ # Inbound federation transaction request ^/_matrix/federation/v1/send/ @@ -223,7 +223,7 @@ expressions: ^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/members$ ^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/state$ ^/_matrix/client/unstable/org.matrix.msc2946/rooms/.*/spaces$ - ^/_matrix/client/unstable/org.matrix.msc2946/rooms/.*/hierarchy$ + ^/_matrix/client/(v1|unstable/org.matrix.msc2946)/rooms/.*/hierarchy$ ^/_matrix/client/unstable/im.nheko.summary/rooms/.*/summary$ ^/_matrix/client/(api/v1|r0|v3|unstable)/account/3pid$ ^/_matrix/client/(api/v1|r0|v3|unstable)/devices$ diff --git a/scripts-dev/complement.sh b/scripts-dev/complement.sh index 972244f4c9..53295b58fc 100755 --- a/scripts-dev/complement.sh +++ b/scripts-dev/complement.sh @@ -65,4 +65,4 @@ if [[ -n "$1" ]]; then fi # Run the tests! -go test -v -tags synapse_blacklist,msc2946,msc2403 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests/... +go test -v -tags synapse_blacklist,msc2403 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests/... diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 7e09530ad2..52541faab2 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -194,6 +194,7 @@ class SynapseHomeServer(HomeServer): { "/_matrix/client/api/v1": client_resource, "/_matrix/client/r0": client_resource, + "/_matrix/client/v1": client_resource, "/_matrix/client/v3": client_resource, "/_matrix/client/unstable": client_resource, "/_matrix/client/v2_alpha": client_resource, diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 3b85b135e0..bc3f96c1fc 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -1395,11 +1395,28 @@ class FederationClient(FederationBase): async def send_request( destination: str, ) -> Tuple[JsonDict, Sequence[JsonDict], Sequence[str]]: - res = await self.transport_layer.get_room_hierarchy( - destination=destination, - room_id=room_id, - suggested_only=suggested_only, - ) + try: + res = await self.transport_layer.get_room_hierarchy( + destination=destination, + room_id=room_id, + suggested_only=suggested_only, + ) + except HttpResponseException as e: + # If an error is received that is due to an unrecognised endpoint, + # fallback to the unstable endpoint. Otherwise consider it a + # legitmate error and raise. + if not self._is_unknown_endpoint(e): + raise + + logger.debug( + "Couldn't fetch room hierarchy with the v1 API, falling back to the unstable API" + ) + + res = await self.transport_layer.get_room_hierarchy_unstable( + destination=destination, + room_id=room_id, + suggested_only=suggested_only, + ) room = res.get("room") if not isinstance(room, dict): @@ -1449,6 +1466,10 @@ class FederationClient(FederationBase): if e.code != 502: raise + logger.debug( + "Couldn't fetch room hierarchy, falling back to the spaces API" + ) + # Fallback to the old federation API and translate the results if # no servers implement the new API. # diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 0fea221165..fe29bcfd4b 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -1192,10 +1192,24 @@ class TransportLayerClient: ) async def get_room_hierarchy( - self, - destination: str, - room_id: str, - suggested_only: bool, + self, destination: str, room_id: str, suggested_only: bool + ) -> JsonDict: + """ + Args: + destination: The remote server + room_id: The room ID to ask about. + suggested_only: if True, only suggested rooms will be returned + """ + path = _create_v1_path("/hierarchy/%s", room_id) + + return await self.client.get_json( + destination=destination, + path=path, + args={"suggested_only": "true" if suggested_only else "false"}, + ) + + async def get_room_hierarchy_unstable( + self, destination: str, room_id: str, suggested_only: bool ) -> JsonDict: """ Args: diff --git a/synapse/federation/transport/server/federation.py b/synapse/federation/transport/server/federation.py index 2fdf6cc99e..66e915228c 100644 --- a/synapse/federation/transport/server/federation.py +++ b/synapse/federation/transport/server/federation.py @@ -611,7 +611,6 @@ class FederationSpaceSummaryServlet(BaseFederationServlet): class FederationRoomHierarchyServlet(BaseFederationServlet): - PREFIX = FEDERATION_UNSTABLE_PREFIX + "/org.matrix.msc2946" PATH = "/hierarchy/(?P[^/]*)" def __init__( @@ -637,6 +636,10 @@ class FederationRoomHierarchyServlet(BaseFederationServlet): ) +class FederationRoomHierarchyUnstableServlet(FederationRoomHierarchyServlet): + PREFIX = FEDERATION_UNSTABLE_PREFIX + "/org.matrix.msc2946" + + class RoomComplexityServlet(BaseFederationServlet): """ Indicates to other servers how complex (and therefore likely @@ -701,6 +704,7 @@ FEDERATION_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = ( RoomComplexityServlet, FederationSpaceSummaryServlet, FederationRoomHierarchyServlet, + FederationRoomHierarchyUnstableServlet, FederationV1SendKnockServlet, FederationMakeKnockServlet, ) diff --git a/synapse/handlers/room_summary.py b/synapse/handlers/room_summary.py index 8181cc0b52..b2cfe537df 100644 --- a/synapse/handlers/room_summary.py +++ b/synapse/handlers/room_summary.py @@ -36,8 +36,9 @@ from synapse.api.errors import ( SynapseError, UnsupportedRoomVersionError, ) +from synapse.api.ratelimiting import Ratelimiter from synapse.events import EventBase -from synapse.types import JsonDict +from synapse.types import JsonDict, Requester from synapse.util.caches.response_cache import ResponseCache if TYPE_CHECKING: @@ -93,6 +94,9 @@ class RoomSummaryHandler: self._event_serializer = hs.get_event_client_serializer() self._server_name = hs.hostname self._federation_client = hs.get_federation_client() + self._ratelimiter = Ratelimiter( + store=self._store, clock=hs.get_clock(), rate_hz=5, burst_count=10 + ) # If a user tries to fetch the same page multiple times in quick succession, # only process the first attempt and return its result to subsequent requests. @@ -249,7 +253,7 @@ class RoomSummaryHandler: async def get_room_hierarchy( self, - requester: str, + requester: Requester, requested_room_id: str, suggested_only: bool = False, max_depth: Optional[int] = None, @@ -276,6 +280,8 @@ class RoomSummaryHandler: Returns: The JSON hierarchy dictionary. """ + await self._ratelimiter.ratelimit(requester) + # If a user tries to fetch the same page multiple times in quick succession, # only process the first attempt and return its result to subsequent requests. # @@ -283,7 +289,7 @@ class RoomSummaryHandler: # to process multiple requests for the same page will result in errors. return await self._pagination_response_cache.wrap( ( - requester, + requester.user.to_string(), requested_room_id, suggested_only, max_depth, @@ -291,7 +297,7 @@ class RoomSummaryHandler: from_token, ), self._get_room_hierarchy, - requester, + requester.user.to_string(), requested_room_id, suggested_only, max_depth, diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py index 955d4e8641..73d0f7c950 100644 --- a/synapse/rest/client/room.py +++ b/synapse/rest/client/room.py @@ -1138,12 +1138,12 @@ class RoomSpaceSummaryRestServlet(RestServlet): class RoomHierarchyRestServlet(RestServlet): - PATTERNS = ( + PATTERNS = [ re.compile( - "^/_matrix/client/unstable/org.matrix.msc2946" + "^/_matrix/client/(v1|unstable/org.matrix.msc2946)" "/rooms/(?P[^/]*)/hierarchy$" ), - ) + ] def __init__(self, hs: "HomeServer"): super().__init__() @@ -1168,7 +1168,7 @@ class RoomHierarchyRestServlet(RestServlet): ) return 200, await self._room_summary_handler.get_room_hierarchy( - requester.user.to_string(), + requester, room_id, suggested_only=parse_boolean(request, "suggested_only", default=False), max_depth=max_depth, diff --git a/tests/handlers/test_room_summary.py b/tests/handlers/test_room_summary.py index 7b95844b55..e5a6a6c747 100644 --- a/tests/handlers/test_room_summary.py +++ b/tests/handlers/test_room_summary.py @@ -32,7 +32,7 @@ from synapse.handlers.room_summary import _child_events_comparison_key, _RoomEnt from synapse.rest import admin from synapse.rest.client import login, room from synapse.server import HomeServer -from synapse.types import JsonDict, UserID +from synapse.types import JsonDict, UserID, create_requester from tests import unittest @@ -249,7 +249,7 @@ class SpaceSummaryTestCase(unittest.HomeserverTestCase): self._assert_rooms(result, expected) result = self.get_success( - self.handler.get_room_hierarchy(self.user, self.space) + self.handler.get_room_hierarchy(create_requester(self.user), self.space) ) self._assert_hierarchy(result, expected) @@ -263,7 +263,9 @@ class SpaceSummaryTestCase(unittest.HomeserverTestCase): expected = [(self.space, [self.room]), (self.room, ())] self._assert_rooms(result, expected) - result = self.get_success(self.handler.get_room_hierarchy(user2, self.space)) + result = self.get_success( + self.handler.get_room_hierarchy(create_requester(user2), self.space) + ) self._assert_hierarchy(result, expected) # If the space is made invite-only, it should no longer be viewable. @@ -274,7 +276,10 @@ class SpaceSummaryTestCase(unittest.HomeserverTestCase): tok=self.token, ) self.get_failure(self.handler.get_space_summary(user2, self.space), AuthError) - self.get_failure(self.handler.get_room_hierarchy(user2, self.space), AuthError) + self.get_failure( + self.handler.get_room_hierarchy(create_requester(user2), self.space), + AuthError, + ) # If the space is made world-readable it should return a result. self.helper.send_state( @@ -286,7 +291,9 @@ class SpaceSummaryTestCase(unittest.HomeserverTestCase): result = self.get_success(self.handler.get_space_summary(user2, self.space)) self._assert_rooms(result, expected) - result = self.get_success(self.handler.get_room_hierarchy(user2, self.space)) + result = self.get_success( + self.handler.get_room_hierarchy(create_requester(user2), self.space) + ) self._assert_hierarchy(result, expected) # Make it not world-readable again and confirm it results in an error. @@ -297,7 +304,10 @@ class SpaceSummaryTestCase(unittest.HomeserverTestCase): tok=self.token, ) self.get_failure(self.handler.get_space_summary(user2, self.space), AuthError) - self.get_failure(self.handler.get_room_hierarchy(user2, self.space), AuthError) + self.get_failure( + self.handler.get_room_hierarchy(create_requester(user2), self.space), + AuthError, + ) # Join the space and results should be returned. self.helper.invite(self.space, targ=user2, tok=self.token) @@ -305,7 +315,9 @@ class SpaceSummaryTestCase(unittest.HomeserverTestCase): result = self.get_success(self.handler.get_space_summary(user2, self.space)) self._assert_rooms(result, expected) - result = self.get_success(self.handler.get_room_hierarchy(user2, self.space)) + result = self.get_success( + self.handler.get_room_hierarchy(create_requester(user2), self.space) + ) self._assert_hierarchy(result, expected) # Attempting to view an unknown room returns the same error. @@ -314,7 +326,9 @@ class SpaceSummaryTestCase(unittest.HomeserverTestCase): AuthError, ) self.get_failure( - self.handler.get_room_hierarchy(user2, "#not-a-space:" + self.hs.hostname), + self.handler.get_room_hierarchy( + create_requester(user2), "#not-a-space:" + self.hs.hostname + ), AuthError, ) @@ -322,10 +336,10 @@ class SpaceSummaryTestCase(unittest.HomeserverTestCase): """In-flight room hierarchy requests are deduplicated.""" # Run two `get_room_hierarchy` calls up until they block. deferred1 = ensureDeferred( - self.handler.get_room_hierarchy(self.user, self.space) + self.handler.get_room_hierarchy(create_requester(self.user), self.space) ) deferred2 = ensureDeferred( - self.handler.get_room_hierarchy(self.user, self.space) + self.handler.get_room_hierarchy(create_requester(self.user), self.space) ) # Complete the two calls. @@ -340,7 +354,7 @@ class SpaceSummaryTestCase(unittest.HomeserverTestCase): # A subsequent `get_room_hierarchy` call should not reuse the result. result3 = self.get_success( - self.handler.get_room_hierarchy(self.user, self.space) + self.handler.get_room_hierarchy(create_requester(self.user), self.space) ) self._assert_hierarchy(result3, expected) self.assertIsNot(result1, result3) @@ -359,9 +373,11 @@ class SpaceSummaryTestCase(unittest.HomeserverTestCase): # Run two `get_room_hierarchy` calls for different users up until they block. deferred1 = ensureDeferred( - self.handler.get_room_hierarchy(self.user, self.space) + self.handler.get_room_hierarchy(create_requester(self.user), self.space) + ) + deferred2 = ensureDeferred( + self.handler.get_room_hierarchy(create_requester(user2), self.space) ) - deferred2 = ensureDeferred(self.handler.get_room_hierarchy(user2, self.space)) # Complete the two calls. result1 = self.get_success(deferred1) @@ -465,7 +481,9 @@ class SpaceSummaryTestCase(unittest.HomeserverTestCase): ] self._assert_rooms(result, expected) - result = self.get_success(self.handler.get_room_hierarchy(user2, self.space)) + result = self.get_success( + self.handler.get_room_hierarchy(create_requester(user2), self.space) + ) self._assert_hierarchy(result, expected) def test_complex_space(self): @@ -507,7 +525,7 @@ class SpaceSummaryTestCase(unittest.HomeserverTestCase): self._assert_rooms(result, expected) result = self.get_success( - self.handler.get_room_hierarchy(self.user, self.space) + self.handler.get_room_hierarchy(create_requester(self.user), self.space) ) self._assert_hierarchy(result, expected) @@ -522,7 +540,9 @@ class SpaceSummaryTestCase(unittest.HomeserverTestCase): room_ids.append(self.room) result = self.get_success( - self.handler.get_room_hierarchy(self.user, self.space, limit=7) + self.handler.get_room_hierarchy( + create_requester(self.user), self.space, limit=7 + ) ) # The result should have the space and all of the links, plus some of the # rooms and a pagination token. @@ -534,7 +554,10 @@ class SpaceSummaryTestCase(unittest.HomeserverTestCase): # Check the next page. result = self.get_success( self.handler.get_room_hierarchy( - self.user, self.space, limit=5, from_token=result["next_batch"] + create_requester(self.user), + self.space, + limit=5, + from_token=result["next_batch"], ) ) # The result should have the space and the room in it, along with a link @@ -554,20 +577,22 @@ class SpaceSummaryTestCase(unittest.HomeserverTestCase): room_ids.append(self.room) result = self.get_success( - self.handler.get_room_hierarchy(self.user, self.space, limit=7) + self.handler.get_room_hierarchy( + create_requester(self.user), self.space, limit=7 + ) ) self.assertIn("next_batch", result) # Changing the room ID, suggested-only, or max-depth causes an error. self.get_failure( self.handler.get_room_hierarchy( - self.user, self.room, from_token=result["next_batch"] + create_requester(self.user), self.room, from_token=result["next_batch"] ), SynapseError, ) self.get_failure( self.handler.get_room_hierarchy( - self.user, + create_requester(self.user), self.space, suggested_only=True, from_token=result["next_batch"], @@ -576,14 +601,19 @@ class SpaceSummaryTestCase(unittest.HomeserverTestCase): ) self.get_failure( self.handler.get_room_hierarchy( - self.user, self.space, max_depth=0, from_token=result["next_batch"] + create_requester(self.user), + self.space, + max_depth=0, + from_token=result["next_batch"], ), SynapseError, ) # An invalid token is ignored. self.get_failure( - self.handler.get_room_hierarchy(self.user, self.space, from_token="foo"), + self.handler.get_room_hierarchy( + create_requester(self.user), self.space, from_token="foo" + ), SynapseError, ) @@ -609,14 +639,18 @@ class SpaceSummaryTestCase(unittest.HomeserverTestCase): # Test just the space itself. result = self.get_success( - self.handler.get_room_hierarchy(self.user, self.space, max_depth=0) + self.handler.get_room_hierarchy( + create_requester(self.user), self.space, max_depth=0 + ) ) expected: List[Tuple[str, Iterable[str]]] = [(spaces[0], [rooms[0], spaces[1]])] self._assert_hierarchy(result, expected) # A single additional layer. result = self.get_success( - self.handler.get_room_hierarchy(self.user, self.space, max_depth=1) + self.handler.get_room_hierarchy( + create_requester(self.user), self.space, max_depth=1 + ) ) expected += [ (rooms[0], ()), @@ -626,7 +660,9 @@ class SpaceSummaryTestCase(unittest.HomeserverTestCase): # A few layers. result = self.get_success( - self.handler.get_room_hierarchy(self.user, self.space, max_depth=3) + self.handler.get_room_hierarchy( + create_requester(self.user), self.space, max_depth=3 + ) ) expected += [ (rooms[1], ()), @@ -657,7 +693,7 @@ class SpaceSummaryTestCase(unittest.HomeserverTestCase): self._assert_rooms(result, expected) result = self.get_success( - self.handler.get_room_hierarchy(self.user, self.space) + self.handler.get_room_hierarchy(create_requester(self.user), self.space) ) self._assert_hierarchy(result, expected) @@ -739,7 +775,7 @@ class SpaceSummaryTestCase(unittest.HomeserverTestCase): new=summarize_remote_room_hierarchy, ): result = self.get_success( - self.handler.get_room_hierarchy(self.user, self.space) + self.handler.get_room_hierarchy(create_requester(self.user), self.space) ) self._assert_hierarchy(result, expected) @@ -906,7 +942,7 @@ class SpaceSummaryTestCase(unittest.HomeserverTestCase): new=summarize_remote_room_hierarchy, ): result = self.get_success( - self.handler.get_room_hierarchy(self.user, self.space) + self.handler.get_room_hierarchy(create_requester(self.user), self.space) ) self._assert_hierarchy(result, expected) @@ -964,7 +1000,7 @@ class SpaceSummaryTestCase(unittest.HomeserverTestCase): new=summarize_remote_room_hierarchy, ): result = self.get_success( - self.handler.get_room_hierarchy(self.user, self.space) + self.handler.get_room_hierarchy(create_requester(self.user), self.space) ) self._assert_hierarchy(result, expected) -- cgit 1.5.1