Add ability to run multiple pusher instances (#7855)
This reuses the same scheme as federation sender sharding
1 files changed, 3 insertions, 34 deletions
diff --git a/synapse/config/federation.py b/synapse/config/federation.py
index 7782ab4c9d..82ff9664de 100644
--- a/synapse/config/federation.py
+++ b/synapse/config/federation.py
@@ -13,42 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from hashlib import sha256
-from typing import List, Optional
+from typing import Optional
-import attr
from netaddr import IPSet
-from ._base import Config, ConfigError
-
-
-@attr.s
-class ShardedFederationSendingConfig:
- """Algorithm for choosing which federation sender instance is responsible
- for which destionation host.
- """
-
- instances = attr.ib(type=List[str])
-
- def should_send_to(self, instance_name: str, destination: str) -> bool:
- """Whether this instance is responsible for sending transcations for
- the given host.
- """
-
- # If multiple federation senders are not defined we always return true.
- if not self.instances or len(self.instances) == 1:
- return True
-
- # We shard by taking the hash, modulo it by the number of federation
- # senders and then checking whether this instance matches the instance
- # at that index.
- #
- # (Technically this introduces some bias and is not entirely uniform, but
- # since the hash is so large the bias is ridiculously small).
- dest_hash = sha256(destination.encode("utf8")).digest()
- dest_int = int.from_bytes(dest_hash, byteorder="little")
- remainder = dest_int % (len(self.instances))
- return self.instances[remainder] == instance_name
+from ._base import Config, ConfigError, ShardedWorkerHandlingConfig
class FederationConfig(Config):
@@ -61,7 +30,7 @@ class FederationConfig(Config):
self.send_federation = config.get("send_federation", True)
federation_sender_instances = config.get("federation_sender_instances") or []
- self.federation_shard_config = ShardedFederationSendingConfig(
+ self.federation_shard_config = ShardedWorkerHandlingConfig(
federation_sender_instances
)
|