diff options
Diffstat (limited to 'synapse/config')
-rw-r--r-- | synapse/config/_base.py | 21 | ||||
-rw-r--r-- | synapse/config/_base.pyi | 1 | ||||
-rw-r--r-- | synapse/config/workers.py | 37 |
3 files changed, 13 insertions, 46 deletions
diff --git a/synapse/config/_base.py b/synapse/config/_base.py index 73f0717b0d..1417487427 100644 --- a/synapse/config/_base.py +++ b/synapse/config/_base.py @@ -832,26 +832,11 @@ class ShardedWorkerHandlingConfig: def should_handle(self, instance_name: str, key: str) -> bool: """Whether this instance is responsible for handling the given key. """ - # If multiple instances are not defined we always return true + + # If multiple instances are not defined we always return true. if not self.instances or len(self.instances) == 1: return True - return self.get_instance(key) == instance_name - - def get_instance(self, key: str) -> str: - """Get the instance responsible for handling the given key. - - Note: For things like federation sending the config for which instance - is sending is known only to the sender instance if there is only one. - Therefore `should_handle` should be used where possible. - """ - - if not self.instances: - return "master" - - if len(self.instances) == 1: - return self.instances[0] - # We shard by taking the hash, modulo it by the number of instances and # then checking whether this instance matches the instance at that # index. @@ -861,7 +846,7 @@ class ShardedWorkerHandlingConfig: dest_hash = sha256(key.encode("utf8")).digest() dest_int = int.from_bytes(dest_hash, byteorder="little") remainder = dest_int % (len(self.instances)) - return self.instances[remainder] + return self.instances[remainder] == instance_name __all__ = ["Config", "RootConfig", "ShardedWorkerHandlingConfig"] diff --git a/synapse/config/_base.pyi b/synapse/config/_base.pyi index b8faafa9bd..eb911e8f9f 100644 --- a/synapse/config/_base.pyi +++ b/synapse/config/_base.pyi @@ -142,4 +142,3 @@ class ShardedWorkerHandlingConfig: instances: List[str] def __init__(self, instances: List[str]) -> None: ... def should_handle(self, instance_name: str, key: str) -> bool: ... - def get_instance(self, key: str) -> str: ... diff --git a/synapse/config/workers.py b/synapse/config/workers.py index f23e42cdf9..c784a71508 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -13,24 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import List, Union - import attr from ._base import Config, ConfigError, ShardedWorkerHandlingConfig from .server import ListenerConfig, parse_listener_def -def _instance_to_list_converter(obj: Union[str, List[str]]) -> List[str]: - """Helper for allowing parsing a string or list of strings to a config - option expecting a list of strings. - """ - - if isinstance(obj, str): - return [obj] - return obj - - @attr.s class InstanceLocationConfig: """The host and port to talk to an instance via HTTP replication. @@ -45,13 +33,11 @@ class WriterLocations: """Specifies the instances that write various streams. Attributes: - events: The instances that write to the event and backfill streams. - typing: The instance that writes to the typing stream. + events: The instance that writes to the event and backfill streams. + events: The instance that writes to the typing stream. """ - events = attr.ib( - default=["master"], type=List[str], converter=_instance_to_list_converter - ) + events = attr.ib(default="master", type=str) typing = attr.ib(default="master", type=str) @@ -119,18 +105,15 @@ class WorkerConfig(Config): writers = config.get("stream_writers") or {} self.writers = WriterLocations(**writers) - # Check that the configured writers for events and typing also appears in + # Check that the configured writer for events and typing also appears in # `instance_map`. for stream in ("events", "typing"): - instances = _instance_to_list_converter(getattr(self.writers, stream)) - for instance in instances: - if instance != "master" and instance not in self.instance_map: - raise ConfigError( - "Instance %r is configured to write %s but does not appear in `instance_map` config." - % (instance, stream) - ) - - self.events_shard_config = ShardedWorkerHandlingConfig(self.writers.events) + instance = getattr(self.writers, stream) + if instance != "master" and instance not in self.instance_map: + raise ConfigError( + "Instance %r is configured to write %s but does not appear in `instance_map` config." + % (instance, stream) + ) def generate_config_section(self, config_dir_path, server_name, **kwargs): return """\ |