diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py
index 51583dc13d..c1e1544536 100755
--- a/docker/configure_workers_and_start.py
+++ b/docker/configure_workers_and_start.py
@@ -20,7 +20,7 @@
# * SYNAPSE_SERVER_NAME: The desired server_name of the homeserver.
# * SYNAPSE_REPORT_STATS: Whether to report stats.
# * SYNAPSE_WORKER_TYPES: A comma separated list of worker names as specified in WORKER_CONFIG
-# below. Leave empty for no workers, or set to '*' for all possible workers.
+# below. Leave empty for no workers.
# * SYNAPSE_AS_REGISTRATION_DIR: If specified, a directory in which .yaml and .yml files
# will be treated as Application Service registration files.
# * SYNAPSE_TLS_CERT: Path to a TLS certificate in PEM format.
@@ -39,6 +39,7 @@
# continue to work if so.
import os
+import platform
import subprocess
import sys
from pathlib import Path
@@ -49,13 +50,18 @@ from jinja2 import Environment, FileSystemLoader
MAIN_PROCESS_HTTP_LISTENER_PORT = 8080
-
+# Workers with exposed endpoints needs either "client", "federation", or "media" listener_resources
+# Watching /_matrix/client needs a "client" listener
+# Watching /_matrix/federation needs a "federation" listener
+# Watching /_matrix/media and related needs a "media" listener
+# Stream Writers require "client" and "replication" listeners because they
+# have to attach by instance_map to the master process and have client endpoints.
WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
"pusher": {
- "app": "synapse.app.pusher",
+ "app": "synapse.app.generic_worker",
"listener_resources": [],
"endpoint_patterns": [],
- "shared_extra_conf": {"start_pushers": False},
+ "shared_extra_conf": {},
"worker_extra_conf": "",
},
"user_dir": {
@@ -78,7 +84,11 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
"^/_synapse/admin/v1/media/.*$",
"^/_synapse/admin/v1/quarantine_media/.*$",
],
- "shared_extra_conf": {"enable_media_repo": False},
+ # The first configured media worker will run the media background jobs
+ "shared_extra_conf": {
+ "enable_media_repo": False,
+ "media_instance_running_background_jobs": "media_repository1",
+ },
"worker_extra_conf": "enable_media_repo: true",
},
"appservice": {
@@ -89,10 +99,10 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
"worker_extra_conf": "",
},
"federation_sender": {
- "app": "synapse.app.federation_sender",
+ "app": "synapse.app.generic_worker",
"listener_resources": [],
"endpoint_patterns": [],
- "shared_extra_conf": {"send_federation": False},
+ "shared_extra_conf": {},
"worker_extra_conf": "",
},
"synchrotron": {
@@ -107,6 +117,34 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
"shared_extra_conf": {},
"worker_extra_conf": "",
},
+ "client_reader": {
+ "app": "synapse.app.generic_worker",
+ "listener_resources": ["client"],
+ "endpoint_patterns": [
+ "^/_matrix/client/(api/v1|r0|v3|unstable)/publicRooms$",
+ "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/joined_members$",
+ "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/context/.*$",
+ "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/members$",
+ "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/state$",
+ "^/_matrix/client/v1/rooms/.*/hierarchy$",
+ "^/_matrix/client/(v1|unstable)/rooms/.*/relations/",
+ "^/_matrix/client/v1/rooms/.*/threads$",
+ "^/_matrix/client/(api/v1|r0|v3|unstable)/login$",
+ "^/_matrix/client/(api/v1|r0|v3|unstable)/account/3pid$",
+ "^/_matrix/client/(api/v1|r0|v3|unstable)/account/whoami$",
+ "^/_matrix/client/versions$",
+ "^/_matrix/client/(api/v1|r0|v3|unstable)/voip/turnServer$",
+ "^/_matrix/client/(r0|v3|unstable)/register$",
+ "^/_matrix/client/(r0|v3|unstable)/auth/.*/fallback/web$",
+ "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/messages$",
+ "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/event",
+ "^/_matrix/client/(api/v1|r0|v3|unstable)/joined_rooms",
+ "^/_matrix/client/(api/v1|r0|v3|unstable/.*)/rooms/.*/aliases",
+ "^/_matrix/client/(api/v1|r0|v3|unstable)/search",
+ ],
+ "shared_extra_conf": {},
+ "worker_extra_conf": "",
+ },
"federation_reader": {
"app": "synapse.app.generic_worker",
"listener_resources": ["federation"],
@@ -171,14 +209,54 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
"worker_extra_conf": "",
},
"frontend_proxy": {
- "app": "synapse.app.frontend_proxy",
+ "app": "synapse.app.generic_worker",
"listener_resources": ["client", "replication"],
"endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/keys/upload"],
"shared_extra_conf": {},
- "worker_extra_conf": (
- "worker_main_http_uri: http://127.0.0.1:%d"
- % (MAIN_PROCESS_HTTP_LISTENER_PORT,)
- ),
+ "worker_extra_conf": "",
+ },
+ "account_data": {
+ "app": "synapse.app.generic_worker",
+ "listener_resources": ["client", "replication"],
+ "endpoint_patterns": [
+ "^/_matrix/client/(r0|v3|unstable)/.*/tags",
+ "^/_matrix/client/(r0|v3|unstable)/.*/account_data",
+ ],
+ "shared_extra_conf": {},
+ "worker_extra_conf": "",
+ },
+ "presence": {
+ "app": "synapse.app.generic_worker",
+ "listener_resources": ["client", "replication"],
+ "endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/presence/"],
+ "shared_extra_conf": {},
+ "worker_extra_conf": "",
+ },
+ "receipts": {
+ "app": "synapse.app.generic_worker",
+ "listener_resources": ["client", "replication"],
+ "endpoint_patterns": [
+ "^/_matrix/client/(r0|v3|unstable)/rooms/.*/receipt",
+ "^/_matrix/client/(r0|v3|unstable)/rooms/.*/read_markers",
+ ],
+ "shared_extra_conf": {},
+ "worker_extra_conf": "",
+ },
+ "to_device": {
+ "app": "synapse.app.generic_worker",
+ "listener_resources": ["client", "replication"],
+ "endpoint_patterns": ["^/_matrix/client/(r0|v3|unstable)/sendToDevice/"],
+ "shared_extra_conf": {},
+ "worker_extra_conf": "",
+ },
+ "typing": {
+ "app": "synapse.app.generic_worker",
+ "listener_resources": ["client", "replication"],
+ "endpoint_patterns": [
+ "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/typing"
+ ],
+ "shared_extra_conf": {},
+ "worker_extra_conf": "",
},
}
@@ -201,24 +279,19 @@ upstream {upstream_worker_type} {{
# Utility functions
def log(txt: str) -> None:
- """Log something to the stdout.
-
- Args:
- txt: The text to log.
- """
print(txt)
def error(txt: str) -> NoReturn:
- """Log something and exit with an error code.
-
- Args:
- txt: The text to log in error.
- """
- log(txt)
+ print(txt, file=sys.stderr)
sys.exit(2)
+def flush_buffers() -> None:
+ sys.stdout.flush()
+ sys.stderr.flush()
+
+
def convert(src: str, dst: str, **template_vars: object) -> None:
"""Generate a file from a template
@@ -247,14 +320,14 @@ def convert(src: str, dst: str, **template_vars: object) -> None:
outfile.write(rendered)
-def add_sharding_to_shared_config(
+def add_worker_roles_to_shared_config(
shared_config: dict,
worker_type: str,
worker_name: str,
worker_port: int,
) -> None:
"""Given a dictionary representing a config file shared across all workers,
- append sharded worker information to it for the current worker_type instance.
+ append appropriate worker information to it for the current worker_type instance.
Args:
shared_config: The config dict that all worker instances share (after being converted to YAML)
@@ -285,9 +358,19 @@ def add_sharding_to_shared_config(
"port": worker_port,
}
- elif worker_type == "media_repository":
- # The first configured media worker will run the media background jobs
- shared_config.setdefault("media_instance_running_background_jobs", worker_name)
+ elif worker_type in ["account_data", "presence", "receipts", "to_device", "typing"]:
+ # Update the list of stream writers
+ # It's convenient that the name of the worker type is the same as the stream to write
+ shared_config.setdefault("stream_writers", {}).setdefault(
+ worker_type, []
+ ).append(worker_name)
+
+ # Map of stream writer instance names to host/ports combos
+ # For now, all stream writers need http replication ports
+ instance_map[worker_name] = {
+ "host": "localhost",
+ "port": worker_port,
+ }
def generate_base_homeserver_config() -> None:
@@ -299,7 +382,7 @@ def generate_base_homeserver_config() -> None:
# start.py already does this for us, so just call that.
# note that this script is copied in in the official, monolith dockerfile
os.environ["SYNAPSE_HTTP_PORT"] = str(MAIN_PROCESS_HTTP_LISTENER_PORT)
- subprocess.check_output(["/usr/local/bin/python", "/start.py", "migrate_config"])
+ subprocess.run(["/usr/local/bin/python", "/start.py", "migrate_config"], check=True)
def generate_worker_files(
@@ -373,8 +456,8 @@ def generate_worker_files(
# No workers, just the main process
worker_types = []
else:
- # Split type names by comma
- worker_types = worker_types_env.split(",")
+ # Split type names by comma, ignoring whitespace.
+ worker_types = [x.strip() for x in worker_types_env.split(",")]
# Create the worker configuration directory if it doesn't already exist
os.makedirs("/conf/workers", exist_ok=True)
@@ -393,14 +476,11 @@ def generate_worker_files(
# For each worker type specified by the user, create config values
for worker_type in worker_types:
- worker_type = worker_type.strip()
-
worker_config = WORKERS_CONFIG.get(worker_type)
if worker_config:
worker_config = worker_config.copy()
else:
- log(worker_type + " is an unknown worker type! It will be ignored")
- continue
+ error(worker_type + " is an unknown worker type! Please fix!")
new_worker_count = worker_type_counter.setdefault(worker_type, 0) + 1
worker_type_counter[worker_type] = new_worker_count
@@ -419,11 +499,11 @@ def generate_worker_files(
# Check if more than one instance of this worker type has been specified
worker_type_total_count = worker_types.count(worker_type)
- if worker_type_total_count > 1:
- # Update the shared config with sharding-related options if necessary
- add_sharding_to_shared_config(
- shared_config, worker_type, worker_name, worker_port
- )
+
+ # Update the shared config with sharding-related options if necessary
+ add_worker_roles_to_shared_config(
+ shared_config, worker_type, worker_name, worker_port
+ )
# Enable the worker in supervisord
worker_descriptors.append(worker_config)
@@ -604,14 +684,24 @@ def main(args: List[str], environ: MutableMapping[str, str]) -> None:
with open(mark_filepath, "w") as f:
f.write("")
+ # Lifted right out of start.py
+ jemallocpath = "/usr/lib/%s-linux-gnu/libjemalloc.so.2" % (platform.machine(),)
+
+ if os.path.isfile(jemallocpath):
+ environ["LD_PRELOAD"] = jemallocpath
+ else:
+ log("Could not find %s, will not use" % (jemallocpath,))
+
# Start supervisord, which will start Synapse, all of the configured worker
# processes, redis, nginx etc. according to the config we created above.
log("Starting supervisord")
- os.execl(
+ flush_buffers()
+ os.execle(
"/usr/local/bin/supervisord",
"supervisord",
"-c",
"/etc/supervisor/supervisord.conf",
+ environ,
)
|