diff --git a/synapse/config/_base.py b/synapse/config/_base.py
index 1391e5fc43..fd137853b1 100644
--- a/synapse/config/_base.py
+++ b/synapse/config/_base.py
@@ -19,9 +19,11 @@ import argparse
import errno
import os
from collections import OrderedDict
+from hashlib import sha256
from textwrap import dedent
-from typing import Any, MutableMapping, Optional
+from typing import Any, List, MutableMapping, Optional
+import attr
import yaml
@@ -717,4 +719,36 @@ def find_config_files(search_paths):
return config_files
-__all__ = ["Config", "RootConfig"]
+@attr.s
+class ShardedWorkerHandlingConfig:
+ """Algorithm for choosing which instance is responsible for handling some
+ sharded work.
+
+ For example, the federation senders use this to determine which instances
+ handles sending stuff to a given destination (which is used as the `key`
+ below).
+ """
+
+ instances = attr.ib(type=List[str])
+
+ def should_handle(self, instance_name: str, key: str) -> bool:
+ """Whether this instance is responsible for handling the given key.
+ """
+
+ # If multiple instances are not defined we always return true.
+ if not self.instances or len(self.instances) == 1:
+ return True
+
+ # We shard by taking the hash, modulo it by the number of instances and
+ # then checking whether this instance matches the instance at that
+ # index.
+ #
+ # (Technically this introduces some bias and is not entirely uniform,
+ # but since the hash is so large the bias is ridiculously small).
+ dest_hash = sha256(key.encode("utf8")).digest()
+ dest_int = int.from_bytes(dest_hash, byteorder="little")
+ remainder = dest_int % (len(self.instances))
+ return self.instances[remainder] == instance_name
+
+
+__all__ = ["Config", "RootConfig", "ShardedWorkerHandlingConfig"]
diff --git a/synapse/config/_base.pyi b/synapse/config/_base.pyi
index 9e576060d4..eb911e8f9f 100644
--- a/synapse/config/_base.pyi
+++ b/synapse/config/_base.pyi
@@ -137,3 +137,8 @@ class Config:
def read_config_files(config_files: List[str]): ...
def find_config_files(search_paths: List[str]): ...
+
+class ShardedWorkerHandlingConfig:
+ instances: List[str]
+ def __init__(self, instances: List[str]) -> None: ...
+ def should_handle(self, instance_name: str, key: str) -> bool: ...
diff --git a/synapse/config/federation.py b/synapse/config/federation.py
index 7782ab4c9d..82ff9664de 100644
--- a/synapse/config/federation.py
+++ b/synapse/config/federation.py
@@ -13,42 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from hashlib import sha256
-from typing import List, Optional
+from typing import Optional
-import attr
from netaddr import IPSet
-from ._base import Config, ConfigError
-
-
-@attr.s
-class ShardedFederationSendingConfig:
- """Algorithm for choosing which federation sender instance is responsible
- for which destionation host.
- """
-
- instances = attr.ib(type=List[str])
-
- def should_send_to(self, instance_name: str, destination: str) -> bool:
- """Whether this instance is responsible for sending transcations for
- the given host.
- """
-
- # If multiple federation senders are not defined we always return true.
- if not self.instances or len(self.instances) == 1:
- return True
-
- # We shard by taking the hash, modulo it by the number of federation
- # senders and then checking whether this instance matches the instance
- # at that index.
- #
- # (Technically this introduces some bias and is not entirely uniform, but
- # since the hash is so large the bias is ridiculously small).
- dest_hash = sha256(destination.encode("utf8")).digest()
- dest_int = int.from_bytes(dest_hash, byteorder="little")
- remainder = dest_int % (len(self.instances))
- return self.instances[remainder] == instance_name
+from ._base import Config, ConfigError, ShardedWorkerHandlingConfig
class FederationConfig(Config):
@@ -61,7 +30,7 @@ class FederationConfig(Config):
self.send_federation = config.get("send_federation", True)
federation_sender_instances = config.get("federation_sender_instances") or []
- self.federation_shard_config = ShardedFederationSendingConfig(
+ self.federation_shard_config = ShardedWorkerHandlingConfig(
federation_sender_instances
)
diff --git a/synapse/config/push.py b/synapse/config/push.py
index 6f2b3a7faa..a1f3752c8a 100644
--- a/synapse/config/push.py
+++ b/synapse/config/push.py
@@ -14,7 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import Config
+from ._base import Config, ShardedWorkerHandlingConfig
class PushConfig(Config):
@@ -24,6 +24,9 @@ class PushConfig(Config):
push_config = config.get("push", {})
self.push_include_content = push_config.get("include_content", True)
+ pusher_instances = config.get("pusher_instances") or []
+ self.pusher_shard_config = ShardedWorkerHandlingConfig(pusher_instances)
+
# There was a a 'redact_content' setting but mistakenly read from the
# 'email'section'. Check for the flag in the 'push' section, and log,
# but do not honour it to avoid nasty surprises when people upgrade.
|