diff --git a/changelog.d/9449.bugfix b/changelog.d/9449.bugfix
new file mode 100644
index 0000000000..54214a7e4a
--- /dev/null
+++ b/changelog.d/9449.bugfix
@@ -0,0 +1 @@
+Fix a bug introduced in v1.26.0 where some sequences were not properly configured when running `synapse_port_db`.
diff --git a/changelog.d/9464.misc b/changelog.d/9464.misc
new file mode 100644
index 0000000000..39fcf85d40
--- /dev/null
+++ b/changelog.d/9464.misc
@@ -0,0 +1 @@
+Add a comment about systemd-python.
diff --git a/changelog.d/9465.bugfix b/changelog.d/9465.bugfix
new file mode 100644
index 0000000000..2ab4f315c1
--- /dev/null
+++ b/changelog.d/9465.bugfix
@@ -0,0 +1 @@
+Fix deleting pushers when using sharded pushers.
diff --git a/changelog.d/9466.bugfix b/changelog.d/9466.bugfix
new file mode 100644
index 0000000000..2ab4f315c1
--- /dev/null
+++ b/changelog.d/9466.bugfix
@@ -0,0 +1 @@
+Fix deleting pushers when using sharded pushers.
diff --git a/changelog.d/9470.bugfix b/changelog.d/9470.bugfix
new file mode 100644
index 0000000000..c1b7dbb17d
--- /dev/null
+++ b/changelog.d/9470.bugfix
@@ -0,0 +1 @@
+Fix missing startup checks for the consistency of certain PostgreSQL sequences.
diff --git a/docs/tcp_replication.md b/docs/tcp_replication.md
index ad145439b4..15df949deb 100644
--- a/docs/tcp_replication.md
+++ b/docs/tcp_replication.md
@@ -220,10 +220,6 @@ Asks the server for the current position of all streams.
Acknowledge receipt of some federation data
-#### REMOVE_PUSHER (C)
-
- Inform the server a pusher should be removed
-
### REMOTE_SERVER_UP (S, C)
Inform other processes that a remote server may have come back online.
diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db
index 00d638eb9a..83c53d9887 100755
--- a/scripts/synapse_port_db
+++ b/scripts/synapse_port_db
@@ -22,7 +22,7 @@ import logging
import sys
import time
import traceback
-from typing import Dict, Optional, Set
+from typing import Dict, Iterable, Optional, Set
import yaml
@@ -631,7 +631,13 @@ class Porter(object):
await self._setup_state_group_id_seq()
await self._setup_user_id_seq()
await self._setup_events_stream_seqs()
- await self._setup_device_inbox_seq()
+ await self._setup_sequence(
+ "device_inbox_sequence", ("device_inbox", "device_federation_outbox")
+ )
+ await self._setup_sequence(
+ "account_data_sequence", ("room_account_data", "room_tags_revisions", "account_data"))
+ await self._setup_sequence("receipts_sequence", ("receipts_linearized", ))
+ await self._setup_auth_chain_sequence()
# Step 3. Get tables.
self.progress.set_state("Fetching tables")
@@ -856,7 +862,7 @@ class Porter(object):
return done, remaining + done
- async def _setup_state_group_id_seq(self):
+ async def _setup_state_group_id_seq(self) -> None:
curr_id = await self.sqlite_store.db_pool.simple_select_one_onecol(
table="state_groups", keyvalues={}, retcol="MAX(id)", allow_none=True
)
@@ -870,7 +876,7 @@ class Porter(object):
await self.postgres_store.db_pool.runInteraction("setup_state_group_id_seq", r)
- async def _setup_user_id_seq(self):
+ async def _setup_user_id_seq(self) -> None:
curr_id = await self.sqlite_store.db_pool.runInteraction(
"setup_user_id_seq", find_max_generated_user_id_localpart
)
@@ -879,9 +885,9 @@ class Porter(object):
next_id = curr_id + 1
txn.execute("ALTER SEQUENCE user_id_seq RESTART WITH %s", (next_id,))
- return self.postgres_store.db_pool.runInteraction("setup_user_id_seq", r)
+ await self.postgres_store.db_pool.runInteraction("setup_user_id_seq", r)
- async def _setup_events_stream_seqs(self):
+ async def _setup_events_stream_seqs(self) -> None:
"""Set the event stream sequences to the correct values.
"""
@@ -910,35 +916,46 @@ class Porter(object):
(curr_backward_id + 1,),
)
- return await self.postgres_store.db_pool.runInteraction(
+ await self.postgres_store.db_pool.runInteraction(
"_setup_events_stream_seqs", _setup_events_stream_seqs_set_pos,
)
- async def _setup_device_inbox_seq(self):
- """Set the device inbox sequence to the correct value.
+ async def _setup_sequence(self, sequence_name: str, stream_id_tables: Iterable[str]) -> None:
+ """Set a sequence to the correct value.
"""
- curr_local_id = await self.sqlite_store.db_pool.simple_select_one_onecol(
- table="device_inbox",
- keyvalues={},
- retcol="COALESCE(MAX(stream_id), 1)",
- allow_none=True,
- )
+ current_stream_ids = []
+ for stream_id_table in stream_id_tables:
+ max_stream_id = await self.sqlite_store.db_pool.simple_select_one_onecol(
+ table=stream_id_table,
+ keyvalues={},
+ retcol="COALESCE(MAX(stream_id), 1)",
+ allow_none=True,
+ )
+ current_stream_ids.append(max_stream_id)
- curr_federation_id = await self.sqlite_store.db_pool.simple_select_one_onecol(
- table="device_federation_outbox",
- keyvalues={},
- retcol="COALESCE(MAX(stream_id), 1)",
- allow_none=True,
- )
+ next_id = max(current_stream_ids) + 1
+
+ def r(txn):
+ sql = "ALTER SEQUENCE %s RESTART WITH" % (sequence_name, )
+ txn.execute(sql + " %s", (next_id, ))
- next_id = max(curr_local_id, curr_federation_id) + 1
+ await self.postgres_store.db_pool.runInteraction("_setup_%s" % (sequence_name,), r)
+
+ async def _setup_auth_chain_sequence(self) -> None:
+ curr_chain_id = await self.sqlite_store.db_pool.simple_select_one_onecol(
+ table="event_auth_chains", keyvalues={}, retcol="MAX(chain_id)", allow_none=True
+ )
def r(txn):
txn.execute(
- "ALTER SEQUENCE device_inbox_sequence RESTART WITH %s", (next_id,)
+ "ALTER SEQUENCE event_auth_chain_id RESTART WITH %s",
+ (curr_chain_id,),
)
- return self.postgres_store.db_pool.runInteraction("_setup_device_inbox_seq", r)
+ await self.postgres_store.db_pool.runInteraction(
+ "_setup_event_auth_chain_id", r,
+ )
+
##############################################
diff --git a/synapse/app/admin_cmd.py b/synapse/app/admin_cmd.py
index b4bd4d8e7a..9f99651aa2 100644
--- a/synapse/app/admin_cmd.py
+++ b/synapse/app/admin_cmd.py
@@ -210,7 +210,9 @@ def start(config_options):
config.update_user_directory = False
config.run_background_tasks = False
config.start_pushers = False
+ config.pusher_shard_config.instances = []
config.send_federation = False
+ config.federation_shard_config.instances = []
synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index fe0178dd79..d9423349e1 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -645,9 +645,6 @@ class GenericWorkerServer(HomeServer):
self.get_tcp_replication().start_replication(self)
- async def remove_pusher(self, app_id, push_key, user_id):
- self.get_tcp_replication().send_remove_pusher(app_id, push_key, user_id)
-
@cache_in_self
def get_replication_data_handler(self):
return GenericWorkerReplicationHandler(self)
@@ -922,22 +919,6 @@ def start(config_options):
# For other worker types we force this to off.
config.appservice.notify_appservices = False
- if config.worker_app == "synapse.app.pusher":
- if config.server.start_pushers:
- sys.stderr.write(
- "\nThe pushers must be disabled in the main synapse process"
- "\nbefore they can be run in a separate worker."
- "\nPlease add ``start_pushers: false`` to the main config"
- "\n"
- )
- sys.exit(1)
-
- # Force the pushers to start since they will be disabled in the main config
- config.server.start_pushers = True
- else:
- # For other worker types we force this to off.
- config.server.start_pushers = False
-
if config.worker_app == "synapse.app.user_dir":
if config.server.update_user_directory:
sys.stderr.write(
@@ -954,22 +935,6 @@ def start(config_options):
# For other worker types we force this to off.
config.server.update_user_directory = False
- if config.worker_app == "synapse.app.federation_sender":
- if config.worker.send_federation:
- sys.stderr.write(
- "\nThe send_federation must be disabled in the main synapse process"
- "\nbefore they can be run in a separate worker."
- "\nPlease add ``send_federation: false`` to the main config"
- "\n"
- )
- sys.exit(1)
-
- # Force the pushers to start since they will be disabled in the main config
- config.worker.send_federation = True
- else:
- # For other worker types we force this to off.
- config.worker.send_federation = False
-
synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts
hs = GenericWorkerServer(
diff --git a/synapse/config/_base.py b/synapse/config/_base.py
index 57f454fc9f..fb0a615930 100644
--- a/synapse/config/_base.py
+++ b/synapse/config/_base.py
@@ -845,22 +845,23 @@ class ShardedWorkerHandlingConfig:
def should_handle(self, instance_name: str, key: str) -> bool:
"""Whether this instance is responsible for handling the given key."""
- # If multiple instances are not defined we always return true
- if not self.instances or len(self.instances) == 1:
- return True
+ # If no instances are defined we assume some other worker is handling
+ # this.
+ if not self.instances:
+ return False
- return self.get_instance(key) == instance_name
+ return self._get_instance(key) == instance_name
- def get_instance(self, key: str) -> str:
+ def _get_instance(self, key: str) -> str:
"""Get the instance responsible for handling the given key.
- Note: For things like federation sending the config for which instance
- is sending is known only to the sender instance if there is only one.
- Therefore `should_handle` should be used where possible.
+ Note: For federation sending and pushers the config for which instance
+ is sending is known only to the sender instance, so we don't expose this
+ method by default.
"""
if not self.instances:
- return "master"
+ raise Exception("Unknown worker")
if len(self.instances) == 1:
return self.instances[0]
@@ -877,4 +878,21 @@ class ShardedWorkerHandlingConfig:
return self.instances[remainder]
+@attr.s
+class RoutableShardedWorkerHandlingConfig(ShardedWorkerHandlingConfig):
+ """A version of `ShardedWorkerHandlingConfig` that is used for config
+ options where all instances know which instances are responsible for the
+ sharded work.
+ """
+
+ def __attrs_post_init__(self):
+ # We require that `self.instances` is non-empty.
+ if not self.instances:
+ raise Exception("Got empty list of instances for shard config")
+
+ def get_instance(self, key: str) -> str:
+ """Get the instance responsible for handling the given key."""
+ return self._get_instance(key)
+
+
__all__ = ["Config", "RootConfig", "ShardedWorkerHandlingConfig"]
diff --git a/synapse/config/_base.pyi b/synapse/config/_base.pyi
index 0565418e60..5e1c9147a8 100644
--- a/synapse/config/_base.pyi
+++ b/synapse/config/_base.pyi
@@ -151,4 +151,6 @@ class ShardedWorkerHandlingConfig:
instances: List[str]
def __init__(self, instances: List[str]) -> None: ...
def should_handle(self, instance_name: str, key: str) -> bool: ...
+
+class RoutableShardedWorkerHandlingConfig(ShardedWorkerHandlingConfig):
def get_instance(self, key: str) -> str: ...
diff --git a/synapse/config/push.py b/synapse/config/push.py
index 3adbfb73e6..7831a2ef79 100644
--- a/synapse/config/push.py
+++ b/synapse/config/push.py
@@ -14,7 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import Config, ShardedWorkerHandlingConfig
+from ._base import Config
class PushConfig(Config):
@@ -27,9 +27,6 @@ class PushConfig(Config):
"group_unread_count_by_room", True
)
- pusher_instances = config.get("pusher_instances") or []
- self.pusher_shard_config = ShardedWorkerHandlingConfig(pusher_instances)
-
# There was a a 'redact_content' setting but mistakenly read from the
# 'email'section'. Check for the flag in the 'push' section, and log,
# but do not honour it to avoid nasty surprises when people upgrade.
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 0e8394b149..29ff7718fd 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -397,7 +397,6 @@ class ServerConfig(Config):
if self.public_baseurl is not None:
if self.public_baseurl[-1] != "/":
self.public_baseurl += "/"
- self.start_pushers = config.get("start_pushers", True)
# (undocumented) option for torturing the worker-mode replication a bit,
# for testing. The value defines the number of milliseconds to pause before
diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index 7a0ca16da8..ac92375a85 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -17,9 +17,28 @@ from typing import List, Union
import attr
-from ._base import Config, ConfigError, ShardedWorkerHandlingConfig
+from ._base import (
+ Config,
+ ConfigError,
+ RoutableShardedWorkerHandlingConfig,
+ ShardedWorkerHandlingConfig,
+)
from .server import ListenerConfig, parse_listener_def
+_FEDERATION_SENDER_WITH_SEND_FEDERATION_ENABLED_ERROR = """
+The send_federation config option must be disabled in the main
+synapse process before they can be run in a separate worker.
+
+Please add ``send_federation: false`` to the main config
+"""
+
+_PUSHER_WITH_START_PUSHERS_ENABLED_ERROR = """
+The start_pushers config option must be disabled in the main
+synapse process before they can be run in a separate worker.
+
+Please add ``start_pushers: false`` to the main config
+"""
+
def _instance_to_list_converter(obj: Union[str, List[str]]) -> List[str]:
"""Helper for allowing parsing a string or list of strings to a config
@@ -103,6 +122,7 @@ class WorkerConfig(Config):
self.worker_replication_secret = config.get("worker_replication_secret", None)
self.worker_name = config.get("worker_name", self.worker_app)
+ self.instance_name = self.worker_name or "master"
self.worker_main_http_uri = config.get("worker_main_http_uri", None)
@@ -118,12 +138,41 @@ class WorkerConfig(Config):
)
)
- # Whether to send federation traffic out in this process. This only
- # applies to some federation traffic, and so shouldn't be used to
- # "disable" federation
- self.send_federation = config.get("send_federation", True)
+ # Handle federation sender configuration.
+ #
+ # There are two ways of configuring which instances handle federation
+ # sending:
+ # 1. The old way where "send_federation" is set to false and running a
+ # `synapse.app.federation_sender` worker app.
+ # 2. Specifying the workers sending federation in
+ # `federation_sender_instances`.
+ #
+
+ send_federation = config.get("send_federation", True)
+
+ federation_sender_instances = config.get("federation_sender_instances")
+ if federation_sender_instances is None:
+ # Default to an empty list, which means "another, unknown, worker is
+ # responsible for it".
+ federation_sender_instances = []
- federation_sender_instances = config.get("federation_sender_instances") or []
+ # If no federation sender instances are set we check if
+ # `send_federation` is set, which means use master
+ if send_federation:
+ federation_sender_instances = ["master"]
+
+ if self.worker_app == "synapse.app.federation_sender":
+ if send_federation:
+ # If we're running federation senders, and not using
+ # `federation_sender_instances`, then we should have
+ # explicitly set `send_federation` to false.
+ raise ConfigError(
+ _FEDERATION_SENDER_WITH_SEND_FEDERATION_ENABLED_ERROR
+ )
+
+ federation_sender_instances = [self.worker_name]
+
+ self.send_federation = self.instance_name in federation_sender_instances
self.federation_shard_config = ShardedWorkerHandlingConfig(
federation_sender_instances
)
@@ -164,7 +213,37 @@ class WorkerConfig(Config):
"Must only specify one instance to handle `receipts` messages."
)
- self.events_shard_config = ShardedWorkerHandlingConfig(self.writers.events)
+ if len(self.writers.events) == 0:
+ raise ConfigError("Must specify at least one instance to handle `events`.")
+
+ self.events_shard_config = RoutableShardedWorkerHandlingConfig(
+ self.writers.events
+ )
+
+ # Handle sharded push
+ start_pushers = config.get("start_pushers", True)
+ pusher_instances = config.get("pusher_instances")
+ if pusher_instances is None:
+ # Default to an empty list, which means "another, unknown, worker is
+ # responsible for it".
+ pusher_instances = []
+
+ # If no pushers instances are set we check if `start_pushers` is
+ # set, which means use master
+ if start_pushers:
+ pusher_instances = ["master"]
+
+ if self.worker_app == "synapse.app.pusher":
+ if start_pushers:
+ # If we're running pushers, and not using
+ # `pusher_instances`, then we should have explicitly set
+ # `start_pushers` to false.
+ raise ConfigError(_PUSHER_WITH_START_PUSHERS_ENABLED_ERROR)
+
+ pusher_instances = [self.instance_name]
+
+ self.start_pushers = self.instance_name in pusher_instances
+ self.pusher_shard_config = ShardedWorkerHandlingConfig(pusher_instances)
# Whether this worker should run background tasks or not.
#
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index b9d3da2e0a..f4d7e199e9 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -74,6 +74,7 @@ class HttpPusher(Pusher):
self.timed_call = None
self._is_processing = False
self._group_unread_count_by_room = hs.config.push_group_unread_count_by_room
+ self._pusherpool = hs.get_pusherpool()
self.data = pusher_config.data
if self.data is None:
@@ -299,7 +300,7 @@ class HttpPusher(Pusher):
)
else:
logger.info("Pushkey %s was rejected: removing", pk)
- await self.hs.remove_pusher(self.app_id, pk, self.user_id)
+ await self._pusherpool.remove_pusher(self.app_id, pk, self.user_id)
return True
async def _build_notification_dict(
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 343ab30bed..1679a6d211 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -25,6 +25,7 @@ from synapse.metrics.background_process_metrics import (
)
from synapse.push import Pusher, PusherConfig, PusherConfigException
from synapse.push.pusher import PusherFactory
+from synapse.replication.http.push import ReplicationRemovePusherRestServlet
from synapse.types import JsonDict, RoomStreamToken
from synapse.util.async_helpers import concurrently_execute
@@ -58,7 +59,6 @@ class PusherPool:
def __init__(self, hs: "HomeServer"):
self.hs = hs
self.pusher_factory = PusherFactory(hs)
- self._should_start_pushers = hs.config.start_pushers
self.store = self.hs.get_datastore()
self.clock = self.hs.get_clock()
@@ -67,6 +67,16 @@ class PusherPool:
# We shard the handling of push notifications by user ID.
self._pusher_shard_config = hs.config.push.pusher_shard_config
self._instance_name = hs.get_instance_name()
+ self._should_start_pushers = (
+ self._instance_name in self._pusher_shard_config.instances
+ )
+
+ # We can only delete pushers on master.
+ self._remove_pusher_client = None
+ if hs.config.worker.worker_app:
+ self._remove_pusher_client = ReplicationRemovePusherRestServlet.make_client(
+ hs
+ )
# Record the last stream ID that we were poked about so we can get
# changes since then. We set this to the current max stream ID on
@@ -175,9 +185,6 @@ class PusherPool:
user_id: user to remove pushers for
access_tokens: access token *ids* to remove pushers for
"""
- if not self._pusher_shard_config.should_handle(self._instance_name, user_id):
- return
-
tokens = set(access_tokens)
for p in await self.store.get_pushers_by_user_id(user_id):
if p.access_token in tokens:
@@ -380,6 +387,12 @@ class PusherPool:
synapse_pushers.labels(type(pusher).__name__, pusher.app_id).dec()
- await self.store.delete_pusher_by_app_id_pushkey_user_id(
- app_id, pushkey, user_id
- )
+ # We can only delete pushers on master.
+ if self._remove_pusher_client:
+ await self._remove_pusher_client(
+ app_id=app_id, pushkey=pushkey, user_id=user_id
+ )
+ else:
+ await self.store.delete_pusher_by_app_id_pushkey_user_id(
+ app_id, pushkey, user_id
+ )
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 0857efbe71..fca6151feb 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -106,6 +106,9 @@ CONDITIONAL_REQUIREMENTS = {
"pysaml2>=4.5.0;python_version>='3.6'",
],
"oidc": ["authlib>=0.14.0"],
+ # systemd-python is necessary for logging to the systemd journal via
+ # `systemd.journal.JournalHandler`, as is documented in
+ # `contrib/systemd/log_config.yaml`.
"systemd": ["systemd-python>=231"],
"url_preview": ["lxml>=3.5.0"],
"sentry": ["sentry-sdk>=0.7.2"],
diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py
index dd527e807f..cb4a52dbe9 100644
--- a/synapse/replication/http/__init__.py
+++ b/synapse/replication/http/__init__.py
@@ -21,6 +21,7 @@ from synapse.replication.http import (
login,
membership,
presence,
+ push,
register,
send_event,
streams,
@@ -42,6 +43,7 @@ class ReplicationRestResource(JsonResource):
membership.register_servlets(hs, self)
streams.register_servlets(hs, self)
account_data.register_servlets(hs, self)
+ push.register_servlets(hs, self)
# The following can't currently be instantiated on workers.
if hs.config.worker.worker_app is None:
diff --git a/synapse/replication/http/push.py b/synapse/replication/http/push.py
new file mode 100644
index 0000000000..054ed64d34
--- /dev/null
+++ b/synapse/replication/http/push.py
@@ -0,0 +1,72 @@
+# -*- coding: utf-8 -*-
+# Copyright 2021 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+from typing import TYPE_CHECKING
+
+from synapse.http.servlet import parse_json_object_from_request
+from synapse.replication.http._base import ReplicationEndpoint
+
+if TYPE_CHECKING:
+ from synapse.server import HomeServer
+
+logger = logging.getLogger(__name__)
+
+
+class ReplicationRemovePusherRestServlet(ReplicationEndpoint):
+ """Deletes the given pusher.
+
+ Request format:
+
+ POST /_synapse/replication/remove_pusher/:user_id
+
+ {
+ "app_id": "<some_id>",
+ "pushkey": "<some_key>"
+ }
+
+ """
+
+ NAME = "add_user_account_data"
+ PATH_ARGS = ("user_id",)
+ CACHE = False
+
+ def __init__(self, hs: "HomeServer"):
+ super().__init__(hs)
+
+ self.pusher_pool = hs.get_pusherpool()
+
+ @staticmethod
+ async def _serialize_payload(app_id, pushkey, user_id):
+ payload = {
+ "app_id": app_id,
+ "pushkey": pushkey,
+ }
+
+ return payload
+
+ async def _handle_request(self, request, user_id):
+ content = parse_json_object_from_request(request)
+
+ app_id = content["app_id"]
+ pushkey = content["pushkey"]
+
+ await self.pusher_pool.remove_pusher(app_id, pushkey, user_id)
+
+ return 200, {}
+
+
+def register_servlets(hs, http_server):
+ ReplicationRemovePusherRestServlet(hs).register(http_server)
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index 0a9da79c32..bb447f75b4 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -325,31 +325,6 @@ class FederationAckCommand(Command):
return "%s %s" % (self.instance_name, self.token)
-class RemovePusherCommand(Command):
- """Sent by the client to request the master remove the given pusher.
-
- Format::
-
- REMOVE_PUSHER <app_id> <push_key> <user_id>
- """
-
- NAME = "REMOVE_PUSHER"
-
- def __init__(self, app_id, push_key, user_id):
- self.user_id = user_id
- self.app_id = app_id
- self.push_key = push_key
-
- @classmethod
- def from_line(cls, line):
- app_id, push_key, user_id = line.split(" ", 2)
-
- return cls(app_id, push_key, user_id)
-
- def to_line(self):
- return " ".join((self.app_id, self.push_key, self.user_id))
-
-
class UserIpCommand(Command):
"""Sent periodically when a worker sees activity from a client.
@@ -416,7 +391,6 @@ _COMMANDS = (
ReplicateCommand,
UserSyncCommand,
FederationAckCommand,
- RemovePusherCommand,
UserIpCommand,
RemoteServerUpCommand,
ClearUserSyncsCommand,
@@ -443,7 +417,6 @@ VALID_CLIENT_COMMANDS = (
UserSyncCommand.NAME,
ClearUserSyncsCommand.NAME,
FederationAckCommand.NAME,
- RemovePusherCommand.NAME,
UserIpCommand.NAME,
ErrorCommand.NAME,
RemoteServerUpCommand.NAME,
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index d1d00c3717..a7245da152 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -44,7 +44,6 @@ from synapse.replication.tcp.commands import (
PositionCommand,
RdataCommand,
RemoteServerUpCommand,
- RemovePusherCommand,
ReplicateCommand,
UserIpCommand,
UserSyncCommand,
@@ -373,23 +372,6 @@ class ReplicationCommandHandler:
if self._federation_sender:
self._federation_sender.federation_ack(cmd.instance_name, cmd.token)
- def on_REMOVE_PUSHER(
- self, conn: AbstractConnection, cmd: RemovePusherCommand
- ) -> Optional[Awaitable[None]]:
- remove_pusher_counter.inc()
-
- if self._is_master:
- return self._handle_remove_pusher(cmd)
- else:
- return None
-
- async def _handle_remove_pusher(self, cmd: RemovePusherCommand):
- await self._store.delete_pusher_by_app_id_pushkey_user_id(
- app_id=cmd.app_id, pushkey=cmd.push_key, user_id=cmd.user_id
- )
-
- self._notifier.on_new_replication_data()
-
def on_USER_IP(
self, conn: AbstractConnection, cmd: UserIpCommand
) -> Optional[Awaitable[None]]:
@@ -684,11 +666,6 @@ class ReplicationCommandHandler:
UserSyncCommand(instance_id, user_id, is_syncing, last_sync_ms)
)
- def send_remove_pusher(self, app_id: str, push_key: str, user_id: str):
- """Poke the master to remove a pusher for a user"""
- cmd = RemovePusherCommand(app_id, push_key, user_id)
- self.send_command(cmd)
-
def send_user_ip(
self,
user_id: str,
diff --git a/synapse/server.py b/synapse/server.py
index 91d59b755a..1d4370e0ba 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -247,7 +247,7 @@ class HomeServer(metaclass=abc.ABCMeta):
self.start_time = None # type: Optional[int]
self._instance_id = random_string(5)
- self._instance_name = config.worker_name or "master"
+ self._instance_name = config.worker.instance_name
self.version_string = version_string
@@ -752,12 +752,6 @@ class HomeServer(metaclass=abc.ABCMeta):
reconnect=True,
)
- async def remove_pusher(self, app_id: str, push_key: str, user_id: str):
- return await self.get_pusherpool().remove_pusher(app_id, push_key, user_id)
-
def should_send_federation(self) -> bool:
"Should this server be sending federation traffic directly?"
- return self.config.send_federation and (
- not self.config.worker_app
- or self.config.worker_app == "synapse.app.federation_sender"
- )
+ return self.config.send_federation
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index 4646926449..f1ba529a2d 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -49,7 +49,6 @@ from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.background_updates import BackgroundUpdater
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
from synapse.storage.types import Connection, Cursor
-from synapse.storage.util.sequence import build_sequence_generator
from synapse.types import Collection
# python 3 does not have a maximum int value
@@ -381,7 +380,10 @@ class DatabasePool:
_TXN_ID = 0
def __init__(
- self, hs, database_config: DatabaseConnectionConfig, engine: BaseDatabaseEngine
+ self,
+ hs,
+ database_config: DatabaseConnectionConfig,
+ engine: BaseDatabaseEngine,
):
self.hs = hs
self._clock = hs.get_clock()
@@ -420,16 +422,6 @@ class DatabasePool:
self._check_safe_to_upsert,
)
- # We define this sequence here so that it can be referenced from both
- # the DataStore and PersistEventStore.
- def get_chain_id_txn(txn):
- txn.execute("SELECT COALESCE(max(chain_id), 0) FROM event_auth_chains")
- return txn.fetchone()[0]
-
- self.event_chain_id_gen = build_sequence_generator(
- engine, get_chain_id_txn, "event_auth_chain_id"
- )
-
def is_running(self) -> bool:
"""Is the database pool currently running"""
return self._db_pool.running
diff --git a/synapse/storage/databases/__init__.py b/synapse/storage/databases/__init__.py
index e84f8b42f7..379c78bb83 100644
--- a/synapse/storage/databases/__init__.py
+++ b/synapse/storage/databases/__init__.py
@@ -79,7 +79,7 @@ class Databases:
# If we're on a process that can persist events also
# instantiate a `PersistEventsStore`
if hs.get_instance_name() in hs.config.worker.writers.events:
- persist_events = PersistEventsStore(hs, database, main)
+ persist_events = PersistEventsStore(hs, database, main, db_conn)
if "state" in database_config.databases:
logger.info(
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 287606cb4f..cd1ceac50e 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -42,7 +42,9 @@ from synapse.logging.utils import log_function
from synapse.storage._base import db_to_json, make_in_list_sql_clause
from synapse.storage.database import DatabasePool, LoggingTransaction
from synapse.storage.databases.main.search import SearchEntry
+from synapse.storage.types import Connection
from synapse.storage.util.id_generators import MultiWriterIdGenerator
+from synapse.storage.util.sequence import SequenceGenerator
from synapse.types import StateMap, get_domain_from_id
from synapse.util import json_encoder
from synapse.util.iterutils import batch_iter, sorted_topologically
@@ -90,7 +92,11 @@ class PersistEventsStore:
"""
def __init__(
- self, hs: "HomeServer", db: DatabasePool, main_data_store: "DataStore"
+ self,
+ hs: "HomeServer",
+ db: DatabasePool,
+ main_data_store: "DataStore",
+ db_conn: Connection,
):
self.hs = hs
self.db_pool = db
@@ -474,6 +480,7 @@ class PersistEventsStore:
self._add_chain_cover_index(
txn,
self.db_pool,
+ self.store.event_chain_id_gen,
event_to_room_id,
event_to_types,
event_to_auth_chain,
@@ -484,6 +491,7 @@ class PersistEventsStore:
cls,
txn,
db_pool: DatabasePool,
+ event_chain_id_gen: SequenceGenerator,
event_to_room_id: Dict[str, str],
event_to_types: Dict[str, Tuple[str, str]],
event_to_auth_chain: Dict[str, List[str]],
@@ -630,6 +638,7 @@ class PersistEventsStore:
new_chain_tuples = cls._allocate_chain_ids(
txn,
db_pool,
+ event_chain_id_gen,
event_to_room_id,
event_to_types,
event_to_auth_chain,
@@ -768,6 +777,7 @@ class PersistEventsStore:
def _allocate_chain_ids(
txn,
db_pool: DatabasePool,
+ event_chain_id_gen: SequenceGenerator,
event_to_room_id: Dict[str, str],
event_to_types: Dict[str, Tuple[str, str]],
event_to_auth_chain: Dict[str, List[str]],
@@ -880,7 +890,7 @@ class PersistEventsStore:
chain_to_max_seq_no[new_chain_tuple[0]] = new_chain_tuple[1]
# Generate new chain IDs for all unallocated chain IDs.
- newly_allocated_chain_ids = db_pool.event_chain_id_gen.get_next_mult_txn(
+ newly_allocated_chain_ids = event_chain_id_gen.get_next_mult_txn(
txn, len(unallocated_chain_ids)
)
diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py
index 89274e75f7..c1626ccf28 100644
--- a/synapse/storage/databases/main/events_bg_updates.py
+++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -917,6 +917,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
PersistEventsStore._add_chain_cover_index(
txn,
self.db_pool,
+ self.event_chain_id_gen,
event_to_room_id,
event_to_types,
event_to_auth_chain,
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 7bab6aa009..34e8ddc62f 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -45,6 +45,7 @@ from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_cla
from synapse.storage.database import DatabasePool
from synapse.storage.engines import PostgresEngine
from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
+from synapse.storage.util.sequence import build_sequence_generator
from synapse.types import Collection, JsonDict, get_domain_from_id
from synapse.util.caches.descriptors import cached
from synapse.util.caches.lrucache import LruCache
@@ -156,6 +157,21 @@ class EventsWorkerStore(SQLBaseStore):
self._event_fetch_list = []
self._event_fetch_ongoing = 0
+ # We define this sequence here so that it can be referenced from both
+ # the DataStore and PersistEventStore.
+ def get_chain_id_txn(txn):
+ txn.execute("SELECT COALESCE(max(chain_id), 0) FROM event_auth_chains")
+ return txn.fetchone()[0]
+
+ self.event_chain_id_gen = build_sequence_generator(
+ db_conn,
+ database.engine,
+ get_chain_id_txn,
+ "event_auth_chain_id",
+ table="event_auth_chains",
+ id_column="chain_id",
+ )
+
def process_replication_rows(self, stream_name, instance_name, token, rows):
if stream_name == EventsStream.NAME:
self._stream_id_gen.advance(instance_name, token)
diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index b96ff6a0b6..2cfa3dface 100644
--- a/synapse/storage/databases/main/registration.py
+++ b/synapse/storage/databases/main/registration.py
@@ -23,7 +23,7 @@ import attr
from synapse.api.constants import UserTypes
from synapse.api.errors import Codes, StoreError, SynapseError, ThreepidValidationError
from synapse.metrics.background_process_metrics import wrap_as_background_process
-from synapse.storage.database import DatabasePool
+from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.storage.databases.main.stats import StatsStore
from synapse.storage.types import Connection, Cursor
@@ -70,7 +70,12 @@ class TokenLookupResult:
class RegistrationWorkerStore(CacheInvalidationWorkerStore):
- def __init__(self, database: DatabasePool, db_conn: Connection, hs: "HomeServer"):
+ def __init__(
+ self,
+ database: DatabasePool,
+ db_conn: LoggingDatabaseConnection,
+ hs: "HomeServer",
+ ):
super().__init__(database, db_conn, hs)
self.config = hs.config
@@ -79,9 +84,12 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
# call `find_max_generated_user_id_localpart` each time, which is
# expensive if there are many entries.
self._user_id_seq = build_sequence_generator(
+ db_conn,
database.engine,
find_max_generated_user_id_localpart,
"user_id_seq",
+ table=None,
+ id_column=None,
)
self._account_validity_enabled = hs.config.account_validity_enabled
@@ -1140,7 +1148,12 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
class RegistrationBackgroundUpdateStore(RegistrationWorkerStore):
- def __init__(self, database: DatabasePool, db_conn: Connection, hs: "HomeServer"):
+ def __init__(
+ self,
+ database: DatabasePool,
+ db_conn: LoggingDatabaseConnection,
+ hs: "HomeServer",
+ ):
super().__init__(database, db_conn, hs)
self._clock = hs.get_clock()
diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py
index b16b9905d8..e2240703a7 100644
--- a/synapse/storage/databases/state/store.py
+++ b/synapse/storage/databases/state/store.py
@@ -97,10 +97,12 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
return txn.fetchone()[0]
self._state_group_seq_gen = build_sequence_generator(
- self.database_engine, get_max_state_group_txn, "state_group_id_seq"
- )
- self._state_group_seq_gen.check_consistency(
- db_conn, table="state_groups", id_column="id"
+ db_conn,
+ self.database_engine,
+ get_max_state_group_txn,
+ "state_group_id_seq",
+ table="state_groups",
+ id_column="id",
)
@cached(max_entries=10000, iterable=True)
diff --git a/synapse/storage/util/sequence.py b/synapse/storage/util/sequence.py
index 3ea637b281..36a67e7019 100644
--- a/synapse/storage/util/sequence.py
+++ b/synapse/storage/util/sequence.py
@@ -251,9 +251,14 @@ class LocalSequenceGenerator(SequenceGenerator):
def build_sequence_generator(
+ db_conn: "LoggingDatabaseConnection",
database_engine: BaseDatabaseEngine,
get_first_callback: GetFirstCallbackType,
sequence_name: str,
+ table: Optional[str],
+ id_column: Optional[str],
+ stream_name: Optional[str] = None,
+ positive: bool = True,
) -> SequenceGenerator:
"""Get the best impl of SequenceGenerator available
@@ -265,8 +270,23 @@ def build_sequence_generator(
get_first_callback: a callback which gets the next sequence ID. Used if
we're on sqlite.
sequence_name: the name of a postgres sequence to use.
+ table, id_column, stream_name, positive: If set then `check_consistency`
+ is called on the created sequence. See docstring for
+ `check_consistency` details.
"""
if isinstance(database_engine, PostgresEngine):
- return PostgresSequenceGenerator(sequence_name)
+ seq = PostgresSequenceGenerator(sequence_name) # type: SequenceGenerator
else:
- return LocalSequenceGenerator(get_first_callback)
+ seq = LocalSequenceGenerator(get_first_callback)
+
+ if table:
+ assert id_column
+ seq.check_consistency(
+ db_conn=db_conn,
+ table=table,
+ id_column=id_column,
+ stream_name=stream_name,
+ positive=positive,
+ )
+
+ return seq
diff --git a/tests/replication/tcp/streams/test_federation.py b/tests/replication/tcp/streams/test_federation.py
index 2babea4e3e..aa4bf1c7e3 100644
--- a/tests/replication/tcp/streams/test_federation.py
+++ b/tests/replication/tcp/streams/test_federation.py
@@ -24,7 +24,7 @@ class FederationStreamTestCase(BaseStreamTestCase):
# enable federation sending on the worker
config = super()._get_worker_hs_config()
# TODO: make it so we don't need both of these
- config["send_federation"] = True
+ config["send_federation"] = False
config["worker_app"] = "synapse.app.federation_sender"
return config
diff --git a/tests/replication/test_federation_ack.py b/tests/replication/test_federation_ack.py
index 1853667558..f235f1bd83 100644
--- a/tests/replication/test_federation_ack.py
+++ b/tests/replication/test_federation_ack.py
@@ -27,7 +27,7 @@ class FederationAckTestCase(HomeserverTestCase):
def default_config(self) -> dict:
config = super().default_config()
config["worker_app"] = "synapse.app.federation_sender"
- config["send_federation"] = True
+ config["send_federation"] = False
return config
def make_homeserver(self, reactor, clock):
diff --git a/tests/replication/test_federation_sender_shard.py b/tests/replication/test_federation_sender_shard.py
index fffdb742c8..2f2d117858 100644
--- a/tests/replication/test_federation_sender_shard.py
+++ b/tests/replication/test_federation_sender_shard.py
@@ -49,7 +49,7 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase):
self.make_worker_hs(
"synapse.app.federation_sender",
- {"send_federation": True},
+ {"send_federation": False},
federation_http_client=mock_client,
)
diff --git a/tests/replication/test_pusher_shard.py b/tests/replication/test_pusher_shard.py
index f118fe32af..ab2988a6ba 100644
--- a/tests/replication/test_pusher_shard.py
+++ b/tests/replication/test_pusher_shard.py
@@ -95,7 +95,7 @@ class PusherShardTestCase(BaseMultiWorkerStreamTestCase):
self.make_worker_hs(
"synapse.app.pusher",
- {"start_pushers": True},
+ {"start_pushers": False},
proxied_blacklisted_http_client=http_client_mock,
)
|