From 66f4949e7f57bad44c582098f702fda057c6c7ad Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 22 Feb 2021 21:14:42 +0000 Subject: Fix deleting pushers when using sharded pushers. (#9465) --- synapse/app/generic_worker.py | 3 -- synapse/push/httppusher.py | 3 +- synapse/push/pusherpool.py | 23 +++++++++--- synapse/replication/http/__init__.py | 2 + synapse/replication/http/push.py | 72 ++++++++++++++++++++++++++++++++++++ synapse/replication/tcp/commands.py | 27 -------------- synapse/replication/tcp/handler.py | 23 ------------ synapse/server.py | 3 -- 8 files changed, 93 insertions(+), 63 deletions(-) create mode 100644 synapse/replication/http/push.py (limited to 'synapse') diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 6526acb2f2..6ed405e66b 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -645,9 +645,6 @@ class GenericWorkerServer(HomeServer): self.get_tcp_replication().start_replication(self) - async def remove_pusher(self, app_id, push_key, user_id): - self.get_tcp_replication().send_remove_pusher(app_id, push_key, user_id) - @cache_in_self def get_replication_data_handler(self): return GenericWorkerReplicationHandler(self) diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index b9d3da2e0a..f4d7e199e9 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -74,6 +74,7 @@ class HttpPusher(Pusher): self.timed_call = None self._is_processing = False self._group_unread_count_by_room = hs.config.push_group_unread_count_by_room + self._pusherpool = hs.get_pusherpool() self.data = pusher_config.data if self.data is None: @@ -299,7 +300,7 @@ class HttpPusher(Pusher): ) else: logger.info("Pushkey %s was rejected: removing", pk) - await self.hs.remove_pusher(self.app_id, pk, self.user_id) + await self._pusherpool.remove_pusher(self.app_id, pk, self.user_id) return True async def _build_notification_dict( diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index ae1145be0e..3936bf8784 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -25,6 +25,7 @@ from synapse.metrics.background_process_metrics import ( ) from synapse.push import Pusher, PusherConfig, PusherConfigException from synapse.push.pusher import PusherFactory +from synapse.replication.http.push import ReplicationRemovePusherRestServlet from synapse.types import JsonDict, RoomStreamToken from synapse.util.async_helpers import concurrently_execute @@ -68,6 +69,13 @@ class PusherPool: self._pusher_shard_config = hs.config.push.pusher_shard_config self._instance_name = hs.get_instance_name() + # We can only delete pushers on master. + self._remove_pusher_client = None + if hs.config.worker.worker_app: + self._remove_pusher_client = ReplicationRemovePusherRestServlet.make_client( + hs + ) + # Record the last stream ID that we were poked about so we can get # changes since then. We set this to the current max stream ID on # startup as every individual pusher will have checked for changes on @@ -175,9 +183,6 @@ class PusherPool: user_id: user to remove pushers for access_tokens: access token *ids* to remove pushers for """ - if not self._pusher_shard_config.should_handle(self._instance_name, user_id): - return - tokens = set(access_tokens) for p in await self.store.get_pushers_by_user_id(user_id): if p.access_token in tokens: @@ -380,6 +385,12 @@ class PusherPool: synapse_pushers.labels(type(pusher).__name__, pusher.app_id).dec() - await self.store.delete_pusher_by_app_id_pushkey_user_id( - app_id, pushkey, user_id - ) + # We can only delete pushers on master. + if self._remove_pusher_client: + await self._remove_pusher_client( + app_id=app_id, pushkey=pushkey, user_id=user_id + ) + else: + await self.store.delete_pusher_by_app_id_pushkey_user_id( + app_id, pushkey, user_id + ) diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py index dd527e807f..cb4a52dbe9 100644 --- a/synapse/replication/http/__init__.py +++ b/synapse/replication/http/__init__.py @@ -21,6 +21,7 @@ from synapse.replication.http import ( login, membership, presence, + push, register, send_event, streams, @@ -42,6 +43,7 @@ class ReplicationRestResource(JsonResource): membership.register_servlets(hs, self) streams.register_servlets(hs, self) account_data.register_servlets(hs, self) + push.register_servlets(hs, self) # The following can't currently be instantiated on workers. if hs.config.worker.worker_app is None: diff --git a/synapse/replication/http/push.py b/synapse/replication/http/push.py new file mode 100644 index 0000000000..054ed64d34 --- /dev/null +++ b/synapse/replication/http/push.py @@ -0,0 +1,72 @@ +# -*- coding: utf-8 -*- +# Copyright 2021 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from typing import TYPE_CHECKING + +from synapse.http.servlet import parse_json_object_from_request +from synapse.replication.http._base import ReplicationEndpoint + +if TYPE_CHECKING: + from synapse.server import HomeServer + +logger = logging.getLogger(__name__) + + +class ReplicationRemovePusherRestServlet(ReplicationEndpoint): + """Deletes the given pusher. + + Request format: + + POST /_synapse/replication/remove_pusher/:user_id + + { + "app_id": "", + "pushkey": "" + } + + """ + + NAME = "add_user_account_data" + PATH_ARGS = ("user_id",) + CACHE = False + + def __init__(self, hs: "HomeServer"): + super().__init__(hs) + + self.pusher_pool = hs.get_pusherpool() + + @staticmethod + async def _serialize_payload(app_id, pushkey, user_id): + payload = { + "app_id": app_id, + "pushkey": pushkey, + } + + return payload + + async def _handle_request(self, request, user_id): + content = parse_json_object_from_request(request) + + app_id = content["app_id"] + pushkey = content["pushkey"] + + await self.pusher_pool.remove_pusher(app_id, pushkey, user_id) + + return 200, {} + + +def register_servlets(hs, http_server): + ReplicationRemovePusherRestServlet(hs).register(http_server) diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py index 0a9da79c32..bb447f75b4 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py @@ -325,31 +325,6 @@ class FederationAckCommand(Command): return "%s %s" % (self.instance_name, self.token) -class RemovePusherCommand(Command): - """Sent by the client to request the master remove the given pusher. - - Format:: - - REMOVE_PUSHER - """ - - NAME = "REMOVE_PUSHER" - - def __init__(self, app_id, push_key, user_id): - self.user_id = user_id - self.app_id = app_id - self.push_key = push_key - - @classmethod - def from_line(cls, line): - app_id, push_key, user_id = line.split(" ", 2) - - return cls(app_id, push_key, user_id) - - def to_line(self): - return " ".join((self.app_id, self.push_key, self.user_id)) - - class UserIpCommand(Command): """Sent periodically when a worker sees activity from a client. @@ -416,7 +391,6 @@ _COMMANDS = ( ReplicateCommand, UserSyncCommand, FederationAckCommand, - RemovePusherCommand, UserIpCommand, RemoteServerUpCommand, ClearUserSyncsCommand, @@ -443,7 +417,6 @@ VALID_CLIENT_COMMANDS = ( UserSyncCommand.NAME, ClearUserSyncsCommand.NAME, FederationAckCommand.NAME, - RemovePusherCommand.NAME, UserIpCommand.NAME, ErrorCommand.NAME, RemoteServerUpCommand.NAME, diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index d1d00c3717..a7245da152 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -44,7 +44,6 @@ from synapse.replication.tcp.commands import ( PositionCommand, RdataCommand, RemoteServerUpCommand, - RemovePusherCommand, ReplicateCommand, UserIpCommand, UserSyncCommand, @@ -373,23 +372,6 @@ class ReplicationCommandHandler: if self._federation_sender: self._federation_sender.federation_ack(cmd.instance_name, cmd.token) - def on_REMOVE_PUSHER( - self, conn: AbstractConnection, cmd: RemovePusherCommand - ) -> Optional[Awaitable[None]]: - remove_pusher_counter.inc() - - if self._is_master: - return self._handle_remove_pusher(cmd) - else: - return None - - async def _handle_remove_pusher(self, cmd: RemovePusherCommand): - await self._store.delete_pusher_by_app_id_pushkey_user_id( - app_id=cmd.app_id, pushkey=cmd.push_key, user_id=cmd.user_id - ) - - self._notifier.on_new_replication_data() - def on_USER_IP( self, conn: AbstractConnection, cmd: UserIpCommand ) -> Optional[Awaitable[None]]: @@ -684,11 +666,6 @@ class ReplicationCommandHandler: UserSyncCommand(instance_id, user_id, is_syncing, last_sync_ms) ) - def send_remove_pusher(self, app_id: str, push_key: str, user_id: str): - """Poke the master to remove a pusher for a user""" - cmd = RemovePusherCommand(app_id, push_key, user_id) - self.send_command(cmd) - def send_user_ip( self, user_id: str, diff --git a/synapse/server.py b/synapse/server.py index 6b3892e3cd..5de8782000 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -758,9 +758,6 @@ class HomeServer(metaclass=abc.ABCMeta): reconnect=True, ) - async def remove_pusher(self, app_id: str, push_key: str, user_id: str): - return await self.get_pusherpool().remove_pusher(app_id, push_key, user_id) - def should_send_federation(self) -> bool: "Should this server be sending federation traffic directly?" return self.config.send_federation and ( -- cgit 1.5.1 From 65a9eb899479e4c9d7de8b86780f68d478696908 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 23 Feb 2021 07:33:24 -0500 Subject: Include newly added sequences in the port DB script. (#9449) And ensure the consistency of `event_auth_chain_id`. --- changelog.d/9449.bugfix | 1 + scripts/synapse_port_db | 65 ++++++++++++++++++++------------ synapse/storage/databases/__init__.py | 2 +- synapse/storage/databases/main/events.py | 13 ++++++- 4 files changed, 55 insertions(+), 26 deletions(-) create mode 100644 changelog.d/9449.bugfix (limited to 'synapse') diff --git a/changelog.d/9449.bugfix b/changelog.d/9449.bugfix new file mode 100644 index 0000000000..54214a7e4a --- /dev/null +++ b/changelog.d/9449.bugfix @@ -0,0 +1 @@ +Fix a bug introduced in v1.26.0 where some sequences were not properly configured when running `synapse_port_db`. diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index 69bf9110a6..d2aaea08f5 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -22,7 +22,7 @@ import logging import sys import time import traceback -from typing import Dict, Optional, Set +from typing import Dict, Iterable, Optional, Set import yaml @@ -629,7 +629,13 @@ class Porter(object): await self._setup_state_group_id_seq() await self._setup_user_id_seq() await self._setup_events_stream_seqs() - await self._setup_device_inbox_seq() + await self._setup_sequence( + "device_inbox_sequence", ("device_inbox", "device_federation_outbox") + ) + await self._setup_sequence( + "account_data_sequence", ("room_account_data", "room_tags_revisions", "account_data")) + await self._setup_sequence("receipts_sequence", ("receipts_linearized", )) + await self._setup_auth_chain_sequence() # Step 3. Get tables. self.progress.set_state("Fetching tables") @@ -854,7 +860,7 @@ class Porter(object): return done, remaining + done - async def _setup_state_group_id_seq(self): + async def _setup_state_group_id_seq(self) -> None: curr_id = await self.sqlite_store.db_pool.simple_select_one_onecol( table="state_groups", keyvalues={}, retcol="MAX(id)", allow_none=True ) @@ -868,7 +874,7 @@ class Porter(object): await self.postgres_store.db_pool.runInteraction("setup_state_group_id_seq", r) - async def _setup_user_id_seq(self): + async def _setup_user_id_seq(self) -> None: curr_id = await self.sqlite_store.db_pool.runInteraction( "setup_user_id_seq", find_max_generated_user_id_localpart ) @@ -877,9 +883,9 @@ class Porter(object): next_id = curr_id + 1 txn.execute("ALTER SEQUENCE user_id_seq RESTART WITH %s", (next_id,)) - return self.postgres_store.db_pool.runInteraction("setup_user_id_seq", r) + await self.postgres_store.db_pool.runInteraction("setup_user_id_seq", r) - async def _setup_events_stream_seqs(self): + async def _setup_events_stream_seqs(self) -> None: """Set the event stream sequences to the correct values. """ @@ -908,35 +914,46 @@ class Porter(object): (curr_backward_id + 1,), ) - return await self.postgres_store.db_pool.runInteraction( + await self.postgres_store.db_pool.runInteraction( "_setup_events_stream_seqs", _setup_events_stream_seqs_set_pos, ) - async def _setup_device_inbox_seq(self): - """Set the device inbox sequence to the correct value. + async def _setup_sequence(self, sequence_name: str, stream_id_tables: Iterable[str]) -> None: + """Set a sequence to the correct value. """ - curr_local_id = await self.sqlite_store.db_pool.simple_select_one_onecol( - table="device_inbox", - keyvalues={}, - retcol="COALESCE(MAX(stream_id), 1)", - allow_none=True, - ) + current_stream_ids = [] + for stream_id_table in stream_id_tables: + max_stream_id = await self.sqlite_store.db_pool.simple_select_one_onecol( + table=stream_id_table, + keyvalues={}, + retcol="COALESCE(MAX(stream_id), 1)", + allow_none=True, + ) + current_stream_ids.append(max_stream_id) - curr_federation_id = await self.sqlite_store.db_pool.simple_select_one_onecol( - table="device_federation_outbox", - keyvalues={}, - retcol="COALESCE(MAX(stream_id), 1)", - allow_none=True, - ) + next_id = max(current_stream_ids) + 1 + + def r(txn): + sql = "ALTER SEQUENCE %s RESTART WITH" % (sequence_name, ) + txn.execute(sql + " %s", (next_id, )) - next_id = max(curr_local_id, curr_federation_id) + 1 + await self.postgres_store.db_pool.runInteraction("_setup_%s" % (sequence_name,), r) + + async def _setup_auth_chain_sequence(self) -> None: + curr_chain_id = await self.sqlite_store.db_pool.simple_select_one_onecol( + table="event_auth_chains", keyvalues={}, retcol="MAX(chain_id)", allow_none=True + ) def r(txn): txn.execute( - "ALTER SEQUENCE device_inbox_sequence RESTART WITH %s", (next_id,) + "ALTER SEQUENCE event_auth_chain_id RESTART WITH %s", + (curr_chain_id,), ) - return self.postgres_store.db_pool.runInteraction("_setup_device_inbox_seq", r) + await self.postgres_store.db_pool.runInteraction( + "_setup_event_auth_chain_id", r, + ) + ############################################## diff --git a/synapse/storage/databases/__init__.py b/synapse/storage/databases/__init__.py index e84f8b42f7..379c78bb83 100644 --- a/synapse/storage/databases/__init__.py +++ b/synapse/storage/databases/__init__.py @@ -79,7 +79,7 @@ class Databases: # If we're on a process that can persist events also # instantiate a `PersistEventsStore` if hs.get_instance_name() in hs.config.worker.writers.events: - persist_events = PersistEventsStore(hs, database, main) + persist_events = PersistEventsStore(hs, database, main, db_conn) if "state" in database_config.databases: logger.info( diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 287606cb4f..a7a11a5bc0 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -42,6 +42,7 @@ from synapse.logging.utils import log_function from synapse.storage._base import db_to_json, make_in_list_sql_clause from synapse.storage.database import DatabasePool, LoggingTransaction from synapse.storage.databases.main.search import SearchEntry +from synapse.storage.types import Connection from synapse.storage.util.id_generators import MultiWriterIdGenerator from synapse.types import StateMap, get_domain_from_id from synapse.util import json_encoder @@ -90,7 +91,11 @@ class PersistEventsStore: """ def __init__( - self, hs: "HomeServer", db: DatabasePool, main_data_store: "DataStore" + self, + hs: "HomeServer", + db: DatabasePool, + main_data_store: "DataStore", + db_conn: Connection, ): self.hs = hs self.db_pool = db @@ -109,6 +114,12 @@ class PersistEventsStore: ) # type: MultiWriterIdGenerator self._stream_id_gen = self.store._stream_id_gen # type: MultiWriterIdGenerator + # The consistency of this cannot be checked when the ID generator is + # created since the database might not yet be up-to-date. + self.db_pool.event_chain_id_gen.check_consistency( + db_conn, "event_auth_chains", "chain_id" # type: ignore + ) + # This should only exist on instances that are configured to write assert ( hs.get_instance_name() in hs.config.worker.writers.events -- cgit 1.5.1 From 713145d3de8699fde9d3baf0cf772dd46be0438e Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Tue, 23 Feb 2021 13:42:36 +0000 Subject: Add a comment about systemd-python. (#9464) This confused me for a while. --- changelog.d/9464.misc | 1 + synapse/python_dependencies.py | 3 +++ 2 files changed, 4 insertions(+) create mode 100644 changelog.d/9464.misc (limited to 'synapse') diff --git a/changelog.d/9464.misc b/changelog.d/9464.misc new file mode 100644 index 0000000000..39fcf85d40 --- /dev/null +++ b/changelog.d/9464.misc @@ -0,0 +1 @@ +Add a comment about systemd-python. diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index 8a2b73b75e..321a333820 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -106,6 +106,9 @@ CONDITIONAL_REQUIREMENTS = { "pysaml2>=4.5.0;python_version>='3.6'", ], "oidc": ["authlib>=0.14.0"], + # systemd-python is necessary for logging to the systemd journal via + # `systemd.journal.JournalHandler`, as is documented in + # `contrib/systemd/log_config.yaml`. "systemd": ["systemd-python>=231"], "url_preview": ["lxml>=3.5.0"], "sentry": ["sentry-sdk>=0.7.2"], -- cgit 1.5.1 From 0b5c967813a8e5f3338b6ecd5f3742e91b8a100b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 24 Feb 2021 10:13:53 +0000 Subject: Refactor to ensure we call check_consistency (#9470) The idea here is to stop people forgetting to call `check_consistency`. Folks can still just pass in `None` to the new args in `build_sequence_generator`, but hopefully they won't. --- changelog.d/9470.bugfix | 1 + synapse/storage/database.py | 16 ++++----------- synapse/storage/databases/main/events.py | 13 ++++++------ .../storage/databases/main/events_bg_updates.py | 1 + synapse/storage/databases/main/events_worker.py | 16 +++++++++++++++ synapse/storage/databases/main/registration.py | 19 ++++++++++++++--- synapse/storage/databases/state/store.py | 10 +++++---- synapse/storage/util/sequence.py | 24 ++++++++++++++++++++-- 8 files changed, 72 insertions(+), 28 deletions(-) create mode 100644 changelog.d/9470.bugfix (limited to 'synapse') diff --git a/changelog.d/9470.bugfix b/changelog.d/9470.bugfix new file mode 100644 index 0000000000..c1b7dbb17d --- /dev/null +++ b/changelog.d/9470.bugfix @@ -0,0 +1 @@ +Fix missing startup checks for the consistency of certain PostgreSQL sequences. diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 4646926449..f1ba529a2d 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -49,7 +49,6 @@ from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.background_updates import BackgroundUpdater from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine from synapse.storage.types import Connection, Cursor -from synapse.storage.util.sequence import build_sequence_generator from synapse.types import Collection # python 3 does not have a maximum int value @@ -381,7 +380,10 @@ class DatabasePool: _TXN_ID = 0 def __init__( - self, hs, database_config: DatabaseConnectionConfig, engine: BaseDatabaseEngine + self, + hs, + database_config: DatabaseConnectionConfig, + engine: BaseDatabaseEngine, ): self.hs = hs self._clock = hs.get_clock() @@ -420,16 +422,6 @@ class DatabasePool: self._check_safe_to_upsert, ) - # We define this sequence here so that it can be referenced from both - # the DataStore and PersistEventStore. - def get_chain_id_txn(txn): - txn.execute("SELECT COALESCE(max(chain_id), 0) FROM event_auth_chains") - return txn.fetchone()[0] - - self.event_chain_id_gen = build_sequence_generator( - engine, get_chain_id_txn, "event_auth_chain_id" - ) - def is_running(self) -> bool: """Is the database pool currently running""" return self._db_pool.running diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index a7a11a5bc0..cd1ceac50e 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -44,6 +44,7 @@ from synapse.storage.database import DatabasePool, LoggingTransaction from synapse.storage.databases.main.search import SearchEntry from synapse.storage.types import Connection from synapse.storage.util.id_generators import MultiWriterIdGenerator +from synapse.storage.util.sequence import SequenceGenerator from synapse.types import StateMap, get_domain_from_id from synapse.util import json_encoder from synapse.util.iterutils import batch_iter, sorted_topologically @@ -114,12 +115,6 @@ class PersistEventsStore: ) # type: MultiWriterIdGenerator self._stream_id_gen = self.store._stream_id_gen # type: MultiWriterIdGenerator - # The consistency of this cannot be checked when the ID generator is - # created since the database might not yet be up-to-date. - self.db_pool.event_chain_id_gen.check_consistency( - db_conn, "event_auth_chains", "chain_id" # type: ignore - ) - # This should only exist on instances that are configured to write assert ( hs.get_instance_name() in hs.config.worker.writers.events @@ -485,6 +480,7 @@ class PersistEventsStore: self._add_chain_cover_index( txn, self.db_pool, + self.store.event_chain_id_gen, event_to_room_id, event_to_types, event_to_auth_chain, @@ -495,6 +491,7 @@ class PersistEventsStore: cls, txn, db_pool: DatabasePool, + event_chain_id_gen: SequenceGenerator, event_to_room_id: Dict[str, str], event_to_types: Dict[str, Tuple[str, str]], event_to_auth_chain: Dict[str, List[str]], @@ -641,6 +638,7 @@ class PersistEventsStore: new_chain_tuples = cls._allocate_chain_ids( txn, db_pool, + event_chain_id_gen, event_to_room_id, event_to_types, event_to_auth_chain, @@ -779,6 +777,7 @@ class PersistEventsStore: def _allocate_chain_ids( txn, db_pool: DatabasePool, + event_chain_id_gen: SequenceGenerator, event_to_room_id: Dict[str, str], event_to_types: Dict[str, Tuple[str, str]], event_to_auth_chain: Dict[str, List[str]], @@ -891,7 +890,7 @@ class PersistEventsStore: chain_to_max_seq_no[new_chain_tuple[0]] = new_chain_tuple[1] # Generate new chain IDs for all unallocated chain IDs. - newly_allocated_chain_ids = db_pool.event_chain_id_gen.get_next_mult_txn( + newly_allocated_chain_ids = event_chain_id_gen.get_next_mult_txn( txn, len(unallocated_chain_ids) ) diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index 89274e75f7..c1626ccf28 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -917,6 +917,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): PersistEventsStore._add_chain_cover_index( txn, self.db_pool, + self.event_chain_id_gen, event_to_room_id, event_to_types, event_to_auth_chain, diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index c8850a4707..edbe42f2bf 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -45,6 +45,7 @@ from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_cla from synapse.storage.database import DatabasePool from synapse.storage.engines import PostgresEngine from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator +from synapse.storage.util.sequence import build_sequence_generator from synapse.types import Collection, JsonDict, get_domain_from_id from synapse.util.caches.descriptors import cached from synapse.util.caches.lrucache import LruCache @@ -156,6 +157,21 @@ class EventsWorkerStore(SQLBaseStore): self._event_fetch_list = [] self._event_fetch_ongoing = 0 + # We define this sequence here so that it can be referenced from both + # the DataStore and PersistEventStore. + def get_chain_id_txn(txn): + txn.execute("SELECT COALESCE(max(chain_id), 0) FROM event_auth_chains") + return txn.fetchone()[0] + + self.event_chain_id_gen = build_sequence_generator( + db_conn, + database.engine, + get_chain_id_txn, + "event_auth_chain_id", + table="event_auth_chains", + id_column="chain_id", + ) + def process_replication_rows(self, stream_name, instance_name, token, rows): if stream_name == EventsStream.NAME: self._stream_id_gen.advance(instance_name, token) diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py index d5b5507815..61a7556e56 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py @@ -23,7 +23,7 @@ import attr from synapse.api.constants import UserTypes from synapse.api.errors import Codes, StoreError, SynapseError, ThreepidValidationError from synapse.metrics.background_process_metrics import wrap_as_background_process -from synapse.storage.database import DatabasePool +from synapse.storage.database import DatabasePool, LoggingDatabaseConnection from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore from synapse.storage.databases.main.stats import StatsStore from synapse.storage.types import Connection, Cursor @@ -70,7 +70,12 @@ class TokenLookupResult: class RegistrationWorkerStore(CacheInvalidationWorkerStore): - def __init__(self, database: DatabasePool, db_conn: Connection, hs: "HomeServer"): + def __init__( + self, + database: DatabasePool, + db_conn: LoggingDatabaseConnection, + hs: "HomeServer", + ): super().__init__(database, db_conn, hs) self.config = hs.config @@ -79,9 +84,12 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): # call `find_max_generated_user_id_localpart` each time, which is # expensive if there are many entries. self._user_id_seq = build_sequence_generator( + db_conn, database.engine, find_max_generated_user_id_localpart, "user_id_seq", + table=None, + id_column=None, ) self._account_validity = hs.config.account_validity @@ -1036,7 +1044,12 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): class RegistrationBackgroundUpdateStore(RegistrationWorkerStore): - def __init__(self, database: DatabasePool, db_conn: Connection, hs: "HomeServer"): + def __init__( + self, + database: DatabasePool, + db_conn: LoggingDatabaseConnection, + hs: "HomeServer", + ): super().__init__(database, db_conn, hs) self._clock = hs.get_clock() diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py index b16b9905d8..e2240703a7 100644 --- a/synapse/storage/databases/state/store.py +++ b/synapse/storage/databases/state/store.py @@ -97,10 +97,12 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): return txn.fetchone()[0] self._state_group_seq_gen = build_sequence_generator( - self.database_engine, get_max_state_group_txn, "state_group_id_seq" - ) - self._state_group_seq_gen.check_consistency( - db_conn, table="state_groups", id_column="id" + db_conn, + self.database_engine, + get_max_state_group_txn, + "state_group_id_seq", + table="state_groups", + id_column="id", ) @cached(max_entries=10000, iterable=True) diff --git a/synapse/storage/util/sequence.py b/synapse/storage/util/sequence.py index 3ea637b281..36a67e7019 100644 --- a/synapse/storage/util/sequence.py +++ b/synapse/storage/util/sequence.py @@ -251,9 +251,14 @@ class LocalSequenceGenerator(SequenceGenerator): def build_sequence_generator( + db_conn: "LoggingDatabaseConnection", database_engine: BaseDatabaseEngine, get_first_callback: GetFirstCallbackType, sequence_name: str, + table: Optional[str], + id_column: Optional[str], + stream_name: Optional[str] = None, + positive: bool = True, ) -> SequenceGenerator: """Get the best impl of SequenceGenerator available @@ -265,8 +270,23 @@ def build_sequence_generator( get_first_callback: a callback which gets the next sequence ID. Used if we're on sqlite. sequence_name: the name of a postgres sequence to use. + table, id_column, stream_name, positive: If set then `check_consistency` + is called on the created sequence. See docstring for + `check_consistency` details. """ if isinstance(database_engine, PostgresEngine): - return PostgresSequenceGenerator(sequence_name) + seq = PostgresSequenceGenerator(sequence_name) # type: SequenceGenerator else: - return LocalSequenceGenerator(get_first_callback) + seq = LocalSequenceGenerator(get_first_callback) + + if table: + assert id_column + seq.check_consistency( + db_conn=db_conn, + table=table, + id_column=id_column, + stream_name=stream_name, + positive=positive, + ) + + return seq -- cgit 1.5.1 From 292792194232e68eb68c5135bb59d037d38b870b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 24 Feb 2021 13:23:18 +0000 Subject: Clean up `ShardedWorkerHandlingConfig` (#9466) * Split ShardedWorkerHandlingConfig This is so that we have a type level understanding of when it is safe to call `get_instance(..)` (as opposed to `should_handle(..)`). * Remove special cases in ShardedWorkerHandlingConfig. `ShardedWorkerHandlingConfig` tried to handle the various different ways it was possible to configure federation senders and pushers. This led to special cases that weren't hit during testing. To fix this the handling of the different cases is moved from there and `generic_worker` into the worker config class. This allows us to have the logic in one place and allows the rest of the code to ignore the different cases. --- changelog.d/9466.bugfix | 1 + synapse/app/admin_cmd.py | 2 + synapse/app/generic_worker.py | 32 -------- synapse/config/_base.py | 36 ++++++--- synapse/config/_base.pyi | 2 + synapse/config/push.py | 5 +- synapse/config/server.py | 1 - synapse/config/workers.py | 93 +++++++++++++++++++++-- synapse/push/pusherpool.py | 4 +- synapse/server.py | 7 +- tests/replication/tcp/streams/test_federation.py | 2 +- tests/replication/test_federation_ack.py | 2 +- tests/replication/test_federation_sender_shard.py | 2 +- tests/replication/test_pusher_shard.py | 2 +- 14 files changed, 128 insertions(+), 63 deletions(-) create mode 100644 changelog.d/9466.bugfix (limited to 'synapse') diff --git a/changelog.d/9466.bugfix b/changelog.d/9466.bugfix new file mode 100644 index 0000000000..2ab4f315c1 --- /dev/null +++ b/changelog.d/9466.bugfix @@ -0,0 +1 @@ +Fix deleting pushers when using sharded pushers. diff --git a/synapse/app/admin_cmd.py b/synapse/app/admin_cmd.py index b4bd4d8e7a..9f99651aa2 100644 --- a/synapse/app/admin_cmd.py +++ b/synapse/app/admin_cmd.py @@ -210,7 +210,9 @@ def start(config_options): config.update_user_directory = False config.run_background_tasks = False config.start_pushers = False + config.pusher_shard_config.instances = [] config.send_federation = False + config.federation_shard_config.instances = [] synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 6ed405e66b..dc0d3eb725 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -919,22 +919,6 @@ def start(config_options): # For other worker types we force this to off. config.appservice.notify_appservices = False - if config.worker_app == "synapse.app.pusher": - if config.server.start_pushers: - sys.stderr.write( - "\nThe pushers must be disabled in the main synapse process" - "\nbefore they can be run in a separate worker." - "\nPlease add ``start_pushers: false`` to the main config" - "\n" - ) - sys.exit(1) - - # Force the pushers to start since they will be disabled in the main config - config.server.start_pushers = True - else: - # For other worker types we force this to off. - config.server.start_pushers = False - if config.worker_app == "synapse.app.user_dir": if config.server.update_user_directory: sys.stderr.write( @@ -951,22 +935,6 @@ def start(config_options): # For other worker types we force this to off. config.server.update_user_directory = False - if config.worker_app == "synapse.app.federation_sender": - if config.worker.send_federation: - sys.stderr.write( - "\nThe send_federation must be disabled in the main synapse process" - "\nbefore they can be run in a separate worker." - "\nPlease add ``send_federation: false`` to the main config" - "\n" - ) - sys.exit(1) - - # Force the pushers to start since they will be disabled in the main config - config.worker.send_federation = True - else: - # For other worker types we force this to off. - config.worker.send_federation = False - synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts hs = GenericWorkerServer( diff --git a/synapse/config/_base.py b/synapse/config/_base.py index e89decda34..4026966711 100644 --- a/synapse/config/_base.py +++ b/synapse/config/_base.py @@ -844,22 +844,23 @@ class ShardedWorkerHandlingConfig: def should_handle(self, instance_name: str, key: str) -> bool: """Whether this instance is responsible for handling the given key.""" - # If multiple instances are not defined we always return true - if not self.instances or len(self.instances) == 1: - return True + # If no instances are defined we assume some other worker is handling + # this. + if not self.instances: + return False - return self.get_instance(key) == instance_name + return self._get_instance(key) == instance_name - def get_instance(self, key: str) -> str: + def _get_instance(self, key: str) -> str: """Get the instance responsible for handling the given key. - Note: For things like federation sending the config for which instance - is sending is known only to the sender instance if there is only one. - Therefore `should_handle` should be used where possible. + Note: For federation sending and pushers the config for which instance + is sending is known only to the sender instance, so we don't expose this + method by default. """ if not self.instances: - return "master" + raise Exception("Unknown worker") if len(self.instances) == 1: return self.instances[0] @@ -876,4 +877,21 @@ class ShardedWorkerHandlingConfig: return self.instances[remainder] +@attr.s +class RoutableShardedWorkerHandlingConfig(ShardedWorkerHandlingConfig): + """A version of `ShardedWorkerHandlingConfig` that is used for config + options where all instances know which instances are responsible for the + sharded work. + """ + + def __attrs_post_init__(self): + # We require that `self.instances` is non-empty. + if not self.instances: + raise Exception("Got empty list of instances for shard config") + + def get_instance(self, key: str) -> str: + """Get the instance responsible for handling the given key.""" + return self._get_instance(key) + + __all__ = ["Config", "RootConfig", "ShardedWorkerHandlingConfig"] diff --git a/synapse/config/_base.pyi b/synapse/config/_base.pyi index 70025b5d60..db16c86f50 100644 --- a/synapse/config/_base.pyi +++ b/synapse/config/_base.pyi @@ -149,4 +149,6 @@ class ShardedWorkerHandlingConfig: instances: List[str] def __init__(self, instances: List[str]) -> None: ... def should_handle(self, instance_name: str, key: str) -> bool: ... + +class RoutableShardedWorkerHandlingConfig(ShardedWorkerHandlingConfig): def get_instance(self, key: str) -> str: ... diff --git a/synapse/config/push.py b/synapse/config/push.py index 3adbfb73e6..7831a2ef79 100644 --- a/synapse/config/push.py +++ b/synapse/config/push.py @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import Config, ShardedWorkerHandlingConfig +from ._base import Config class PushConfig(Config): @@ -27,9 +27,6 @@ class PushConfig(Config): "group_unread_count_by_room", True ) - pusher_instances = config.get("pusher_instances") or [] - self.pusher_shard_config = ShardedWorkerHandlingConfig(pusher_instances) - # There was a a 'redact_content' setting but mistakenly read from the # 'email'section'. Check for the flag in the 'push' section, and log, # but do not honour it to avoid nasty surprises when people upgrade. diff --git a/synapse/config/server.py b/synapse/config/server.py index 0bfd4398e2..2afca36e7d 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -397,7 +397,6 @@ class ServerConfig(Config): if self.public_baseurl is not None: if self.public_baseurl[-1] != "/": self.public_baseurl += "/" - self.start_pushers = config.get("start_pushers", True) # (undocumented) option for torturing the worker-mode replication a bit, # for testing. The value defines the number of milliseconds to pause before diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 7a0ca16da8..ac92375a85 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -17,9 +17,28 @@ from typing import List, Union import attr -from ._base import Config, ConfigError, ShardedWorkerHandlingConfig +from ._base import ( + Config, + ConfigError, + RoutableShardedWorkerHandlingConfig, + ShardedWorkerHandlingConfig, +) from .server import ListenerConfig, parse_listener_def +_FEDERATION_SENDER_WITH_SEND_FEDERATION_ENABLED_ERROR = """ +The send_federation config option must be disabled in the main +synapse process before they can be run in a separate worker. + +Please add ``send_federation: false`` to the main config +""" + +_PUSHER_WITH_START_PUSHERS_ENABLED_ERROR = """ +The start_pushers config option must be disabled in the main +synapse process before they can be run in a separate worker. + +Please add ``start_pushers: false`` to the main config +""" + def _instance_to_list_converter(obj: Union[str, List[str]]) -> List[str]: """Helper for allowing parsing a string or list of strings to a config @@ -103,6 +122,7 @@ class WorkerConfig(Config): self.worker_replication_secret = config.get("worker_replication_secret", None) self.worker_name = config.get("worker_name", self.worker_app) + self.instance_name = self.worker_name or "master" self.worker_main_http_uri = config.get("worker_main_http_uri", None) @@ -118,12 +138,41 @@ class WorkerConfig(Config): ) ) - # Whether to send federation traffic out in this process. This only - # applies to some federation traffic, and so shouldn't be used to - # "disable" federation - self.send_federation = config.get("send_federation", True) + # Handle federation sender configuration. + # + # There are two ways of configuring which instances handle federation + # sending: + # 1. The old way where "send_federation" is set to false and running a + # `synapse.app.federation_sender` worker app. + # 2. Specifying the workers sending federation in + # `federation_sender_instances`. + # + + send_federation = config.get("send_federation", True) + + federation_sender_instances = config.get("federation_sender_instances") + if federation_sender_instances is None: + # Default to an empty list, which means "another, unknown, worker is + # responsible for it". + federation_sender_instances = [] - federation_sender_instances = config.get("federation_sender_instances") or [] + # If no federation sender instances are set we check if + # `send_federation` is set, which means use master + if send_federation: + federation_sender_instances = ["master"] + + if self.worker_app == "synapse.app.federation_sender": + if send_federation: + # If we're running federation senders, and not using + # `federation_sender_instances`, then we should have + # explicitly set `send_federation` to false. + raise ConfigError( + _FEDERATION_SENDER_WITH_SEND_FEDERATION_ENABLED_ERROR + ) + + federation_sender_instances = [self.worker_name] + + self.send_federation = self.instance_name in federation_sender_instances self.federation_shard_config = ShardedWorkerHandlingConfig( federation_sender_instances ) @@ -164,7 +213,37 @@ class WorkerConfig(Config): "Must only specify one instance to handle `receipts` messages." ) - self.events_shard_config = ShardedWorkerHandlingConfig(self.writers.events) + if len(self.writers.events) == 0: + raise ConfigError("Must specify at least one instance to handle `events`.") + + self.events_shard_config = RoutableShardedWorkerHandlingConfig( + self.writers.events + ) + + # Handle sharded push + start_pushers = config.get("start_pushers", True) + pusher_instances = config.get("pusher_instances") + if pusher_instances is None: + # Default to an empty list, which means "another, unknown, worker is + # responsible for it". + pusher_instances = [] + + # If no pushers instances are set we check if `start_pushers` is + # set, which means use master + if start_pushers: + pusher_instances = ["master"] + + if self.worker_app == "synapse.app.pusher": + if start_pushers: + # If we're running pushers, and not using + # `pusher_instances`, then we should have explicitly set + # `start_pushers` to false. + raise ConfigError(_PUSHER_WITH_START_PUSHERS_ENABLED_ERROR) + + pusher_instances = [self.instance_name] + + self.start_pushers = self.instance_name in pusher_instances + self.pusher_shard_config = ShardedWorkerHandlingConfig(pusher_instances) # Whether this worker should run background tasks or not. # diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 3936bf8784..21f14f05f0 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -59,7 +59,6 @@ class PusherPool: def __init__(self, hs: "HomeServer"): self.hs = hs self.pusher_factory = PusherFactory(hs) - self._should_start_pushers = hs.config.start_pushers self.store = self.hs.get_datastore() self.clock = self.hs.get_clock() @@ -68,6 +67,9 @@ class PusherPool: # We shard the handling of push notifications by user ID. self._pusher_shard_config = hs.config.push.pusher_shard_config self._instance_name = hs.get_instance_name() + self._should_start_pushers = ( + self._instance_name in self._pusher_shard_config.instances + ) # We can only delete pushers on master. self._remove_pusher_client = None diff --git a/synapse/server.py b/synapse/server.py index 5de8782000..4b9ec7f0ae 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -248,7 +248,7 @@ class HomeServer(metaclass=abc.ABCMeta): self.start_time = None # type: Optional[int] self._instance_id = random_string(5) - self._instance_name = config.worker_name or "master" + self._instance_name = config.worker.instance_name self.version_string = version_string @@ -760,7 +760,4 @@ class HomeServer(metaclass=abc.ABCMeta): def should_send_federation(self) -> bool: "Should this server be sending federation traffic directly?" - return self.config.send_federation and ( - not self.config.worker_app - or self.config.worker_app == "synapse.app.federation_sender" - ) + return self.config.send_federation diff --git a/tests/replication/tcp/streams/test_federation.py b/tests/replication/tcp/streams/test_federation.py index 2babea4e3e..aa4bf1c7e3 100644 --- a/tests/replication/tcp/streams/test_federation.py +++ b/tests/replication/tcp/streams/test_federation.py @@ -24,7 +24,7 @@ class FederationStreamTestCase(BaseStreamTestCase): # enable federation sending on the worker config = super()._get_worker_hs_config() # TODO: make it so we don't need both of these - config["send_federation"] = True + config["send_federation"] = False config["worker_app"] = "synapse.app.federation_sender" return config diff --git a/tests/replication/test_federation_ack.py b/tests/replication/test_federation_ack.py index 1853667558..f235f1bd83 100644 --- a/tests/replication/test_federation_ack.py +++ b/tests/replication/test_federation_ack.py @@ -27,7 +27,7 @@ class FederationAckTestCase(HomeserverTestCase): def default_config(self) -> dict: config = super().default_config() config["worker_app"] = "synapse.app.federation_sender" - config["send_federation"] = True + config["send_federation"] = False return config def make_homeserver(self, reactor, clock): diff --git a/tests/replication/test_federation_sender_shard.py b/tests/replication/test_federation_sender_shard.py index fffdb742c8..2f2d117858 100644 --- a/tests/replication/test_federation_sender_shard.py +++ b/tests/replication/test_federation_sender_shard.py @@ -49,7 +49,7 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase): self.make_worker_hs( "synapse.app.federation_sender", - {"send_federation": True}, + {"send_federation": False}, federation_http_client=mock_client, ) diff --git a/tests/replication/test_pusher_shard.py b/tests/replication/test_pusher_shard.py index f118fe32af..ab2988a6ba 100644 --- a/tests/replication/test_pusher_shard.py +++ b/tests/replication/test_pusher_shard.py @@ -95,7 +95,7 @@ class PusherShardTestCase(BaseMultiWorkerStreamTestCase): self.make_worker_hs( "synapse.app.pusher", - {"start_pushers": True}, + {"start_pushers": False}, proxied_blacklisted_http_client=http_client_mock, ) -- cgit 1.5.1