From 62894673e69f7beb0d0a748ad01c2e95c5fed106 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 2 Oct 2020 08:23:15 -0400 Subject: Allow background tasks to be run on a separate worker. (#8369) --- synapse/app/_base.py | 6 ++ synapse/app/admin_cmd.py | 1 + synapse/app/generic_worker.py | 4 + synapse/app/homeserver.py | 182 ------------------------------------ synapse/app/phone_stats_home.py | 202 ++++++++++++++++++++++++++++++++++++++++ 5 files changed, 213 insertions(+), 182 deletions(-) create mode 100644 synapse/app/phone_stats_home.py (limited to 'synapse/app') diff --git a/synapse/app/_base.py b/synapse/app/_base.py index fb476ddaf5..8bb0b142ca 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -28,6 +28,7 @@ from twisted.protocols.tls import TLSMemoryBIOFactory import synapse from synapse.app import check_bind_error +from synapse.app.phone_stats_home import start_phone_stats_home from synapse.config.server import ListenerConfig from synapse.crypto import context_factory from synapse.logging.context import PreserveLoggingContext @@ -274,6 +275,11 @@ def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerConfig]): setup_sentry(hs) setup_sdnotify(hs) + # If background tasks are running on the main process, start collecting the + # phone home stats. + if hs.config.run_background_tasks: + start_phone_stats_home(hs) + # We now freeze all allocated objects in the hopes that (almost) # everything currently allocated are things that will be used for the # rest of time. Doing so means less work each GC (hopefully). diff --git a/synapse/app/admin_cmd.py b/synapse/app/admin_cmd.py index 7d309b1bb0..f0d65d08d7 100644 --- a/synapse/app/admin_cmd.py +++ b/synapse/app/admin_cmd.py @@ -208,6 +208,7 @@ def start(config_options): # Explicitly disable background processes config.update_user_directory = False + config.run_background_tasks = False config.start_pushers = False config.send_federation = False diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index c38413c893..fc5188ce95 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -128,11 +128,13 @@ from synapse.rest.key.v2 import KeyApiV2Resource from synapse.server import HomeServer, cache_in_self from synapse.storage.databases.main.censor_events import CensorEventsStore from synapse.storage.databases.main.media_repository import MediaRepositoryStore +from synapse.storage.databases.main.metrics import ServerMetricsStore from synapse.storage.databases.main.monthly_active_users import ( MonthlyActiveUsersWorkerStore, ) from synapse.storage.databases.main.presence import UserPresenceState from synapse.storage.databases.main.search import SearchWorkerStore +from synapse.storage.databases.main.stats import StatsStore from synapse.storage.databases.main.ui_auth import UIAuthWorkerStore from synapse.storage.databases.main.user_directory import UserDirectoryStore from synapse.types import ReadReceipt @@ -454,6 +456,7 @@ class GenericWorkerSlavedStore( # FIXME(#3714): We need to add UserDirectoryStore as we write directly # rather than going via the correct worker. UserDirectoryStore, + StatsStore, UIAuthWorkerStore, SlavedDeviceInboxStore, SlavedDeviceStore, @@ -476,6 +479,7 @@ class GenericWorkerSlavedStore( SlavedFilteringStore, MonthlyActiveUsersWorkerStore, MediaRepositoryStore, + ServerMetricsStore, SearchWorkerStore, BaseSlavedStore, ): diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index dff739e106..4ed4a2c253 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -17,14 +17,10 @@ import gc import logging -import math import os -import resource import sys from typing import Iterable -from prometheus_client import Gauge - from twisted.application import service from twisted.internet import defer, reactor from twisted.python.failure import Failure @@ -60,7 +56,6 @@ from synapse.http.server import ( from synapse.http.site import SynapseSite from synapse.logging.context import LoggingContext from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy -from synapse.metrics.background_process_metrics import run_as_background_process from synapse.module_api import ModuleApi from synapse.python_dependencies import check_requirements from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource @@ -334,20 +329,6 @@ class SynapseHomeServer(HomeServer): logger.warning("Unrecognized listener type: %s", listener.type) -# Gauges to expose monthly active user control metrics -current_mau_gauge = Gauge("synapse_admin_mau:current", "Current MAU") -current_mau_by_service_gauge = Gauge( - "synapse_admin_mau_current_mau_by_service", - "Current MAU by service", - ["app_service"], -) -max_mau_gauge = Gauge("synapse_admin_mau:max", "MAU Limit") -registered_reserved_users_mau_gauge = Gauge( - "synapse_admin_mau:registered_reserved_users", - "Registered users with reserved threepids", -) - - def setup(config_options): """ Args: @@ -389,8 +370,6 @@ def setup(config_options): except UpgradeDatabaseException as e: quit_with_error("Failed to upgrade database: %s" % (e,)) - hs.setup_master() - async def do_acme() -> bool: """ Reprovision an ACME certificate, if it's required. @@ -486,92 +465,6 @@ class SynapseService(service.Service): return self._port.stopListening() -# Contains the list of processes we will be monitoring -# currently either 0 or 1 -_stats_process = [] - - -async def phone_stats_home(hs, stats, stats_process=_stats_process): - logger.info("Gathering stats for reporting") - now = int(hs.get_clock().time()) - uptime = int(now - hs.start_time) - if uptime < 0: - uptime = 0 - - # - # Performance statistics. Keep this early in the function to maintain reliability of `test_performance_100` test. - # - old = stats_process[0] - new = (now, resource.getrusage(resource.RUSAGE_SELF)) - stats_process[0] = new - - # Get RSS in bytes - stats["memory_rss"] = new[1].ru_maxrss - - # Get CPU time in % of a single core, not % of all cores - used_cpu_time = (new[1].ru_utime + new[1].ru_stime) - ( - old[1].ru_utime + old[1].ru_stime - ) - if used_cpu_time == 0 or new[0] == old[0]: - stats["cpu_average"] = 0 - else: - stats["cpu_average"] = math.floor(used_cpu_time / (new[0] - old[0]) * 100) - - # - # General statistics - # - - stats["homeserver"] = hs.config.server_name - stats["server_context"] = hs.config.server_context - stats["timestamp"] = now - stats["uptime_seconds"] = uptime - version = sys.version_info - stats["python_version"] = "{}.{}.{}".format( - version.major, version.minor, version.micro - ) - stats["total_users"] = await hs.get_datastore().count_all_users() - - total_nonbridged_users = await hs.get_datastore().count_nonbridged_users() - stats["total_nonbridged_users"] = total_nonbridged_users - - daily_user_type_results = await hs.get_datastore().count_daily_user_type() - for name, count in daily_user_type_results.items(): - stats["daily_user_type_" + name] = count - - room_count = await hs.get_datastore().get_room_count() - stats["total_room_count"] = room_count - - stats["daily_active_users"] = await hs.get_datastore().count_daily_users() - stats["monthly_active_users"] = await hs.get_datastore().count_monthly_users() - stats["daily_active_rooms"] = await hs.get_datastore().count_daily_active_rooms() - stats["daily_messages"] = await hs.get_datastore().count_daily_messages() - - r30_results = await hs.get_datastore().count_r30_users() - for name, count in r30_results.items(): - stats["r30_users_" + name] = count - - daily_sent_messages = await hs.get_datastore().count_daily_sent_messages() - stats["daily_sent_messages"] = daily_sent_messages - stats["cache_factor"] = hs.config.caches.global_factor - stats["event_cache_size"] = hs.config.caches.event_cache_size - - # - # Database version - # - - # This only reports info about the *main* database. - stats["database_engine"] = hs.get_datastore().db_pool.engine.module.__name__ - stats["database_server_version"] = hs.get_datastore().db_pool.engine.server_version - - logger.info("Reporting stats to %s: %s" % (hs.config.report_stats_endpoint, stats)) - try: - await hs.get_proxied_http_client().put_json( - hs.config.report_stats_endpoint, stats - ) - except Exception as e: - logger.warning("Error reporting stats: %s", e) - - def run(hs): PROFILE_SYNAPSE = False if PROFILE_SYNAPSE: @@ -597,81 +490,6 @@ def run(hs): ThreadPool._worker = profile(ThreadPool._worker) reactor.run = profile(reactor.run) - clock = hs.get_clock() - - stats = {} - - def performance_stats_init(): - _stats_process.clear() - _stats_process.append( - (int(hs.get_clock().time()), resource.getrusage(resource.RUSAGE_SELF)) - ) - - def start_phone_stats_home(): - return run_as_background_process( - "phone_stats_home", phone_stats_home, hs, stats - ) - - def generate_user_daily_visit_stats(): - return run_as_background_process( - "generate_user_daily_visits", hs.get_datastore().generate_user_daily_visits - ) - - # Rather than update on per session basis, batch up the requests. - # If you increase the loop period, the accuracy of user_daily_visits - # table will decrease - clock.looping_call(generate_user_daily_visit_stats, 5 * 60 * 1000) - - # monthly active user limiting functionality - def reap_monthly_active_users(): - return run_as_background_process( - "reap_monthly_active_users", hs.get_datastore().reap_monthly_active_users - ) - - clock.looping_call(reap_monthly_active_users, 1000 * 60 * 60) - reap_monthly_active_users() - - async def generate_monthly_active_users(): - current_mau_count = 0 - current_mau_count_by_service = {} - reserved_users = () - store = hs.get_datastore() - if hs.config.limit_usage_by_mau or hs.config.mau_stats_only: - current_mau_count = await store.get_monthly_active_count() - current_mau_count_by_service = ( - await store.get_monthly_active_count_by_service() - ) - reserved_users = await store.get_registered_reserved_users() - current_mau_gauge.set(float(current_mau_count)) - - for app_service, count in current_mau_count_by_service.items(): - current_mau_by_service_gauge.labels(app_service).set(float(count)) - - registered_reserved_users_mau_gauge.set(float(len(reserved_users))) - max_mau_gauge.set(float(hs.config.max_mau_value)) - - def start_generate_monthly_active_users(): - return run_as_background_process( - "generate_monthly_active_users", generate_monthly_active_users - ) - - start_generate_monthly_active_users() - if hs.config.limit_usage_by_mau or hs.config.mau_stats_only: - clock.looping_call(start_generate_monthly_active_users, 5 * 60 * 1000) - # End of monthly active user settings - - if hs.config.report_stats: - logger.info("Scheduling stats reporting for 3 hour intervals") - clock.looping_call(start_phone_stats_home, 3 * 60 * 60 * 1000) - - # We need to defer this init for the cases that we daemonize - # otherwise the process ID we get is that of the non-daemon process - clock.call_later(0, performance_stats_init) - - # We wait 5 minutes to send the first set of stats as the server can - # be quite busy the first few minutes - clock.call_later(5 * 60, start_phone_stats_home) - _base.start_reactor( "synapse-homeserver", soft_file_limit=hs.config.soft_file_limit, diff --git a/synapse/app/phone_stats_home.py b/synapse/app/phone_stats_home.py new file mode 100644 index 0000000000..2c8e14a8c0 --- /dev/null +++ b/synapse/app/phone_stats_home.py @@ -0,0 +1,202 @@ +# Copyright 2020 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import math +import resource +import sys + +from prometheus_client import Gauge + +from synapse.metrics.background_process_metrics import run_as_background_process + +logger = logging.getLogger("synapse.app.homeserver") + +# Contains the list of processes we will be monitoring +# currently either 0 or 1 +_stats_process = [] + +# Gauges to expose monthly active user control metrics +current_mau_gauge = Gauge("synapse_admin_mau:current", "Current MAU") +current_mau_by_service_gauge = Gauge( + "synapse_admin_mau_current_mau_by_service", + "Current MAU by service", + ["app_service"], +) +max_mau_gauge = Gauge("synapse_admin_mau:max", "MAU Limit") +registered_reserved_users_mau_gauge = Gauge( + "synapse_admin_mau:registered_reserved_users", + "Registered users with reserved threepids", +) + + +async def phone_stats_home(hs, stats, stats_process=_stats_process): + logger.info("Gathering stats for reporting") + now = int(hs.get_clock().time()) + uptime = int(now - hs.start_time) + if uptime < 0: + uptime = 0 + + # + # Performance statistics. Keep this early in the function to maintain reliability of `test_performance_100` test. + # + old = stats_process[0] + new = (now, resource.getrusage(resource.RUSAGE_SELF)) + stats_process[0] = new + + # Get RSS in bytes + stats["memory_rss"] = new[1].ru_maxrss + + # Get CPU time in % of a single core, not % of all cores + used_cpu_time = (new[1].ru_utime + new[1].ru_stime) - ( + old[1].ru_utime + old[1].ru_stime + ) + if used_cpu_time == 0 or new[0] == old[0]: + stats["cpu_average"] = 0 + else: + stats["cpu_average"] = math.floor(used_cpu_time / (new[0] - old[0]) * 100) + + # + # General statistics + # + + stats["homeserver"] = hs.config.server_name + stats["server_context"] = hs.config.server_context + stats["timestamp"] = now + stats["uptime_seconds"] = uptime + version = sys.version_info + stats["python_version"] = "{}.{}.{}".format( + version.major, version.minor, version.micro + ) + stats["total_users"] = await hs.get_datastore().count_all_users() + + total_nonbridged_users = await hs.get_datastore().count_nonbridged_users() + stats["total_nonbridged_users"] = total_nonbridged_users + + daily_user_type_results = await hs.get_datastore().count_daily_user_type() + for name, count in daily_user_type_results.items(): + stats["daily_user_type_" + name] = count + + room_count = await hs.get_datastore().get_room_count() + stats["total_room_count"] = room_count + + stats["daily_active_users"] = await hs.get_datastore().count_daily_users() + stats["monthly_active_users"] = await hs.get_datastore().count_monthly_users() + stats["daily_active_rooms"] = await hs.get_datastore().count_daily_active_rooms() + stats["daily_messages"] = await hs.get_datastore().count_daily_messages() + + r30_results = await hs.get_datastore().count_r30_users() + for name, count in r30_results.items(): + stats["r30_users_" + name] = count + + daily_sent_messages = await hs.get_datastore().count_daily_sent_messages() + stats["daily_sent_messages"] = daily_sent_messages + stats["cache_factor"] = hs.config.caches.global_factor + stats["event_cache_size"] = hs.config.caches.event_cache_size + + # + # Database version + # + + # This only reports info about the *main* database. + stats["database_engine"] = hs.get_datastore().db_pool.engine.module.__name__ + stats["database_server_version"] = hs.get_datastore().db_pool.engine.server_version + + logger.info("Reporting stats to %s: %s" % (hs.config.report_stats_endpoint, stats)) + try: + await hs.get_proxied_http_client().put_json( + hs.config.report_stats_endpoint, stats + ) + except Exception as e: + logger.warning("Error reporting stats: %s", e) + + +def start_phone_stats_home(hs): + """ + Start the background tasks which report phone home stats. + """ + clock = hs.get_clock() + + stats = {} + + def performance_stats_init(): + _stats_process.clear() + _stats_process.append( + (int(hs.get_clock().time()), resource.getrusage(resource.RUSAGE_SELF)) + ) + + def start_phone_stats_home(): + return run_as_background_process( + "phone_stats_home", phone_stats_home, hs, stats + ) + + def generate_user_daily_visit_stats(): + return run_as_background_process( + "generate_user_daily_visits", hs.get_datastore().generate_user_daily_visits + ) + + # Rather than update on per session basis, batch up the requests. + # If you increase the loop period, the accuracy of user_daily_visits + # table will decrease + clock.looping_call(generate_user_daily_visit_stats, 5 * 60 * 1000) + + # monthly active user limiting functionality + def reap_monthly_active_users(): + return run_as_background_process( + "reap_monthly_active_users", hs.get_datastore().reap_monthly_active_users + ) + + clock.looping_call(reap_monthly_active_users, 1000 * 60 * 60) + reap_monthly_active_users() + + async def generate_monthly_active_users(): + current_mau_count = 0 + current_mau_count_by_service = {} + reserved_users = () + store = hs.get_datastore() + if hs.config.limit_usage_by_mau or hs.config.mau_stats_only: + current_mau_count = await store.get_monthly_active_count() + current_mau_count_by_service = ( + await store.get_monthly_active_count_by_service() + ) + reserved_users = await store.get_registered_reserved_users() + current_mau_gauge.set(float(current_mau_count)) + + for app_service, count in current_mau_count_by_service.items(): + current_mau_by_service_gauge.labels(app_service).set(float(count)) + + registered_reserved_users_mau_gauge.set(float(len(reserved_users))) + max_mau_gauge.set(float(hs.config.max_mau_value)) + + def start_generate_monthly_active_users(): + return run_as_background_process( + "generate_monthly_active_users", generate_monthly_active_users + ) + + if hs.config.limit_usage_by_mau or hs.config.mau_stats_only: + start_generate_monthly_active_users() + clock.looping_call(start_generate_monthly_active_users, 5 * 60 * 1000) + # End of monthly active user settings + + if hs.config.report_stats: + logger.info("Scheduling stats reporting for 3 hour intervals") + clock.looping_call(start_phone_stats_home, 3 * 60 * 60 * 1000) + + # We need to defer this init for the cases that we daemonize + # otherwise the process ID we get is that of the non-daemon process + clock.call_later(0, performance_stats_init) + + # We wait 5 minutes to send the first set of stats as the server can + # be quite busy the first few minutes + clock.call_later(5 * 60, start_phone_stats_home) -- cgit 1.5.1 From e3debf9682ed59b2972f236fe2982b6af0a9bb9a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 2 Oct 2020 15:20:45 +0100 Subject: Add logging on startup/shutdown (#8448) This is so we can tell what is going on when things are taking a while to start up. The main change here is to ensure that transactions that are created during startup get correctly logged like normal transactions. --- changelog.d/8448.misc | 1 + scripts/synapse_port_db | 2 +- synapse/app/_base.py | 5 ++ synapse/storage/database.py | 89 ++++++++++++++++++---- synapse/storage/databases/__init__.py | 2 +- synapse/storage/databases/main/__init__.py | 1 - .../storage/databases/main/event_push_actions.py | 8 +- .../storage/databases/main/monthly_active_users.py | 1 - synapse/storage/databases/main/roommember.py | 13 +--- .../databases/main/schema/delta/20/pushers.py | 19 +++-- .../storage/databases/main/schema/delta/25/fts.py | 2 - .../storage/databases/main/schema/delta/27/ts.py | 2 - .../databases/main/schema/delta/30/as_users.py | 6 +- .../databases/main/schema/delta/31/pushers.py | 19 +++-- .../main/schema/delta/31/search_update.py | 2 - .../databases/main/schema/delta/33/event_fields.py | 2 - .../main/schema/delta/33/remote_media_ts.py | 5 +- .../schema/delta/56/unique_user_filter_index.py | 7 +- .../schema/delta/57/local_current_membership.py | 1 - synapse/storage/prepare_database.py | 33 +++----- synapse/storage/types.py | 6 ++ synapse/storage/util/id_generators.py | 8 +- synapse/storage/util/sequence.py | 15 +++- tests/storage/test_appservice.py | 14 ++-- tests/utils.py | 2 + 25 files changed, 152 insertions(+), 113 deletions(-) create mode 100644 changelog.d/8448.misc (limited to 'synapse/app') diff --git a/changelog.d/8448.misc b/changelog.d/8448.misc new file mode 100644 index 0000000000..5ddda1803b --- /dev/null +++ b/changelog.d/8448.misc @@ -0,0 +1 @@ +Add SQL logging on queries that happen during startup. diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index ae2887b7d2..7e12f5440c 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -489,7 +489,7 @@ class Porter(object): hs = MockHomeserver(self.hs_config) - with make_conn(db_config, engine) as db_conn: + with make_conn(db_config, engine, "portdb") as db_conn: engine.check_database( db_conn, allow_outdated_version=allow_outdated_version ) diff --git a/synapse/app/_base.py b/synapse/app/_base.py index 8bb0b142ca..f6f7b2bf42 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -272,6 +272,11 @@ def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerConfig]): hs.get_datastore().db_pool.start_profiling() hs.get_pusherpool().start() + # Log when we start the shut down process. + hs.get_reactor().addSystemEventTrigger( + "before", "shutdown", logger.info, "Shutting down..." + ) + setup_sentry(hs) setup_sdnotify(hs) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 79ec8f119d..0d9d9b7cc0 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -32,6 +32,7 @@ from typing import ( overload, ) +import attr from prometheus_client import Histogram from typing_extensions import Literal @@ -90,13 +91,17 @@ def make_pool( return adbapi.ConnectionPool( db_config.config["name"], cp_reactor=reactor, - cp_openfun=engine.on_new_connection, + cp_openfun=lambda conn: engine.on_new_connection( + LoggingDatabaseConnection(conn, engine, "on_new_connection") + ), **db_config.config.get("args", {}) ) def make_conn( - db_config: DatabaseConnectionConfig, engine: BaseDatabaseEngine + db_config: DatabaseConnectionConfig, + engine: BaseDatabaseEngine, + default_txn_name: str, ) -> Connection: """Make a new connection to the database and return it. @@ -109,11 +114,60 @@ def make_conn( for k, v in db_config.config.get("args", {}).items() if not k.startswith("cp_") } - db_conn = engine.module.connect(**db_params) + native_db_conn = engine.module.connect(**db_params) + db_conn = LoggingDatabaseConnection(native_db_conn, engine, default_txn_name) + engine.on_new_connection(db_conn) return db_conn +@attr.s(slots=True) +class LoggingDatabaseConnection: + """A wrapper around a database connection that returns `LoggingTransaction` + as its cursor class. + + This is mainly used on startup to ensure that queries get logged correctly + """ + + conn = attr.ib(type=Connection) + engine = attr.ib(type=BaseDatabaseEngine) + default_txn_name = attr.ib(type=str) + + def cursor( + self, *, txn_name=None, after_callbacks=None, exception_callbacks=None + ) -> "LoggingTransaction": + if not txn_name: + txn_name = self.default_txn_name + + return LoggingTransaction( + self.conn.cursor(), + name=txn_name, + database_engine=self.engine, + after_callbacks=after_callbacks, + exception_callbacks=exception_callbacks, + ) + + def close(self) -> None: + self.conn.close() + + def commit(self) -> None: + self.conn.commit() + + def rollback(self, *args, **kwargs) -> None: + self.conn.rollback(*args, **kwargs) + + def __enter__(self) -> "Connection": + self.conn.__enter__() + return self + + def __exit__(self, exc_type, exc_value, traceback) -> bool: + return self.conn.__exit__(exc_type, exc_value, traceback) + + # Proxy through any unknown lookups to the DB conn class. + def __getattr__(self, name): + return getattr(self.conn, name) + + # The type of entry which goes on our after_callbacks and exception_callbacks lists. # # Python 3.5.2 doesn't support Callable with an ellipsis, so we wrap it in quotes so @@ -247,6 +301,12 @@ class LoggingTransaction: def close(self) -> None: self.txn.close() + def __enter__(self) -> "LoggingTransaction": + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.close() + class PerformanceCounters: def __init__(self): @@ -395,7 +455,7 @@ class DatabasePool: def new_transaction( self, - conn: Connection, + conn: LoggingDatabaseConnection, desc: str, after_callbacks: List[_CallbackListEntry], exception_callbacks: List[_CallbackListEntry], @@ -418,12 +478,10 @@ class DatabasePool: i = 0 N = 5 while True: - cursor = LoggingTransaction( - conn.cursor(), - name, - self.engine, - after_callbacks, - exception_callbacks, + cursor = conn.cursor( + txn_name=name, + after_callbacks=after_callbacks, + exception_callbacks=exception_callbacks, ) try: r = func(cursor, *args, **kwargs) @@ -584,7 +642,10 @@ class DatabasePool: logger.debug("Reconnecting closed database connection") conn.reconnect() - return func(conn, *args, **kwargs) + db_conn = LoggingDatabaseConnection( + conn, self.engine, "runWithConnection" + ) + return func(db_conn, *args, **kwargs) return await make_deferred_yieldable( self._db_pool.runWithConnection(inner_func, *args, **kwargs) @@ -1621,7 +1682,7 @@ class DatabasePool: def get_cache_dict( self, - db_conn: Connection, + db_conn: LoggingDatabaseConnection, table: str, entity_column: str, stream_column: str, @@ -1642,9 +1703,7 @@ class DatabasePool: "limit": limit, } - sql = self.engine.convert_param_style(sql) - - txn = db_conn.cursor() + txn = db_conn.cursor(txn_name="get_cache_dict") txn.execute(sql, (int(max_value),)) cache = {row[0]: int(row[1]) for row in txn} diff --git a/synapse/storage/databases/__init__.py b/synapse/storage/databases/__init__.py index aa5d490624..0c24325011 100644 --- a/synapse/storage/databases/__init__.py +++ b/synapse/storage/databases/__init__.py @@ -46,7 +46,7 @@ class Databases: db_name = database_config.name engine = create_engine(database_config.config) - with make_conn(database_config, engine) as db_conn: + with make_conn(database_config, engine, "startup") as db_conn: logger.info("[database config %r]: Checking database server", db_name) engine.check_database(db_conn) diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py index f823d66709..9b16f45f3e 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py @@ -284,7 +284,6 @@ class DataStore( " last_user_sync_ts, status_msg, currently_active FROM presence_stream" " WHERE state != ?" ) - sql = self.database_engine.convert_param_style(sql) txn = db_conn.cursor() txn.execute(sql, (PresenceState.OFFLINE,)) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 62f1738732..80f3b4d740 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -20,7 +20,7 @@ from typing import Dict, List, Optional, Tuple, Union import attr from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.storage._base import LoggingTransaction, SQLBaseStore, db_to_json +from synapse.storage._base import SQLBaseStore, db_to_json from synapse.storage.database import DatabasePool from synapse.util import json_encoder from synapse.util.caches.descriptors import cached @@ -74,11 +74,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): self.stream_ordering_month_ago = None self.stream_ordering_day_ago = None - cur = LoggingTransaction( - db_conn.cursor(), - name="_find_stream_orderings_for_times_txn", - database_engine=self.database_engine, - ) + cur = db_conn.cursor(txn_name="_find_stream_orderings_for_times_txn") self._find_stream_orderings_for_times_txn(cur) cur.close() diff --git a/synapse/storage/databases/main/monthly_active_users.py b/synapse/storage/databases/main/monthly_active_users.py index b2127598ef..c66f558567 100644 --- a/synapse/storage/databases/main/monthly_active_users.py +++ b/synapse/storage/databases/main/monthly_active_users.py @@ -214,7 +214,6 @@ class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore): self._mau_stats_only = hs.config.mau_stats_only # Do not add more reserved users than the total allowable number - # cur = LoggingTransaction( self.db_pool.new_transaction( db_conn, "initialise_mau_threepids", diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 86ffe2479e..bae1bd22d3 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -21,12 +21,7 @@ from synapse.events import EventBase from synapse.events.snapshot import EventContext from synapse.metrics import LaterGauge from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.storage._base import ( - LoggingTransaction, - SQLBaseStore, - db_to_json, - make_in_list_sql_clause, -) +from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause from synapse.storage.database import DatabasePool from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.engines import Sqlite3Engine @@ -60,10 +55,8 @@ class RoomMemberWorkerStore(EventsWorkerStore): # background update still running? self._current_state_events_membership_up_to_date = False - txn = LoggingTransaction( - db_conn.cursor(), - name="_check_safe_current_state_events_membership_updated", - database_engine=self.database_engine, + txn = db_conn.cursor( + txn_name="_check_safe_current_state_events_membership_updated" ) self._check_safe_current_state_events_membership_updated_txn(txn) txn.close() diff --git a/synapse/storage/databases/main/schema/delta/20/pushers.py b/synapse/storage/databases/main/schema/delta/20/pushers.py index 3edfcfd783..45b846e6a7 100644 --- a/synapse/storage/databases/main/schema/delta/20/pushers.py +++ b/synapse/storage/databases/main/schema/delta/20/pushers.py @@ -66,16 +66,15 @@ def run_create(cur, database_engine, *args, **kwargs): row[8] = bytes(row[8]).decode("utf-8") row[11] = bytes(row[11]).decode("utf-8") cur.execute( - database_engine.convert_param_style( - """ - INSERT into pushers2 ( - id, user_name, access_token, profile_tag, kind, - app_id, app_display_name, device_display_name, - pushkey, ts, lang, data, last_token, last_success, - failing_since - ) values (%s)""" - % (",".join(["?" for _ in range(len(row))])) - ), + """ + INSERT into pushers2 ( + id, user_name, access_token, profile_tag, kind, + app_id, app_display_name, device_display_name, + pushkey, ts, lang, data, last_token, last_success, + failing_since + ) values (%s) + """ + % (",".join(["?" for _ in range(len(row))])), row, ) count += 1 diff --git a/synapse/storage/databases/main/schema/delta/25/fts.py b/synapse/storage/databases/main/schema/delta/25/fts.py index ee675e71ff..21f57825d4 100644 --- a/synapse/storage/databases/main/schema/delta/25/fts.py +++ b/synapse/storage/databases/main/schema/delta/25/fts.py @@ -71,8 +71,6 @@ def run_create(cur, database_engine, *args, **kwargs): " VALUES (?, ?)" ) - sql = database_engine.convert_param_style(sql) - cur.execute(sql, ("event_search", progress_json)) diff --git a/synapse/storage/databases/main/schema/delta/27/ts.py b/synapse/storage/databases/main/schema/delta/27/ts.py index b7972cfa8e..1c6058063f 100644 --- a/synapse/storage/databases/main/schema/delta/27/ts.py +++ b/synapse/storage/databases/main/schema/delta/27/ts.py @@ -50,8 +50,6 @@ def run_create(cur, database_engine, *args, **kwargs): " VALUES (?, ?)" ) - sql = database_engine.convert_param_style(sql) - cur.execute(sql, ("event_origin_server_ts", progress_json)) diff --git a/synapse/storage/databases/main/schema/delta/30/as_users.py b/synapse/storage/databases/main/schema/delta/30/as_users.py index b42c02710a..7f08fabe9f 100644 --- a/synapse/storage/databases/main/schema/delta/30/as_users.py +++ b/synapse/storage/databases/main/schema/delta/30/as_users.py @@ -59,9 +59,7 @@ def run_upgrade(cur, database_engine, config, *args, **kwargs): user_chunks = (user_ids[i : i + 100] for i in range(0, len(user_ids), n)) for chunk in user_chunks: cur.execute( - database_engine.convert_param_style( - "UPDATE users SET appservice_id = ? WHERE name IN (%s)" - % (",".join("?" for _ in chunk),) - ), + "UPDATE users SET appservice_id = ? WHERE name IN (%s)" + % (",".join("?" for _ in chunk),), [as_id] + chunk, ) diff --git a/synapse/storage/databases/main/schema/delta/31/pushers.py b/synapse/storage/databases/main/schema/delta/31/pushers.py index 9bb504aad5..5be81c806a 100644 --- a/synapse/storage/databases/main/schema/delta/31/pushers.py +++ b/synapse/storage/databases/main/schema/delta/31/pushers.py @@ -65,16 +65,15 @@ def run_create(cur, database_engine, *args, **kwargs): row = list(row) row[12] = token_to_stream_ordering(row[12]) cur.execute( - database_engine.convert_param_style( - """ - INSERT into pushers2 ( - id, user_name, access_token, profile_tag, kind, - app_id, app_display_name, device_display_name, - pushkey, ts, lang, data, last_stream_ordering, last_success, - failing_since - ) values (%s)""" - % (",".join(["?" for _ in range(len(row))])) - ), + """ + INSERT into pushers2 ( + id, user_name, access_token, profile_tag, kind, + app_id, app_display_name, device_display_name, + pushkey, ts, lang, data, last_stream_ordering, last_success, + failing_since + ) values (%s) + """ + % (",".join(["?" for _ in range(len(row))])), row, ) count += 1 diff --git a/synapse/storage/databases/main/schema/delta/31/search_update.py b/synapse/storage/databases/main/schema/delta/31/search_update.py index 63b757ade6..b84c844e3a 100644 --- a/synapse/storage/databases/main/schema/delta/31/search_update.py +++ b/synapse/storage/databases/main/schema/delta/31/search_update.py @@ -55,8 +55,6 @@ def run_create(cur, database_engine, *args, **kwargs): " VALUES (?, ?)" ) - sql = database_engine.convert_param_style(sql) - cur.execute(sql, ("event_search_order", progress_json)) diff --git a/synapse/storage/databases/main/schema/delta/33/event_fields.py b/synapse/storage/databases/main/schema/delta/33/event_fields.py index a3e81eeac7..e928c66a8f 100644 --- a/synapse/storage/databases/main/schema/delta/33/event_fields.py +++ b/synapse/storage/databases/main/schema/delta/33/event_fields.py @@ -50,8 +50,6 @@ def run_create(cur, database_engine, *args, **kwargs): " VALUES (?, ?)" ) - sql = database_engine.convert_param_style(sql) - cur.execute(sql, ("event_fields_sender_url", progress_json)) diff --git a/synapse/storage/databases/main/schema/delta/33/remote_media_ts.py b/synapse/storage/databases/main/schema/delta/33/remote_media_ts.py index a26057dfb6..ad875c733a 100644 --- a/synapse/storage/databases/main/schema/delta/33/remote_media_ts.py +++ b/synapse/storage/databases/main/schema/delta/33/remote_media_ts.py @@ -23,8 +23,5 @@ def run_create(cur, database_engine, *args, **kwargs): def run_upgrade(cur, database_engine, *args, **kwargs): cur.execute( - database_engine.convert_param_style( - "UPDATE remote_media_cache SET last_access_ts = ?" - ), - (int(time.time() * 1000),), + "UPDATE remote_media_cache SET last_access_ts = ?", (int(time.time() * 1000),), ) diff --git a/synapse/storage/databases/main/schema/delta/56/unique_user_filter_index.py b/synapse/storage/databases/main/schema/delta/56/unique_user_filter_index.py index 1de8b54961..bb7296852a 100644 --- a/synapse/storage/databases/main/schema/delta/56/unique_user_filter_index.py +++ b/synapse/storage/databases/main/schema/delta/56/unique_user_filter_index.py @@ -1,6 +1,8 @@ import logging +from io import StringIO from synapse.storage.engines import PostgresEngine +from synapse.storage.prepare_database import execute_statements_from_stream logger = logging.getLogger(__name__) @@ -46,7 +48,4 @@ def run_create(cur, database_engine, *args, **kwargs): select_clause, ) - if isinstance(database_engine, PostgresEngine): - cur.execute(sql) - else: - cur.executescript(sql) + execute_statements_from_stream(cur, StringIO(sql)) diff --git a/synapse/storage/databases/main/schema/delta/57/local_current_membership.py b/synapse/storage/databases/main/schema/delta/57/local_current_membership.py index 63b5acdcf7..44917f0a2e 100644 --- a/synapse/storage/databases/main/schema/delta/57/local_current_membership.py +++ b/synapse/storage/databases/main/schema/delta/57/local_current_membership.py @@ -68,7 +68,6 @@ def run_upgrade(cur, database_engine, config, *args, **kwargs): INNER JOIN room_memberships AS r USING (event_id) WHERE type = 'm.room.member' AND state_key LIKE ? """ - sql = database_engine.convert_param_style(sql) cur.execute(sql, ("%:" + config.server_name,)) cur.execute( diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 4957e77f4c..459754feab 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -13,7 +13,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import imp import logging import os @@ -24,9 +23,10 @@ from typing import Optional, TextIO import attr from synapse.config.homeserver import HomeServerConfig +from synapse.storage.database import LoggingDatabaseConnection from synapse.storage.engines import BaseDatabaseEngine from synapse.storage.engines.postgres import PostgresEngine -from synapse.storage.types import Connection, Cursor +from synapse.storage.types import Cursor from synapse.types import Collection logger = logging.getLogger(__name__) @@ -67,7 +67,7 @@ UNAPPLIED_DELTA_ON_WORKER_ERROR = ( def prepare_database( - db_conn: Connection, + db_conn: LoggingDatabaseConnection, database_engine: BaseDatabaseEngine, config: Optional[HomeServerConfig], databases: Collection[str] = ["main", "state"], @@ -89,7 +89,7 @@ def prepare_database( """ try: - cur = db_conn.cursor() + cur = db_conn.cursor(txn_name="prepare_database") # sqlite does not automatically start transactions for DDL / SELECT statements, # so we start one before running anything. This ensures that any upgrades @@ -258,9 +258,7 @@ def _setup_new_database(cur, database_engine, databases): executescript(cur, entry.absolute_path) cur.execute( - database_engine.convert_param_style( - "INSERT INTO schema_version (version, upgraded) VALUES (?,?)" - ), + "INSERT INTO schema_version (version, upgraded) VALUES (?,?)", (max_current_ver, False), ) @@ -486,17 +484,13 @@ def _upgrade_existing_database( # Mark as done. cur.execute( - database_engine.convert_param_style( - "INSERT INTO applied_schema_deltas (version, file) VALUES (?,?)" - ), + "INSERT INTO applied_schema_deltas (version, file) VALUES (?,?)", (v, relative_path), ) cur.execute("DELETE FROM schema_version") cur.execute( - database_engine.convert_param_style( - "INSERT INTO schema_version (version, upgraded) VALUES (?,?)" - ), + "INSERT INTO schema_version (version, upgraded) VALUES (?,?)", (v, True), ) @@ -532,10 +526,7 @@ def _apply_module_schema_files(cur, database_engine, modname, names_and_streams) schemas to be applied """ cur.execute( - database_engine.convert_param_style( - "SELECT file FROM applied_module_schemas WHERE module_name = ?" - ), - (modname,), + "SELECT file FROM applied_module_schemas WHERE module_name = ?", (modname,), ) applied_deltas = {d for d, in cur} for (name, stream) in names_and_streams: @@ -553,9 +544,7 @@ def _apply_module_schema_files(cur, database_engine, modname, names_and_streams) # Mark as done. cur.execute( - database_engine.convert_param_style( - "INSERT INTO applied_module_schemas (module_name, file) VALUES (?,?)" - ), + "INSERT INTO applied_module_schemas (module_name, file) VALUES (?,?)", (modname, name), ) @@ -627,9 +616,7 @@ def _get_or_create_schema_state(txn, database_engine): if current_version: txn.execute( - database_engine.convert_param_style( - "SELECT file FROM applied_schema_deltas WHERE version >= ?" - ), + "SELECT file FROM applied_schema_deltas WHERE version >= ?", (current_version,), ) applied_deltas = [d for d, in txn] diff --git a/synapse/storage/types.py b/synapse/storage/types.py index 2d2b560e74..970bb1b9da 100644 --- a/synapse/storage/types.py +++ b/synapse/storage/types.py @@ -61,3 +61,9 @@ class Connection(Protocol): def rollback(self, *args, **kwargs) -> None: ... + + def __enter__(self) -> "Connection": + ... + + def __exit__(self, exc_type, exc_value, traceback) -> bool: + ... diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index c92cd4a6ba..51f680d05d 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -54,7 +54,7 @@ def _load_current_id(db_conn, table, column, step=1): """ # debug logging for https://github.com/matrix-org/synapse/issues/7968 logger.info("initialising stream generator for %s(%s)", table, column) - cur = db_conn.cursor() + cur = db_conn.cursor(txn_name="_load_current_id") if step == 1: cur.execute("SELECT MAX(%s) FROM %s" % (column, table)) else: @@ -269,7 +269,7 @@ class MultiWriterIdGenerator: def _load_current_ids( self, db_conn, table: str, instance_column: str, id_column: str ): - cur = db_conn.cursor() + cur = db_conn.cursor(txn_name="_load_current_ids") # Load the current positions of all writers for the stream. if self._writers: @@ -283,15 +283,12 @@ class MultiWriterIdGenerator: stream_name = ? AND instance_name != ALL(?) """ - sql = self._db.engine.convert_param_style(sql) cur.execute(sql, (self._stream_name, self._writers)) sql = """ SELECT instance_name, stream_id FROM stream_positions WHERE stream_name = ? """ - sql = self._db.engine.convert_param_style(sql) - cur.execute(sql, (self._stream_name,)) self._current_positions = { @@ -340,7 +337,6 @@ class MultiWriterIdGenerator: "instance": instance_column, "cmp": "<=" if self._positive else ">=", } - sql = self._db.engine.convert_param_style(sql) cur.execute(sql, (min_stream_id * self._return_factor,)) self._persisted_upto_position = min_stream_id diff --git a/synapse/storage/util/sequence.py b/synapse/storage/util/sequence.py index 2dd95e2709..ff2d038ad2 100644 --- a/synapse/storage/util/sequence.py +++ b/synapse/storage/util/sequence.py @@ -17,6 +17,7 @@ import logging import threading from typing import Callable, List, Optional +from synapse.storage.database import LoggingDatabaseConnection from synapse.storage.engines import ( BaseDatabaseEngine, IncorrectDatabaseSetup, @@ -53,7 +54,11 @@ class SequenceGenerator(metaclass=abc.ABCMeta): @abc.abstractmethod def check_consistency( - self, db_conn: Connection, table: str, id_column: str, positive: bool = True + self, + db_conn: LoggingDatabaseConnection, + table: str, + id_column: str, + positive: bool = True, ): """Should be called during start up to test that the current value of the sequence is greater than or equal to the maximum ID in the table. @@ -82,9 +87,13 @@ class PostgresSequenceGenerator(SequenceGenerator): return [i for (i,) in txn] def check_consistency( - self, db_conn: Connection, table: str, id_column: str, positive: bool = True + self, + db_conn: LoggingDatabaseConnection, + table: str, + id_column: str, + positive: bool = True, ): - txn = db_conn.cursor() + txn = db_conn.cursor(txn_name="sequence.check_consistency") # First we get the current max ID from the table. table_sql = "SELECT GREATEST(%(agg)s(%(id)s), 0) FROM %(table)s" % { diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index 46f94914ff..c905a38930 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -58,7 +58,7 @@ class ApplicationServiceStoreTestCase(unittest.TestCase): # must be done after inserts database = hs.get_datastores().databases[0] self.store = ApplicationServiceStore( - database, make_conn(database._database_config, database.engine), hs + database, make_conn(database._database_config, database.engine, "test"), hs ) def tearDown(self): @@ -132,7 +132,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): db_config = hs.config.get_single_database() self.store = TestTransactionStore( - database, make_conn(db_config, self.engine), hs + database, make_conn(db_config, self.engine, "test"), hs ) def _add_service(self, url, as_token, id): @@ -448,7 +448,7 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase): database = hs.get_datastores().databases[0] ApplicationServiceStore( - database, make_conn(database._database_config, database.engine), hs + database, make_conn(database._database_config, database.engine, "test"), hs ) @defer.inlineCallbacks @@ -467,7 +467,9 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase): with self.assertRaises(ConfigError) as cm: database = hs.get_datastores().databases[0] ApplicationServiceStore( - database, make_conn(database._database_config, database.engine), hs + database, + make_conn(database._database_config, database.engine, "test"), + hs, ) e = cm.exception @@ -491,7 +493,9 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase): with self.assertRaises(ConfigError) as cm: database = hs.get_datastores().databases[0] ApplicationServiceStore( - database, make_conn(database._database_config, database.engine), hs + database, + make_conn(database._database_config, database.engine, "test"), + hs, ) e = cm.exception diff --git a/tests/utils.py b/tests/utils.py index 7a927c7f74..af563ffe0f 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -38,6 +38,7 @@ from synapse.http.server import HttpServer from synapse.logging.context import current_context, set_current_context from synapse.server import HomeServer from synapse.storage import DataStore +from synapse.storage.database import LoggingDatabaseConnection from synapse.storage.engines import PostgresEngine, create_engine from synapse.storage.prepare_database import prepare_database from synapse.util.ratelimitutils import FederationRateLimiter @@ -88,6 +89,7 @@ def setupdb(): host=POSTGRES_HOST, password=POSTGRES_PASSWORD, ) + db_conn = LoggingDatabaseConnection(db_conn, db_engine, "tests") prepare_database(db_conn, db_engine, None) db_conn.close() -- cgit 1.5.1 From 4f0637346a194a3343b4fea6cf38c1548e56648d Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Wed, 7 Oct 2020 12:03:26 +0100 Subject: Combine `SpamCheckerApi` with the more generic `ModuleApi`. (#8464) Lots of different module apis is not easy to maintain. Rather than adding yet another ModuleApi(hs, hs.get_auth_handler()) incantation, first add an hs.get_module_api() method and use it where possible. --- changelog.d/8464.misc | 1 + docs/spam_checker.md | 9 +++----- synapse/app/homeserver.py | 3 +-- synapse/events/spamcheck.py | 5 +++-- synapse/events/third_party_rules.py | 3 +-- synapse/handlers/auth.py | 7 ++++++ synapse/module_api/__init__.py | 29 +++++++++++++++++++++++- synapse/server.py | 5 +++++ synapse/spam_checker_api/__init__.py | 43 ------------------------------------ tests/module_api/test_api.py | 4 ++-- 10 files changed, 51 insertions(+), 58 deletions(-) create mode 100644 changelog.d/8464.misc (limited to 'synapse/app') diff --git a/changelog.d/8464.misc b/changelog.d/8464.misc new file mode 100644 index 0000000000..a552e88f9f --- /dev/null +++ b/changelog.d/8464.misc @@ -0,0 +1 @@ +Combine `SpamCheckerApi` with the more generic `ModuleApi`. diff --git a/docs/spam_checker.md b/docs/spam_checker.md index eb10e115f9..7fc08f1b70 100644 --- a/docs/spam_checker.md +++ b/docs/spam_checker.md @@ -11,7 +11,7 @@ able to be imported by the running Synapse. The Python class is instantiated with two objects: * Any configuration (see below). -* An instance of `synapse.spam_checker_api.SpamCheckerApi`. +* An instance of `synapse.module_api.ModuleApi`. It then implements methods which return a boolean to alter behavior in Synapse. @@ -26,11 +26,8 @@ well as some specific methods: The details of the each of these methods (as well as their inputs and outputs) are documented in the `synapse.events.spamcheck.SpamChecker` class. -The `SpamCheckerApi` class provides a way for the custom spam checker class to -call back into the homeserver internals. It currently implements the following -methods: - -* `get_state_events_in_room` +The `ModuleApi` class provides a way for the custom spam checker class to +call back into the homeserver internals. ### Example diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 4ed4a2c253..2b5465417f 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -56,7 +56,6 @@ from synapse.http.server import ( from synapse.http.site import SynapseSite from synapse.logging.context import LoggingContext from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy -from synapse.module_api import ModuleApi from synapse.python_dependencies import check_requirements from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory @@ -106,7 +105,7 @@ class SynapseHomeServer(HomeServer): additional_resources = listener_config.http_options.additional_resources logger.debug("Configuring additional resources: %r", additional_resources) - module_api = ModuleApi(self, self.get_auth_handler()) + module_api = self.get_module_api() for path, resmodule in additional_resources.items(): handler_cls, config = load_module(resmodule) handler = handler_cls(config, module_api) diff --git a/synapse/events/spamcheck.py b/synapse/events/spamcheck.py index b0fc859a47..bad18f7fdf 100644 --- a/synapse/events/spamcheck.py +++ b/synapse/events/spamcheck.py @@ -17,24 +17,25 @@ import inspect from typing import Any, Dict, List, Optional, Tuple -from synapse.spam_checker_api import RegistrationBehaviour, SpamCheckerApi +from synapse.spam_checker_api import RegistrationBehaviour from synapse.types import Collection MYPY = False if MYPY: + import synapse.events import synapse.server class SpamChecker: def __init__(self, hs: "synapse.server.HomeServer"): self.spam_checkers = [] # type: List[Any] + api = hs.get_module_api() for module, config in hs.config.spam_checkers: # Older spam checkers don't accept the `api` argument, so we # try and detect support. spam_args = inspect.getfullargspec(module) if "api" in spam_args.args: - api = SpamCheckerApi(hs) self.spam_checkers.append(module(config=config, api=api)) else: self.spam_checkers.append(module(config=config)) diff --git a/synapse/events/third_party_rules.py b/synapse/events/third_party_rules.py index e38b8e67fb..1535cc5339 100644 --- a/synapse/events/third_party_rules.py +++ b/synapse/events/third_party_rules.py @@ -16,7 +16,6 @@ from typing import Callable from synapse.events import EventBase from synapse.events.snapshot import EventContext -from synapse.module_api import ModuleApi from synapse.types import Requester, StateMap @@ -40,7 +39,7 @@ class ThirdPartyEventRules: if module is not None: self.third_party_rules = module( - config=config, module_api=ModuleApi(hs, hs.get_auth_handler()), + config=config, module_api=hs.get_module_api(), ) async def check_event_allowed( diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 7c4b716b28..f6d17c53b1 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -164,7 +164,14 @@ class AuthHandler(BaseHandler): self.bcrypt_rounds = hs.config.bcrypt_rounds + # we can't use hs.get_module_api() here, because to do so will create an + # import loop. + # + # TODO: refactor this class to separate the lower-level stuff that + # ModuleApi can use from the higher-level stuff that uses ModuleApi, as + # better way to break the loop account_handler = ModuleApi(hs, self) + self.password_providers = [ module(config=config, account_handler=account_handler) for module, config in hs.config.password_providers diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index 646f09d2bc..b410e3ad9c 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -14,13 +14,14 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Iterable, Optional, Tuple from twisted.internet import defer from synapse.http.client import SimpleHttpClient from synapse.http.site import SynapseRequest from synapse.logging.context import make_deferred_yieldable, run_in_background +from synapse.storage.state import StateFilter from synapse.types import UserID if TYPE_CHECKING: @@ -293,6 +294,32 @@ class ModuleApi: registered_user_id, request, client_redirect_url, ) + @defer.inlineCallbacks + def get_state_events_in_room( + self, room_id: str, types: Iterable[Tuple[str, Optional[str]]] + ) -> defer.Deferred: + """Gets current state events for the given room. + + (This is exposed for compatibility with the old SpamCheckerApi. We should + probably deprecate it and replace it with an async method in a subclass.) + + Args: + room_id: The room ID to get state events in. + types: The event type and state key (using None + to represent 'any') of the room state to acquire. + + Returns: + twisted.internet.defer.Deferred[list(synapse.events.FrozenEvent)]: + The filtered state events in the room. + """ + state_ids = yield defer.ensureDeferred( + self._store.get_filtered_current_state_ids( + room_id=room_id, state_filter=StateFilter.from_types(types) + ) + ) + state = yield defer.ensureDeferred(self._store.get_events(state_ids.values())) + return state.values() + class PublicRoomListManager: """Contains methods for adding to, removing from and querying whether a room diff --git a/synapse/server.py b/synapse/server.py index aa2273955c..f83dd6148c 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -91,6 +91,7 @@ from synapse.handlers.typing import FollowerTypingHandler, TypingWriterHandler from synapse.handlers.user_directory import UserDirectoryHandler from synapse.http.client import InsecureInterceptableContextFactory, SimpleHttpClient from synapse.http.matrixfederationclient import MatrixFederationHttpClient +from synapse.module_api import ModuleApi from synapse.notifier import Notifier from synapse.push.action_generator import ActionGenerator from synapse.push.pusherpool import PusherPool @@ -656,6 +657,10 @@ class HomeServer(metaclass=abc.ABCMeta): def get_federation_ratelimiter(self) -> FederationRateLimiter: return FederationRateLimiter(self.clock, config=self.config.rc_federation) + @cache_in_self + def get_module_api(self) -> ModuleApi: + return ModuleApi(self, self.get_auth_handler()) + async def remove_pusher(self, app_id: str, push_key: str, user_id: str): return await self.get_pusherpool().remove_pusher(app_id, push_key, user_id) diff --git a/synapse/spam_checker_api/__init__.py b/synapse/spam_checker_api/__init__.py index 395ac5ab02..3ce25bb012 100644 --- a/synapse/spam_checker_api/__init__.py +++ b/synapse/spam_checker_api/__init__.py @@ -12,19 +12,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import logging from enum import Enum -from twisted.internet import defer - -from synapse.storage.state import StateFilter - -MYPY = False -if MYPY: - import synapse.server - -logger = logging.getLogger(__name__) - class RegistrationBehaviour(Enum): """ @@ -34,35 +23,3 @@ class RegistrationBehaviour(Enum): ALLOW = "allow" SHADOW_BAN = "shadow_ban" DENY = "deny" - - -class SpamCheckerApi: - """A proxy object that gets passed to spam checkers so they can get - access to rooms and other relevant information. - """ - - def __init__(self, hs: "synapse.server.HomeServer"): - self.hs = hs - - self._store = hs.get_datastore() - - @defer.inlineCallbacks - def get_state_events_in_room(self, room_id: str, types: tuple) -> defer.Deferred: - """Gets state events for the given room. - - Args: - room_id: The room ID to get state events in. - types: The event type and state key (using None - to represent 'any') of the room state to acquire. - - Returns: - twisted.internet.defer.Deferred[list(synapse.events.FrozenEvent)]: - The filtered state events in the room. - """ - state_ids = yield defer.ensureDeferred( - self._store.get_filtered_current_state_ids( - room_id=room_id, state_filter=StateFilter.from_types(types) - ) - ) - state = yield defer.ensureDeferred(self._store.get_events(state_ids.values())) - return state.values() diff --git a/tests/module_api/test_api.py b/tests/module_api/test_api.py index 54600ad983..7c790bee7d 100644 --- a/tests/module_api/test_api.py +++ b/tests/module_api/test_api.py @@ -12,7 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from synapse.module_api import ModuleApi + from synapse.rest import admin from synapse.rest.client.v1 import login, room @@ -28,7 +28,7 @@ class ModuleApiTestCase(HomeserverTestCase): def prepare(self, reactor, clock, homeserver): self.store = homeserver.get_datastore() - self.module_api = ModuleApi(homeserver, homeserver.get_auth_handler()) + self.module_api = homeserver.get_module_api() def test_can_register_user(self): """Tests that an external module can register a user""" -- cgit 1.5.1 From 8dbf62fada36f11a915cea4b6445f716e931dea3 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 7 Oct 2020 11:13:38 -0400 Subject: Include the configured log level in phone home stats. (#8477) By reporting the log level of the synapse logger as a string. --- changelog.d/8477.misc | 1 + synapse/app/phone_stats_home.py | 7 +++++++ 2 files changed, 8 insertions(+) create mode 100644 changelog.d/8477.misc (limited to 'synapse/app') diff --git a/changelog.d/8477.misc b/changelog.d/8477.misc new file mode 100644 index 0000000000..2ee1606b6e --- /dev/null +++ b/changelog.d/8477.misc @@ -0,0 +1 @@ +Include the log level in the phone home stats. diff --git a/synapse/app/phone_stats_home.py b/synapse/app/phone_stats_home.py index 2c8e14a8c0..daed8ccfe9 100644 --- a/synapse/app/phone_stats_home.py +++ b/synapse/app/phone_stats_home.py @@ -113,6 +113,13 @@ async def phone_stats_home(hs, stats, stats_process=_stats_process): stats["database_engine"] = hs.get_datastore().db_pool.engine.module.__name__ stats["database_server_version"] = hs.get_datastore().db_pool.engine.server_version + # + # Logging configuration + # + synapse_logger = logging.getLogger("synapse") + log_level = synapse_logger.getEffectiveLevel() + stats["log_level"] = logging.getLevelName(log_level) + logger.info("Reporting stats to %s: %s" % (hs.config.report_stats_endpoint, stats)) try: await hs.get_proxied_http_client().put_json( -- cgit 1.5.1 From e4f72ddc44367d0cd53e6cfc5ba310b6f55319b6 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 7 Oct 2020 11:27:56 -0400 Subject: Move additional tasks to the background worker (#8458) --- changelog.d/8458.feature | 1 + synapse/app/generic_worker.py | 4 + synapse/app/phone_stats_home.py | 33 ++--- synapse/storage/databases/main/client_ips.py | 109 ++++++++------- synapse/storage/databases/main/metrics.py | 14 +- synapse/storage/databases/main/registration.py | 184 ++++++++++++------------- synapse/storage/databases/main/roommember.py | 5 +- synapse/storage/databases/main/transactions.py | 42 +++--- 8 files changed, 195 insertions(+), 197 deletions(-) create mode 100644 changelog.d/8458.feature (limited to 'synapse/app') diff --git a/changelog.d/8458.feature b/changelog.d/8458.feature new file mode 100644 index 0000000000..542993110b --- /dev/null +++ b/changelog.d/8458.feature @@ -0,0 +1 @@ +Allow running background tasks in a separate worker process. diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index fc5188ce95..d53181deb1 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -127,6 +127,7 @@ from synapse.rest.health import HealthResource from synapse.rest.key.v2 import KeyApiV2Resource from synapse.server import HomeServer, cache_in_self from synapse.storage.databases.main.censor_events import CensorEventsStore +from synapse.storage.databases.main.client_ips import ClientIpWorkerStore from synapse.storage.databases.main.media_repository import MediaRepositoryStore from synapse.storage.databases.main.metrics import ServerMetricsStore from synapse.storage.databases.main.monthly_active_users import ( @@ -135,6 +136,7 @@ from synapse.storage.databases.main.monthly_active_users import ( from synapse.storage.databases.main.presence import UserPresenceState from synapse.storage.databases.main.search import SearchWorkerStore from synapse.storage.databases.main.stats import StatsStore +from synapse.storage.databases.main.transactions import TransactionWorkerStore from synapse.storage.databases.main.ui_auth import UIAuthWorkerStore from synapse.storage.databases.main.user_directory import UserDirectoryStore from synapse.types import ReadReceipt @@ -466,6 +468,7 @@ class GenericWorkerSlavedStore( SlavedAccountDataStore, SlavedPusherStore, CensorEventsStore, + ClientIpWorkerStore, SlavedEventStore, SlavedKeyStore, RoomStore, @@ -481,6 +484,7 @@ class GenericWorkerSlavedStore( MediaRepositoryStore, ServerMetricsStore, SearchWorkerStore, + TransactionWorkerStore, BaseSlavedStore, ): pass diff --git a/synapse/app/phone_stats_home.py b/synapse/app/phone_stats_home.py index daed8ccfe9..8a69104a04 100644 --- a/synapse/app/phone_stats_home.py +++ b/synapse/app/phone_stats_home.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging import math import resource @@ -19,7 +18,10 @@ import sys from prometheus_client import Gauge -from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.metrics.background_process_metrics import ( + run_as_background_process, + wrap_as_background_process, +) logger = logging.getLogger("synapse.app.homeserver") @@ -41,6 +43,7 @@ registered_reserved_users_mau_gauge = Gauge( ) +@wrap_as_background_process("phone_stats_home") async def phone_stats_home(hs, stats, stats_process=_stats_process): logger.info("Gathering stats for reporting") now = int(hs.get_clock().time()) @@ -143,20 +146,10 @@ def start_phone_stats_home(hs): (int(hs.get_clock().time()), resource.getrusage(resource.RUSAGE_SELF)) ) - def start_phone_stats_home(): - return run_as_background_process( - "phone_stats_home", phone_stats_home, hs, stats - ) - - def generate_user_daily_visit_stats(): - return run_as_background_process( - "generate_user_daily_visits", hs.get_datastore().generate_user_daily_visits - ) - # Rather than update on per session basis, batch up the requests. # If you increase the loop period, the accuracy of user_daily_visits # table will decrease - clock.looping_call(generate_user_daily_visit_stats, 5 * 60 * 1000) + clock.looping_call(hs.get_datastore().generate_user_daily_visits, 5 * 60 * 1000) # monthly active user limiting functionality def reap_monthly_active_users(): @@ -167,6 +160,7 @@ def start_phone_stats_home(hs): clock.looping_call(reap_monthly_active_users, 1000 * 60 * 60) reap_monthly_active_users() + @wrap_as_background_process("generate_monthly_active_users") async def generate_monthly_active_users(): current_mau_count = 0 current_mau_count_by_service = {} @@ -186,19 +180,14 @@ def start_phone_stats_home(hs): registered_reserved_users_mau_gauge.set(float(len(reserved_users))) max_mau_gauge.set(float(hs.config.max_mau_value)) - def start_generate_monthly_active_users(): - return run_as_background_process( - "generate_monthly_active_users", generate_monthly_active_users - ) - if hs.config.limit_usage_by_mau or hs.config.mau_stats_only: - start_generate_monthly_active_users() - clock.looping_call(start_generate_monthly_active_users, 5 * 60 * 1000) + generate_monthly_active_users() + clock.looping_call(generate_monthly_active_users, 5 * 60 * 1000) # End of monthly active user settings if hs.config.report_stats: logger.info("Scheduling stats reporting for 3 hour intervals") - clock.looping_call(start_phone_stats_home, 3 * 60 * 60 * 1000) + clock.looping_call(phone_stats_home, 3 * 60 * 60 * 1000, hs, stats) # We need to defer this init for the cases that we daemonize # otherwise the process ID we get is that of the non-daemon process @@ -206,4 +195,4 @@ def start_phone_stats_home(hs): # We wait 5 minutes to send the first set of stats as the server can # be quite busy the first few minutes - clock.call_later(5 * 60, start_phone_stats_home) + clock.call_later(5 * 60, phone_stats_home, hs, stats) diff --git a/synapse/storage/databases/main/client_ips.py b/synapse/storage/databases/main/client_ips.py index 239c7a949c..a25a888443 100644 --- a/synapse/storage/databases/main/client_ips.py +++ b/synapse/storage/databases/main/client_ips.py @@ -351,7 +351,63 @@ class ClientIpBackgroundUpdateStore(SQLBaseStore): return updated -class ClientIpStore(ClientIpBackgroundUpdateStore): +class ClientIpWorkerStore(ClientIpBackgroundUpdateStore): + def __init__(self, database: DatabasePool, db_conn, hs): + super().__init__(database, db_conn, hs) + + self.user_ips_max_age = hs.config.user_ips_max_age + + if hs.config.run_background_tasks and self.user_ips_max_age: + self._clock.looping_call(self._prune_old_user_ips, 5 * 1000) + + @wrap_as_background_process("prune_old_user_ips") + async def _prune_old_user_ips(self): + """Removes entries in user IPs older than the configured period. + """ + + if self.user_ips_max_age is None: + # Nothing to do + return + + if not await self.db_pool.updates.has_completed_background_update( + "devices_last_seen" + ): + # Only start pruning if we have finished populating the devices + # last seen info. + return + + # We do a slightly funky SQL delete to ensure we don't try and delete + # too much at once (as the table may be very large from before we + # started pruning). + # + # This works by finding the max last_seen that is less than the given + # time, but has no more than N rows before it, deleting all rows with + # a lesser last_seen time. (We COALESCE so that the sub-SELECT always + # returns exactly one row). + sql = """ + DELETE FROM user_ips + WHERE last_seen <= ( + SELECT COALESCE(MAX(last_seen), -1) + FROM ( + SELECT last_seen FROM user_ips + WHERE last_seen <= ? + ORDER BY last_seen ASC + LIMIT 5000 + ) AS u + ) + """ + + timestamp = self.clock.time_msec() - self.user_ips_max_age + + def _prune_old_user_ips_txn(txn): + txn.execute(sql, (timestamp,)) + + await self.db_pool.runInteraction( + "_prune_old_user_ips", _prune_old_user_ips_txn + ) + + +class ClientIpStore(ClientIpWorkerStore): def __init__(self, database: DatabasePool, db_conn, hs): self.client_ip_last_seen = Cache( @@ -360,8 +416,6 @@ class ClientIpStore(ClientIpBackgroundUpdateStore): super().__init__(database, db_conn, hs) - self.user_ips_max_age = hs.config.user_ips_max_age - # (user_id, access_token, ip,) -> (user_agent, device_id, last_seen) self._batch_row_update = {} @@ -372,9 +426,6 @@ class ClientIpStore(ClientIpBackgroundUpdateStore): "before", "shutdown", self._update_client_ips_batch ) - if self.user_ips_max_age: - self._clock.looping_call(self._prune_old_user_ips, 5 * 1000) - async def insert_client_ip( self, user_id, access_token, ip, user_agent, device_id, now=None ): @@ -525,49 +576,3 @@ class ClientIpStore(ClientIpBackgroundUpdateStore): } for (access_token, ip), (user_agent, last_seen) in results.items() ] - - @wrap_as_background_process("prune_old_user_ips") - async def _prune_old_user_ips(self): - """Removes entries in user IPs older than the configured period. - """ - - if self.user_ips_max_age is None: - # Nothing to do - return - - if not await self.db_pool.updates.has_completed_background_update( - "devices_last_seen" - ): - # Only start pruning if we have finished populating the devices - # last seen info. - return - - # We do a slightly funky SQL delete to ensure we don't try and delete - # too much at once (as the table may be very large from before we - # started pruning). - # - # This works by finding the max last_seen that is less than the given - # time, but has no more than N rows before it, deleting all rows with - # a lesser last_seen time. (We COALESCE so that the sub-SELECT always - # returns exactly one row). - sql = """ - DELETE FROM user_ips - WHERE last_seen <= ( - SELECT COALESCE(MAX(last_seen), -1) - FROM ( - SELECT last_seen FROM user_ips - WHERE last_seen <= ? - ORDER BY last_seen ASC - LIMIT 5000 - ) AS u - ) - """ - - timestamp = self.clock.time_msec() - self.user_ips_max_age - - def _prune_old_user_ips_txn(txn): - txn.execute(sql, (timestamp,)) - - await self.db_pool.runInteraction( - "_prune_old_user_ips", _prune_old_user_ips_txn - ) diff --git a/synapse/storage/databases/main/metrics.py b/synapse/storage/databases/main/metrics.py index 2c5a4fdbf6..0acf0617ca 100644 --- a/synapse/storage/databases/main/metrics.py +++ b/synapse/storage/databases/main/metrics.py @@ -18,7 +18,7 @@ import time from typing import Dict from synapse.metrics import GaugeBucketCollector -from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.storage._base import SQLBaseStore from synapse.storage.database import DatabasePool from synapse.storage.databases.main.event_push_actions import ( @@ -57,18 +57,13 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore): super().__init__(database, db_conn, hs) # Read the extrems every 60 minutes - def read_forward_extremities(): - # run as a background process to make sure that the database transactions - # have a logcontext to report to - return run_as_background_process( - "read_forward_extremities", self._read_forward_extremities - ) - - hs.get_clock().looping_call(read_forward_extremities, 60 * 60 * 1000) + if hs.config.run_background_tasks: + self._clock.looping_call(self._read_forward_extremities, 60 * 60 * 1000) # Used in _generate_user_daily_visits to keep track of progress self._last_user_visit_update = self._get_start_of_day() + @wrap_as_background_process("read_forward_extremities") async def _read_forward_extremities(self): def fetch(txn): txn.execute( @@ -274,6 +269,7 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore): today_start = calendar.timegm((now.tm_year, now.tm_mon, now.tm_mday, 0, 0, 0)) return today_start * 1000 + @wrap_as_background_process("generate_user_daily_visits") async def generate_user_daily_visits(self) -> None: """ Generates daily visit data for use in cohort/ retention analysis diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py index 16ba545740..a85867936f 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py @@ -14,14 +14,16 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging import re from typing import Any, Dict, List, Optional, Tuple from synapse.api.constants import UserTypes from synapse.api.errors import Codes, StoreError, SynapseError, ThreepidValidationError -from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.metrics.background_process_metrics import ( + run_as_background_process, + wrap_as_background_process, +) from synapse.storage._base import SQLBaseStore from synapse.storage.database import DatabasePool from synapse.storage.types import Cursor @@ -48,6 +50,21 @@ class RegistrationWorkerStore(SQLBaseStore): database.engine, find_max_generated_user_id_localpart, "user_id_seq", ) + self._account_validity = hs.config.account_validity + if hs.config.run_background_tasks and self._account_validity.enabled: + self._clock.call_later( + 0.0, + run_as_background_process, + "account_validity_set_expiration_dates", + self._set_expiration_date_when_missing, + ) + + # Create a background job for culling expired 3PID validity tokens + if hs.config.run_background_tasks: + self.clock.looping_call( + self.cull_expired_threepid_validation_tokens, THIRTY_MINUTES_IN_MS + ) + @cached() async def get_user_by_id(self, user_id: str) -> Optional[Dict[str, Any]]: return await self.db_pool.simple_select_one( @@ -778,6 +795,78 @@ class RegistrationWorkerStore(SQLBaseStore): "delete_threepid_session", delete_threepid_session_txn ) + @wrap_as_background_process("cull_expired_threepid_validation_tokens") + async def cull_expired_threepid_validation_tokens(self) -> None: + """Remove threepid validation tokens with expiry dates that have passed""" + + def cull_expired_threepid_validation_tokens_txn(txn, ts): + sql = """ + DELETE FROM threepid_validation_token WHERE + expires < ? + """ + txn.execute(sql, (ts,)) + + await self.db_pool.runInteraction( + "cull_expired_threepid_validation_tokens", + cull_expired_threepid_validation_tokens_txn, + self.clock.time_msec(), + ) + + async def _set_expiration_date_when_missing(self): + """ + Retrieves the list of registered users that don't have an expiration date, and + adds an expiration date for each of them. + """ + + def select_users_with_no_expiration_date_txn(txn): + """Retrieves the list of registered users with no expiration date from the + database, filtering out deactivated users. + """ + sql = ( + "SELECT users.name FROM users" + " LEFT JOIN account_validity ON (users.name = account_validity.user_id)" + " WHERE account_validity.user_id is NULL AND users.deactivated = 0;" + ) + txn.execute(sql, []) + + res = self.db_pool.cursor_to_dict(txn) + if res: + for user in res: + self.set_expiration_date_for_user_txn( + txn, user["name"], use_delta=True + ) + + await self.db_pool.runInteraction( + "get_users_with_no_expiration_date", + select_users_with_no_expiration_date_txn, + ) + + def set_expiration_date_for_user_txn(self, txn, user_id, use_delta=False): + """Sets an expiration date to the account with the given user ID. + + Args: + user_id (str): User ID to set an expiration date for. + use_delta (bool): If set to False, the expiration date for the user will be + now + validity period. If set to True, this expiration date will be a + random value in the [now + period - d ; now + period] range, d being a + delta equal to 10% of the validity period. + """ + now_ms = self._clock.time_msec() + expiration_ts = now_ms + self._account_validity.period + + if use_delta: + expiration_ts = self.rand.randrange( + expiration_ts - self._account_validity.startup_job_max_delta, + expiration_ts, + ) + + self.db_pool.simple_upsert_txn( + txn, + "account_validity", + keyvalues={"user_id": user_id}, + values={"expiration_ts_ms": expiration_ts, "email_sent": False}, + ) + class RegistrationBackgroundUpdateStore(RegistrationWorkerStore): def __init__(self, database: DatabasePool, db_conn, hs): @@ -911,28 +1000,8 @@ class RegistrationStore(RegistrationBackgroundUpdateStore): def __init__(self, database: DatabasePool, db_conn, hs): super().__init__(database, db_conn, hs) - self._account_validity = hs.config.account_validity self._ignore_unknown_session_error = hs.config.request_token_inhibit_3pid_errors - if self._account_validity.enabled: - self._clock.call_later( - 0.0, - run_as_background_process, - "account_validity_set_expiration_dates", - self._set_expiration_date_when_missing, - ) - - # Create a background job for culling expired 3PID validity tokens - def start_cull(): - # run as a background process to make sure that the database transactions - # have a logcontext to report to - return run_as_background_process( - "cull_expired_threepid_validation_tokens", - self.cull_expired_threepid_validation_tokens, - ) - - hs.get_clock().looping_call(start_cull, THIRTY_MINUTES_IN_MS) - async def add_access_token_to_user( self, user_id: str, @@ -1477,22 +1546,6 @@ class RegistrationStore(RegistrationBackgroundUpdateStore): start_or_continue_validation_session_txn, ) - async def cull_expired_threepid_validation_tokens(self) -> None: - """Remove threepid validation tokens with expiry dates that have passed""" - - def cull_expired_threepid_validation_tokens_txn(txn, ts): - sql = """ - DELETE FROM threepid_validation_token WHERE - expires < ? - """ - txn.execute(sql, (ts,)) - - await self.db_pool.runInteraction( - "cull_expired_threepid_validation_tokens", - cull_expired_threepid_validation_tokens_txn, - self.clock.time_msec(), - ) - async def set_user_deactivated_status( self, user_id: str, deactivated: bool ) -> None: @@ -1522,61 +1575,6 @@ class RegistrationStore(RegistrationBackgroundUpdateStore): ) txn.call_after(self.is_guest.invalidate, (user_id,)) - async def _set_expiration_date_when_missing(self): - """ - Retrieves the list of registered users that don't have an expiration date, and - adds an expiration date for each of them. - """ - - def select_users_with_no_expiration_date_txn(txn): - """Retrieves the list of registered users with no expiration date from the - database, filtering out deactivated users. - """ - sql = ( - "SELECT users.name FROM users" - " LEFT JOIN account_validity ON (users.name = account_validity.user_id)" - " WHERE account_validity.user_id is NULL AND users.deactivated = 0;" - ) - txn.execute(sql, []) - - res = self.db_pool.cursor_to_dict(txn) - if res: - for user in res: - self.set_expiration_date_for_user_txn( - txn, user["name"], use_delta=True - ) - - await self.db_pool.runInteraction( - "get_users_with_no_expiration_date", - select_users_with_no_expiration_date_txn, - ) - - def set_expiration_date_for_user_txn(self, txn, user_id, use_delta=False): - """Sets an expiration date to the account with the given user ID. - - Args: - user_id (str): User ID to set an expiration date for. - use_delta (bool): If set to False, the expiration date for the user will be - now + validity period. If set to True, this expiration date will be a - random value in the [now + period - d ; now + period] range, d being a - delta equal to 10% of the validity period. - """ - now_ms = self._clock.time_msec() - expiration_ts = now_ms + self._account_validity.period - - if use_delta: - expiration_ts = self.rand.randrange( - expiration_ts - self._account_validity.startup_job_max_delta, - expiration_ts, - ) - - self.db_pool.simple_upsert_txn( - txn, - "account_validity", - keyvalues={"user_id": user_id}, - values={"expiration_ts_ms": expiration_ts, "email_sent": False}, - ) - def find_max_generated_user_id_localpart(cur: Cursor) -> int: """ diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index bae1bd22d3..20fcdaa529 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -61,7 +61,10 @@ class RoomMemberWorkerStore(EventsWorkerStore): self._check_safe_current_state_events_membership_updated_txn(txn) txn.close() - if self.hs.config.metrics_flags.known_servers: + if ( + self.hs.config.run_background_tasks + and self.hs.config.metrics_flags.known_servers + ): self._known_servers_count = 1 self.hs.get_clock().looping_call( run_as_background_process, diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index 97aed1500e..7d46090267 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -19,7 +19,7 @@ from typing import Iterable, List, Optional, Tuple from canonicaljson import encode_canonical_json -from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.storage._base import SQLBaseStore, db_to_json from synapse.storage.database import DatabasePool, LoggingTransaction from synapse.storage.engines import PostgresEngine, Sqlite3Engine @@ -43,15 +43,33 @@ _UpdateTransactionRow = namedtuple( SENTINEL = object() -class TransactionStore(SQLBaseStore): +class TransactionWorkerStore(SQLBaseStore): + def __init__(self, database: DatabasePool, db_conn, hs): + super().__init__(database, db_conn, hs) + + if hs.config.run_background_tasks: + self._clock.looping_call(self._cleanup_transactions, 30 * 60 * 1000) + + @wrap_as_background_process("cleanup_transactions") + async def _cleanup_transactions(self) -> None: + now = self._clock.time_msec() + month_ago = now - 30 * 24 * 60 * 60 * 1000 + + def _cleanup_transactions_txn(txn): + txn.execute("DELETE FROM received_transactions WHERE ts < ?", (month_ago,)) + + await self.db_pool.runInteraction( + "_cleanup_transactions", _cleanup_transactions_txn + ) + + +class TransactionStore(TransactionWorkerStore): """A collection of queries for handling PDUs. """ def __init__(self, database: DatabasePool, db_conn, hs): super().__init__(database, db_conn, hs) - self._clock.looping_call(self._start_cleanup_transactions, 30 * 60 * 1000) - self._destination_retry_cache = ExpiringCache( cache_name="get_destination_retry_timings", clock=self._clock, @@ -266,22 +284,6 @@ class TransactionStore(SQLBaseStore): }, ) - def _start_cleanup_transactions(self): - return run_as_background_process( - "cleanup_transactions", self._cleanup_transactions - ) - - async def _cleanup_transactions(self) -> None: - now = self._clock.time_msec() - month_ago = now - 30 * 24 * 60 * 60 * 1000 - - def _cleanup_transactions_txn(txn): - txn.execute("DELETE FROM received_transactions WHERE ts < ?", (month_ago,)) - - await self.db_pool.runInteraction( - "_cleanup_transactions", _cleanup_transactions_txn - ) - async def store_destination_rooms_entries( self, destinations: Iterable[str], room_id: str, stream_ordering: int, ) -> None: -- cgit 1.5.1 From c9c0ad5e204f309f2686dbe250382e481e0f82c2 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 9 Oct 2020 07:24:34 -0400 Subject: Remove the deprecated Handlers object (#8494) All handlers now available via get_*_handler() methods on the HomeServer. --- changelog.d/8494.misc | 1 + synapse/app/admin_cmd.py | 2 +- synapse/federation/federation_server.py | 7 ++++- synapse/handlers/__init__.py | 33 ----------------------- synapse/handlers/auth.py | 2 +- synapse/handlers/deactivate_account.py | 2 +- synapse/handlers/message.py | 4 +-- synapse/handlers/pagination.py | 2 +- synapse/handlers/register.py | 2 +- synapse/handlers/room.py | 2 +- synapse/handlers/room_member.py | 6 ++--- synapse/handlers/ui_auth/checkers.py | 2 +- synapse/replication/http/federation.py | 2 +- synapse/replication/http/membership.py | 2 +- synapse/rest/admin/rooms.py | 4 +-- synapse/rest/admin/users.py | 13 +++++---- synapse/rest/client/v1/directory.py | 25 +++++++++-------- synapse/rest/client/v1/login.py | 1 - synapse/rest/client/v1/room.py | 10 +++---- synapse/rest/client/v2_alpha/account.py | 16 +++++------ synapse/rest/client/v2_alpha/register.py | 6 ++--- synapse/server.py | 30 +++++++++++++++++---- tests/api/test_auth.py | 12 +++------ tests/api/test_filtering.py | 5 +--- tests/crypto/test_keyring.py | 6 ++--- tests/handlers/test_admin.py | 2 +- tests/handlers/test_auth.py | 11 ++------ tests/handlers/test_directory.py | 10 +++---- tests/handlers/test_e2e_keys.py | 2 +- tests/handlers/test_e2e_room_keys.py | 2 +- tests/handlers/test_federation.py | 2 +- tests/handlers/test_presence.py | 2 +- tests/handlers/test_profile.py | 7 ----- tests/handlers/test_register.py | 22 ++++++--------- tests/replication/test_federation_sender_shard.py | 2 +- tests/rest/client/test_shadow_banned.py | 2 +- tests/rest/client/v1/test_events.py | 2 +- tests/rest/client/v1/test_rooms.py | 6 ++++- tests/rest/client/v1/test_typing.py | 2 +- tests/test_federation.py | 2 +- 40 files changed, 116 insertions(+), 157 deletions(-) create mode 100644 changelog.d/8494.misc (limited to 'synapse/app') diff --git a/changelog.d/8494.misc b/changelog.d/8494.misc new file mode 100644 index 0000000000..6e56c6b854 --- /dev/null +++ b/changelog.d/8494.misc @@ -0,0 +1 @@ +Remove the deprecated `Handlers` object. diff --git a/synapse/app/admin_cmd.py b/synapse/app/admin_cmd.py index f0d65d08d7..b4bd4d8e7a 100644 --- a/synapse/app/admin_cmd.py +++ b/synapse/app/admin_cmd.py @@ -89,7 +89,7 @@ async def export_data_command(hs, args): user_id = args.user_id directory = args.output_directory - res = await hs.get_handlers().admin_handler.export_user_data( + res = await hs.get_admin_handler().export_user_data( user_id, FileExfiltrationWriter(user_id, directory=directory) ) print(res) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 1c7ea886c9..e8039e244c 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -99,10 +99,15 @@ class FederationServer(FederationBase): super().__init__(hs) self.auth = hs.get_auth() - self.handler = hs.get_handlers().federation_handler + self.handler = hs.get_federation_handler() self.state = hs.get_state_handler() self.device_handler = hs.get_device_handler() + + # Ensure the following handlers are loaded since they register callbacks + # with FederationHandlerRegistry. + hs.get_directory_handler() + self._federation_ratelimiter = hs.get_federation_ratelimiter() self._server_linearizer = Linearizer("fed_server") diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py index 286f0054be..bfebb0f644 100644 --- a/synapse/handlers/__init__.py +++ b/synapse/handlers/__init__.py @@ -12,36 +12,3 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -from .admin import AdminHandler -from .directory import DirectoryHandler -from .federation import FederationHandler -from .identity import IdentityHandler -from .search import SearchHandler - - -class Handlers: - - """ Deprecated. A collection of handlers. - - At some point most of the classes whose name ended "Handler" were - accessed through this class. - - However this makes it painful to unit test the handlers and to run cut - down versions of synapse that only use specific handlers because using a - single handler required creating all of the handlers. So some of the - handlers have been lifted out of the Handlers object and are now accessed - directly through the homeserver object itself. - - Any new handlers should follow the new pattern of being accessed through - the homeserver object and should not be added to the Handlers object. - - The remaining handlers should be moved out of the handlers object. - """ - - def __init__(self, hs): - self.federation_handler = FederationHandler(hs) - self.directory_handler = DirectoryHandler(hs) - self.admin_handler = AdminHandler(hs) - self.identity_handler = IdentityHandler(hs) - self.search_handler = SearchHandler(hs) diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index f6d17c53b1..1d1ddc2245 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -1080,7 +1080,7 @@ class AuthHandler(BaseHandler): if medium == "email": address = canonicalise_email(address) - identity_handler = self.hs.get_handlers().identity_handler + identity_handler = self.hs.get_identity_handler() result = await identity_handler.try_unbind_threepid( user_id, {"medium": medium, "address": address, "id_server": id_server} ) diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py index 72a5831531..58c9f12686 100644 --- a/synapse/handlers/deactivate_account.py +++ b/synapse/handlers/deactivate_account.py @@ -37,7 +37,7 @@ class DeactivateAccountHandler(BaseHandler): self._auth_handler = hs.get_auth_handler() self._device_handler = hs.get_device_handler() self._room_member_handler = hs.get_room_member_handler() - self._identity_handler = hs.get_handlers().identity_handler + self._identity_handler = hs.get_identity_handler() self.user_directory_handler = hs.get_user_directory_handler() # Flag that indicates whether the process to part users from rooms is running diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 3e9a22e8f3..33d133a4b2 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1014,7 +1014,7 @@ class EventCreationHandler: # Check the alias is currently valid (if it has changed). room_alias_str = event.content.get("alias", None) - directory_handler = self.hs.get_handlers().directory_handler + directory_handler = self.hs.get_directory_handler() if room_alias_str and room_alias_str != original_alias: await self._validate_canonical_alias( directory_handler, room_alias_str, event.room_id @@ -1040,7 +1040,7 @@ class EventCreationHandler: directory_handler, alias_str, event.room_id ) - federation_handler = self.hs.get_handlers().federation_handler + federation_handler = self.hs.get_federation_handler() if event.type == EventTypes.Member: if event.content["membership"] == Membership.INVITE: diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 2c2a633938..085b685959 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -383,7 +383,7 @@ class PaginationHandler: "room_key", leave_token ) - await self.hs.get_handlers().federation_handler.maybe_backfill( + await self.hs.get_federation_handler().maybe_backfill( room_id, curr_topo, limit=pagin_config.limit, ) diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 538f4b2a61..a6f1d21674 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -48,7 +48,7 @@ class RegistrationHandler(BaseHandler): self._auth_handler = hs.get_auth_handler() self.profile_handler = hs.get_profile_handler() self.user_directory_handler = hs.get_user_directory_handler() - self.identity_handler = self.hs.get_handlers().identity_handler + self.identity_handler = self.hs.get_identity_handler() self.ratelimiter = hs.get_registration_ratelimiter() self.macaroon_gen = hs.get_macaroon_generator() self._server_notices_mxid = hs.config.server_notices_mxid diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index d0530a446c..1d04d41e98 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -691,7 +691,7 @@ class RoomCreationHandler(BaseHandler): if not allowed_by_third_party_rules: raise SynapseError(403, "Room visibility value not allowed.") - directory_handler = self.hs.get_handlers().directory_handler + directory_handler = self.hs.get_directory_handler() if room_alias: await directory_handler.create_association( requester=requester, diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index fd8114a64d..ffbc62ff44 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -64,9 +64,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): self.state_handler = hs.get_state_handler() self.config = hs.config - self.federation_handler = hs.get_handlers().federation_handler - self.directory_handler = hs.get_handlers().directory_handler - self.identity_handler = hs.get_handlers().identity_handler + self.federation_handler = hs.get_federation_handler() + self.directory_handler = hs.get_directory_handler() + self.identity_handler = hs.get_identity_handler() self.registration_handler = hs.get_registration_handler() self.profile_handler = hs.get_profile_handler() self.event_creation_handler = hs.get_event_creation_handler() diff --git a/synapse/handlers/ui_auth/checkers.py b/synapse/handlers/ui_auth/checkers.py index 9146dc1a3b..3d66bf305e 100644 --- a/synapse/handlers/ui_auth/checkers.py +++ b/synapse/handlers/ui_auth/checkers.py @@ -143,7 +143,7 @@ class _BaseThreepidAuthChecker: threepid_creds = authdict["threepid_creds"] - identity_handler = self.hs.get_handlers().identity_handler + identity_handler = self.hs.get_identity_handler() logger.info("Getting validated threepid. threepidcreds: %r", (threepid_creds,)) diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py index 5393b9a9e7..b4f4a68b5c 100644 --- a/synapse/replication/http/federation.py +++ b/synapse/replication/http/federation.py @@ -62,7 +62,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): self.store = hs.get_datastore() self.storage = hs.get_storage() self.clock = hs.get_clock() - self.federation_handler = hs.get_handlers().federation_handler + self.federation_handler = hs.get_federation_handler() @staticmethod async def _serialize_payload(store, room_id, event_and_contexts, backfilled): diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py index 30680baee8..e7cc74a5d2 100644 --- a/synapse/replication/http/membership.py +++ b/synapse/replication/http/membership.py @@ -47,7 +47,7 @@ class ReplicationRemoteJoinRestServlet(ReplicationEndpoint): def __init__(self, hs): super().__init__(hs) - self.federation_handler = hs.get_handlers().federation_handler + self.federation_handler = hs.get_federation_handler() self.store = hs.get_datastore() self.clock = hs.get_clock() diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py index 09726d52d6..f5304ff43d 100644 --- a/synapse/rest/admin/rooms.py +++ b/synapse/rest/admin/rooms.py @@ -138,7 +138,7 @@ class ListRoomRestServlet(RestServlet): def __init__(self, hs): self.store = hs.get_datastore() self.auth = hs.get_auth() - self.admin_handler = hs.get_handlers().admin_handler + self.admin_handler = hs.get_admin_handler() async def on_GET(self, request): requester = await self.auth.get_user_by_req(request) @@ -273,7 +273,7 @@ class JoinRoomAliasServlet(RestServlet): self.hs = hs self.auth = hs.get_auth() self.room_member_handler = hs.get_room_member_handler() - self.admin_handler = hs.get_handlers().admin_handler + self.admin_handler = hs.get_admin_handler() self.state_handler = hs.get_state_handler() async def on_POST(self, request, room_identifier): diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py index 20dc1d0e05..8efefbc0a0 100644 --- a/synapse/rest/admin/users.py +++ b/synapse/rest/admin/users.py @@ -45,7 +45,7 @@ class UsersRestServlet(RestServlet): self.hs = hs self.store = hs.get_datastore() self.auth = hs.get_auth() - self.admin_handler = hs.get_handlers().admin_handler + self.admin_handler = hs.get_admin_handler() async def on_GET(self, request, user_id): target_user = UserID.from_string(user_id) @@ -82,7 +82,7 @@ class UsersRestServletV2(RestServlet): self.hs = hs self.store = hs.get_datastore() self.auth = hs.get_auth() - self.admin_handler = hs.get_handlers().admin_handler + self.admin_handler = hs.get_admin_handler() async def on_GET(self, request): await assert_requester_is_admin(self.auth, request) @@ -135,7 +135,7 @@ class UserRestServletV2(RestServlet): def __init__(self, hs): self.hs = hs self.auth = hs.get_auth() - self.admin_handler = hs.get_handlers().admin_handler + self.admin_handler = hs.get_admin_handler() self.store = hs.get_datastore() self.auth_handler = hs.get_auth_handler() self.profile_handler = hs.get_profile_handler() @@ -448,7 +448,7 @@ class WhoisRestServlet(RestServlet): def __init__(self, hs): self.hs = hs self.auth = hs.get_auth() - self.handlers = hs.get_handlers() + self.admin_handler = hs.get_admin_handler() async def on_GET(self, request, user_id): target_user = UserID.from_string(user_id) @@ -461,7 +461,7 @@ class WhoisRestServlet(RestServlet): if not self.hs.is_mine(target_user): raise SynapseError(400, "Can only whois a local user") - ret = await self.handlers.admin_handler.get_whois(target_user) + ret = await self.admin_handler.get_whois(target_user) return 200, ret @@ -591,7 +591,6 @@ class SearchUsersRestServlet(RestServlet): self.hs = hs self.store = hs.get_datastore() self.auth = hs.get_auth() - self.handlers = hs.get_handlers() async def on_GET(self, request, target_user_id): """Get request to search user table for specific users according to @@ -612,7 +611,7 @@ class SearchUsersRestServlet(RestServlet): term = parse_string(request, "term", required=True) logger.info("term: %s ", term) - ret = await self.handlers.store.search_users(term) + ret = await self.store.search_users(term) return 200, ret diff --git a/synapse/rest/client/v1/directory.py b/synapse/rest/client/v1/directory.py index faabeeb91c..e5af26b176 100644 --- a/synapse/rest/client/v1/directory.py +++ b/synapse/rest/client/v1/directory.py @@ -42,14 +42,13 @@ class ClientDirectoryServer(RestServlet): def __init__(self, hs): super().__init__() self.store = hs.get_datastore() - self.handlers = hs.get_handlers() + self.directory_handler = hs.get_directory_handler() self.auth = hs.get_auth() async def on_GET(self, request, room_alias): room_alias = RoomAlias.from_string(room_alias) - dir_handler = self.handlers.directory_handler - res = await dir_handler.get_association(room_alias) + res = await self.directory_handler.get_association(room_alias) return 200, res @@ -79,19 +78,19 @@ class ClientDirectoryServer(RestServlet): requester = await self.auth.get_user_by_req(request) - await self.handlers.directory_handler.create_association( + await self.directory_handler.create_association( requester, room_alias, room_id, servers ) return 200, {} async def on_DELETE(self, request, room_alias): - dir_handler = self.handlers.directory_handler - try: service = self.auth.get_appservice_by_req(request) room_alias = RoomAlias.from_string(room_alias) - await dir_handler.delete_appservice_association(service, room_alias) + await self.directory_handler.delete_appservice_association( + service, room_alias + ) logger.info( "Application service at %s deleted alias %s", service.url, @@ -107,7 +106,7 @@ class ClientDirectoryServer(RestServlet): room_alias = RoomAlias.from_string(room_alias) - await dir_handler.delete_association(requester, room_alias) + await self.directory_handler.delete_association(requester, room_alias) logger.info( "User %s deleted alias %s", user.to_string(), room_alias.to_string() @@ -122,7 +121,7 @@ class ClientDirectoryListServer(RestServlet): def __init__(self, hs): super().__init__() self.store = hs.get_datastore() - self.handlers = hs.get_handlers() + self.directory_handler = hs.get_directory_handler() self.auth = hs.get_auth() async def on_GET(self, request, room_id): @@ -138,7 +137,7 @@ class ClientDirectoryListServer(RestServlet): content = parse_json_object_from_request(request) visibility = content.get("visibility", "public") - await self.handlers.directory_handler.edit_published_room_list( + await self.directory_handler.edit_published_room_list( requester, room_id, visibility ) @@ -147,7 +146,7 @@ class ClientDirectoryListServer(RestServlet): async def on_DELETE(self, request, room_id): requester = await self.auth.get_user_by_req(request) - await self.handlers.directory_handler.edit_published_room_list( + await self.directory_handler.edit_published_room_list( requester, room_id, "private" ) @@ -162,7 +161,7 @@ class ClientAppserviceDirectoryListServer(RestServlet): def __init__(self, hs): super().__init__() self.store = hs.get_datastore() - self.handlers = hs.get_handlers() + self.directory_handler = hs.get_directory_handler() self.auth = hs.get_auth() def on_PUT(self, request, network_id, room_id): @@ -180,7 +179,7 @@ class ClientAppserviceDirectoryListServer(RestServlet): 403, "Only appservices can edit the appservice published room list" ) - await self.handlers.directory_handler.edit_published_appservice_room_list( + await self.directory_handler.edit_published_appservice_room_list( requester.app_service.id, network_id, room_id, visibility ) diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py index 3d1693d7ac..d7deb9300d 100644 --- a/synapse/rest/client/v1/login.py +++ b/synapse/rest/client/v1/login.py @@ -67,7 +67,6 @@ class LoginRestServlet(RestServlet): self.auth_handler = self.hs.get_auth_handler() self.registration_handler = hs.get_registration_handler() - self.handlers = hs.get_handlers() self._well_known_builder = WellKnownBuilder(hs) self._address_ratelimiter = Ratelimiter( clock=hs.get_clock(), diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index b63389e5fe..00b4397082 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -112,7 +112,6 @@ class RoomCreateRestServlet(TransactionRestServlet): class RoomStateEventRestServlet(TransactionRestServlet): def __init__(self, hs): super().__init__(hs) - self.handlers = hs.get_handlers() self.event_creation_handler = hs.get_event_creation_handler() self.room_member_handler = hs.get_room_member_handler() self.message_handler = hs.get_message_handler() @@ -798,7 +797,6 @@ class RoomMembershipRestServlet(TransactionRestServlet): class RoomRedactEventRestServlet(TransactionRestServlet): def __init__(self, hs): super().__init__(hs) - self.handlers = hs.get_handlers() self.event_creation_handler = hs.get_event_creation_handler() self.auth = hs.get_auth() @@ -903,7 +901,7 @@ class RoomAliasListServlet(RestServlet): def __init__(self, hs: "synapse.server.HomeServer"): super().__init__() self.auth = hs.get_auth() - self.directory_handler = hs.get_handlers().directory_handler + self.directory_handler = hs.get_directory_handler() async def on_GET(self, request, room_id): requester = await self.auth.get_user_by_req(request) @@ -920,7 +918,7 @@ class SearchRestServlet(RestServlet): def __init__(self, hs): super().__init__() - self.handlers = hs.get_handlers() + self.search_handler = hs.get_search_handler() self.auth = hs.get_auth() async def on_POST(self, request): @@ -929,9 +927,7 @@ class SearchRestServlet(RestServlet): content = parse_json_object_from_request(request) batch = parse_string(request, "next_batch") - results = await self.handlers.search_handler.search( - requester.user, content, batch - ) + results = await self.search_handler.search(requester.user, content, batch) return 200, results diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py index ab5815e7f7..e857cff176 100644 --- a/synapse/rest/client/v2_alpha/account.py +++ b/synapse/rest/client/v2_alpha/account.py @@ -56,7 +56,7 @@ class EmailPasswordRequestTokenRestServlet(RestServlet): self.hs = hs self.datastore = hs.get_datastore() self.config = hs.config - self.identity_handler = hs.get_handlers().identity_handler + self.identity_handler = hs.get_identity_handler() if self.config.threepid_behaviour_email == ThreepidBehaviour.LOCAL: self.mailer = Mailer( @@ -327,7 +327,7 @@ class EmailThreepidRequestTokenRestServlet(RestServlet): super().__init__() self.hs = hs self.config = hs.config - self.identity_handler = hs.get_handlers().identity_handler + self.identity_handler = hs.get_identity_handler() self.store = self.hs.get_datastore() if self.config.threepid_behaviour_email == ThreepidBehaviour.LOCAL: @@ -424,7 +424,7 @@ class MsisdnThreepidRequestTokenRestServlet(RestServlet): self.hs = hs super().__init__() self.store = self.hs.get_datastore() - self.identity_handler = hs.get_handlers().identity_handler + self.identity_handler = hs.get_identity_handler() async def on_POST(self, request): body = parse_json_object_from_request(request) @@ -574,7 +574,7 @@ class AddThreepidMsisdnSubmitTokenServlet(RestServlet): self.config = hs.config self.clock = hs.get_clock() self.store = hs.get_datastore() - self.identity_handler = hs.get_handlers().identity_handler + self.identity_handler = hs.get_identity_handler() async def on_POST(self, request): if not self.config.account_threepid_delegate_msisdn: @@ -604,7 +604,7 @@ class ThreepidRestServlet(RestServlet): def __init__(self, hs): super().__init__() self.hs = hs - self.identity_handler = hs.get_handlers().identity_handler + self.identity_handler = hs.get_identity_handler() self.auth = hs.get_auth() self.auth_handler = hs.get_auth_handler() self.datastore = self.hs.get_datastore() @@ -660,7 +660,7 @@ class ThreepidAddRestServlet(RestServlet): def __init__(self, hs): super().__init__() self.hs = hs - self.identity_handler = hs.get_handlers().identity_handler + self.identity_handler = hs.get_identity_handler() self.auth = hs.get_auth() self.auth_handler = hs.get_auth_handler() @@ -711,7 +711,7 @@ class ThreepidBindRestServlet(RestServlet): def __init__(self, hs): super().__init__() self.hs = hs - self.identity_handler = hs.get_handlers().identity_handler + self.identity_handler = hs.get_identity_handler() self.auth = hs.get_auth() async def on_POST(self, request): @@ -740,7 +740,7 @@ class ThreepidUnbindRestServlet(RestServlet): def __init__(self, hs): super().__init__() self.hs = hs - self.identity_handler = hs.get_handlers().identity_handler + self.identity_handler = hs.get_identity_handler() self.auth = hs.get_auth() self.datastore = self.hs.get_datastore() diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index ffa2dfce42..395b6a82a9 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -78,7 +78,7 @@ class EmailRegisterRequestTokenRestServlet(RestServlet): """ super().__init__() self.hs = hs - self.identity_handler = hs.get_handlers().identity_handler + self.identity_handler = hs.get_identity_handler() self.config = hs.config if self.hs.config.threepid_behaviour_email == ThreepidBehaviour.LOCAL: @@ -176,7 +176,7 @@ class MsisdnRegisterRequestTokenRestServlet(RestServlet): """ super().__init__() self.hs = hs - self.identity_handler = hs.get_handlers().identity_handler + self.identity_handler = hs.get_identity_handler() async def on_POST(self, request): body = parse_json_object_from_request(request) @@ -370,7 +370,7 @@ class RegisterRestServlet(RestServlet): self.store = hs.get_datastore() self.auth_handler = hs.get_auth_handler() self.registration_handler = hs.get_registration_handler() - self.identity_handler = hs.get_handlers().identity_handler + self.identity_handler = hs.get_identity_handler() self.room_member_handler = hs.get_room_member_handler() self.macaroon_gen = hs.get_macaroon_generator() self.ratelimiter = hs.get_registration_ratelimiter() diff --git a/synapse/server.py b/synapse/server.py index f83dd6148c..e793793cdc 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -54,19 +54,22 @@ from synapse.federation.sender import FederationSender from synapse.federation.transport.client import TransportLayerClient from synapse.groups.attestations import GroupAttestationSigning, GroupAttestionRenewer from synapse.groups.groups_server import GroupsServerHandler, GroupsServerWorkerHandler -from synapse.handlers import Handlers from synapse.handlers.account_validity import AccountValidityHandler from synapse.handlers.acme import AcmeHandler +from synapse.handlers.admin import AdminHandler from synapse.handlers.appservice import ApplicationServicesHandler from synapse.handlers.auth import AuthHandler, MacaroonGenerator from synapse.handlers.cas_handler import CasHandler from synapse.handlers.deactivate_account import DeactivateAccountHandler from synapse.handlers.device import DeviceHandler, DeviceWorkerHandler from synapse.handlers.devicemessage import DeviceMessageHandler +from synapse.handlers.directory import DirectoryHandler from synapse.handlers.e2e_keys import E2eKeysHandler from synapse.handlers.e2e_room_keys import E2eRoomKeysHandler from synapse.handlers.events import EventHandler, EventStreamHandler +from synapse.handlers.federation import FederationHandler from synapse.handlers.groups_local import GroupsLocalHandler, GroupsLocalWorkerHandler +from synapse.handlers.identity import IdentityHandler from synapse.handlers.initial_sync import InitialSyncHandler from synapse.handlers.message import EventCreationHandler, MessageHandler from synapse.handlers.pagination import PaginationHandler @@ -84,6 +87,7 @@ from synapse.handlers.room import ( from synapse.handlers.room_list import RoomListHandler from synapse.handlers.room_member import RoomMemberMasterHandler from synapse.handlers.room_member_worker import RoomMemberWorkerHandler +from synapse.handlers.search import SearchHandler from synapse.handlers.set_password import SetPasswordHandler from synapse.handlers.stats import StatsHandler from synapse.handlers.sync import SyncHandler @@ -318,10 +322,6 @@ class HomeServer(metaclass=abc.ABCMeta): def get_federation_server(self) -> FederationServer: return FederationServer(self) - @cache_in_self - def get_handlers(self) -> Handlers: - return Handlers(self) - @cache_in_self def get_notifier(self) -> Notifier: return Notifier(self) @@ -408,6 +408,10 @@ class HomeServer(metaclass=abc.ABCMeta): def get_device_message_handler(self) -> DeviceMessageHandler: return DeviceMessageHandler(self) + @cache_in_self + def get_directory_handler(self) -> DirectoryHandler: + return DirectoryHandler(self) + @cache_in_self def get_e2e_keys_handler(self) -> E2eKeysHandler: return E2eKeysHandler(self) @@ -420,6 +424,10 @@ class HomeServer(metaclass=abc.ABCMeta): def get_acme_handler(self) -> AcmeHandler: return AcmeHandler(self) + @cache_in_self + def get_admin_handler(self) -> AdminHandler: + return AdminHandler(self) + @cache_in_self def get_application_service_api(self) -> ApplicationServiceApi: return ApplicationServiceApi(self) @@ -440,6 +448,14 @@ class HomeServer(metaclass=abc.ABCMeta): def get_event_stream_handler(self) -> EventStreamHandler: return EventStreamHandler(self) + @cache_in_self + def get_federation_handler(self) -> FederationHandler: + return FederationHandler(self) + + @cache_in_self + def get_identity_handler(self) -> IdentityHandler: + return IdentityHandler(self) + @cache_in_self def get_initial_sync_handler(self) -> InitialSyncHandler: return InitialSyncHandler(self) @@ -459,6 +475,10 @@ class HomeServer(metaclass=abc.ABCMeta): def get_deactivate_account_handler(self) -> DeactivateAccountHandler: return DeactivateAccountHandler(self) + @cache_in_self + def get_search_handler(self) -> SearchHandler: + return SearchHandler(self) + @cache_in_self def get_set_password_handler(self) -> SetPasswordHandler: return SetPasswordHandler(self) diff --git a/tests/api/test_auth.py b/tests/api/test_auth.py index 8ab56ec94c..cb6f29d670 100644 --- a/tests/api/test_auth.py +++ b/tests/api/test_auth.py @@ -19,7 +19,6 @@ import pymacaroons from twisted.internet import defer -import synapse.handlers.auth from synapse.api.auth import Auth from synapse.api.constants import UserTypes from synapse.api.errors import ( @@ -36,20 +35,15 @@ from tests import unittest from tests.utils import mock_getRawHeaders, setup_test_homeserver -class TestHandlers: - def __init__(self, hs): - self.auth_handler = synapse.handlers.auth.AuthHandler(hs) - - class AuthTestCase(unittest.TestCase): @defer.inlineCallbacks def setUp(self): self.state_handler = Mock() self.store = Mock() - self.hs = yield setup_test_homeserver(self.addCleanup, handlers=None) + self.hs = yield setup_test_homeserver(self.addCleanup) self.hs.get_datastore = Mock(return_value=self.store) - self.hs.handlers = TestHandlers(self.hs) + self.hs.get_auth_handler().store = self.store self.auth = Auth(self.hs) # AuthBlocking reads from the hs' config on initialization. We need to @@ -283,7 +277,7 @@ class AuthTestCase(unittest.TestCase): self.store.get_device = Mock(return_value=defer.succeed(None)) token = yield defer.ensureDeferred( - self.hs.handlers.auth_handler.get_access_token_for_user_id( + self.hs.get_auth_handler().get_access_token_for_user_id( USER_ID, "DEVICE", valid_until_ms=None ) ) diff --git a/tests/api/test_filtering.py b/tests/api/test_filtering.py index d2d535d23c..c98ae75974 100644 --- a/tests/api/test_filtering.py +++ b/tests/api/test_filtering.py @@ -50,10 +50,7 @@ class FilteringTestCase(unittest.TestCase): self.mock_http_client.put_json = DeferredMockCallable() hs = yield setup_test_homeserver( - self.addCleanup, - handlers=None, - http_client=self.mock_http_client, - keyring=Mock(), + self.addCleanup, http_client=self.mock_http_client, keyring=Mock(), ) self.filtering = hs.get_filtering() diff --git a/tests/crypto/test_keyring.py b/tests/crypto/test_keyring.py index 8ff1460c0d..697916a019 100644 --- a/tests/crypto/test_keyring.py +++ b/tests/crypto/test_keyring.py @@ -315,7 +315,7 @@ class KeyringTestCase(unittest.HomeserverTestCase): class ServerKeyFetcherTestCase(unittest.HomeserverTestCase): def make_homeserver(self, reactor, clock): self.http_client = Mock() - hs = self.setup_test_homeserver(handlers=None, http_client=self.http_client) + hs = self.setup_test_homeserver(http_client=self.http_client) return hs def test_get_keys_from_server(self): @@ -395,9 +395,7 @@ class PerspectivesKeyFetcherTestCase(unittest.HomeserverTestCase): } ] - return self.setup_test_homeserver( - handlers=None, http_client=self.http_client, config=config - ) + return self.setup_test_homeserver(http_client=self.http_client, config=config) def build_perspectives_response( self, server_name: str, signing_key: SigningKey, valid_until_ts: int, diff --git a/tests/handlers/test_admin.py b/tests/handlers/test_admin.py index fc37c4328c..5c2b4de1a6 100644 --- a/tests/handlers/test_admin.py +++ b/tests/handlers/test_admin.py @@ -35,7 +35,7 @@ class ExfiltrateData(unittest.HomeserverTestCase): ] def prepare(self, reactor, clock, hs): - self.admin_handler = hs.get_handlers().admin_handler + self.admin_handler = hs.get_admin_handler() self.user1 = self.register_user("user1", "password") self.token1 = self.login("user1", "password") diff --git a/tests/handlers/test_auth.py b/tests/handlers/test_auth.py index 97877c2e42..b5055e018c 100644 --- a/tests/handlers/test_auth.py +++ b/tests/handlers/test_auth.py @@ -21,24 +21,17 @@ from twisted.internet import defer import synapse import synapse.api.errors from synapse.api.errors import ResourceLimitError -from synapse.handlers.auth import AuthHandler from tests import unittest from tests.test_utils import make_awaitable from tests.utils import setup_test_homeserver -class AuthHandlers: - def __init__(self, hs): - self.auth_handler = AuthHandler(hs) - - class AuthTestCase(unittest.TestCase): @defer.inlineCallbacks def setUp(self): - self.hs = yield setup_test_homeserver(self.addCleanup, handlers=None) - self.hs.handlers = AuthHandlers(self.hs) - self.auth_handler = self.hs.handlers.auth_handler + self.hs = yield setup_test_homeserver(self.addCleanup) + self.auth_handler = self.hs.get_auth_handler() self.macaroon_generator = self.hs.get_macaroon_generator() # MAU tests diff --git a/tests/handlers/test_directory.py b/tests/handlers/test_directory.py index bc0c5aefdc..2ce6dc9528 100644 --- a/tests/handlers/test_directory.py +++ b/tests/handlers/test_directory.py @@ -48,7 +48,7 @@ class DirectoryTestCase(unittest.HomeserverTestCase): federation_registry=self.mock_registry, ) - self.handler = hs.get_handlers().directory_handler + self.handler = hs.get_directory_handler() self.store = hs.get_datastore() @@ -110,7 +110,7 @@ class TestCreateAlias(unittest.HomeserverTestCase): ] def prepare(self, reactor, clock, hs): - self.handler = hs.get_handlers().directory_handler + self.handler = hs.get_directory_handler() # Create user self.admin_user = self.register_user("admin", "pass", admin=True) @@ -173,7 +173,7 @@ class TestDeleteAlias(unittest.HomeserverTestCase): def prepare(self, reactor, clock, hs): self.store = hs.get_datastore() - self.handler = hs.get_handlers().directory_handler + self.handler = hs.get_directory_handler() self.state_handler = hs.get_state_handler() # Create user @@ -289,7 +289,7 @@ class CanonicalAliasTestCase(unittest.HomeserverTestCase): def prepare(self, reactor, clock, hs): self.store = hs.get_datastore() - self.handler = hs.get_handlers().directory_handler + self.handler = hs.get_directory_handler() self.state_handler = hs.get_state_handler() # Create user @@ -442,7 +442,7 @@ class TestRoomListSearchDisabled(unittest.HomeserverTestCase): self.assertEquals(200, channel.code, channel.result) self.room_list_handler = hs.get_room_list_handler() - self.directory_handler = hs.get_handlers().directory_handler + self.directory_handler = hs.get_directory_handler() return hs diff --git a/tests/handlers/test_e2e_keys.py b/tests/handlers/test_e2e_keys.py index e79d612f7a..924f29f051 100644 --- a/tests/handlers/test_e2e_keys.py +++ b/tests/handlers/test_e2e_keys.py @@ -38,7 +38,7 @@ class E2eKeysHandlerTestCase(unittest.TestCase): @defer.inlineCallbacks def setUp(self): self.hs = yield utils.setup_test_homeserver( - self.addCleanup, handlers=None, federation_client=mock.Mock() + self.addCleanup, federation_client=mock.Mock() ) self.handler = synapse.handlers.e2e_keys.E2eKeysHandler(self.hs) self.store = self.hs.get_datastore() diff --git a/tests/handlers/test_e2e_room_keys.py b/tests/handlers/test_e2e_room_keys.py index 7adde9b9de..45f201a399 100644 --- a/tests/handlers/test_e2e_room_keys.py +++ b/tests/handlers/test_e2e_room_keys.py @@ -54,7 +54,7 @@ class E2eRoomKeysHandlerTestCase(unittest.TestCase): @defer.inlineCallbacks def setUp(self): self.hs = yield utils.setup_test_homeserver( - self.addCleanup, handlers=None, replication_layer=mock.Mock() + self.addCleanup, replication_layer=mock.Mock() ) self.handler = synapse.handlers.e2e_room_keys.E2eRoomKeysHandler(self.hs) self.local_user = "@boris:" + self.hs.hostname diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py index 96fea58673..9ef80fe502 100644 --- a/tests/handlers/test_federation.py +++ b/tests/handlers/test_federation.py @@ -38,7 +38,7 @@ class FederationTestCase(unittest.HomeserverTestCase): def make_homeserver(self, reactor, clock): hs = self.setup_test_homeserver(http_client=None) - self.handler = hs.get_handlers().federation_handler + self.handler = hs.get_federation_handler() self.store = hs.get_datastore() return hs diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index 306dcfe944..914c82e7a8 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -470,7 +470,7 @@ class PresenceJoinTestCase(unittest.HomeserverTestCase): def prepare(self, reactor, clock, hs): self.federation_sender = hs.get_federation_sender() self.event_builder_factory = hs.get_event_builder_factory() - self.federation_handler = hs.get_handlers().federation_handler + self.federation_handler = hs.get_federation_handler() self.presence_handler = hs.get_presence_handler() # self.event_builder_for_2 = EventBuilderFactory(hs) diff --git a/tests/handlers/test_profile.py b/tests/handlers/test_profile.py index 8e95e53d9e..a69fa28b41 100644 --- a/tests/handlers/test_profile.py +++ b/tests/handlers/test_profile.py @@ -20,7 +20,6 @@ from twisted.internet import defer import synapse.types from synapse.api.errors import AuthError, SynapseError -from synapse.handlers.profile import MasterProfileHandler from synapse.types import UserID from tests import unittest @@ -28,11 +27,6 @@ from tests.test_utils import make_awaitable from tests.utils import setup_test_homeserver -class ProfileHandlers: - def __init__(self, hs): - self.profile_handler = MasterProfileHandler(hs) - - class ProfileTestCase(unittest.TestCase): """ Tests profile management. """ @@ -51,7 +45,6 @@ class ProfileTestCase(unittest.TestCase): hs = yield setup_test_homeserver( self.addCleanup, http_client=None, - handlers=None, resource_for_federation=Mock(), federation_client=self.mock_federation, federation_server=Mock(), diff --git a/tests/handlers/test_register.py b/tests/handlers/test_register.py index 702c6aa089..bdf3d0a8a2 100644 --- a/tests/handlers/test_register.py +++ b/tests/handlers/test_register.py @@ -18,7 +18,6 @@ from mock import Mock from synapse.api.auth import Auth from synapse.api.constants import UserTypes from synapse.api.errors import Codes, ResourceLimitError, SynapseError -from synapse.handlers.register import RegistrationHandler from synapse.spam_checker_api import RegistrationBehaviour from synapse.types import RoomAlias, UserID, create_requester @@ -29,11 +28,6 @@ from tests.utils import mock_getRawHeaders from .. import unittest -class RegistrationHandlers: - def __init__(self, hs): - self.registration_handler = RegistrationHandler(hs) - - class RegistrationTestCase(unittest.HomeserverTestCase): """ Tests the RegistrationHandler. """ @@ -154,7 +148,7 @@ class RegistrationTestCase(unittest.HomeserverTestCase): room_alias_str = "#room:test" user_id = self.get_success(self.handler.register_user(localpart="jeff")) rooms = self.get_success(self.store.get_rooms_for_user(user_id)) - directory_handler = self.hs.get_handlers().directory_handler + directory_handler = self.hs.get_directory_handler() room_alias = RoomAlias.from_string(room_alias_str) room_id = self.get_success(directory_handler.get_association(room_alias)) @@ -193,7 +187,7 @@ class RegistrationTestCase(unittest.HomeserverTestCase): user_id = self.get_success(self.handler.register_user(localpart="support")) rooms = self.get_success(self.store.get_rooms_for_user(user_id)) self.assertEqual(len(rooms), 0) - directory_handler = self.hs.get_handlers().directory_handler + directory_handler = self.hs.get_directory_handler() room_alias = RoomAlias.from_string(room_alias_str) self.get_failure(directory_handler.get_association(room_alias), SynapseError) @@ -205,7 +199,7 @@ class RegistrationTestCase(unittest.HomeserverTestCase): self.store.is_real_user = Mock(return_value=make_awaitable(True)) user_id = self.get_success(self.handler.register_user(localpart="real")) rooms = self.get_success(self.store.get_rooms_for_user(user_id)) - directory_handler = self.hs.get_handlers().directory_handler + directory_handler = self.hs.get_directory_handler() room_alias = RoomAlias.from_string(room_alias_str) room_id = self.get_success(directory_handler.get_association(room_alias)) @@ -237,7 +231,7 @@ class RegistrationTestCase(unittest.HomeserverTestCase): user_id = self.get_success(self.handler.register_user(localpart="jeff")) # Ensure the room was created. - directory_handler = self.hs.get_handlers().directory_handler + directory_handler = self.hs.get_directory_handler() room_alias = RoomAlias.from_string(room_alias_str) room_id = self.get_success(directory_handler.get_association(room_alias)) @@ -266,7 +260,7 @@ class RegistrationTestCase(unittest.HomeserverTestCase): user_id = self.get_success(self.handler.register_user(localpart="jeff")) # Ensure the room was created. - directory_handler = self.hs.get_handlers().directory_handler + directory_handler = self.hs.get_directory_handler() room_alias = RoomAlias.from_string(room_alias_str) room_id = self.get_success(directory_handler.get_association(room_alias)) @@ -304,7 +298,7 @@ class RegistrationTestCase(unittest.HomeserverTestCase): user_id = self.get_success(self.handler.register_user(localpart="jeff")) # Ensure the room was created. - directory_handler = self.hs.get_handlers().directory_handler + directory_handler = self.hs.get_directory_handler() room_alias = RoomAlias.from_string(room_alias_str) room_id = self.get_success(directory_handler.get_association(room_alias)) @@ -347,7 +341,7 @@ class RegistrationTestCase(unittest.HomeserverTestCase): ) # Ensure the room was created. - directory_handler = self.hs.get_handlers().directory_handler + directory_handler = self.hs.get_directory_handler() room_alias = RoomAlias.from_string(room_alias_str) room_id = self.get_success(directory_handler.get_association(room_alias)) @@ -384,7 +378,7 @@ class RegistrationTestCase(unittest.HomeserverTestCase): user_id = self.get_success(self.handler.register_user(localpart="jeff")) # Ensure the room was created. - directory_handler = self.hs.get_handlers().directory_handler + directory_handler = self.hs.get_directory_handler() room_alias = RoomAlias.from_string(room_alias_str) room_id = self.get_success(directory_handler.get_association(room_alias)) diff --git a/tests/replication/test_federation_sender_shard.py b/tests/replication/test_federation_sender_shard.py index 1d7edee5ba..9c4a9c3563 100644 --- a/tests/replication/test_federation_sender_shard.py +++ b/tests/replication/test_federation_sender_shard.py @@ -207,7 +207,7 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase): def create_room_with_remote_server(self, user, token, remote_server="other_server"): room = self.helper.create_room_as(user, tok=token) store = self.hs.get_datastore() - federation = self.hs.get_handlers().federation_handler + federation = self.hs.get_federation_handler() prev_event_ids = self.get_success(store.get_latest_event_ids_in_room(room)) room_version = self.get_success(store.get_room_version(room)) diff --git a/tests/rest/client/test_shadow_banned.py b/tests/rest/client/test_shadow_banned.py index dfe4bf7762..6bb02b9630 100644 --- a/tests/rest/client/test_shadow_banned.py +++ b/tests/rest/client/test_shadow_banned.py @@ -78,7 +78,7 @@ class RoomTestCase(_ShadowBannedBase): def test_invite_3pid(self): """Ensure that a 3PID invite does not attempt to contact the identity server.""" - identity_handler = self.hs.get_handlers().identity_handler + identity_handler = self.hs.get_identity_handler() identity_handler.lookup_3pid = Mock( side_effect=AssertionError("This should not get called") ) diff --git a/tests/rest/client/v1/test_events.py b/tests/rest/client/v1/test_events.py index f75520877f..3397ba5579 100644 --- a/tests/rest/client/v1/test_events.py +++ b/tests/rest/client/v1/test_events.py @@ -42,7 +42,7 @@ class EventStreamPermissionsTestCase(unittest.HomeserverTestCase): hs = self.setup_test_homeserver(config=config) - hs.get_handlers().federation_handler = Mock() + hs.get_federation_handler = Mock() return hs diff --git a/tests/rest/client/v1/test_rooms.py b/tests/rest/client/v1/test_rooms.py index 0d809d25d5..9ba5f9d943 100644 --- a/tests/rest/client/v1/test_rooms.py +++ b/tests/rest/client/v1/test_rooms.py @@ -32,6 +32,7 @@ from synapse.types import JsonDict, RoomAlias, UserID from synapse.util.stringutils import random_string from tests import unittest +from tests.test_utils import make_awaitable PATH_PREFIX = b"/_matrix/client/api/v1" @@ -47,7 +48,10 @@ class RoomBase(unittest.HomeserverTestCase): "red", http_client=None, federation_client=Mock(), ) - self.hs.get_federation_handler = Mock(return_value=Mock()) + self.hs.get_federation_handler = Mock() + self.hs.get_federation_handler.return_value.maybe_backfill = Mock( + return_value=make_awaitable(None) + ) async def _insert_client_ip(*args, **kwargs): return None diff --git a/tests/rest/client/v1/test_typing.py b/tests/rest/client/v1/test_typing.py index 94d2bf2eb1..cd58ee7792 100644 --- a/tests/rest/client/v1/test_typing.py +++ b/tests/rest/client/v1/test_typing.py @@ -44,7 +44,7 @@ class RoomTypingTestCase(unittest.HomeserverTestCase): self.event_source = hs.get_event_sources().sources["typing"] - hs.get_handlers().federation_handler = Mock() + hs.get_federation_handler = Mock() async def get_user_by_access_token(token=None, allow_guest=False): return { diff --git a/tests/test_federation.py b/tests/test_federation.py index 27a7fc9ed7..d39e792580 100644 --- a/tests/test_federation.py +++ b/tests/test_federation.py @@ -75,7 +75,7 @@ class MessageAcceptTests(unittest.HomeserverTestCase): } ) - self.handler = self.homeserver.get_handlers().federation_handler + self.handler = self.homeserver.get_federation_handler() self.handler.do_auth = lambda origin, event, context, auth_events: succeed( context ) -- cgit 1.5.1 From fe0f4a3591302176c7eea48a54f6ed83d9eb4aa9 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 9 Oct 2020 07:37:51 -0400 Subject: Move additional tasks to the background worker, part 3 (#8489) --- changelog.d/8489.feature | 1 + synapse/app/phone_stats_home.py | 14 +- synapse/storage/databases/main/censor_events.py | 15 +- synapse/storage/databases/main/devices.py | 196 ++++++++-------- synapse/storage/databases/main/event_federation.py | 60 ++--- .../storage/databases/main/event_push_actions.py | 259 ++++++++++----------- .../storage/databases/main/monthly_active_users.py | 2 + synapse/storage/databases/main/registration.py | 11 +- 8 files changed, 276 insertions(+), 282 deletions(-) create mode 100644 changelog.d/8489.feature (limited to 'synapse/app') diff --git a/changelog.d/8489.feature b/changelog.d/8489.feature new file mode 100644 index 0000000000..22591870a4 --- /dev/null +++ b/changelog.d/8489.feature @@ -0,0 +1 @@ + Allow running background tasks in a separate worker process. diff --git a/synapse/app/phone_stats_home.py b/synapse/app/phone_stats_home.py index 8a69104a04..c38cf8231f 100644 --- a/synapse/app/phone_stats_home.py +++ b/synapse/app/phone_stats_home.py @@ -18,10 +18,7 @@ import sys from prometheus_client import Gauge -from synapse.metrics.background_process_metrics import ( - run_as_background_process, - wrap_as_background_process, -) +from synapse.metrics.background_process_metrics import wrap_as_background_process logger = logging.getLogger("synapse.app.homeserver") @@ -152,13 +149,8 @@ def start_phone_stats_home(hs): clock.looping_call(hs.get_datastore().generate_user_daily_visits, 5 * 60 * 1000) # monthly active user limiting functionality - def reap_monthly_active_users(): - return run_as_background_process( - "reap_monthly_active_users", hs.get_datastore().reap_monthly_active_users - ) - - clock.looping_call(reap_monthly_active_users, 1000 * 60 * 60) - reap_monthly_active_users() + clock.looping_call(hs.get_datastore().reap_monthly_active_users, 1000 * 60 * 60) + hs.get_datastore().reap_monthly_active_users() @wrap_as_background_process("generate_monthly_active_users") async def generate_monthly_active_users(): diff --git a/synapse/storage/databases/main/censor_events.py b/synapse/storage/databases/main/censor_events.py index 4bb2b9c28c..849bd5ba7a 100644 --- a/synapse/storage/databases/main/censor_events.py +++ b/synapse/storage/databases/main/censor_events.py @@ -17,7 +17,7 @@ import logging from typing import TYPE_CHECKING from synapse.events.utils import prune_event_dict -from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.storage._base import SQLBaseStore from synapse.storage.database import DatabasePool from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore @@ -35,14 +35,13 @@ class CensorEventsStore(EventsWorkerStore, CacheInvalidationWorkerStore, SQLBase def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): super().__init__(database, db_conn, hs) - def _censor_redactions(): - return run_as_background_process( - "_censor_redactions", self._censor_redactions - ) - - if self.hs.config.redaction_retention_period is not None: - hs.get_clock().looping_call(_censor_redactions, 5 * 60 * 1000) + if ( + hs.config.run_background_tasks + and self.hs.config.redaction_retention_period is not None + ): + hs.get_clock().looping_call(self._censor_redactions, 5 * 60 * 1000) + @wrap_as_background_process("_censor_redactions") async def _censor_redactions(self): """Censors all redactions older than the configured period that haven't been censored yet. diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 2d0a6408b5..88fd97e1df 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -25,7 +25,7 @@ from synapse.logging.opentracing import ( trace, whitelisted_homeserver, ) -from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause from synapse.storage.database import ( DatabasePool, @@ -48,6 +48,14 @@ BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES = "remove_dup_outbound_pokes" class DeviceWorkerStore(SQLBaseStore): + def __init__(self, database: DatabasePool, db_conn, hs): + super().__init__(database, db_conn, hs) + + if hs.config.run_background_tasks: + self._clock.looping_call( + self._prune_old_outbound_device_pokes, 60 * 60 * 1000 + ) + async def get_device(self, user_id: str, device_id: str) -> Dict[str, Any]: """Retrieve a device. Only returns devices that are not marked as hidden. @@ -772,6 +780,98 @@ class DeviceWorkerStore(SQLBaseStore): ) return count >= 1 + @wrap_as_background_process("prune_old_outbound_device_pokes") + async def _prune_old_outbound_device_pokes( + self, prune_age: int = 24 * 60 * 60 * 1000 + ) -> None: + """Delete old entries out of the device_lists_outbound_pokes to ensure + that we don't fill up due to dead servers. + + Normally, we try to send device updates as a delta since a previous known point: + this is done by setting the prev_id in the m.device_list_update EDU. However, + for that to work, we have to have a complete record of each change to + each device, which can add up to quite a lot of data. + + An alternative mechanism is that, if the remote server sees that it has missed + an entry in the stream_id sequence for a given user, it will request a full + list of that user's devices. Hence, we can reduce the amount of data we have to + store (and transmit in some future transaction), by clearing almost everything + for a given destination out of the database, and having the remote server + resync. + + All we need to do is make sure we keep at least one row for each + (user, destination) pair, to remind us to send a m.device_list_update EDU for + that user when the destination comes back. It doesn't matter which device + we keep. + """ + yesterday = self._clock.time_msec() - prune_age + + def _prune_txn(txn): + # look for (user, destination) pairs which have an update older than + # the cutoff. + # + # For each pair, we also need to know the most recent stream_id, and + # an arbitrary device_id at that stream_id. + select_sql = """ + SELECT + dlop1.destination, + dlop1.user_id, + MAX(dlop1.stream_id) AS stream_id, + (SELECT MIN(dlop2.device_id) AS device_id FROM + device_lists_outbound_pokes dlop2 + WHERE dlop2.destination = dlop1.destination AND + dlop2.user_id=dlop1.user_id AND + dlop2.stream_id=MAX(dlop1.stream_id) + ) + FROM device_lists_outbound_pokes dlop1 + GROUP BY destination, user_id + HAVING min(ts) < ? AND count(*) > 1 + """ + + txn.execute(select_sql, (yesterday,)) + rows = txn.fetchall() + + if not rows: + return + + logger.info( + "Pruning old outbound device list updates for %i users/destinations: %s", + len(rows), + shortstr((row[0], row[1]) for row in rows), + ) + + # we want to keep the update with the highest stream_id for each user. + # + # there might be more than one update (with different device_ids) with the + # same stream_id, so we also delete all but one rows with the max stream id. + delete_sql = """ + DELETE FROM device_lists_outbound_pokes + WHERE destination = ? AND user_id = ? AND ( + stream_id < ? OR + (stream_id = ? AND device_id != ?) + ) + """ + count = 0 + for (destination, user_id, stream_id, device_id) in rows: + txn.execute( + delete_sql, (destination, user_id, stream_id, stream_id, device_id) + ) + count += txn.rowcount + + # Since we've deleted unsent deltas, we need to remove the entry + # of last successful sent so that the prev_ids are correctly set. + sql = """ + DELETE FROM device_lists_outbound_last_success + WHERE destination = ? AND user_id = ? + """ + txn.executemany(sql, ((row[0], row[1]) for row in rows)) + + logger.info("Pruned %d device list outbound pokes", count) + + await self.db_pool.runInteraction( + "_prune_old_outbound_device_pokes", _prune_txn, + ) + class DeviceBackgroundUpdateStore(SQLBaseStore): def __init__(self, database: DatabasePool, db_conn, hs): @@ -908,8 +1008,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): name="device_id_exists", keylen=2, max_entries=10000 ) - self._clock.looping_call(self._prune_old_outbound_device_pokes, 60 * 60 * 1000) - async def store_device( self, user_id: str, device_id: str, initial_device_display_name: Optional[str] ) -> bool: @@ -1267,95 +1365,3 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): for device_id in device_ids ], ) - - def _prune_old_outbound_device_pokes(self, prune_age: int = 24 * 60 * 60 * 1000): - """Delete old entries out of the device_lists_outbound_pokes to ensure - that we don't fill up due to dead servers. - - Normally, we try to send device updates as a delta since a previous known point: - this is done by setting the prev_id in the m.device_list_update EDU. However, - for that to work, we have to have a complete record of each change to - each device, which can add up to quite a lot of data. - - An alternative mechanism is that, if the remote server sees that it has missed - an entry in the stream_id sequence for a given user, it will request a full - list of that user's devices. Hence, we can reduce the amount of data we have to - store (and transmit in some future transaction), by clearing almost everything - for a given destination out of the database, and having the remote server - resync. - - All we need to do is make sure we keep at least one row for each - (user, destination) pair, to remind us to send a m.device_list_update EDU for - that user when the destination comes back. It doesn't matter which device - we keep. - """ - yesterday = self._clock.time_msec() - prune_age - - def _prune_txn(txn): - # look for (user, destination) pairs which have an update older than - # the cutoff. - # - # For each pair, we also need to know the most recent stream_id, and - # an arbitrary device_id at that stream_id. - select_sql = """ - SELECT - dlop1.destination, - dlop1.user_id, - MAX(dlop1.stream_id) AS stream_id, - (SELECT MIN(dlop2.device_id) AS device_id FROM - device_lists_outbound_pokes dlop2 - WHERE dlop2.destination = dlop1.destination AND - dlop2.user_id=dlop1.user_id AND - dlop2.stream_id=MAX(dlop1.stream_id) - ) - FROM device_lists_outbound_pokes dlop1 - GROUP BY destination, user_id - HAVING min(ts) < ? AND count(*) > 1 - """ - - txn.execute(select_sql, (yesterday,)) - rows = txn.fetchall() - - if not rows: - return - - logger.info( - "Pruning old outbound device list updates for %i users/destinations: %s", - len(rows), - shortstr((row[0], row[1]) for row in rows), - ) - - # we want to keep the update with the highest stream_id for each user. - # - # there might be more than one update (with different device_ids) with the - # same stream_id, so we also delete all but one rows with the max stream id. - delete_sql = """ - DELETE FROM device_lists_outbound_pokes - WHERE destination = ? AND user_id = ? AND ( - stream_id < ? OR - (stream_id = ? AND device_id != ?) - ) - """ - count = 0 - for (destination, user_id, stream_id, device_id) in rows: - txn.execute( - delete_sql, (destination, user_id, stream_id, stream_id, device_id) - ) - count += txn.rowcount - - # Since we've deleted unsent deltas, we need to remove the entry - # of last successful sent so that the prev_ids are correctly set. - sql = """ - DELETE FROM device_lists_outbound_last_success - WHERE destination = ? AND user_id = ? - """ - txn.executemany(sql, ((row[0], row[1]) for row in rows)) - - logger.info("Pruned %d device list outbound pokes", count) - - return run_as_background_process( - "prune_old_outbound_device_pokes", - self.db_pool.runInteraction, - "_prune_old_outbound_device_pokes", - _prune_txn, - ) diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 6d3689c09e..a6279a6c13 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -19,7 +19,7 @@ from typing import Dict, Iterable, List, Set, Tuple from synapse.api.errors import StoreError from synapse.events import EventBase -from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause from synapse.storage.database import DatabasePool, LoggingTransaction from synapse.storage.databases.main.events_worker import EventsWorkerStore @@ -32,6 +32,14 @@ logger = logging.getLogger(__name__) class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBaseStore): + def __init__(self, database: DatabasePool, db_conn, hs): + super().__init__(database, db_conn, hs) + + if hs.config.run_background_tasks: + hs.get_clock().looping_call( + self._delete_old_forward_extrem_cache, 60 * 60 * 1000 + ) + async def get_auth_chain( self, event_ids: Collection[str], include_given: bool = False ) -> List[EventBase]: @@ -586,6 +594,28 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas return [row["event_id"] for row in rows] + @wrap_as_background_process("delete_old_forward_extrem_cache") + async def _delete_old_forward_extrem_cache(self) -> None: + def _delete_old_forward_extrem_cache_txn(txn): + # Delete entries older than a month, while making sure we don't delete + # the only entries for a room. + sql = """ + DELETE FROM stream_ordering_to_exterm + WHERE + room_id IN ( + SELECT room_id + FROM stream_ordering_to_exterm + WHERE stream_ordering > ? + ) AND stream_ordering < ? + """ + txn.execute( + sql, (self.stream_ordering_month_ago, self.stream_ordering_month_ago) + ) + + await self.db_pool.runInteraction( + "_delete_old_forward_extrem_cache", _delete_old_forward_extrem_cache_txn, + ) + class EventFederationStore(EventFederationWorkerStore): """ Responsible for storing and serving up the various graphs associated @@ -606,34 +636,6 @@ class EventFederationStore(EventFederationWorkerStore): self.EVENT_AUTH_STATE_ONLY, self._background_delete_non_state_event_auth ) - hs.get_clock().looping_call( - self._delete_old_forward_extrem_cache, 60 * 60 * 1000 - ) - - def _delete_old_forward_extrem_cache(self): - def _delete_old_forward_extrem_cache_txn(txn): - # Delete entries older than a month, while making sure we don't delete - # the only entries for a room. - sql = """ - DELETE FROM stream_ordering_to_exterm - WHERE - room_id IN ( - SELECT room_id - FROM stream_ordering_to_exterm - WHERE stream_ordering > ? - ) AND stream_ordering < ? - """ - txn.execute( - sql, (self.stream_ordering_month_ago, self.stream_ordering_month_ago) - ) - - return run_as_background_process( - "delete_old_forward_extrem_cache", - self.db_pool.runInteraction, - "_delete_old_forward_extrem_cache", - _delete_old_forward_extrem_cache_txn, - ) - async def clean_room_for_join(self, room_id): return await self.db_pool.runInteraction( "clean_room_for_join", self._clean_room_for_join_txn, room_id diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 80f3b4d740..2e56dfaf31 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -13,15 +13,14 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging from typing import Dict, List, Optional, Tuple, Union import attr -from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.storage._base import SQLBaseStore, db_to_json -from synapse.storage.database import DatabasePool +from synapse.storage.database import DatabasePool, LoggingTransaction from synapse.util import json_encoder from synapse.util.caches.descriptors import cached @@ -81,8 +80,14 @@ class EventPushActionsWorkerStore(SQLBaseStore): self.find_stream_orderings_looping_call = self._clock.looping_call( self._find_stream_orderings_for_times, 10 * 60 * 1000 ) + self._rotate_delay = 3 self._rotate_count = 10000 + self._doing_notif_rotation = False + if hs.config.run_background_tasks: + self._rotate_notif_loop = self._clock.looping_call( + self._rotate_notifs, 30 * 60 * 1000 + ) @cached(num_args=3, tree=True, max_entries=5000) async def get_unread_event_push_actions_by_room_for_user( @@ -514,15 +519,14 @@ class EventPushActionsWorkerStore(SQLBaseStore): "Error removing push actions after event persistence failure" ) - def _find_stream_orderings_for_times(self): - return run_as_background_process( - "event_push_action_stream_orderings", - self.db_pool.runInteraction, + @wrap_as_background_process("event_push_action_stream_orderings") + async def _find_stream_orderings_for_times(self) -> None: + await self.db_pool.runInteraction( "_find_stream_orderings_for_times", self._find_stream_orderings_for_times_txn, ) - def _find_stream_orderings_for_times_txn(self, txn): + def _find_stream_orderings_for_times_txn(self, txn: LoggingTransaction) -> None: logger.info("Searching for stream ordering 1 month ago") self.stream_ordering_month_ago = self._find_first_stream_ordering_after_ts_txn( txn, self._clock.time_msec() - 30 * 24 * 60 * 60 * 1000 @@ -652,129 +656,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): ) return result[0] if result else None - -class EventPushActionsStore(EventPushActionsWorkerStore): - EPA_HIGHLIGHT_INDEX = "epa_highlight_index" - - def __init__(self, database: DatabasePool, db_conn, hs): - super().__init__(database, db_conn, hs) - - self.db_pool.updates.register_background_index_update( - self.EPA_HIGHLIGHT_INDEX, - index_name="event_push_actions_u_highlight", - table="event_push_actions", - columns=["user_id", "stream_ordering"], - ) - - self.db_pool.updates.register_background_index_update( - "event_push_actions_highlights_index", - index_name="event_push_actions_highlights_index", - table="event_push_actions", - columns=["user_id", "room_id", "topological_ordering", "stream_ordering"], - where_clause="highlight=1", - ) - - self._doing_notif_rotation = False - self._rotate_notif_loop = self._clock.looping_call( - self._start_rotate_notifs, 30 * 60 * 1000 - ) - - async def get_push_actions_for_user( - self, user_id, before=None, limit=50, only_highlight=False - ): - def f(txn): - before_clause = "" - if before: - before_clause = "AND epa.stream_ordering < ?" - args = [user_id, before, limit] - else: - args = [user_id, limit] - - if only_highlight: - if len(before_clause) > 0: - before_clause += " " - before_clause += "AND epa.highlight = 1" - - # NB. This assumes event_ids are globally unique since - # it makes the query easier to index - sql = ( - "SELECT epa.event_id, epa.room_id," - " epa.stream_ordering, epa.topological_ordering," - " epa.actions, epa.highlight, epa.profile_tag, e.received_ts" - " FROM event_push_actions epa, events e" - " WHERE epa.event_id = e.event_id" - " AND epa.user_id = ? %s" - " AND epa.notif = 1" - " ORDER BY epa.stream_ordering DESC" - " LIMIT ?" % (before_clause,) - ) - txn.execute(sql, args) - return self.db_pool.cursor_to_dict(txn) - - push_actions = await self.db_pool.runInteraction("get_push_actions_for_user", f) - for pa in push_actions: - pa["actions"] = _deserialize_action(pa["actions"], pa["highlight"]) - return push_actions - - async def get_latest_push_action_stream_ordering(self): - def f(txn): - txn.execute("SELECT MAX(stream_ordering) FROM event_push_actions") - return txn.fetchone() - - result = await self.db_pool.runInteraction( - "get_latest_push_action_stream_ordering", f - ) - return result[0] or 0 - - def _remove_old_push_actions_before_txn( - self, txn, room_id, user_id, stream_ordering - ): - """ - Purges old push actions for a user and room before a given - stream_ordering. - - We however keep a months worth of highlighted notifications, so that - users can still get a list of recent highlights. - - Args: - txn: The transcation - room_id: Room ID to delete from - user_id: user ID to delete for - stream_ordering: The lowest stream ordering which will - not be deleted. - """ - txn.call_after( - self.get_unread_event_push_actions_by_room_for_user.invalidate_many, - (room_id, user_id), - ) - - # We need to join on the events table to get the received_ts for - # event_push_actions and sqlite won't let us use a join in a delete so - # we can't just delete where received_ts < x. Furthermore we can - # only identify event_push_actions by a tuple of room_id, event_id - # we we can't use a subquery. - # Instead, we look up the stream ordering for the last event in that - # room received before the threshold time and delete event_push_actions - # in the room with a stream_odering before that. - txn.execute( - "DELETE FROM event_push_actions " - " WHERE user_id = ? AND room_id = ? AND " - " stream_ordering <= ?" - " AND ((stream_ordering < ? AND highlight = 1) or highlight = 0)", - (user_id, room_id, stream_ordering, self.stream_ordering_month_ago), - ) - - txn.execute( - """ - DELETE FROM event_push_summary - WHERE room_id = ? AND user_id = ? AND stream_ordering <= ? - """, - (room_id, user_id, stream_ordering), - ) - - def _start_rotate_notifs(self): - return run_as_background_process("rotate_notifs", self._rotate_notifs) - + @wrap_as_background_process("rotate_notifs") async def _rotate_notifs(self): if self._doing_notif_rotation or self.stream_ordering_day_ago is None: return @@ -954,6 +836,121 @@ class EventPushActionsStore(EventPushActionsWorkerStore): ) +class EventPushActionsStore(EventPushActionsWorkerStore): + EPA_HIGHLIGHT_INDEX = "epa_highlight_index" + + def __init__(self, database: DatabasePool, db_conn, hs): + super().__init__(database, db_conn, hs) + + self.db_pool.updates.register_background_index_update( + self.EPA_HIGHLIGHT_INDEX, + index_name="event_push_actions_u_highlight", + table="event_push_actions", + columns=["user_id", "stream_ordering"], + ) + + self.db_pool.updates.register_background_index_update( + "event_push_actions_highlights_index", + index_name="event_push_actions_highlights_index", + table="event_push_actions", + columns=["user_id", "room_id", "topological_ordering", "stream_ordering"], + where_clause="highlight=1", + ) + + async def get_push_actions_for_user( + self, user_id, before=None, limit=50, only_highlight=False + ): + def f(txn): + before_clause = "" + if before: + before_clause = "AND epa.stream_ordering < ?" + args = [user_id, before, limit] + else: + args = [user_id, limit] + + if only_highlight: + if len(before_clause) > 0: + before_clause += " " + before_clause += "AND epa.highlight = 1" + + # NB. This assumes event_ids are globally unique since + # it makes the query easier to index + sql = ( + "SELECT epa.event_id, epa.room_id," + " epa.stream_ordering, epa.topological_ordering," + " epa.actions, epa.highlight, epa.profile_tag, e.received_ts" + " FROM event_push_actions epa, events e" + " WHERE epa.event_id = e.event_id" + " AND epa.user_id = ? %s" + " AND epa.notif = 1" + " ORDER BY epa.stream_ordering DESC" + " LIMIT ?" % (before_clause,) + ) + txn.execute(sql, args) + return self.db_pool.cursor_to_dict(txn) + + push_actions = await self.db_pool.runInteraction("get_push_actions_for_user", f) + for pa in push_actions: + pa["actions"] = _deserialize_action(pa["actions"], pa["highlight"]) + return push_actions + + async def get_latest_push_action_stream_ordering(self): + def f(txn): + txn.execute("SELECT MAX(stream_ordering) FROM event_push_actions") + return txn.fetchone() + + result = await self.db_pool.runInteraction( + "get_latest_push_action_stream_ordering", f + ) + return result[0] or 0 + + def _remove_old_push_actions_before_txn( + self, txn, room_id, user_id, stream_ordering + ): + """ + Purges old push actions for a user and room before a given + stream_ordering. + + We however keep a months worth of highlighted notifications, so that + users can still get a list of recent highlights. + + Args: + txn: The transcation + room_id: Room ID to delete from + user_id: user ID to delete for + stream_ordering: The lowest stream ordering which will + not be deleted. + """ + txn.call_after( + self.get_unread_event_push_actions_by_room_for_user.invalidate_many, + (room_id, user_id), + ) + + # We need to join on the events table to get the received_ts for + # event_push_actions and sqlite won't let us use a join in a delete so + # we can't just delete where received_ts < x. Furthermore we can + # only identify event_push_actions by a tuple of room_id, event_id + # we we can't use a subquery. + # Instead, we look up the stream ordering for the last event in that + # room received before the threshold time and delete event_push_actions + # in the room with a stream_odering before that. + txn.execute( + "DELETE FROM event_push_actions " + " WHERE user_id = ? AND room_id = ? AND " + " stream_ordering <= ?" + " AND ((stream_ordering < ? AND highlight = 1) or highlight = 0)", + (user_id, room_id, stream_ordering, self.stream_ordering_month_ago), + ) + + txn.execute( + """ + DELETE FROM event_push_summary + WHERE room_id = ? AND user_id = ? AND stream_ordering <= ? + """, + (room_id, user_id, stream_ordering), + ) + + def _action_has_highlight(actions): for action in actions: try: diff --git a/synapse/storage/databases/main/monthly_active_users.py b/synapse/storage/databases/main/monthly_active_users.py index c66f558567..d788dc0fc6 100644 --- a/synapse/storage/databases/main/monthly_active_users.py +++ b/synapse/storage/databases/main/monthly_active_users.py @@ -15,6 +15,7 @@ import logging from typing import Dict, List +from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.storage._base import SQLBaseStore from synapse.storage.database import DatabasePool, make_in_list_sql_clause from synapse.util.caches.descriptors import cached @@ -127,6 +128,7 @@ class MonthlyActiveUsersWorkerStore(SQLBaseStore): desc="user_last_seen_monthly_active", ) + @wrap_as_background_process("reap_monthly_active_users") async def reap_monthly_active_users(self): """Cleans out monthly active user table to ensure that no stale entries exist. diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py index 7fd7b0b952..236d3cdbe3 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py @@ -20,10 +20,7 @@ from typing import Any, Dict, List, Optional, Tuple from synapse.api.constants import UserTypes from synapse.api.errors import Codes, StoreError, SynapseError, ThreepidValidationError -from synapse.metrics.background_process_metrics import ( - run_as_background_process, - wrap_as_background_process, -) +from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.storage._base import SQLBaseStore from synapse.storage.database import DatabasePool from synapse.storage.types import Cursor @@ -53,10 +50,7 @@ class RegistrationWorkerStore(SQLBaseStore): self._account_validity = hs.config.account_validity if hs.config.run_background_tasks and self._account_validity.enabled: self._clock.call_later( - 0.0, - run_as_background_process, - "account_validity_set_expiration_dates", - self._set_expiration_date_when_missing, + 0.0, self._set_expiration_date_when_missing, ) # Create a background job for culling expired 3PID validity tokens @@ -812,6 +806,7 @@ class RegistrationWorkerStore(SQLBaseStore): self.clock.time_msec(), ) + @wrap_as_background_process("account_validity_set_expiration_dates") async def _set_expiration_date_when_missing(self): """ Retrieves the list of registered users that don't have an expiration date, and -- cgit 1.5.1 From 921a3f8a59da0f8fe706a22627f464a74b54c992 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 14 Oct 2020 13:27:51 +0100 Subject: Fix not sending events over federation when using sharded event persisters (#8536) * Fix outbound federaion with multiple event persisters. We incorrectly notified federation senders that the minimum persisted stream position had advanced when we got an `RDATA` from an event persister. Notifying of federation senders already correctly happens in the notifier, so we just delete the offending line. * Change some interfaces to use RoomStreamToken. By enforcing use of `RoomStreamTokens` we make it less likely that people pass in random ints that they got from somewhere random. --- changelog.d/8536.bugfix | 1 + synapse/app/generic_worker.py | 4 ---- synapse/federation/send_queue.py | 2 +- synapse/federation/sender/__init__.py | 9 +++++++-- synapse/handlers/appservice.py | 11 +++++++---- synapse/notifier.py | 6 +++--- synapse/push/emailpusher.py | 8 +++++++- synapse/push/httppusher.py | 8 +++++++- synapse/push/pusherpool.py | 10 ++++++++-- tests/handlers/test_appservice.py | 13 ++++++++++--- 10 files changed, 51 insertions(+), 21 deletions(-) create mode 100644 changelog.d/8536.bugfix (limited to 'synapse/app') diff --git a/changelog.d/8536.bugfix b/changelog.d/8536.bugfix new file mode 100644 index 0000000000..8d238cc008 --- /dev/null +++ b/changelog.d/8536.bugfix @@ -0,0 +1 @@ +Fix not sending events over federation when using sharded event writers. diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index d53181deb1..1b511890aa 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -790,10 +790,6 @@ class FederationSenderHandler: send_queue.process_rows_for_federation(self.federation_sender, rows) await self.update_token(token) - # We also need to poke the federation sender when new events happen - elif stream_name == "events": - self.federation_sender.notify_new_events(token) - # ... and when new receipts happen elif stream_name == ReceiptsStream.NAME: await self._on_new_receipts(rows) diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 8e46957d15..5f1bf492c1 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -188,7 +188,7 @@ class FederationRemoteSendQueue: for key in keys[:i]: del self.edus[key] - def notify_new_events(self, current_id): + def notify_new_events(self, max_token): """As per FederationSender""" # We don't need to replicate this as it gets sent down a different # stream. diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index e33b29a42c..604cfd1935 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -40,7 +40,7 @@ from synapse.metrics import ( events_processed_counter, ) from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.types import ReadReceipt +from synapse.types import ReadReceipt, RoomStreamToken from synapse.util.metrics import Measure, measure_func logger = logging.getLogger(__name__) @@ -154,10 +154,15 @@ class FederationSender: self._per_destination_queues[destination] = queue return queue - def notify_new_events(self, current_id: int) -> None: + def notify_new_events(self, max_token: RoomStreamToken) -> None: """This gets called when we have some new events we might want to send out to other servers. """ + # We just use the minimum stream ordering and ignore the vector clock + # component. This is safe to do as long as we *always* ignore the vector + # clock components. + current_id = max_token.stream + self._last_poked_id = max(current_id, self._last_poked_id) if self._is_processing: diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 9d4e87dad6..c8d5e58035 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -27,6 +27,7 @@ from synapse.metrics import ( event_processing_loop_room_count, ) from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.types import RoomStreamToken from synapse.util.metrics import Measure logger = logging.getLogger(__name__) @@ -47,15 +48,17 @@ class ApplicationServicesHandler: self.current_max = 0 self.is_processing = False - async def notify_interested_services(self, current_id): + async def notify_interested_services(self, max_token: RoomStreamToken): """Notifies (pushes) all application services interested in this event. Pushing is done asynchronously, so this method won't block for any prolonged length of time. - - Args: - current_id(int): The current maximum ID. """ + # We just use the minimum stream ordering and ignore the vector clock + # component. This is safe to do as long as we *always* ignore the vector + # clock components. + current_id = max_token.stream + services = self.store.get_app_services() if not services or not self.notify_appservices: return diff --git a/synapse/notifier.py b/synapse/notifier.py index 13adeed01e..51c830c91e 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -319,19 +319,19 @@ class Notifier: ) if self.federation_sender: - self.federation_sender.notify_new_events(max_room_stream_token.stream) + self.federation_sender.notify_new_events(max_room_stream_token) async def _notify_app_services(self, max_room_stream_token: RoomStreamToken): try: await self.appservice_handler.notify_interested_services( - max_room_stream_token.stream + max_room_stream_token ) except Exception: logger.exception("Error notifying application services of event") async def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken): try: - await self._pusher_pool.on_new_notifications(max_room_stream_token.stream) + await self._pusher_pool.on_new_notifications(max_room_stream_token) except Exception: logger.exception("Error pusher pool of event") diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index 28bd8ab748..c6763971ee 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -18,6 +18,7 @@ import logging from twisted.internet.error import AlreadyCalled, AlreadyCancelled from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.types import RoomStreamToken logger = logging.getLogger(__name__) @@ -91,7 +92,12 @@ class EmailPusher: pass self.timed_call = None - def on_new_notifications(self, max_stream_ordering): + def on_new_notifications(self, max_token: RoomStreamToken): + # We just use the minimum stream ordering and ignore the vector clock + # component. This is safe to do as long as we *always* ignore the vector + # clock components. + max_stream_ordering = max_token.stream + if self.max_stream_ordering: self.max_stream_ordering = max( max_stream_ordering, self.max_stream_ordering diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 26706bf3e1..793d0db2d9 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -23,6 +23,7 @@ from synapse.api.constants import EventTypes from synapse.logging import opentracing from synapse.metrics.background_process_metrics import run_as_background_process from synapse.push import PusherConfigException +from synapse.types import RoomStreamToken from . import push_rule_evaluator, push_tools @@ -114,7 +115,12 @@ class HttpPusher: if should_check_for_notifs: self._start_processing() - def on_new_notifications(self, max_stream_ordering): + def on_new_notifications(self, max_token: RoomStreamToken): + # We just use the minimum stream ordering and ignore the vector clock + # component. This is safe to do as long as we *always* ignore the vector + # clock components. + max_stream_ordering = max_token.stream + self.max_stream_ordering = max( max_stream_ordering, self.max_stream_ordering or 0 ) diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 76150e117b..0080c68ce2 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -24,6 +24,7 @@ from synapse.push import PusherConfigException from synapse.push.emailpusher import EmailPusher from synapse.push.httppusher import HttpPusher from synapse.push.pusher import PusherFactory +from synapse.types import RoomStreamToken from synapse.util.async_helpers import concurrently_execute if TYPE_CHECKING: @@ -186,11 +187,16 @@ class PusherPool: ) await self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"]) - async def on_new_notifications(self, max_stream_id: int): + async def on_new_notifications(self, max_token: RoomStreamToken): if not self.pushers: # nothing to do here. return + # We just use the minimum stream ordering and ignore the vector clock + # component. This is safe to do as long as we *always* ignore the vector + # clock components. + max_stream_id = max_token.stream + if max_stream_id < self._last_room_stream_id_seen: # Nothing to do return @@ -214,7 +220,7 @@ class PusherPool: if u in self.pushers: for p in self.pushers[u].values(): - p.on_new_notifications(max_stream_id) + p.on_new_notifications(max_token) except Exception: logger.exception("Exception in pusher on_new_notifications") diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index 2a0b7c1b56..ee4f3da31c 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -18,6 +18,7 @@ from mock import Mock from twisted.internet import defer from synapse.handlers.appservice import ApplicationServicesHandler +from synapse.types import RoomStreamToken from tests.test_utils import make_awaitable from tests.utils import MockClock @@ -61,7 +62,9 @@ class AppServiceHandlerTestCase(unittest.TestCase): defer.succeed((0, [event])), defer.succeed((0, [])), ] - yield defer.ensureDeferred(self.handler.notify_interested_services(0)) + yield defer.ensureDeferred( + self.handler.notify_interested_services(RoomStreamToken(None, 0)) + ) self.mock_scheduler.submit_event_for_as.assert_called_once_with( interested_service, event ) @@ -80,7 +83,9 @@ class AppServiceHandlerTestCase(unittest.TestCase): defer.succeed((0, [event])), defer.succeed((0, [])), ] - yield defer.ensureDeferred(self.handler.notify_interested_services(0)) + yield defer.ensureDeferred( + self.handler.notify_interested_services(RoomStreamToken(None, 0)) + ) self.mock_as_api.query_user.assert_called_once_with(services[0], user_id) @defer.inlineCallbacks @@ -97,7 +102,9 @@ class AppServiceHandlerTestCase(unittest.TestCase): defer.succeed((0, [event])), defer.succeed((0, [])), ] - yield defer.ensureDeferred(self.handler.notify_interested_services(0)) + yield defer.ensureDeferred( + self.handler.notify_interested_services(RoomStreamToken(None, 0)) + ) self.assertFalse( self.mock_as_api.query_user.called, "query_user called when it shouldn't have been.", -- cgit 1.5.1 From fb56dfdccd129ecdc07effbb6e2e18d3b304d821 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Fri, 6 Nov 2020 11:42:07 +0000 Subject: Fix SIGHUP handler (#8697) Fixes: ``` builtins.TypeError: _reload_logging_config() takes 1 positional argument but 2 were given ``` --- changelog.d/8697.misc | 1 + synapse/app/_base.py | 5 ++--- 2 files changed, 3 insertions(+), 3 deletions(-) create mode 100644 changelog.d/8697.misc (limited to 'synapse/app') diff --git a/changelog.d/8697.misc b/changelog.d/8697.misc new file mode 100644 index 0000000000..7982a4e46d --- /dev/null +++ b/changelog.d/8697.misc @@ -0,0 +1 @@ + Re-organize the structured logging code to separate the TCP transport handling from the JSON formatting. diff --git a/synapse/app/_base.py b/synapse/app/_base.py index f6f7b2bf42..9c8dc785c6 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -49,7 +49,6 @@ def register_sighup(func, *args, **kwargs): Args: func (function): Function to be called when sent a SIGHUP signal. - Will be called with a single default argument, the homeserver. *args, **kwargs: args and kwargs to be passed to the target function. """ _sighup_callbacks.append((func, args, kwargs)) @@ -251,13 +250,13 @@ def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerConfig]): sdnotify(b"RELOADING=1") for i, args, kwargs in _sighup_callbacks: - i(hs, *args, **kwargs) + i(*args, **kwargs) sdnotify(b"READY=1") signal.signal(signal.SIGHUP, handle_sighup) - register_sighup(refresh_certificate) + register_sighup(refresh_certificate, hs) # Load the certificate from disk. refresh_certificate(hs) -- cgit 1.5.1 From 382b4e83f1e5f4c9c1e233193241db8eb6e6ffa4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 26 Nov 2020 11:18:10 +0000 Subject: Defer SIGHUP handlers to reactor. (#8817) We can get a SIGHUP at any point, including times where we are not in a sane state. By deferring calling the handlers until the next reactor tick we ensure that we don't get unexpected conflicts, e.g. trying to flush logs from the signal handler while the code was in the process of writing a log entry. Fixes #8769. --- changelog.d/8817.bugfix | 1 + synapse/app/_base.py | 10 +++++++++- 2 files changed, 10 insertions(+), 1 deletion(-) create mode 100644 changelog.d/8817.bugfix (limited to 'synapse/app') diff --git a/changelog.d/8817.bugfix b/changelog.d/8817.bugfix new file mode 100644 index 0000000000..e45dbd2ba4 --- /dev/null +++ b/changelog.d/8817.bugfix @@ -0,0 +1 @@ +Fix bug where logging could break after a call to SIGHUP. diff --git a/synapse/app/_base.py b/synapse/app/_base.py index 9c8dc785c6..895b38ae76 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -32,6 +32,7 @@ from synapse.app.phone_stats_home import start_phone_stats_home from synapse.config.server import ListenerConfig from synapse.crypto import context_factory from synapse.logging.context import PreserveLoggingContext +from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.util.async_helpers import Linearizer from synapse.util.daemonize import daemonize_process from synapse.util.rlimit import change_resource_limit @@ -244,6 +245,7 @@ def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerConfig]): # Set up the SIGHUP machinery. if hasattr(signal, "SIGHUP"): + @wrap_as_background_process("sighup") def handle_sighup(*args, **kwargs): # Tell systemd our state, if we're using it. This will silently fail if # we're not using systemd. @@ -254,7 +256,13 @@ def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerConfig]): sdnotify(b"READY=1") - signal.signal(signal.SIGHUP, handle_sighup) + # We defer running the sighup handlers until next reactor tick. This + # is so that we're in a sane state, e.g. flushing the logs may fail + # if the sighup happens in the middle of writing a log entry. + def run_sighup(*args, **kwargs): + hs.get_clock().call_later(0, handle_sighup, *args, **kwargs) + + signal.signal(signal.SIGHUP, run_sighup) register_sighup(refresh_certificate, hs) -- cgit 1.5.1