summary refs log tree commit diff
path: root/synapse/app
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/app/generic_worker.py26
-rw-r--r--synapse/app/homeserver.py24
-rw-r--r--synapse/app/phone_stats_home.py40
-rw-r--r--synapse/appservice/__init__.py2
-rw-r--r--synapse/appservice/scheduler.py37
5 files changed, 90 insertions, 39 deletions
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py

index 53f1859256..75c65ccc0d 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py
@@ -3,7 +3,7 @@ # # Copyright 2020 The Matrix.org Foundation C.I.C. # Copyright 2016 OpenMarket Ltd -# Copyright (C) 2023 New Vector, Ltd +# Copyright (C) 2023-2024 New Vector, Ltd # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as @@ -51,8 +51,7 @@ from synapse.http.server import JsonResource, OptionsResource from synapse.logging.context import LoggingContext from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource -from synapse.rest import ClientRestResource -from synapse.rest.admin import register_servlets_for_media_repo +from synapse.rest import ClientRestResource, admin from synapse.rest.health import HealthResource from synapse.rest.key.v2 import KeyResource from synapse.rest.synapse.client import build_synapse_client_resource_tree @@ -65,6 +64,7 @@ from synapse.storage.databases.main.appservice import ( ) from synapse.storage.databases.main.censor_events import CensorEventsStore from synapse.storage.databases.main.client_ips import ClientIpWorkerStore +from synapse.storage.databases.main.delayed_events import DelayedEventsStore from synapse.storage.databases.main.deviceinbox import DeviceInboxWorkerStore from synapse.storage.databases.main.devices import DeviceWorkerStore from synapse.storage.databases.main.directory import DirectoryWorkerStore @@ -98,6 +98,7 @@ from synapse.storage.databases.main.roommember import RoomMemberWorkerStore from synapse.storage.databases.main.search import SearchStore from synapse.storage.databases.main.session import SessionStore from synapse.storage.databases.main.signatures import SignatureWorkerStore +from synapse.storage.databases.main.sliding_sync import SlidingSyncStore from synapse.storage.databases.main.state import StateGroupWorkerStore from synapse.storage.databases.main.stats import StatsStore from synapse.storage.databases.main.stream import StreamWorkerStore @@ -159,6 +160,8 @@ class GenericWorkerStore( SessionStore, TaskSchedulerWorkerStore, ExperimentalFeaturesStore, + SlidingSyncStore, + DelayedEventsStore, ): # Properties that multiple storage classes define. Tell mypy what the # expected type is. @@ -172,8 +175,13 @@ class GenericWorkerServer(HomeServer): def _listen_http(self, listener_config: ListenerConfig) -> None: assert listener_config.http_options is not None - # We always include a health resource. - resources: Dict[str, Resource] = {"/health": HealthResource()} + # We always include an admin resource that we populate with servlets as needed + admin_resource = JsonResource(self, canonical_json=False) + resources: Dict[str, Resource] = { + # We always include a health resource. + "/health": HealthResource(), + "/_synapse/admin": admin_resource, + } for res in listener_config.http_options.resources: for name in res.names: @@ -186,6 +194,7 @@ class GenericWorkerServer(HomeServer): resources.update(build_synapse_client_resource_tree(self)) resources["/.well-known"] = well_known_resource(self) + admin.register_servlets(self, admin_resource) elif name == "federation": resources[FEDERATION_PREFIX] = TransportLayerServer(self) @@ -195,15 +204,13 @@ class GenericWorkerServer(HomeServer): # We need to serve the admin servlets for media on the # worker. - admin_resource = JsonResource(self, canonical_json=False) - register_servlets_for_media_repo(self, admin_resource) + admin.register_servlets_for_media_repo(self, admin_resource) resources.update( { MEDIA_R0_PREFIX: media_repo, MEDIA_V3_PREFIX: media_repo, LEGACY_MEDIA_PREFIX: media_repo, - "/_synapse/admin": admin_resource, } ) @@ -280,8 +287,7 @@ class GenericWorkerServer(HomeServer): elif listener.type == "metrics": if not self.config.metrics.enable_metrics: logger.warning( - "Metrics listener configured, but " - "enable_metrics is not True!" + "Metrics listener configured, but enable_metrics is not True!" ) else: if isinstance(listener, TCPListenerConfig): diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 2a824e8457..9b5ecf2c68 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py
@@ -54,6 +54,7 @@ from synapse.config.server import ListenerConfig, TCPListenerConfig from synapse.federation.transport.server import TransportLayerServer from synapse.http.additional_resource import AdditionalResource from synapse.http.server import ( + JsonResource, OptionsResource, RootOptionsRedirectResource, StaticResource, @@ -61,8 +62,7 @@ from synapse.http.server import ( from synapse.logging.context import LoggingContext from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource -from synapse.rest import ClientRestResource -from synapse.rest.admin import AdminRestResource +from synapse.rest import ClientRestResource, admin from synapse.rest.health import HealthResource from synapse.rest.key.v2 import KeyResource from synapse.rest.synapse.client import build_synapse_client_resource_tree @@ -180,24 +180,18 @@ class SynapseHomeServer(HomeServer): if compress: client_resource = gz_wrap(client_resource) + admin_resource = JsonResource(self, canonical_json=False) + admin.register_servlets(self, admin_resource) + resources.update( { CLIENT_API_PREFIX: client_resource, "/.well-known": well_known_resource(self), - "/_synapse/admin": AdminRestResource(self), + "/_synapse/admin": admin_resource, **build_synapse_client_resource_tree(self), } ) - if self.config.email.can_verify_email: - from synapse.rest.synapse.client.password_reset import ( - PasswordResetSubmitTokenResource, - ) - - resources["/_synapse/client/password_reset/email/submit_token"] = ( - PasswordResetSubmitTokenResource(self) - ) - if name == "consent": from synapse.rest.consent.consent_resource import ConsentResource @@ -286,8 +280,7 @@ class SynapseHomeServer(HomeServer): elif listener.type == "metrics": if not self.config.metrics.enable_metrics: logger.warning( - "Metrics listener configured, but " - "enable_metrics is not True!" + "Metrics listener configured, but enable_metrics is not True!" ) else: if isinstance(listener, TCPListenerConfig): @@ -349,12 +342,11 @@ def setup(config_options: List[str]) -> SynapseHomeServer: ): if ( not config.captcha.enable_registration_captcha - and not config.registration.registrations_require_3pid and not config.registration.registration_requires_token ): raise ConfigError( "You have enabled open registration without any verification. This is a known vector for " - "spam and abuse. If you would like to allow public registration, please consider adding email, " + "spam and abuse. If you would like to allow public registration, please consider adding " "captcha, or token-based verification. Otherwise this check can be removed by setting the " "`enable_registration_without_verification` config option to `true`." ) diff --git a/synapse/app/phone_stats_home.py b/synapse/app/phone_stats_home.py
index f602bbbeea..07870a16ee 100644 --- a/synapse/app/phone_stats_home.py +++ b/synapse/app/phone_stats_home.py
@@ -34,6 +34,22 @@ if TYPE_CHECKING: logger = logging.getLogger("synapse.app.homeserver") +ONE_MINUTE_SECONDS = 60 +ONE_HOUR_SECONDS = 60 * ONE_MINUTE_SECONDS + +MILLISECONDS_PER_SECOND = 1000 + +INITIAL_DELAY_BEFORE_FIRST_PHONE_HOME_SECONDS = 5 * ONE_MINUTE_SECONDS +""" +We wait 5 minutes to send the first set of stats as the server can be quite busy the +first few minutes +""" + +PHONE_HOME_INTERVAL_SECONDS = 3 * ONE_HOUR_SECONDS +""" +Phone home stats are sent every 3 hours +""" + # Contains the list of processes we will be monitoring # currently either 0 or 1 _stats_process: List[Tuple[int, "resource.struct_rusage"]] = [] @@ -46,10 +62,6 @@ current_mau_by_service_gauge = Gauge( ["app_service"], ) max_mau_gauge = Gauge("synapse_admin_mau_max", "MAU Limit") -registered_reserved_users_mau_gauge = Gauge( - "synapse_admin_mau_registered_reserved_users", - "Registered users with reserved threepids", -) @wrap_as_background_process("phone_stats_home") @@ -185,12 +197,14 @@ def start_phone_stats_home(hs: "HomeServer") -> None: # If you increase the loop period, the accuracy of user_daily_visits # table will decrease clock.looping_call( - hs.get_datastores().main.generate_user_daily_visits, 5 * 60 * 1000 + hs.get_datastores().main.generate_user_daily_visits, + 5 * ONE_MINUTE_SECONDS * MILLISECONDS_PER_SECOND, ) # monthly active user limiting functionality clock.looping_call( - hs.get_datastores().main.reap_monthly_active_users, 1000 * 60 * 60 + hs.get_datastores().main.reap_monthly_active_users, + ONE_HOUR_SECONDS * MILLISECONDS_PER_SECOND, ) hs.get_datastores().main.reap_monthly_active_users() @@ -198,20 +212,17 @@ def start_phone_stats_home(hs: "HomeServer") -> None: async def generate_monthly_active_users() -> None: current_mau_count = 0 current_mau_count_by_service: Mapping[str, int] = {} - reserved_users: Sized = () store = hs.get_datastores().main if hs.config.server.limit_usage_by_mau or hs.config.server.mau_stats_only: current_mau_count = await store.get_monthly_active_count() current_mau_count_by_service = ( await store.get_monthly_active_count_by_service() ) - reserved_users = await store.get_registered_reserved_users() current_mau_gauge.set(float(current_mau_count)) for app_service, count in current_mau_count_by_service.items(): current_mau_by_service_gauge.labels(app_service).set(float(count)) - registered_reserved_users_mau_gauge.set(float(len(reserved_users))) max_mau_gauge.set(float(hs.config.server.max_mau_value)) if hs.config.server.limit_usage_by_mau or hs.config.server.mau_stats_only: @@ -221,7 +232,12 @@ def start_phone_stats_home(hs: "HomeServer") -> None: if hs.config.metrics.report_stats: logger.info("Scheduling stats reporting for 3 hour intervals") - clock.looping_call(phone_stats_home, 3 * 60 * 60 * 1000, hs, stats) + clock.looping_call( + phone_stats_home, + PHONE_HOME_INTERVAL_SECONDS * MILLISECONDS_PER_SECOND, + hs, + stats, + ) # We need to defer this init for the cases that we daemonize # otherwise the process ID we get is that of the non-daemon process @@ -229,4 +245,6 @@ def start_phone_stats_home(hs: "HomeServer") -> None: # We wait 5 minutes to send the first set of stats as the server can # be quite busy the first few minutes - clock.call_later(5 * 60, phone_stats_home, hs, stats) + clock.call_later( + INITIAL_DELAY_BEFORE_FIRST_PHONE_HOME_SECONDS, phone_stats_home, hs, stats + ) diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py
index a96cdbf1e7..6ee5240c4e 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py
@@ -87,6 +87,7 @@ class ApplicationService: ip_range_whitelist: Optional[IPSet] = None, supports_ephemeral: bool = False, msc3202_transaction_extensions: bool = False, + msc4190_device_management: bool = False, ): self.token = token self.url = ( @@ -100,6 +101,7 @@ class ApplicationService: self.ip_range_whitelist = ip_range_whitelist self.supports_ephemeral = supports_ephemeral self.msc3202_transaction_extensions = msc3202_transaction_extensions + self.msc4190_device_management = msc4190_device_management if "|" in self.id: raise Exception("application service ID cannot contain '|' character") diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index bec83419a2..cba08dde85 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py
@@ -2,7 +2,7 @@ # This file is licensed under the Affero General Public License (AGPL) version 3. # # Copyright 2015, 2016 OpenMarket Ltd -# Copyright (C) 2023 New Vector, Ltd +# Copyright (C) 2023, 2025 New Vector, Ltd # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as @@ -54,6 +54,7 @@ UP & quit +---------- YES SUCCESS This is all tied together by the AppServiceScheduler which DIs the required components. """ + import logging from typing import ( TYPE_CHECKING, @@ -69,6 +70,8 @@ from typing import ( Tuple, ) +from twisted.internet.interfaces import IDelayedCall + from synapse.appservice import ( ApplicationService, ApplicationServiceState, @@ -449,6 +452,20 @@ class _TransactionController: recoverer.recover() logger.info("Now %i active recoverers", len(self.recoverers)) + def force_retry(self, service: ApplicationService) -> None: + """Forces a Recoverer to attempt delivery of transations immediately. + + Args: + service: + """ + recoverer = self.recoverers.get(service.id) + if not recoverer: + # No need to force a retry on a happy AS. + logger.info(f"{service.id} is not in recovery, not forcing retry") + return + + recoverer.force_retry() + async def _is_service_up(self, service: ApplicationService) -> bool: state = await self.store.get_appservice_state(service) return state == ApplicationServiceState.UP or state is None @@ -481,11 +498,12 @@ class _Recoverer: self.service = service self.callback = callback self.backoff_counter = 1 + self.scheduled_recovery: Optional[IDelayedCall] = None def recover(self) -> None: delay = 2**self.backoff_counter logger.info("Scheduling retries on %s in %fs", self.service.id, delay) - self.clock.call_later( + self.scheduled_recovery = self.clock.call_later( delay, run_as_background_process, "as-recoverer", self.retry ) @@ -495,6 +513,21 @@ class _Recoverer: self.backoff_counter += 1 self.recover() + def force_retry(self) -> None: + """Cancels the existing timer and forces an immediate retry in the background. + + Args: + service: + """ + # Prevent the existing backoff from occuring + if self.scheduled_recovery: + self.clock.cancel_call_later(self.scheduled_recovery) + # Run a retry, which will resechedule a recovery if it fails. + run_as_background_process( + "retry", + self.retry, + ) + async def retry(self) -> None: logger.info("Starting retries on %s", self.service.id) try: