summary refs log blame commit diff
path: root/docker/configure_workers_and_start.py
blob: 62952e6b26358064e651379d3f0111b129236fc3 (plain) (tree)



















                                                                                







                                                                                                 
                                                                                          

                                                                                     

                                                                                     


                                                                                
                                                                                 




                                                                                            
               
         
                 
                                   
                        









                    
 
           
                                                
                                      

                                            

                                                                 
 


                                                                                        




                                                                                                   
                                             
               
                                            
                                 
                                

                                
                                            
                                         
                                                                             
          

                                                                        

                                
                                            







                                                       

                                                                              
                                                                              
          

                                                       
                                            
                                 

                                                                     

                                
                                            
                                 
                                




                                            


                                                                       


                                









                                                                                
                                                    




                                                                        
                                                                    



                                                                           
                                                               
                                                              
                                                                    
                                                                 
                                                                          
                                                              
                                                               


                                
















                                                               
                                                          
























                                                                        

                                                                                  




                                            


                                                                                                    
                                                              
                                                                



                                
                                            
                                                        
                                                                                      
                                
                                
      









































                                                                                    

                                                                            
                                 
                             


                                                      
      

                                 
                                       
      
  


                   
                          

              
                                
                               

               



                            
                                                                 






                                                                                           


                                                                                      
 
                                               










                                                                                         
                                      
                        
                               


                                                                               
                                                                                     
         


                                                                                      

                                                                                        
                                                                                       
                                                               











                                                                                       
                                                                            
                                               
                                                                                       
                                             





                                                                                       







                                                            









                                                                                        







                                                                

                                  
                                            



                                                                                        
 

























































































































                                                                                        
 
 
                                              






                                                                                  
                                                                                        
 


























































































































                                                                                        
                          


                                                
          
                                                                             
         
                                     
                                                                                   
                                                                           
                                                                                    


                                                                                     
                                                                       

                                                                                     
















                                                               





                                                                                   

                                                                                        
                                                            
 


                                                                               
 
                                                                                   


                                                                
                                             
 




                                                                                        





                                                                           
                                                                                 









                                                                                       
 
























                                                                                        
 
                             
                                                                                       
         


                                                                                        




                                                                                  
 
                                                                             
                                                                     
         
                                          
                                                
 
                                                    
                                                                                        


                                   
                                                
                                                           
                                                  
         


                                                                           










                                                                    
                                                                                    
                 





                                                                

                                                                    
                                                                



                                                


                                                                               









                                                                                   
                                                    
 

                                                                         







                                                              
 



                                                      
                                                          
                                      
                                              






                                                  
                                                         
                                              

                        
                                                 
                                    
                                           
                                     
                                    
                                              
     



                                              
                                                                                      
     





                                          




                                         






                                                                                   
                                                          
                                                         




                                                                                       
                                                                                       
 
                               
                                                                   



                                  

                                                       


                              
                                                                     










                                                                                     
                                                               
                                                                                     
                                                        











                                                                          
                                                  
                                             
                                                                                     


                                           
                                                      
 






                                                                                   
                                                                               
                               
                   
              


                                           
                
     


                              
#!/usr/bin/env python
# Copyright 2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# This script reads environment variables and generates a shared Synapse worker,
# nginx and supervisord configs depending on the workers requested.
#
# The environment variables it reads are:
#   * SYNAPSE_SERVER_NAME: The desired server_name of the homeserver.
#   * SYNAPSE_REPORT_STATS: Whether to report stats.
#   * SYNAPSE_WORKER_TYPES: A comma separated list of worker names as specified in WORKERS_CONFIG
#         below. Leave empty for no workers. Add a ':' and a number at the end to
#         multiply that worker. Append multiple worker types with '+' to merge the
#         worker types into a single worker. Add a name and a '=' to the front of a
#         worker type to give this instance a name in logs and nginx.
#         Examples:
#         SYNAPSE_WORKER_TYPES='event_persister, federation_sender, client_reader'
#         SYNAPSE_WORKER_TYPES='event_persister:2, federation_sender:2, client_reader'
#         SYNAPSE_WORKER_TYPES='stream_writers=account_data+presence+typing'
#   * SYNAPSE_AS_REGISTRATION_DIR: If specified, a directory in which .yaml and .yml files
#         will be treated as Application Service registration files.
#   * SYNAPSE_TLS_CERT: Path to a TLS certificate in PEM format.
#   * SYNAPSE_TLS_KEY: Path to a TLS key. If this and SYNAPSE_TLS_CERT are specified,
#         Nginx will be configured to serve TLS on port 8448.
#   * SYNAPSE_USE_EXPERIMENTAL_FORKING_LAUNCHER: Whether to use the forking launcher,
#         only intended for usage in Complement at the moment.
#         No stability guarantees are provided.
#   * SYNAPSE_LOG_LEVEL: Set this to DEBUG, INFO, WARNING or ERROR to change the
#         log level. INFO is the default.
#   * SYNAPSE_LOG_SENSITIVE: If unset, SQL and SQL values won't be logged,
#         regardless of the SYNAPSE_LOG_LEVEL setting.
#   * SYNAPSE_LOG_TESTING: if set, Synapse will log additional information useful
#     for testing.
#
# NOTE: According to Complement's ENTRYPOINT expectations for a homeserver image (as defined
# in the project's README), this script may be run multiple times, and functionality should
# continue to work if so.

import os
import platform
import re
import subprocess
import sys
from collections import defaultdict
from itertools import chain
from pathlib import Path
from typing import (
    Any,
    Dict,
    List,
    Mapping,
    MutableMapping,
    NoReturn,
    Optional,
    Set,
    SupportsIndex,
)

import yaml
from jinja2 import Environment, FileSystemLoader

MAIN_PROCESS_HTTP_LISTENER_PORT = 8080
MAIN_PROCESS_INSTANCE_NAME = "main"
MAIN_PROCESS_LOCALHOST_ADDRESS = "127.0.0.1"
MAIN_PROCESS_REPLICATION_PORT = 9093
# Obviously, these would only be used with the UNIX socket option
MAIN_PROCESS_UNIX_SOCKET_PUBLIC_PATH = "/run/main_public.sock"
MAIN_PROCESS_UNIX_SOCKET_PRIVATE_PATH = "/run/main_private.sock"

# A simple name used as a placeholder in the WORKERS_CONFIG below. This will be replaced
# during processing with the name of the worker.
WORKER_PLACEHOLDER_NAME = "placeholder_name"

# Workers with exposed endpoints needs either "client", "federation", or "media" listener_resources
# Watching /_matrix/client needs a "client" listener
# Watching /_matrix/federation needs a "federation" listener
# Watching /_matrix/media and related needs a "media" listener
# Stream Writers require "client" and "replication" listeners because they
#   have to attach by instance_map to the master process and have client endpoints.
WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
    "pusher": {
        "app": "synapse.app.generic_worker",
        "listener_resources": [],
        "endpoint_patterns": [],
        "shared_extra_conf": {},
        "worker_extra_conf": "",
    },
    "user_dir": {
        "app": "synapse.app.generic_worker",
        "listener_resources": ["client"],
        "endpoint_patterns": [
            "^/_matrix/client/(api/v1|r0|v3|unstable)/user_directory/search$"
        ],
        "shared_extra_conf": {
            "update_user_directory_from_worker": WORKER_PLACEHOLDER_NAME
        },
        "worker_extra_conf": "",
    },
    "media_repository": {
        "app": "synapse.app.generic_worker",
        "listener_resources": ["media"],
        "endpoint_patterns": [
            "^/_matrix/media/",
            "^/_synapse/admin/v1/purge_media_cache$",
            "^/_synapse/admin/v1/room/.*/media.*$",
            "^/_synapse/admin/v1/user/.*/media.*$",
            "^/_synapse/admin/v1/media/.*$",
            "^/_synapse/admin/v1/quarantine_media/.*$",
        ],
        # The first configured media worker will run the media background jobs
        "shared_extra_conf": {
            "enable_media_repo": False,
            "media_instance_running_background_jobs": WORKER_PLACEHOLDER_NAME,
        },
        "worker_extra_conf": "enable_media_repo: true",
    },
    "appservice": {
        "app": "synapse.app.generic_worker",
        "listener_resources": [],
        "endpoint_patterns": [],
        "shared_extra_conf": {
            "notify_appservices_from_worker": WORKER_PLACEHOLDER_NAME
        },
        "worker_extra_conf": "",
    },
    "federation_sender": {
        "app": "synapse.app.generic_worker",
        "listener_resources": [],
        "endpoint_patterns": [],
        "shared_extra_conf": {},
        "worker_extra_conf": "",
    },
    "synchrotron": {
        "app": "synapse.app.generic_worker",
        "listener_resources": ["client"],
        "endpoint_patterns": [
            "^/_matrix/client/(v2_alpha|r0|v3)/sync$",
            "^/_matrix/client/(api/v1|v2_alpha|r0|v3)/events$",
            "^/_matrix/client/(api/v1|r0|v3)/initialSync$",
            "^/_matrix/client/(api/v1|r0|v3)/rooms/[^/]+/initialSync$",
        ],
        "shared_extra_conf": {},
        "worker_extra_conf": "",
    },
    "client_reader": {
        "app": "synapse.app.generic_worker",
        "listener_resources": ["client"],
        "endpoint_patterns": [
            "^/_matrix/client/(api/v1|r0|v3|unstable)/publicRooms$",
            "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/joined_members$",
            "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/context/.*$",
            "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/members$",
            "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/state$",
            "^/_matrix/client/v1/rooms/.*/hierarchy$",
            "^/_matrix/client/(v1|unstable)/rooms/.*/relations/",
            "^/_matrix/client/v1/rooms/.*/threads$",
            "^/_matrix/client/(api/v1|r0|v3|unstable)/login$",
            "^/_matrix/client/(api/v1|r0|v3|unstable)/account/3pid$",
            "^/_matrix/client/(api/v1|r0|v3|unstable)/account/whoami$",
            "^/_matrix/client/versions$",
            "^/_matrix/client/(api/v1|r0|v3|unstable)/voip/turnServer$",
            "^/_matrix/client/(r0|v3|unstable)/register$",
            "^/_matrix/client/(r0|v3|unstable)/register/available$",
            "^/_matrix/client/(r0|v3|unstable)/auth/.*/fallback/web$",
            "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/messages$",
            "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/event",
            "^/_matrix/client/(api/v1|r0|v3|unstable)/joined_rooms",
            "^/_matrix/client/(api/v1|r0|v3|unstable/.*)/rooms/.*/aliases",
            "^/_matrix/client/v1/rooms/.*/timestamp_to_event$",
            "^/_matrix/client/(api/v1|r0|v3|unstable)/search",
            "^/_matrix/client/(r0|v3|unstable)/user/.*/filter(/|$)",
            "^/_matrix/client/(r0|v3|unstable)/password_policy$",
            "^/_matrix/client/(api/v1|r0|v3|unstable)/directory/room/.*$",
            "^/_matrix/client/(r0|v3|unstable)/capabilities$",
            "^/_matrix/client/(r0|v3|unstable)/notifications$",
        ],
        "shared_extra_conf": {},
        "worker_extra_conf": "",
    },
    "federation_reader": {
        "app": "synapse.app.generic_worker",
        "listener_resources": ["federation"],
        "endpoint_patterns": [
            "^/_matrix/federation/(v1|v2)/event/",
            "^/_matrix/federation/(v1|v2)/state/",
            "^/_matrix/federation/(v1|v2)/state_ids/",
            "^/_matrix/federation/(v1|v2)/backfill/",
            "^/_matrix/federation/(v1|v2)/get_missing_events/",
            "^/_matrix/federation/(v1|v2)/publicRooms",
            "^/_matrix/federation/(v1|v2)/query/",
            "^/_matrix/federation/(v1|v2)/make_join/",
            "^/_matrix/federation/(v1|v2)/make_leave/",
            "^/_matrix/federation/(v1|v2)/send_join/",
            "^/_matrix/federation/(v1|v2)/send_leave/",
            "^/_matrix/federation/(v1|v2)/invite/",
            "^/_matrix/federation/(v1|v2)/query_auth/",
            "^/_matrix/federation/(v1|v2)/event_auth/",
            "^/_matrix/federation/v1/timestamp_to_event/",
            "^/_matrix/federation/(v1|v2)/exchange_third_party_invite/",
            "^/_matrix/federation/(v1|v2)/user/devices/",
            "^/_matrix/federation/(v1|v2)/get_groups_publicised$",
            "^/_matrix/key/v2/query",
        ],
        "shared_extra_conf": {},
        "worker_extra_conf": "",
    },
    "federation_inbound": {
        "app": "synapse.app.generic_worker",
        "listener_resources": ["federation"],
        "endpoint_patterns": ["/_matrix/federation/(v1|v2)/send/"],
        "shared_extra_conf": {},
        "worker_extra_conf": "",
    },
    "event_persister": {
        "app": "synapse.app.generic_worker",
        "listener_resources": ["replication"],
        "endpoint_patterns": [],
        "shared_extra_conf": {},
        "worker_extra_conf": "",
    },
    "background_worker": {
        "app": "synapse.app.generic_worker",
        "listener_resources": [],
        "endpoint_patterns": [],
        # This worker cannot be sharded. Therefore, there should only ever be one
        # background worker. This is enforced for the safety of your database.
        "shared_extra_conf": {"run_background_tasks_on": WORKER_PLACEHOLDER_NAME},
        "worker_extra_conf": "",
    },
    "event_creator": {
        "app": "synapse.app.generic_worker",
        "listener_resources": ["client"],
        "endpoint_patterns": [
            "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/redact",
            "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/send",
            "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/(join|invite|leave|ban|unban|kick)$",
            "^/_matrix/client/(api/v1|r0|v3|unstable)/join/",
            "^/_matrix/client/(api/v1|r0|v3|unstable)/knock/",
            "^/_matrix/client/(api/v1|r0|v3|unstable)/profile/",
        ],
        "shared_extra_conf": {},
        "worker_extra_conf": "",
    },
    "frontend_proxy": {
        "app": "synapse.app.generic_worker",
        "listener_resources": ["client", "replication"],
        "endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/keys/upload"],
        "shared_extra_conf": {},
        "worker_extra_conf": "",
    },
    "account_data": {
        "app": "synapse.app.generic_worker",
        "listener_resources": ["client", "replication"],
        "endpoint_patterns": [
            "^/_matrix/client/(r0|v3|unstable)/.*/tags",
            "^/_matrix/client/(r0|v3|unstable)/.*/account_data",
        ],
        "shared_extra_conf": {},
        "worker_extra_conf": "",
    },
    "presence": {
        "app": "synapse.app.generic_worker",
        "listener_resources": ["client", "replication"],
        "endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/presence/"],
        "shared_extra_conf": {},
        "worker_extra_conf": "",
    },
    "receipts": {
        "app": "synapse.app.generic_worker",
        "listener_resources": ["client", "replication"],
        "endpoint_patterns": [
            "^/_matrix/client/(r0|v3|unstable)/rooms/.*/receipt",
            "^/_matrix/client/(r0|v3|unstable)/rooms/.*/read_markers",
        ],
        "shared_extra_conf": {},
        "worker_extra_conf": "",
    },
    "to_device": {
        "app": "synapse.app.generic_worker",
        "listener_resources": ["client", "replication"],
        "endpoint_patterns": ["^/_matrix/client/(r0|v3|unstable)/sendToDevice/"],
        "shared_extra_conf": {},
        "worker_extra_conf": "",
    },
    "typing": {
        "app": "synapse.app.generic_worker",
        "listener_resources": ["client", "replication"],
        "endpoint_patterns": [
            "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/typing"
        ],
        "shared_extra_conf": {},
        "worker_extra_conf": "",
    },
}

# Templates for sections that may be inserted multiple times in config files
NGINX_LOCATION_CONFIG_BLOCK = """
    location ~* {endpoint} {{
        proxy_pass {upstream};
        proxy_set_header X-Forwarded-For $remote_addr;
        proxy_set_header X-Forwarded-Proto $scheme;
        proxy_set_header Host $host;
    }}
"""

NGINX_UPSTREAM_CONFIG_BLOCK = """
upstream {upstream_worker_base_name} {{
{body}
}}
"""


# Utility functions
def log(txt: str) -> None:
    print(txt)


def error(txt: str) -> NoReturn:
    print(txt, file=sys.stderr)
    sys.exit(2)


def flush_buffers() -> None:
    sys.stdout.flush()
    sys.stderr.flush()


def convert(src: str, dst: str, **template_vars: object) -> None:
    """Generate a file from a template

    Args:
        src: Path to the input file.
        dst: Path to write to.
        template_vars: The arguments to replace placeholder variables in the template with.
    """
    # Read the template file
    # We disable autoescape to prevent template variables from being escaped,
    # as we're not using HTML.
    env = Environment(loader=FileSystemLoader(os.path.dirname(src)), autoescape=False)
    template = env.get_template(os.path.basename(src))

    # Generate a string from the template.
    rendered = template.render(**template_vars)

    # Write the generated contents to a file
    #
    # We use append mode in case the files have already been written to by something else
    # (for instance, as part of the instructions in a dockerfile).
    with open(dst, "a") as outfile:
        # In case the existing file doesn't end with a newline
        outfile.write("\n")

        outfile.write(rendered)


def add_worker_roles_to_shared_config(
    shared_config: dict,
    worker_types_set: Set[str],
    worker_name: str,
    worker_port: int,
) -> None:
    """Given a dictionary representing a config file shared across all workers,
    append appropriate worker information to it for the current worker_type instance.

    Args:
        shared_config: The config dict that all worker instances share (after being
            converted to YAML)
        worker_types_set: The type of worker (one of those defined in WORKERS_CONFIG).
            This list can be a single worker type or multiple.
        worker_name: The name of the worker instance.
        worker_port: The HTTP replication port that the worker instance is listening on.
    """
    # The instance_map config field marks the workers that write to various replication
    # streams
    instance_map = shared_config.setdefault("instance_map", {})

    # This is a list of the stream_writers that there can be only one of. Events can be
    # sharded, and therefore doesn't belong here.
    singular_stream_writers = [
        "account_data",
        "presence",
        "receipts",
        "to_device",
        "typing",
    ]

    # Worker-type specific sharding config. Now a single worker can fulfill multiple
    # roles, check each.
    if "pusher" in worker_types_set:
        shared_config.setdefault("pusher_instances", []).append(worker_name)

    if "federation_sender" in worker_types_set:
        shared_config.setdefault("federation_sender_instances", []).append(worker_name)

    if "event_persister" in worker_types_set:
        # Event persisters write to the events stream, so we need to update
        # the list of event stream writers
        shared_config.setdefault("stream_writers", {}).setdefault("events", []).append(
            worker_name
        )

        # Map of stream writer instance names to host/ports combos
        if os.environ.get("SYNAPSE_USE_UNIX_SOCKET", False):
            instance_map[worker_name] = {
                "path": f"/run/worker.{worker_port}",
            }
        else:
            instance_map[worker_name] = {
                "host": "localhost",
                "port": worker_port,
            }
    # Update the list of stream writers. It's convenient that the name of the worker
    # type is the same as the stream to write. Iterate over the whole list in case there
    # is more than one.
    for worker in worker_types_set:
        if worker in singular_stream_writers:
            shared_config.setdefault("stream_writers", {}).setdefault(
                worker, []
            ).append(worker_name)

            # Map of stream writer instance names to host/ports combos
            # For now, all stream writers need http replication ports
            if os.environ.get("SYNAPSE_USE_UNIX_SOCKET", False):
                instance_map[worker_name] = {
                    "path": f"/run/worker.{worker_port}",
                }
            else:
                instance_map[worker_name] = {
                    "host": "localhost",
                    "port": worker_port,
                }


def merge_worker_template_configs(
    existing_dict: Optional[Dict[str, Any]],
    to_be_merged_dict: Dict[str, Any],
) -> Dict[str, Any]:
    """When given an existing dict of worker template configuration consisting with both
        dicts and lists, merge new template data from WORKERS_CONFIG(or create) and
        return new dict.

    Args:
        existing_dict: Either an existing worker template or a fresh blank one.
        to_be_merged_dict: The template from WORKERS_CONFIGS to be merged into
            existing_dict.
    Returns: The newly merged together dict values.
    """
    new_dict: Dict[str, Any] = {}
    if not existing_dict:
        # It doesn't exist yet, just use the new dict(but take a copy not a reference)
        new_dict = to_be_merged_dict.copy()
    else:
        for i in to_be_merged_dict.keys():
            if (i == "endpoint_patterns") or (i == "listener_resources"):
                # merge the two lists, remove duplicates
                new_dict[i] = list(set(existing_dict[i] + to_be_merged_dict[i]))
            elif i == "shared_extra_conf":
                # merge dictionary's, the worker name will be replaced later
                new_dict[i] = {**existing_dict[i], **to_be_merged_dict[i]}
            elif i == "worker_extra_conf":
                # There is only one worker type that has a 'worker_extra_conf' and it is
                # the media_repo. Since duplicate worker types on the same worker don't
                # work, this is fine.
                new_dict[i] = existing_dict[i] + to_be_merged_dict[i]
            else:
                # Everything else should be identical, like "app", which only works
                # because all apps are now generic_workers.
                new_dict[i] = to_be_merged_dict[i]
    return new_dict


def insert_worker_name_for_worker_config(
    existing_dict: Dict[str, Any], worker_name: str
) -> Dict[str, Any]:
    """Insert a given worker name into the worker's configuration dict.

    Args:
        existing_dict: The worker_config dict that is imported into shared_config.
        worker_name: The name of the worker to insert.
    Returns: Copy of the dict with newly inserted worker name
    """
    dict_to_edit = existing_dict.copy()
    for k, v in dict_to_edit["shared_extra_conf"].items():
        # Only proceed if it's the placeholder name string
        if v == WORKER_PLACEHOLDER_NAME:
            dict_to_edit["shared_extra_conf"][k] = worker_name
    return dict_to_edit


def apply_requested_multiplier_for_worker(worker_types: List[str]) -> List[str]:
    """
    Apply multiplier(if found) by returning a new expanded list with some basic error
    checking.

    Args:
        worker_types: The unprocessed List of requested workers
    Returns:
        A new list with all requested workers expanded.
    """
    # Checking performed:
    # 1. if worker:2 or more is declared, it will create additional workers up to number
    # 2. if worker:1, it will create a single copy of this worker as if no number was
    #   given
    # 3. if worker:0 is declared, this worker will be ignored. This is to allow for
    #   scripting and automated expansion and is intended behaviour.
    # 4. if worker:NaN or is a negative number, it will error and log it.
    new_worker_types = []
    for worker_type in worker_types:
        if ":" in worker_type:
            worker_type_components = split_and_strip_string(worker_type, ":", 1)
            worker_count = 0
            # Should only be 2 components, a type of worker(s) and an integer as a
            # string. Cast the number as an int then it can be used as a counter.
            try:
                worker_count = int(worker_type_components[1])
            except ValueError:
                error(
                    f"Bad number in worker count for '{worker_type}': "
                    f"'{worker_type_components[1]}' is not an integer"
                )

            # As long as there are more than 0, we add one to the list to make below.
            for _ in range(worker_count):
                new_worker_types.append(worker_type_components[0])

        else:
            # If it's not a real worker_type, it will error out later.
            new_worker_types.append(worker_type)
    return new_worker_types


def is_sharding_allowed_for_worker_type(worker_type: str) -> bool:
    """Helper to check to make sure worker types that cannot have multiples do not.

    Args:
        worker_type: The type of worker to check against.
    Returns: True if allowed, False if not
    """
    return worker_type not in [
        "background_worker",
        "account_data",
        "presence",
        "receipts",
        "typing",
        "to_device",
    ]


def split_and_strip_string(
    given_string: str, split_char: str, max_split: SupportsIndex = -1
) -> List[str]:
    """
    Helper to split a string on split_char and strip whitespace from each end of each
        element.
    Args:
        given_string: The string to split
        split_char: The character to split the string on
        max_split: kwarg for split() to limit how many times the split() happens
    Returns:
        A List of strings
    """
    # Removes whitespace from ends of result strings before adding to list. Allow for
    # overriding 'maxsplit' kwarg, default being -1 to signify no maximum.
    return [x.strip() for x in given_string.split(split_char, maxsplit=max_split)]


def generate_base_homeserver_config() -> None:
    """Starts Synapse and generates a basic homeserver config, which will later be
    modified for worker support.

    Raises: CalledProcessError if calling start.py returned a non-zero exit code.
    """
    # start.py already does this for us, so just call that.
    # note that this script is copied in in the official, monolith dockerfile
    os.environ["SYNAPSE_HTTP_PORT"] = str(MAIN_PROCESS_HTTP_LISTENER_PORT)
    subprocess.run(["/usr/local/bin/python", "/start.py", "migrate_config"], check=True)


def parse_worker_types(
    requested_worker_types: List[str],
) -> Dict[str, Set[str]]:
    """Read the desired list of requested workers and prepare the data for use in
        generating worker config files while also checking for potential gotchas.

    Args:
        requested_worker_types: The list formed from the split environment variable
            containing the unprocessed requests for workers.

    Returns: A dict of worker names to set of worker types. Format:
        {'worker_name':
            {'worker_type', 'worker_type2'}
        }
    """
    # A counter of worker_base_name -> int. Used for determining the name for a given
    # worker when generating its config file, as each worker's name is just
    # worker_base_name followed by instance number
    worker_base_name_counter: Dict[str, int] = defaultdict(int)

    # Similar to above, but more finely grained. This is used to determine we don't have
    # more than a single worker for cases where multiples would be bad(e.g. presence).
    worker_type_shard_counter: Dict[str, int] = defaultdict(int)

    # The final result of all this processing
    dict_to_return: Dict[str, Set[str]] = {}

    # Handle any multipliers requested for given workers.
    multiple_processed_worker_types = apply_requested_multiplier_for_worker(
        requested_worker_types
    )

    # Process each worker_type_string
    # Examples of expected formats:
    #  - requested_name=type1+type2+type3
    #  - synchrotron
    #  - event_creator+event_persister
    for worker_type_string in multiple_processed_worker_types:
        # First, if a name is requested, use that — otherwise generate one.
        worker_base_name: str = ""
        if "=" in worker_type_string:
            # Split on "=", remove extra whitespace from ends then make list
            worker_type_split = split_and_strip_string(worker_type_string, "=")
            if len(worker_type_split) > 2:
                error(
                    "There should only be one '=' in the worker type string. "
                    f"Please fix: {worker_type_string}"
                )

            # Assign the name
            worker_base_name = worker_type_split[0]

            if not re.match(r"^[a-zA-Z0-9_+-]*[a-zA-Z_+-]$", worker_base_name):
                # Apply a fairly narrow regex to the worker names. Some characters
                # aren't safe for use in file paths or nginx configurations.
                # Don't allow to end with a number because we'll add a number
                # ourselves in a moment.
                error(
                    "Invalid worker name; please choose a name consisting of "
                    "alphanumeric letters, _ + -, but not ending with a digit: "
                    f"{worker_base_name!r}"
                )

            # Continue processing the remainder of the worker_type string
            # with the name override removed.
            worker_type_string = worker_type_split[1]

        # Split the worker_type_string on "+", remove whitespace from ends then make
        # the list a set so it's deduplicated.
        worker_types_set: Set[str] = set(
            split_and_strip_string(worker_type_string, "+")
        )

        if not worker_base_name:
            # No base name specified: generate one deterministically from set of
            # types
            worker_base_name = "+".join(sorted(worker_types_set))

        # At this point, we have:
        #   worker_base_name which is the name for the worker, without counter.
        #   worker_types_set which is the set of worker types for this worker.

        # Validate worker_type and make sure we don't allow sharding for a worker type
        # that doesn't support it. Will error and stop if it is a problem,
        # e.g. 'background_worker'.
        for worker_type in worker_types_set:
            # Verify this is a real defined worker type. If it's not, stop everything so
            # it can be fixed.
            if worker_type not in WORKERS_CONFIG:
                error(
                    f"{worker_type} is an unknown worker type! Was found in "
                    f"'{worker_type_string}'. Please fix!"
                )

            if worker_type in worker_type_shard_counter:
                if not is_sharding_allowed_for_worker_type(worker_type):
                    error(
                        f"There can be only a single worker with {worker_type} "
                        "type. Please recount and remove."
                    )
            # Not in shard counter, must not have seen it yet, add it.
            worker_type_shard_counter[worker_type] += 1

        # Generate the number for the worker using incrementing counter
        worker_base_name_counter[worker_base_name] += 1
        worker_number = worker_base_name_counter[worker_base_name]
        worker_name = f"{worker_base_name}{worker_number}"

        if worker_number > 1:
            # If this isn't the first worker, check that we don't have a confusing
            # mixture of worker types with the same base name.
            first_worker_with_base_name = dict_to_return[f"{worker_base_name}1"]
            if first_worker_with_base_name != worker_types_set:
                error(
                    f"Can not use worker_name: '{worker_name}' for worker_type(s): "
                    f"{worker_types_set!r}. It is already in use by "
                    f"worker_type(s): {first_worker_with_base_name!r}"
                )

        dict_to_return[worker_name] = worker_types_set

    return dict_to_return


def generate_worker_files(
    environ: Mapping[str, str],
    config_path: str,
    data_dir: str,
    requested_worker_types: Dict[str, Set[str]],
) -> None:
    """Read the desired workers(if any) that is passed in and generate shared
        homeserver, nginx and supervisord configs.

    Args:
        environ: os.environ instance.
        config_path: The location of the generated Synapse main worker config file.
        data_dir: The location of the synapse data directory. Where log and
            user-facing config files live.
        requested_worker_types: A Dict containing requested workers in the format of
            {'worker_name1': {'worker_type', ...}}
    """
    # Note that yaml cares about indentation, so care should be taken to insert lines
    # into files at the correct indentation below.

    # Convenience helper for if using unix sockets instead of host:port
    using_unix_sockets = environ.get("SYNAPSE_USE_UNIX_SOCKET", False)
    # First read the original config file and extract the listeners block. Then we'll
    # add another listener for replication. Later we'll write out the result to the
    # shared config file.
    listeners: List[Any]
    if using_unix_sockets:
        listeners = [
            {
                "path": MAIN_PROCESS_UNIX_SOCKET_PRIVATE_PATH,
                "type": "http",
                "resources": [{"names": ["replication"]}],
            }
        ]
    else:
        listeners = [
            {
                "port": MAIN_PROCESS_REPLICATION_PORT,
                "bind_address": MAIN_PROCESS_LOCALHOST_ADDRESS,
                "type": "http",
                "resources": [{"names": ["replication"]}],
            }
        ]
    with open(config_path) as file_stream:
        original_config = yaml.safe_load(file_stream)
        original_listeners = original_config.get("listeners")
        if original_listeners:
            listeners += original_listeners

    # The shared homeserver config. The contents of which will be inserted into the
    # base shared worker jinja2 template. This config file will be passed to all
    # workers, included Synapse's main process. It is intended mainly for disabling
    # functionality when certain workers are spun up, and adding a replication listener.
    shared_config: Dict[str, Any] = {"listeners": listeners}

    # List of dicts that describe workers.
    # We pass this to the Supervisor template later to generate the appropriate
    # program blocks.
    worker_descriptors: List[Dict[str, Any]] = []

    # Upstreams for load-balancing purposes. This dict takes the form of the worker
    # type to the ports of each worker. For example:
    # {
    #   worker_type: {1234, 1235, ...}}
    # }
    # and will be used to construct 'upstream' nginx directives.
    nginx_upstreams: Dict[str, Set[int]] = {}

    # A map of: {"endpoint": "upstream"}, where "upstream" is a str representing what
    # will be placed after the proxy_pass directive. The main benefit to representing
    # this data as a dict over a str is that we can easily deduplicate endpoints
    # across multiple instances of the same worker. The final rendering will be combined
    # with nginx_upstreams and placed in /etc/nginx/conf.d.
    nginx_locations: Dict[str, str] = {}

    # Create the worker configuration directory if it doesn't already exist
    os.makedirs("/conf/workers", exist_ok=True)

    # Start worker ports from this arbitrary port
    worker_port = 18009

    # A list of internal endpoints to healthcheck, starting with the main process
    # which exists even if no workers do.
    # This list ends up being part of the command line to curl, (curl added support for
    # Unix sockets in version 7.40).
    if using_unix_sockets:
        healthcheck_urls = [
            f"--unix-socket {MAIN_PROCESS_UNIX_SOCKET_PUBLIC_PATH} "
            # The scheme and hostname from the following URL are ignored.
            # The only thing that matters is the path `/health`
            "http://localhost/health"
        ]
    else:
        healthcheck_urls = ["http://localhost:8080/health"]

    # Get the set of all worker types that we have configured
    all_worker_types_in_use = set(chain(*requested_worker_types.values()))
    # Map locations to upstreams (corresponding to worker types) in Nginx
    # but only if we use the appropriate worker type
    for worker_type in all_worker_types_in_use:
        for endpoint_pattern in WORKERS_CONFIG[worker_type]["endpoint_patterns"]:
            nginx_locations[endpoint_pattern] = f"http://{worker_type}"

    # For each worker type specified by the user, create config values and write it's
    # yaml config file
    for worker_name, worker_types_set in requested_worker_types.items():
        # The collected and processed data will live here.
        worker_config: Dict[str, Any] = {}

        # Merge all worker config templates for this worker into a single config
        for worker_type in worker_types_set:
            copy_of_template_config = WORKERS_CONFIG[worker_type].copy()

            # Merge worker type template configuration data. It's a combination of lists
            # and dicts, so use this helper.
            worker_config = merge_worker_template_configs(
                worker_config, copy_of_template_config
            )

        # Replace placeholder names in the config template with the actual worker name.
        worker_config = insert_worker_name_for_worker_config(worker_config, worker_name)

        worker_config.update(
            {"name": worker_name, "port": str(worker_port), "config_path": config_path}
        )

        # Update the shared config with any worker_type specific options. The first of a
        # given worker_type needs to stay assigned and not be replaced.
        worker_config["shared_extra_conf"].update(shared_config)
        shared_config = worker_config["shared_extra_conf"]
        if using_unix_sockets:
            healthcheck_urls.append(
                f"--unix-socket /run/worker.{worker_port} http://localhost/health"
            )
        else:
            healthcheck_urls.append("http://localhost:%d/health" % (worker_port,))

        # Update the shared config with sharding-related options if necessary
        add_worker_roles_to_shared_config(
            shared_config, worker_types_set, worker_name, worker_port
        )

        # Enable the worker in supervisord
        worker_descriptors.append(worker_config)

        # Write out the worker's logging config file
        log_config_filepath = generate_worker_log_config(environ, worker_name, data_dir)

        # Then a worker config file
        convert(
            "/conf/worker.yaml.j2",
            f"/conf/workers/{worker_name}.yaml",
            **worker_config,
            worker_log_config_filepath=log_config_filepath,
            using_unix_sockets=using_unix_sockets,
        )

        # Save this worker's port number to the correct nginx upstreams
        for worker_type in worker_types_set:
            nginx_upstreams.setdefault(worker_type, set()).add(worker_port)

        worker_port += 1

    # Build the nginx location config blocks
    nginx_location_config = ""
    for endpoint, upstream in nginx_locations.items():
        nginx_location_config += NGINX_LOCATION_CONFIG_BLOCK.format(
            endpoint=endpoint,
            upstream=upstream,
        )

    # Determine the load-balancing upstreams to configure
    nginx_upstream_config = ""
    for upstream_worker_base_name, upstream_worker_ports in nginx_upstreams.items():
        body = ""
        if using_unix_sockets:
            for port in upstream_worker_ports:
                body += f"    server unix:/run/worker.{port};\n"

        else:
            for port in upstream_worker_ports:
                body += f"    server localhost:{port};\n"

        # Add to the list of configured upstreams
        nginx_upstream_config += NGINX_UPSTREAM_CONFIG_BLOCK.format(
            upstream_worker_base_name=upstream_worker_base_name,
            body=body,
        )

    # Finally, we'll write out the config files.

    # log config for the master process
    master_log_config = generate_worker_log_config(environ, "master", data_dir)
    shared_config["log_config"] = master_log_config

    # Find application service registrations
    appservice_registrations = None
    appservice_registration_dir = os.environ.get("SYNAPSE_AS_REGISTRATION_DIR")
    if appservice_registration_dir:
        # Scan for all YAML files that should be application service registrations.
        appservice_registrations = [
            str(reg_path.resolve())
            for reg_path in Path(appservice_registration_dir).iterdir()
            if reg_path.suffix.lower() in (".yaml", ".yml")
        ]

    workers_in_use = len(requested_worker_types) > 0

    # If there are workers, add the main process to the instance_map too.
    if workers_in_use:
        instance_map = shared_config.setdefault("instance_map", {})
        if using_unix_sockets:
            instance_map[MAIN_PROCESS_INSTANCE_NAME] = {
                "path": MAIN_PROCESS_UNIX_SOCKET_PRIVATE_PATH,
            }
        else:
            instance_map[MAIN_PROCESS_INSTANCE_NAME] = {
                "host": MAIN_PROCESS_LOCALHOST_ADDRESS,
                "port": MAIN_PROCESS_REPLICATION_PORT,
            }

    # Shared homeserver config
    convert(
        "/conf/shared.yaml.j2",
        "/conf/workers/shared.yaml",
        shared_worker_config=yaml.dump(shared_config),
        appservice_registrations=appservice_registrations,
        enable_redis=workers_in_use,
        workers_in_use=workers_in_use,
        using_unix_sockets=using_unix_sockets,
    )

    # Nginx config
    convert(
        "/conf/nginx.conf.j2",
        "/etc/nginx/conf.d/matrix-synapse.conf",
        worker_locations=nginx_location_config,
        upstream_directives=nginx_upstream_config,
        tls_cert_path=os.environ.get("SYNAPSE_TLS_CERT"),
        tls_key_path=os.environ.get("SYNAPSE_TLS_KEY"),
        using_unix_sockets=using_unix_sockets,
    )

    # Supervisord config
    os.makedirs("/etc/supervisor", exist_ok=True)
    convert(
        "/conf/supervisord.conf.j2",
        "/etc/supervisor/supervisord.conf",
        main_config_path=config_path,
        enable_redis=workers_in_use,
        using_unix_sockets=using_unix_sockets,
    )

    convert(
        "/conf/synapse.supervisord.conf.j2",
        "/etc/supervisor/conf.d/synapse.conf",
        workers=worker_descriptors,
        main_config_path=config_path,
        use_forking_launcher=environ.get("SYNAPSE_USE_EXPERIMENTAL_FORKING_LAUNCHER"),
    )

    # healthcheck config
    convert(
        "/conf/healthcheck.sh.j2",
        "/healthcheck.sh",
        healthcheck_urls=healthcheck_urls,
    )

    # Ensure the logging directory exists
    log_dir = data_dir + "/logs"
    if not os.path.exists(log_dir):
        os.mkdir(log_dir)


def generate_worker_log_config(
    environ: Mapping[str, str], worker_name: str, data_dir: str
) -> str:
    """Generate a log.config file for the given worker.

    Returns: the path to the generated file
    """
    # Check whether we should write worker logs to disk, in addition to the console
    extra_log_template_args: Dict[str, Optional[str]] = {}
    if environ.get("SYNAPSE_WORKERS_WRITE_LOGS_TO_DISK"):
        extra_log_template_args["LOG_FILE_PATH"] = f"{data_dir}/logs/{worker_name}.log"

    extra_log_template_args["SYNAPSE_LOG_LEVEL"] = environ.get("SYNAPSE_LOG_LEVEL")
    extra_log_template_args["SYNAPSE_LOG_SENSITIVE"] = environ.get(
        "SYNAPSE_LOG_SENSITIVE"
    )
    extra_log_template_args["SYNAPSE_LOG_TESTING"] = environ.get("SYNAPSE_LOG_TESTING")

    # Render and write the file
    log_config_filepath = f"/conf/workers/{worker_name}.log.config"
    convert(
        "/conf/log.config",
        log_config_filepath,
        worker_name=worker_name,
        **extra_log_template_args,
        include_worker_name_in_log_line=environ.get(
            "SYNAPSE_USE_EXPERIMENTAL_FORKING_LAUNCHER"
        ),
    )
    return log_config_filepath


def main(args: List[str], environ: MutableMapping[str, str]) -> None:
    config_dir = environ.get("SYNAPSE_CONFIG_DIR", "/data")
    config_path = environ.get("SYNAPSE_CONFIG_PATH", config_dir + "/homeserver.yaml")
    data_dir = environ.get("SYNAPSE_DATA_DIR", "/data")

    # override SYNAPSE_NO_TLS, we don't support TLS in worker mode,
    # this needs to be handled by a frontend proxy
    environ["SYNAPSE_NO_TLS"] = "yes"

    # Generate the base homeserver config if one does not yet exist
    if not os.path.exists(config_path):
        log("Generating base homeserver config")
        generate_base_homeserver_config()
    else:
        log("Base homeserver config exists—not regenerating")
    # This script may be run multiple times (mostly by Complement, see note at top of
    # file). Don't re-configure workers in this instance.
    mark_filepath = "/conf/workers_have_been_configured"
    if not os.path.exists(mark_filepath):
        # Collect and validate worker_type requests
        # Read the desired worker configuration from the environment
        worker_types_env = environ.get("SYNAPSE_WORKER_TYPES", "").strip()
        # Only process worker_types if they exist
        if not worker_types_env:
            # No workers, just the main process
            worker_types = []
            requested_worker_types: Dict[str, Any] = {}
        else:
            # Split type names by comma, ignoring whitespace.
            worker_types = split_and_strip_string(worker_types_env, ",")
            requested_worker_types = parse_worker_types(worker_types)

        # Always regenerate all other config files
        log("Generating worker config files")
        generate_worker_files(environ, config_path, data_dir, requested_worker_types)

        # Mark workers as being configured
        with open(mark_filepath, "w") as f:
            f.write("")
    else:
        log("Worker config exists—not regenerating")

    # Lifted right out of start.py
    jemallocpath = "/usr/lib/%s-linux-gnu/libjemalloc.so.2" % (platform.machine(),)

    if os.path.isfile(jemallocpath):
        environ["LD_PRELOAD"] = jemallocpath
    else:
        log("Could not find %s, will not use" % (jemallocpath,))

    # Start supervisord, which will start Synapse, all of the configured worker
    # processes, redis, nginx etc. according to the config we created above.
    log("Starting supervisord")
    flush_buffers()
    os.execle(
        "/usr/local/bin/supervisord",
        "supervisord",
        "-c",
        "/etc/supervisor/supervisord.conf",
        environ,
    )


if __name__ == "__main__":
    main(sys.argv, os.environ)