summary refs log tree commit diff
path: root/docker/configure_workers_and_start.py
blob: 8f1e61930e6559b9d29c0afa3a1a8ed378c10644 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
#!/usr/bin/env python
# Copyright 2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# This script reads environment variables and generates a shared Synapse worker,
# nginx and supervisord configs depending on the workers requested.
#
# The environment variables it reads are:
#   * SYNAPSE_SERVER_NAME: The desired server_name of the homeserver.
#   * SYNAPSE_REPORT_STATS: Whether to report stats.
#   * SYNAPSE_WORKER_TYPES: A comma separated list of worker names as specified in WORKER_CONFIG
#         below. Leave empty for no workers, or set to '*' for all possible workers.
#
# NOTE: According to Complement's ENTRYPOINT expectations for a homeserver image (as defined
# in the project's README), this script may be run multiple times, and functionality should
# continue to work if so.

import os
import subprocess
import sys
from typing import Any, Dict, Mapping, Set

import jinja2
import yaml

MAIN_PROCESS_HTTP_LISTENER_PORT = 8080


WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
    "pusher": {
        "app": "synapse.app.pusher",
        "listener_resources": [],
        "endpoint_patterns": [],
        "shared_extra_conf": {"start_pushers": False},
        "worker_extra_conf": "",
    },
    "user_dir": {
        "app": "synapse.app.user_dir",
        "listener_resources": ["client"],
        "endpoint_patterns": [
            "^/_matrix/client/(api/v1|r0|v3|unstable)/user_directory/search$"
        ],
        "shared_extra_conf": {"update_user_directory": False},
        "worker_extra_conf": "",
    },
    "media_repository": {
        "app": "synapse.app.media_repository",
        "listener_resources": ["media"],
        "endpoint_patterns": [
            "^/_matrix/media/",
            "^/_synapse/admin/v1/purge_media_cache$",
            "^/_synapse/admin/v1/room/.*/media.*$",
            "^/_synapse/admin/v1/user/.*/media.*$",
            "^/_synapse/admin/v1/media/.*$",
            "^/_synapse/admin/v1/quarantine_media/.*$",
        ],
        "shared_extra_conf": {"enable_media_repo": False},
        "worker_extra_conf": "enable_media_repo: true",
    },
    "appservice": {
        "app": "synapse.app.appservice",
        "listener_resources": [],
        "endpoint_patterns": [],
        "shared_extra_conf": {"notify_appservices": False},
        "worker_extra_conf": "",
    },
    "federation_sender": {
        "app": "synapse.app.federation_sender",
        "listener_resources": [],
        "endpoint_patterns": [],
        "shared_extra_conf": {"send_federation": False},
        "worker_extra_conf": "",
    },
    "synchrotron": {
        "app": "synapse.app.generic_worker",
        "listener_resources": ["client"],
        "endpoint_patterns": [
            "^/_matrix/client/(v2_alpha|r0|v3)/sync$",
            "^/_matrix/client/(api/v1|v2_alpha|r0|v3)/events$",
            "^/_matrix/client/(api/v1|r0|v3)/initialSync$",
            "^/_matrix/client/(api/v1|r0|v3)/rooms/[^/]+/initialSync$",
        ],
        "shared_extra_conf": {},
        "worker_extra_conf": "",
    },
    "federation_reader": {
        "app": "synapse.app.generic_worker",
        "listener_resources": ["federation"],
        "endpoint_patterns": [
            "^/_matrix/federation/(v1|v2)/event/",
            "^/_matrix/federation/(v1|v2)/state/",
            "^/_matrix/federation/(v1|v2)/state_ids/",
            "^/_matrix/federation/(v1|v2)/backfill/",
            "^/_matrix/federation/(v1|v2)/get_missing_events/",
            "^/_matrix/federation/(v1|v2)/publicRooms",
            "^/_matrix/federation/(v1|v2)/query/",
            "^/_matrix/federation/(v1|v2)/make_join/",
            "^/_matrix/federation/(v1|v2)/make_leave/",
            "^/_matrix/federation/(v1|v2)/send_join/",
            "^/_matrix/federation/(v1|v2)/send_leave/",
            "^/_matrix/federation/(v1|v2)/invite/",
            "^/_matrix/federation/(v1|v2)/query_auth/",
            "^/_matrix/federation/(v1|v2)/event_auth/",
            "^/_matrix/federation/(v1|v2)/exchange_third_party_invite/",
            "^/_matrix/federation/(v1|v2)/user/devices/",
            "^/_matrix/federation/(v1|v2)/get_groups_publicised$",
            "^/_matrix/key/v2/query",
        ],
        "shared_extra_conf": {},
        "worker_extra_conf": "",
    },
    "federation_inbound": {
        "app": "synapse.app.generic_worker",
        "listener_resources": ["federation"],
        "endpoint_patterns": ["/_matrix/federation/(v1|v2)/send/"],
        "shared_extra_conf": {},
        "worker_extra_conf": "",
    },
    "event_persister": {
        "app": "synapse.app.generic_worker",
        "listener_resources": ["replication"],
        "endpoint_patterns": [],
        "shared_extra_conf": {},
        "worker_extra_conf": "",
    },
    "background_worker": {
        "app": "synapse.app.generic_worker",
        "listener_resources": [],
        "endpoint_patterns": [],
        # This worker cannot be sharded. Therefore there should only ever be one background
        # worker, and it should be named background_worker1
        "shared_extra_conf": {"run_background_tasks_on": "background_worker1"},
        "worker_extra_conf": "",
    },
    "event_creator": {
        "app": "synapse.app.generic_worker",
        "listener_resources": ["client"],
        "endpoint_patterns": [
            "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/redact",
            "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/send",
            "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/(join|invite|leave|ban|unban|kick)$",
            "^/_matrix/client/(api/v1|r0|v3|unstable)/join/",
            "^/_matrix/client/(api/v1|r0|v3|unstable)/profile/",
        ],
        "shared_extra_conf": {},
        "worker_extra_conf": "",
    },
    "frontend_proxy": {
        "app": "synapse.app.frontend_proxy",
        "listener_resources": ["client", "replication"],
        "endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/keys/upload"],
        "shared_extra_conf": {},
        "worker_extra_conf": (
            "worker_main_http_uri: http://127.0.0.1:%d"
            % (MAIN_PROCESS_HTTP_LISTENER_PORT,)
        ),
    },
}

# Templates for sections that may be inserted multiple times in config files
SUPERVISORD_PROCESS_CONFIG_BLOCK = """
[program:synapse_{name}]
command=/usr/local/bin/python -m {app} \
    --config-path="{config_path}" \
    --config-path=/conf/workers/shared.yaml \
    --config-path=/conf/workers/{name}.yaml
autorestart=unexpected
priority=500
exitcodes=0
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
"""

NGINX_LOCATION_CONFIG_BLOCK = """
    location ~* {endpoint} {{
        proxy_pass {upstream};
        proxy_set_header X-Forwarded-For $remote_addr;
        proxy_set_header X-Forwarded-Proto $scheme;
        proxy_set_header Host $host;
    }}
"""

NGINX_UPSTREAM_CONFIG_BLOCK = """
upstream {upstream_worker_type} {{
{body}
}}
"""


# Utility functions
def log(txt: str):
    """Log something to the stdout.

    Args:
        txt: The text to log.
    """
    print(txt)


def error(txt: str):
    """Log something and exit with an error code.

    Args:
        txt: The text to log in error.
    """
    log(txt)
    sys.exit(2)


def convert(src: str, dst: str, **template_vars):
    """Generate a file from a template

    Args:
        src: Path to the input file.
        dst: Path to write to.
        template_vars: The arguments to replace placeholder variables in the template with.
    """
    # Read the template file
    with open(src) as infile:
        template = infile.read()

    # Generate a string from the template. We disable autoescape to prevent template
    # variables from being escaped.
    rendered = jinja2.Template(template, autoescape=False).render(**template_vars)

    # Write the generated contents to a file
    #
    # We use append mode in case the files have already been written to by something else
    # (for instance, as part of the instructions in a dockerfile).
    with open(dst, "a") as outfile:
        # In case the existing file doesn't end with a newline
        outfile.write("\n")

        outfile.write(rendered)


def add_sharding_to_shared_config(
    shared_config: dict,
    worker_type: str,
    worker_name: str,
    worker_port: int,
) -> None:
    """Given a dictionary representing a config file shared across all workers,
    append sharded worker information to it for the current worker_type instance.

    Args:
        shared_config: The config dict that all worker instances share (after being converted to YAML)
        worker_type: The type of worker (one of those defined in WORKERS_CONFIG).
        worker_name: The name of the worker instance.
        worker_port: The HTTP replication port that the worker instance is listening on.
    """
    # The instance_map config field marks the workers that write to various replication streams
    instance_map = shared_config.setdefault("instance_map", {})

    # Worker-type specific sharding config
    if worker_type == "pusher":
        shared_config.setdefault("pusher_instances", []).append(worker_name)

    elif worker_type == "federation_sender":
        shared_config.setdefault("federation_sender_instances", []).append(worker_name)

    elif worker_type == "event_persister":
        # Event persisters write to the events stream, so we need to update
        # the list of event stream writers
        shared_config.setdefault("stream_writers", {}).setdefault("events", []).append(
            worker_name
        )

        # Map of stream writer instance names to host/ports combos
        instance_map[worker_name] = {
            "host": "localhost",
            "port": worker_port,
        }

    elif worker_type == "media_repository":
        # The first configured media worker will run the media background jobs
        shared_config.setdefault("media_instance_running_background_jobs", worker_name)


def generate_base_homeserver_config():
    """Starts Synapse and generates a basic homeserver config, which will later be
    modified for worker support.

    Raises: CalledProcessError if calling start.py returned a non-zero exit code.
    """
    # start.py already does this for us, so just call that.
    # note that this script is copied in in the official, monolith dockerfile
    os.environ["SYNAPSE_HTTP_PORT"] = str(MAIN_PROCESS_HTTP_LISTENER_PORT)
    subprocess.check_output(["/usr/local/bin/python", "/start.py", "migrate_config"])


def generate_worker_files(environ, config_path: str, data_dir: str):
    """Read the desired list of workers from environment variables and generate
    shared homeserver, nginx and supervisord configs.

    Args:
        environ: _Environ[str]
        config_path: The location of the generated Synapse main worker config file.
        data_dir: The location of the synapse data directory. Where log and
            user-facing config files live.
    """
    # Note that yaml cares about indentation, so care should be taken to insert lines
    # into files at the correct indentation below.

    # shared_config is the contents of a Synapse config file that will be shared amongst
    # the main Synapse process as well as all workers.
    # It is intended mainly for disabling functionality when certain workers are spun up,
    # and adding a replication listener.

    # First read the original config file and extract the listeners block. Then we'll add
    # another listener for replication. Later we'll write out the result to the shared
    # config file.
    listeners = [
        {
            "port": 9093,
            "bind_address": "127.0.0.1",
            "type": "http",
            "resources": [{"names": ["replication"]}],
        }
    ]
    with open(config_path) as file_stream:
        original_config = yaml.safe_load(file_stream)
        original_listeners = original_config.get("listeners")
        if original_listeners:
            listeners += original_listeners

    # The shared homeserver config. The contents of which will be inserted into the
    # base shared worker jinja2 template.
    #
    # This config file will be passed to all workers, included Synapse's main process.
    shared_config: Dict[str, Any] = {"listeners": listeners}

    # The supervisord config. The contents of which will be inserted into the
    # base supervisord jinja2 template.
    #
    # Supervisord will be in charge of running everything, from redis to nginx to Synapse
    # and all of its worker processes. Load the config template, which defines a few
    # services that are necessary to run.
    supervisord_config = ""

    # Upstreams for load-balancing purposes. This dict takes the form of a worker type to the
    # ports of each worker. For example:
    # {
    #   worker_type: {1234, 1235, ...}}
    # }
    # and will be used to construct 'upstream' nginx directives.
    nginx_upstreams: Dict[str, Set[int]] = {}

    # A map of: {"endpoint": "upstream"}, where "upstream" is a str representing what will be
    # placed after the proxy_pass directive. The main benefit to representing this data as a
    # dict over a str is that we can easily deduplicate endpoints across multiple instances
    # of the same worker.
    #
    # An nginx site config that will be amended to depending on the workers that are
    # spun up. To be placed in /etc/nginx/conf.d.
    nginx_locations = {}

    # Read the desired worker configuration from the environment
    worker_types = environ.get("SYNAPSE_WORKER_TYPES")
    if worker_types is None:
        # No workers, just the main process
        worker_types = []
    else:
        # Split type names by comma
        worker_types = worker_types.split(",")

    # Create the worker configuration directory if it doesn't already exist
    os.makedirs("/conf/workers", exist_ok=True)

    # Start worker ports from this arbitrary port
    worker_port = 18009

    # A counter of worker_type -> int. Used for determining the name for a given
    # worker type when generating its config file, as each worker's name is just
    # worker_type + instance #
    worker_type_counter: Dict[str, int] = {}

    # A list of internal endpoints to healthcheck, starting with the main process
    # which exists even if no workers do.
    healthcheck_urls = ["http://localhost:8080/health"]

    # For each worker type specified by the user, create config values
    for worker_type in worker_types:
        worker_type = worker_type.strip()

        worker_config = WORKERS_CONFIG.get(worker_type)
        if worker_config:
            worker_config = worker_config.copy()
        else:
            log(worker_type + " is an unknown worker type! It will be ignored")
            continue

        new_worker_count = worker_type_counter.setdefault(worker_type, 0) + 1
        worker_type_counter[worker_type] = new_worker_count

        # Name workers by their type concatenated with an incrementing number
        # e.g. federation_reader1
        worker_name = worker_type + str(new_worker_count)
        worker_config.update(
            {"name": worker_name, "port": str(worker_port), "config_path": config_path}
        )

        # Update the shared config with any worker-type specific options
        shared_config.update(worker_config["shared_extra_conf"])

        healthcheck_urls.append("http://localhost:%d/health" % (worker_port,))

        # Check if more than one instance of this worker type has been specified
        worker_type_total_count = worker_types.count(worker_type)
        if worker_type_total_count > 1:
            # Update the shared config with sharding-related options if necessary
            add_sharding_to_shared_config(
                shared_config, worker_type, worker_name, worker_port
            )

        # Enable the worker in supervisord
        supervisord_config += SUPERVISORD_PROCESS_CONFIG_BLOCK.format_map(worker_config)

        # Add nginx location blocks for this worker's endpoints (if any are defined)
        for pattern in worker_config["endpoint_patterns"]:
            # Determine whether we need to load-balance this worker
            if worker_type_total_count > 1:
                # Create or add to a load-balanced upstream for this worker
                nginx_upstreams.setdefault(worker_type, set()).add(worker_port)

                # Upstreams are named after the worker_type
                upstream = "http://" + worker_type
            else:
                upstream = "http://localhost:%d" % (worker_port,)

            # Note that this endpoint should proxy to this upstream
            nginx_locations[pattern] = upstream

        # Write out the worker's logging config file

        log_config_filepath = generate_worker_log_config(environ, worker_name, data_dir)

        # Then a worker config file
        convert(
            "/conf/worker.yaml.j2",
            "/conf/workers/{name}.yaml".format(name=worker_name),
            **worker_config,
            worker_log_config_filepath=log_config_filepath,
        )

        worker_port += 1

    # Build the nginx location config blocks
    nginx_location_config = ""
    for endpoint, upstream in nginx_locations.items():
        nginx_location_config += NGINX_LOCATION_CONFIG_BLOCK.format(
            endpoint=endpoint,
            upstream=upstream,
        )

    # Determine the load-balancing upstreams to configure
    nginx_upstream_config = ""

    for upstream_worker_type, upstream_worker_ports in nginx_upstreams.items():
        body = ""
        for port in upstream_worker_ports:
            body += "    server localhost:%d;\n" % (port,)

        # Add to the list of configured upstreams
        nginx_upstream_config += NGINX_UPSTREAM_CONFIG_BLOCK.format(
            upstream_worker_type=upstream_worker_type,
            body=body,
        )

    # Finally, we'll write out the config files.

    # log config for the master process
    master_log_config = generate_worker_log_config(environ, "master", data_dir)
    shared_config["log_config"] = master_log_config

    # Shared homeserver config
    convert(
        "/conf/shared.yaml.j2",
        "/conf/workers/shared.yaml",
        shared_worker_config=yaml.dump(shared_config),
    )

    # Nginx config
    convert(
        "/conf/nginx.conf.j2",
        "/etc/nginx/conf.d/matrix-synapse.conf",
        worker_locations=nginx_location_config,
        upstream_directives=nginx_upstream_config,
    )

    # Supervisord config
    convert(
        "/conf/supervisord.conf.j2",
        "/etc/supervisor/conf.d/supervisord.conf",
        main_config_path=config_path,
        worker_config=supervisord_config,
    )

    # healthcheck config
    convert(
        "/conf/healthcheck.sh.j2",
        "/healthcheck.sh",
        healthcheck_urls=healthcheck_urls,
    )

    # Ensure the logging directory exists
    log_dir = data_dir + "/logs"
    if not os.path.exists(log_dir):
        os.mkdir(log_dir)


def generate_worker_log_config(
    environ: Mapping[str, str], worker_name: str, data_dir: str
) -> str:
    """Generate a log.config file for the given worker.

    Returns: the path to the generated file
    """
    # Check whether we should write worker logs to disk, in addition to the console
    extra_log_template_args = {}
    if environ.get("SYNAPSE_WORKERS_WRITE_LOGS_TO_DISK"):
        extra_log_template_args["LOG_FILE_PATH"] = "{dir}/logs/{name}.log".format(
            dir=data_dir, name=worker_name
        )
    # Render and write the file
    log_config_filepath = "/conf/workers/{name}.log.config".format(name=worker_name)
    convert(
        "/conf/log.config",
        log_config_filepath,
        worker_name=worker_name,
        **extra_log_template_args,
    )
    return log_config_filepath


def start_supervisord():
    """Starts up supervisord which then starts and monitors all other necessary processes

    Raises: CalledProcessError if calling start.py return a non-zero exit code.
    """
    subprocess.run(["/usr/bin/supervisord"], stdin=subprocess.PIPE)


def main(args, environ):
    config_dir = environ.get("SYNAPSE_CONFIG_DIR", "/data")
    config_path = environ.get("SYNAPSE_CONFIG_PATH", config_dir + "/homeserver.yaml")
    data_dir = environ.get("SYNAPSE_DATA_DIR", "/data")

    # override SYNAPSE_NO_TLS, we don't support TLS in worker mode,
    # this needs to be handled by a frontend proxy
    environ["SYNAPSE_NO_TLS"] = "yes"

    # Generate the base homeserver config if one does not yet exist
    if not os.path.exists(config_path):
        log("Generating base homeserver config")
        generate_base_homeserver_config()

    # This script may be run multiple times (mostly by Complement, see note at top of file).
    # Don't re-configure workers in this instance.
    mark_filepath = "/conf/workers_have_been_configured"
    if not os.path.exists(mark_filepath):
        # Always regenerate all other config files
        generate_worker_files(environ, config_path, data_dir)

        # Mark workers as being configured
        with open(mark_filepath, "w") as f:
            f.write("")

    # Start supervisord, which will start Synapse, all of the configured worker
    # processes, redis, nginx etc. according to the config we created above.
    start_supervisord()


if __name__ == "__main__":
    main(sys.argv, os.environ)