summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-08-20 15:35:23 +0100
committerErik Johnston <erik@matrix.org>2020-08-24 15:57:15 +0100
commite894f67509827088f6bdc6fa1d530568a6fc4875 (patch)
tree76a899af8a96069df1d39dc75d5a6d715809536f
parentFix remote join predecessor race (diff)
downloadsynapse-e894f67509827088f6bdc6fa1d530568a6fc4875.tar.xz
Implement config and routing for multiple event writers
-rw-r--r--synapse/config/_base.py23
-rw-r--r--synapse/config/_base.pyi1
-rw-r--r--synapse/config/workers.py33
-rw-r--r--synapse/handlers/federation.py15
-rw-r--r--synapse/handlers/message.py14
-rw-r--r--synapse/handlers/room.py12
-rw-r--r--synapse/handlers/room_member.py7
-rw-r--r--synapse/replication/tcp/handler.py2
-rw-r--r--synapse/storage/databases/__init__.py2
-rw-r--r--synapse/storage/databases/main/event_federation.py2
-rw-r--r--synapse/storage/databases/main/events.py2
-rw-r--r--synapse/storage/databases/main/events_worker.py2
-rw-r--r--synapse/storage/databases/main/stream.py2
13 files changed, 80 insertions, 37 deletions
diff --git a/synapse/config/_base.py b/synapse/config/_base.py
index 1417487427..64ca4f0482 100644
--- a/synapse/config/_base.py
+++ b/synapse/config/_base.py
@@ -832,11 +832,28 @@ class ShardedWorkerHandlingConfig:
     def should_handle(self, instance_name: str, key: str) -> bool:
         """Whether this instance is responsible for handling the given key.
         """
-
-        # If multiple instances are not defined we always return true.
+        # If multiple instances are not defined we always return true
         if not self.instances or len(self.instances) == 1:
             return True
 
+        return self.get_instance(key) == instance_name
+
+    def get_instance(self, key: str) -> str:
+        """Get the instance responsible for handling the given key.
+
+        Note: For things like federation sending the config for which instance
+        is sending is known only to the sender instance if there is only one.
+        Therefore `should_handle` should be used where possible.
+        """
+
+        # Note: For things like federation sending the config for which instance
+        # is sending is known only to the sender instance if there is only one.
+        if not self.instances:
+            return "master"
+
+        if len(self.instances) == 1:
+            return self.instances[0]
+
         # We shard by taking the hash, modulo it by the number of instances and
         # then checking whether this instance matches the instance at that
         # index.
@@ -846,7 +863,7 @@ class ShardedWorkerHandlingConfig:
         dest_hash = sha256(key.encode("utf8")).digest()
         dest_int = int.from_bytes(dest_hash, byteorder="little")
         remainder = dest_int % (len(self.instances))
-        return self.instances[remainder] == instance_name
+        return self.instances[remainder]
 
 
 __all__ = ["Config", "RootConfig", "ShardedWorkerHandlingConfig"]
diff --git a/synapse/config/_base.pyi b/synapse/config/_base.pyi
index eb911e8f9f..b8faafa9bd 100644
--- a/synapse/config/_base.pyi
+++ b/synapse/config/_base.pyi
@@ -142,3 +142,4 @@ class ShardedWorkerHandlingConfig:
     instances: List[str]
     def __init__(self, instances: List[str]) -> None: ...
     def should_handle(self, instance_name: str, key: str) -> bool: ...
+    def get_instance(self, key: str) -> str: ...
diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index c784a71508..56e6b96bc1 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -13,12 +13,24 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from typing import List, Union
+
 import attr
 
 from ._base import Config, ConfigError, ShardedWorkerHandlingConfig
 from .server import ListenerConfig, parse_listener_def
 
 
+def _instance_to_list_converter(obj: Union[str, List[str]]) -> List[str]:
+    """Helper for allowing parsing a string or list of strings to a config
+    option expecting a list of strings.
+    """
+
+    if isinstance(obj, str):
+        return [obj]
+    return obj
+
+
 @attr.s
 class InstanceLocationConfig:
     """The host and port to talk to an instance via HTTP replication.
@@ -33,11 +45,13 @@ class WriterLocations:
     """Specifies the instances that write various streams.
 
     Attributes:
-        events: The instance that writes to the event and backfill streams.
+        events: The instances that write to the event and backfill streams.
         events: The instance that writes to the typing stream.
     """
 
-    events = attr.ib(default="master", type=str)
+    events = attr.ib(
+        default=["master"], type=List[str], converter=_instance_to_list_converter
+    )
     typing = attr.ib(default="master", type=str)
 
 
@@ -108,12 +122,15 @@ class WorkerConfig(Config):
         # Check that the configured writer for events and typing also appears in
         # `instance_map`.
         for stream in ("events", "typing"):
-            instance = getattr(self.writers, stream)
-            if instance != "master" and instance not in self.instance_map:
-                raise ConfigError(
-                    "Instance %r is configured to write %s but does not appear in `instance_map` config."
-                    % (instance, stream)
-                )
+            instances = _instance_to_list_converter(getattr(self.writers, stream))
+            for instance in instances:
+                if instance != "master" and instance not in self.instance_map:
+                    raise ConfigError(
+                        "Instance %r is configured to write %s but does not appear in `instance_map` config."
+                        % (instance, stream)
+                    )
+
+        self.events_shard_config = ShardedWorkerHandlingConfig(self.writers.events)
 
     def generate_config_section(self, config_dir_path, server_name, **kwargs):
         return """\
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 8391728bdb..54e3889a00 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -918,7 +918,8 @@ class FederationHandler(BaseHandler):
                 )
             )
 
-        await self._handle_new_events(dest, ev_infos, backfilled=True)
+        if ev_infos:
+            await self._handle_new_events(dest, ev_infos, backfilled=True)
 
         # Step 2: Persist the rest of the events in the chunk one by one
         events.sort(key=lambda e: e.depth)
@@ -1366,7 +1367,9 @@ class FederationHandler(BaseHandler):
             #
             # TODO: Currently the events stream is written to from master
             await self._replication.wait_for_stream_position(
-                self.config.worker.writers.events, "events", max_stream_id
+                self.config.worker.events_shard_config.get_instance(room_id),
+                "events",
+                max_stream_id,
             )
 
             predecessor = None
@@ -2916,9 +2919,13 @@ class FederationHandler(BaseHandler):
             backfilled: Whether these events are a result of
                 backfilling or not
         """
-        if self.config.worker.writers.events != self._instance_name:
+        # FIXME:
+        instance = self.config.worker.events_shard_config.get_instance(
+            event_and_contexts[0][0].room_id
+        )
+        if instance != self._instance_name:
             result = await self._send_events(
-                instance_name=self.config.worker.writers.events,
+                instance_name=instance,
                 store=self.store,
                 event_and_contexts=event_and_contexts,
                 backfilled=backfilled,
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index c955a86be0..56dd70c03e 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -377,9 +377,8 @@ class EventCreationHandler(object):
         self.notifier = hs.get_notifier()
         self.config = hs.config
         self.require_membership_for_aliases = hs.config.require_membership_for_aliases
-        self._is_event_writer = (
-            self.config.worker.writers.events == hs.get_instance_name()
-        )
+        self._events_shard_config = self.config.worker.events_shard_config
+        self._instance_name = hs.get_instance_name()
 
         self.room_invite_state_types = self.hs.config.room_invite_state_types
 
@@ -874,9 +873,10 @@ class EventCreationHandler(object):
 
         try:
             # If we're a worker we need to hit out to the master.
-            if not self._is_event_writer:
+            writer_instance = self._events_shard_config.get_instance(event.room_id)
+            if writer_instance != self._instance_name:
                 result = await self.send_event(
-                    instance_name=self.config.worker.writers.events,
+                    instance_name=writer_instance,
                     event_id=event.event_id,
                     store=self.store,
                     requester=requester,
@@ -944,7 +944,9 @@ class EventCreationHandler(object):
 
         This should only be run on the instance in charge of persisting events.
         """
-        assert self._is_event_writer
+        assert self._events_shard_config.should_handle(
+            self._instance_name, event.room_id
+        )
 
         if ratelimit:
             # We check if this is a room admin redacting an event so that we
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 442cca28e6..c1a5e0115a 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -776,7 +776,9 @@ class RoomCreationHandler(BaseHandler):
 
         # Always wait for room creation to progate before returning
         await self._replication.wait_for_stream_position(
-            self.hs.config.worker.writers.events, "events", last_stream_id
+            self.hs.config.worker.events_shard_config.get_instance(room_id),
+            "events",
+            last_stream_id,
         )
 
         return result, last_stream_id
@@ -1233,7 +1235,9 @@ class RoomShutdownHandler(object):
             #
             # TODO: Currently the events stream is written to from master
             await self._replication.wait_for_stream_position(
-                self.hs.config.worker.writers.events, "events", stream_id
+                self.hs.config.worker.events_shard_config.get_instance(new_room_id),
+                "events",
+                stream_id,
             )
         else:
             new_room_id = None
@@ -1263,7 +1267,9 @@ class RoomShutdownHandler(object):
 
                 # Wait for leave to come in over replication before trying to forget.
                 await self._replication.wait_for_stream_position(
-                    self.hs.config.worker.writers.events, "events", stream_id
+                    self.hs.config.worker.events_shard_config.get_instance(room_id),
+                    "events",
+                    stream_id,
                 )
 
                 await self.room_member_handler.forget(target_requester.user, room_id)
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index aa1ccde211..0d7e12b3a0 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -75,13 +75,6 @@ class RoomMemberHandler(object):
         self._enable_lookup = hs.config.enable_3pid_lookup
         self.allow_per_room_profiles = self.config.allow_per_room_profiles
 
-        self._event_stream_writer_instance = hs.config.worker.writers.events
-        self._is_on_event_persistence_instance = (
-            self._event_stream_writer_instance == hs.get_instance_name()
-        )
-        if self._is_on_event_persistence_instance:
-            self.persist_event_storage = hs.get_storage().persistence
-
         self._join_rate_limiter_local = Ratelimiter(
             clock=self.clock,
             rate_hz=hs.config.ratelimiting.rc_joins_local.per_second,
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 1c303f3a46..b323841f73 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -109,7 +109,7 @@ class ReplicationCommandHandler:
             if isinstance(stream, (EventsStream, BackfillStream)):
                 # Only add EventStream and BackfillStream as a source on the
                 # instance in charge of event persistence.
-                if hs.config.worker.writers.events == hs.get_instance_name():
+                if hs.get_instance_name() in hs.config.worker.writers.events:
                     self._streams_to_replicate.append(stream)
 
                 continue
diff --git a/synapse/storage/databases/__init__.py b/synapse/storage/databases/__init__.py
index 0ac854aee2..c73d54fb67 100644
--- a/synapse/storage/databases/__init__.py
+++ b/synapse/storage/databases/__init__.py
@@ -68,7 +68,7 @@ class Databases(object):
 
                     # If we're on a process that can persist events also
                     # instantiate a `PersistEventsStore`
-                    if hs.config.worker.writers.events == hs.get_instance_name():
+                    if hs.get_instance_name() in hs.config.worker.writers.events:
                         persist_events = PersistEventsStore(hs, database, main)
 
                 if "state" in database_config.databases:
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index 4826be630c..16f09fe531 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -440,7 +440,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
         """
 
         if stream_ordering <= self.stream_ordering_month_ago:
-            raise StoreError(400, "stream_ordering too old")
+            raise StoreError(400, "stream_ordering too old %s" % (stream_ordering,))
 
         sql = """
                 SELECT event_id FROM stream_ordering_to_exterm
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 989372193f..bf2afc3dd7 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -110,7 +110,7 @@ class PersistEventsStore:
 
         # This should only exist on instances that are configured to write
         assert (
-            hs.config.worker.writers.events == hs.get_instance_name()
+            hs.get_instance_name() in hs.config.worker.writers.events
         ), "Can only instantiate EventsStore on master"
 
     async def _persist_events_and_state_updates(
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 8f46e04fa6..bda189b923 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -79,7 +79,7 @@ class EventsWorkerStore(SQLBaseStore):
     def __init__(self, database: DatabasePool, db_conn, hs):
         super(EventsWorkerStore, self).__init__(database, db_conn, hs)
 
-        if hs.config.worker.writers.events == hs.get_instance_name():
+        if hs.get_instance_name() in hs.config.worker.writers.events:
             # We are the process in charge of generating stream ids for events,
             # so instantiate ID generators based on the database
             if isinstance(database.engine, PostgresEngine):
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 497f607703..1ed7b4b2f7 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -1084,4 +1084,4 @@ class StreamStore(StreamWorkerStore):
         return self._stream_id_gen.get_current_token()
 
     def get_room_min_stream_ordering(self) -> int:
-        return self._backfill_id_gen.get_current_token()
+        return -self._backfill_id_gen.get_current_token()