summary refs log tree commit diff
path: root/synapse/config
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-07-16 14:06:28 +0100
committerGitHub <noreply@github.com>2020-07-16 14:06:28 +0100
commit649a7ead5c4bd2d8b7c486ac1a68ce4e41d49767 (patch)
tree2a3ee5836578e99c15a8c342934c6e7d056563a2 /synapse/config
parentMerge pull request #7866 from matrix-org/rav/fix_guest_user_id (diff)
downloadsynapse-649a7ead5c4bd2d8b7c486ac1a68ce4e41d49767.tar.xz
Add ability to run multiple pusher instances (#7855)
This reuses the same scheme as federation sender sharding
Diffstat (limited to 'synapse/config')
-rw-r--r--synapse/config/_base.py38
-rw-r--r--synapse/config/_base.pyi5
-rw-r--r--synapse/config/federation.py37
-rw-r--r--synapse/config/push.py5
4 files changed, 48 insertions, 37 deletions
diff --git a/synapse/config/_base.py b/synapse/config/_base.py
index 1391e5fc43..fd137853b1 100644
--- a/synapse/config/_base.py
+++ b/synapse/config/_base.py
@@ -19,9 +19,11 @@ import argparse
 import errno
 import os
 from collections import OrderedDict
+from hashlib import sha256
 from textwrap import dedent
-from typing import Any, MutableMapping, Optional
+from typing import Any, List, MutableMapping, Optional
 
+import attr
 import yaml
 
 
@@ -717,4 +719,36 @@ def find_config_files(search_paths):
     return config_files
 
 
-__all__ = ["Config", "RootConfig"]
+@attr.s
+class ShardedWorkerHandlingConfig:
+    """Algorithm for choosing which instance is responsible for handling some
+    sharded work.
+
+    For example, the federation senders use this to determine which instances
+    handles sending stuff to a given destination (which is used as the `key`
+    below).
+    """
+
+    instances = attr.ib(type=List[str])
+
+    def should_handle(self, instance_name: str, key: str) -> bool:
+        """Whether this instance is responsible for handling the given key.
+        """
+
+        # If multiple instances are not defined we always return true.
+        if not self.instances or len(self.instances) == 1:
+            return True
+
+        # We shard by taking the hash, modulo it by the number of instances and
+        # then checking whether this instance matches the instance at that
+        # index.
+        #
+        # (Technically this introduces some bias and is not entirely uniform,
+        # but since the hash is so large the bias is ridiculously small).
+        dest_hash = sha256(key.encode("utf8")).digest()
+        dest_int = int.from_bytes(dest_hash, byteorder="little")
+        remainder = dest_int % (len(self.instances))
+        return self.instances[remainder] == instance_name
+
+
+__all__ = ["Config", "RootConfig", "ShardedWorkerHandlingConfig"]
diff --git a/synapse/config/_base.pyi b/synapse/config/_base.pyi
index 9e576060d4..eb911e8f9f 100644
--- a/synapse/config/_base.pyi
+++ b/synapse/config/_base.pyi
@@ -137,3 +137,8 @@ class Config:
 
 def read_config_files(config_files: List[str]): ...
 def find_config_files(search_paths: List[str]): ...
+
+class ShardedWorkerHandlingConfig:
+    instances: List[str]
+    def __init__(self, instances: List[str]) -> None: ...
+    def should_handle(self, instance_name: str, key: str) -> bool: ...
diff --git a/synapse/config/federation.py b/synapse/config/federation.py
index 7782ab4c9d..82ff9664de 100644
--- a/synapse/config/federation.py
+++ b/synapse/config/federation.py
@@ -13,42 +13,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from hashlib import sha256
-from typing import List, Optional
+from typing import Optional
 
-import attr
 from netaddr import IPSet
 
-from ._base import Config, ConfigError
-
-
-@attr.s
-class ShardedFederationSendingConfig:
-    """Algorithm for choosing which federation sender instance is responsible
-    for which destionation host.
-    """
-
-    instances = attr.ib(type=List[str])
-
-    def should_send_to(self, instance_name: str, destination: str) -> bool:
-        """Whether this instance is responsible for sending transcations for
-        the given host.
-        """
-
-        # If multiple federation senders are not defined we always return true.
-        if not self.instances or len(self.instances) == 1:
-            return True
-
-        # We shard by taking the hash, modulo it by the number of federation
-        # senders and then checking whether this instance matches the instance
-        # at that index.
-        #
-        # (Technically this introduces some bias and is not entirely uniform, but
-        # since the hash is so large the bias is ridiculously small).
-        dest_hash = sha256(destination.encode("utf8")).digest()
-        dest_int = int.from_bytes(dest_hash, byteorder="little")
-        remainder = dest_int % (len(self.instances))
-        return self.instances[remainder] == instance_name
+from ._base import Config, ConfigError, ShardedWorkerHandlingConfig
 
 
 class FederationConfig(Config):
@@ -61,7 +30,7 @@ class FederationConfig(Config):
         self.send_federation = config.get("send_federation", True)
 
         federation_sender_instances = config.get("federation_sender_instances") or []
-        self.federation_shard_config = ShardedFederationSendingConfig(
+        self.federation_shard_config = ShardedWorkerHandlingConfig(
             federation_sender_instances
         )
 
diff --git a/synapse/config/push.py b/synapse/config/push.py
index 6f2b3a7faa..a1f3752c8a 100644
--- a/synapse/config/push.py
+++ b/synapse/config/push.py
@@ -14,7 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import Config
+from ._base import Config, ShardedWorkerHandlingConfig
 
 
 class PushConfig(Config):
@@ -24,6 +24,9 @@ class PushConfig(Config):
         push_config = config.get("push", {})
         self.push_include_content = push_config.get("include_content", True)
 
+        pusher_instances = config.get("pusher_instances") or []
+        self.pusher_shard_config = ShardedWorkerHandlingConfig(pusher_instances)
+
         # There was a a 'redact_content' setting but mistakenly read from the
         # 'email'section'. Check for the flag in the 'push' section, and log,
         # but do not honour it to avoid nasty surprises when people upgrade.