diff --git a/synapse/config/_base.py b/synapse/config/_base.py
index 1417487427..64ca4f0482 100644
--- a/synapse/config/_base.py
+++ b/synapse/config/_base.py
@@ -832,11 +832,28 @@ class ShardedWorkerHandlingConfig:
def should_handle(self, instance_name: str, key: str) -> bool:
"""Whether this instance is responsible for handling the given key.
"""
-
- # If multiple instances are not defined we always return true.
+ # If multiple instances are not defined we always return true
if not self.instances or len(self.instances) == 1:
return True
+ return self.get_instance(key) == instance_name
+
+ def get_instance(self, key: str) -> str:
+ """Get the instance responsible for handling the given key.
+
+ Note: For things like federation sending the config for which instance
+ is sending is known only to the sender instance if there is only one.
+ Therefore `should_handle` should be used where possible.
+ """
+
+ # Note: For things like federation sending the config for which instance
+ # is sending is known only to the sender instance if there is only one.
+ if not self.instances:
+ return "master"
+
+ if len(self.instances) == 1:
+ return self.instances[0]
+
# We shard by taking the hash, modulo it by the number of instances and
# then checking whether this instance matches the instance at that
# index.
@@ -846,7 +863,7 @@ class ShardedWorkerHandlingConfig:
dest_hash = sha256(key.encode("utf8")).digest()
dest_int = int.from_bytes(dest_hash, byteorder="little")
remainder = dest_int % (len(self.instances))
- return self.instances[remainder] == instance_name
+ return self.instances[remainder]
__all__ = ["Config", "RootConfig", "ShardedWorkerHandlingConfig"]
diff --git a/synapse/config/_base.pyi b/synapse/config/_base.pyi
index eb911e8f9f..b8faafa9bd 100644
--- a/synapse/config/_base.pyi
+++ b/synapse/config/_base.pyi
@@ -142,3 +142,4 @@ class ShardedWorkerHandlingConfig:
instances: List[str]
def __init__(self, instances: List[str]) -> None: ...
def should_handle(self, instance_name: str, key: str) -> bool: ...
+ def get_instance(self, key: str) -> str: ...
diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index c784a71508..56e6b96bc1 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -13,12 +13,24 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from typing import List, Union
+
import attr
from ._base import Config, ConfigError, ShardedWorkerHandlingConfig
from .server import ListenerConfig, parse_listener_def
+def _instance_to_list_converter(obj: Union[str, List[str]]) -> List[str]:
+ """Helper for allowing parsing a string or list of strings to a config
+ option expecting a list of strings.
+ """
+
+ if isinstance(obj, str):
+ return [obj]
+ return obj
+
+
@attr.s
class InstanceLocationConfig:
"""The host and port to talk to an instance via HTTP replication.
@@ -33,11 +45,13 @@ class WriterLocations:
"""Specifies the instances that write various streams.
Attributes:
- events: The instance that writes to the event and backfill streams.
+ events: The instances that write to the event and backfill streams.
events: The instance that writes to the typing stream.
"""
- events = attr.ib(default="master", type=str)
+ events = attr.ib(
+ default=["master"], type=List[str], converter=_instance_to_list_converter
+ )
typing = attr.ib(default="master", type=str)
@@ -108,12 +122,15 @@ class WorkerConfig(Config):
# Check that the configured writer for events and typing also appears in
# `instance_map`.
for stream in ("events", "typing"):
- instance = getattr(self.writers, stream)
- if instance != "master" and instance not in self.instance_map:
- raise ConfigError(
- "Instance %r is configured to write %s but does not appear in `instance_map` config."
- % (instance, stream)
- )
+ instances = _instance_to_list_converter(getattr(self.writers, stream))
+ for instance in instances:
+ if instance != "master" and instance not in self.instance_map:
+ raise ConfigError(
+ "Instance %r is configured to write %s but does not appear in `instance_map` config."
+ % (instance, stream)
+ )
+
+ self.events_shard_config = ShardedWorkerHandlingConfig(self.writers.events)
def generate_config_section(self, config_dir_path, server_name, **kwargs):
return """\
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 8391728bdb..54e3889a00 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -918,7 +918,8 @@ class FederationHandler(BaseHandler):
)
)
- await self._handle_new_events(dest, ev_infos, backfilled=True)
+ if ev_infos:
+ await self._handle_new_events(dest, ev_infos, backfilled=True)
# Step 2: Persist the rest of the events in the chunk one by one
events.sort(key=lambda e: e.depth)
@@ -1366,7 +1367,9 @@ class FederationHandler(BaseHandler):
#
# TODO: Currently the events stream is written to from master
await self._replication.wait_for_stream_position(
- self.config.worker.writers.events, "events", max_stream_id
+ self.config.worker.events_shard_config.get_instance(room_id),
+ "events",
+ max_stream_id,
)
predecessor = None
@@ -2916,9 +2919,13 @@ class FederationHandler(BaseHandler):
backfilled: Whether these events are a result of
backfilling or not
"""
- if self.config.worker.writers.events != self._instance_name:
+ # FIXME:
+ instance = self.config.worker.events_shard_config.get_instance(
+ event_and_contexts[0][0].room_id
+ )
+ if instance != self._instance_name:
result = await self._send_events(
- instance_name=self.config.worker.writers.events,
+ instance_name=instance,
store=self.store,
event_and_contexts=event_and_contexts,
backfilled=backfilled,
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index c955a86be0..56dd70c03e 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -377,9 +377,8 @@ class EventCreationHandler(object):
self.notifier = hs.get_notifier()
self.config = hs.config
self.require_membership_for_aliases = hs.config.require_membership_for_aliases
- self._is_event_writer = (
- self.config.worker.writers.events == hs.get_instance_name()
- )
+ self._events_shard_config = self.config.worker.events_shard_config
+ self._instance_name = hs.get_instance_name()
self.room_invite_state_types = self.hs.config.room_invite_state_types
@@ -874,9 +873,10 @@ class EventCreationHandler(object):
try:
# If we're a worker we need to hit out to the master.
- if not self._is_event_writer:
+ writer_instance = self._events_shard_config.get_instance(event.room_id)
+ if writer_instance != self._instance_name:
result = await self.send_event(
- instance_name=self.config.worker.writers.events,
+ instance_name=writer_instance,
event_id=event.event_id,
store=self.store,
requester=requester,
@@ -944,7 +944,9 @@ class EventCreationHandler(object):
This should only be run on the instance in charge of persisting events.
"""
- assert self._is_event_writer
+ assert self._events_shard_config.should_handle(
+ self._instance_name, event.room_id
+ )
if ratelimit:
# We check if this is a room admin redacting an event so that we
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 442cca28e6..c1a5e0115a 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -776,7 +776,9 @@ class RoomCreationHandler(BaseHandler):
# Always wait for room creation to progate before returning
await self._replication.wait_for_stream_position(
- self.hs.config.worker.writers.events, "events", last_stream_id
+ self.hs.config.worker.events_shard_config.get_instance(room_id),
+ "events",
+ last_stream_id,
)
return result, last_stream_id
@@ -1233,7 +1235,9 @@ class RoomShutdownHandler(object):
#
# TODO: Currently the events stream is written to from master
await self._replication.wait_for_stream_position(
- self.hs.config.worker.writers.events, "events", stream_id
+ self.hs.config.worker.events_shard_config.get_instance(new_room_id),
+ "events",
+ stream_id,
)
else:
new_room_id = None
@@ -1263,7 +1267,9 @@ class RoomShutdownHandler(object):
# Wait for leave to come in over replication before trying to forget.
await self._replication.wait_for_stream_position(
- self.hs.config.worker.writers.events, "events", stream_id
+ self.hs.config.worker.events_shard_config.get_instance(room_id),
+ "events",
+ stream_id,
)
await self.room_member_handler.forget(target_requester.user, room_id)
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index aa1ccde211..0d7e12b3a0 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -75,13 +75,6 @@ class RoomMemberHandler(object):
self._enable_lookup = hs.config.enable_3pid_lookup
self.allow_per_room_profiles = self.config.allow_per_room_profiles
- self._event_stream_writer_instance = hs.config.worker.writers.events
- self._is_on_event_persistence_instance = (
- self._event_stream_writer_instance == hs.get_instance_name()
- )
- if self._is_on_event_persistence_instance:
- self.persist_event_storage = hs.get_storage().persistence
-
self._join_rate_limiter_local = Ratelimiter(
clock=self.clock,
rate_hz=hs.config.ratelimiting.rc_joins_local.per_second,
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 1c303f3a46..b323841f73 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -109,7 +109,7 @@ class ReplicationCommandHandler:
if isinstance(stream, (EventsStream, BackfillStream)):
# Only add EventStream and BackfillStream as a source on the
# instance in charge of event persistence.
- if hs.config.worker.writers.events == hs.get_instance_name():
+ if hs.get_instance_name() in hs.config.worker.writers.events:
self._streams_to_replicate.append(stream)
continue
diff --git a/synapse/storage/databases/__init__.py b/synapse/storage/databases/__init__.py
index 0ac854aee2..c73d54fb67 100644
--- a/synapse/storage/databases/__init__.py
+++ b/synapse/storage/databases/__init__.py
@@ -68,7 +68,7 @@ class Databases(object):
# If we're on a process that can persist events also
# instantiate a `PersistEventsStore`
- if hs.config.worker.writers.events == hs.get_instance_name():
+ if hs.get_instance_name() in hs.config.worker.writers.events:
persist_events = PersistEventsStore(hs, database, main)
if "state" in database_config.databases:
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index 4826be630c..16f09fe531 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -440,7 +440,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
"""
if stream_ordering <= self.stream_ordering_month_ago:
- raise StoreError(400, "stream_ordering too old")
+ raise StoreError(400, "stream_ordering too old %s" % (stream_ordering,))
sql = """
SELECT event_id FROM stream_ordering_to_exterm
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 989372193f..bf2afc3dd7 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -110,7 +110,7 @@ class PersistEventsStore:
# This should only exist on instances that are configured to write
assert (
- hs.config.worker.writers.events == hs.get_instance_name()
+ hs.get_instance_name() in hs.config.worker.writers.events
), "Can only instantiate EventsStore on master"
async def _persist_events_and_state_updates(
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 8f46e04fa6..bda189b923 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -79,7 +79,7 @@ class EventsWorkerStore(SQLBaseStore):
def __init__(self, database: DatabasePool, db_conn, hs):
super(EventsWorkerStore, self).__init__(database, db_conn, hs)
- if hs.config.worker.writers.events == hs.get_instance_name():
+ if hs.get_instance_name() in hs.config.worker.writers.events:
# We are the process in charge of generating stream ids for events,
# so instantiate ID generators based on the database
if isinstance(database.engine, PostgresEngine):
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 497f607703..1ed7b4b2f7 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -1084,4 +1084,4 @@ class StreamStore(StreamWorkerStore):
return self._stream_id_gen.get_current_token()
def get_room_min_stream_ordering(self) -> int:
- return self._backfill_id_gen.get_current_token()
+ return -self._backfill_id_gen.get_current_token()
|