diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index ccfe75eaf3..6567fb6bb0 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -15,7 +15,7 @@
import argparse
import logging
-from typing import Any, Dict, List, Union
+from typing import Any, Dict, List, Optional, Union
import attr
from pydantic import BaseModel, Extra, StrictBool, StrictInt, StrictStr
@@ -94,7 +94,7 @@ class ConfigModel(BaseModel):
allow_mutation = False
-class InstanceLocationConfig(ConfigModel):
+class InstanceTcpLocationConfig(ConfigModel):
"""The host and port to talk to an instance via HTTP replication."""
host: StrictStr
@@ -110,6 +110,23 @@ class InstanceLocationConfig(ConfigModel):
return f"{self.host}:{self.port}"
+class InstanceUnixLocationConfig(ConfigModel):
+ """The socket file to talk to an instance via HTTP replication."""
+
+ path: StrictStr
+
+ def scheme(self) -> str:
+ """Hardcode a retrievable scheme"""
+ return "unix"
+
+ def netloc(self) -> str:
+ """Nicely format the address location data"""
+ return f"{self.path}"
+
+
+InstanceLocationConfig = Union[InstanceTcpLocationConfig, InstanceUnixLocationConfig]
+
+
@attr.s
class WriterLocations:
"""Specifies the instances that write various streams.
@@ -154,6 +171,27 @@ class WriterLocations:
)
+@attr.s(auto_attribs=True)
+class OutboundFederationRestrictedTo:
+ """Whether we limit outbound federation to a certain set of instances.
+
+ Attributes:
+ instances: optional list of instances that can make outbound federation
+ requests. If None then all instances can make federation requests.
+ locations: list of instance locations to connect to proxy via.
+ """
+
+ instances: Optional[List[str]]
+ locations: List[InstanceLocationConfig] = attr.Factory(list)
+
+ def __contains__(self, instance: str) -> bool:
+ # It feels a bit dirty to return `True` if `instances` is `None`, but it makes
+ # sense in downstream usage in the sense that if
+ # `outbound_federation_restricted_to` is not configured, then any instance can
+ # talk to federation (no restrictions so always return `True`).
+ return self.instances is None or instance in self.instances
+
+
class WorkerConfig(Config):
"""The workers are processes run separately to the main synapse process.
They have their own pid_file and listener configuration. They use the
@@ -270,9 +308,12 @@ class WorkerConfig(Config):
% MAIN_PROCESS_INSTANCE_MAP_NAME
)
+ # type-ignore: the expression `Union[A, B]` is not a Type[Union[A, B]] currently
self.instance_map: Dict[
str, InstanceLocationConfig
- ] = parse_and_validate_mapping(instance_map, InstanceLocationConfig)
+ ] = parse_and_validate_mapping(
+ instance_map, InstanceLocationConfig # type: ignore[arg-type]
+ )
# Map from type of streams to source, c.f. WriterLocations.
writers = config.get("stream_writers") or {}
@@ -365,6 +406,28 @@ class WorkerConfig(Config):
new_option_name="update_user_directory_from_worker",
)
+ outbound_federation_restricted_to = config.get(
+ "outbound_federation_restricted_to", None
+ )
+ self.outbound_federation_restricted_to = OutboundFederationRestrictedTo(
+ outbound_federation_restricted_to
+ )
+ if outbound_federation_restricted_to:
+ if not self.worker_replication_secret:
+ raise ConfigError(
+ "`worker_replication_secret` must be configured when using `outbound_federation_restricted_to`."
+ )
+
+ for instance in outbound_federation_restricted_to:
+ if instance not in self.instance_map:
+ raise ConfigError(
+ "Instance %r is configured in 'outbound_federation_restricted_to' but does not appear in `instance_map` config."
+ % (instance,)
+ )
+ self.outbound_federation_restricted_to.locations.append(
+ self.instance_map[instance]
+ )
+
def _should_this_worker_perform_duty(
self,
config: Dict[str, Any],
|