summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/federation.py44
-rw-r--r--synapse/handlers/message.py14
-rw-r--r--synapse/handlers/room.py14
-rw-r--r--synapse/handlers/room_member.py7
4 files changed, 32 insertions, 47 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 310c7f7138..43f2986f89 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -923,8 +923,7 @@ class FederationHandler(BaseHandler):
                 )
             )
 
-        if ev_infos:
-            await self._handle_new_events(dest, room_id, ev_infos, backfilled=True)
+        await self._handle_new_events(dest, ev_infos, backfilled=True)
 
         # Step 2: Persist the rest of the events in the chunk one by one
         events.sort(key=lambda e: e.depth)
@@ -1217,7 +1216,7 @@ class FederationHandler(BaseHandler):
             event_infos.append(_NewEventInfo(event, None, auth))
 
         await self._handle_new_events(
-            destination, room_id, event_infos,
+            destination, event_infos,
         )
 
     def _sanity_check_event(self, ev):
@@ -1364,15 +1363,15 @@ class FederationHandler(BaseHandler):
             )
 
             max_stream_id = await self._persist_auth_tree(
-                origin, room_id, auth_chain, state, event, room_version_obj
+                origin, auth_chain, state, event, room_version_obj
             )
 
             # We wait here until this instance has seen the events come down
             # replication (if we're using replication) as the below uses caches.
+            #
+            # TODO: Currently the events stream is written to from master
             await self._replication.wait_for_stream_position(
-                self.config.worker.events_shard_config.get_instance(room_id),
-                "events",
-                max_stream_id,
+                self.config.worker.writers.events, "events", max_stream_id
             )
 
             # Check whether this room is the result of an upgrade of a room we already know
@@ -1626,7 +1625,7 @@ class FederationHandler(BaseHandler):
         )
 
         context = await self.state_handler.compute_event_context(event)
-        await self.persist_events_and_notify(event.room_id, [(event, context)])
+        await self.persist_events_and_notify([(event, context)])
 
         return event
 
@@ -1653,9 +1652,7 @@ class FederationHandler(BaseHandler):
         await self.federation_client.send_leave(host_list, event)
 
         context = await self.state_handler.compute_event_context(event)
-        stream_id = await self.persist_events_and_notify(
-            event.room_id, [(event, context)]
-        )
+        stream_id = await self.persist_events_and_notify([(event, context)])
 
         return event, stream_id
 
@@ -1903,7 +1900,7 @@ class FederationHandler(BaseHandler):
                 )
 
             await self.persist_events_and_notify(
-                event.room_id, [(event, context)], backfilled=backfilled
+                [(event, context)], backfilled=backfilled
             )
         except Exception:
             run_in_background(
@@ -1916,7 +1913,6 @@ class FederationHandler(BaseHandler):
     async def _handle_new_events(
         self,
         origin: str,
-        room_id: str,
         event_infos: Iterable[_NewEventInfo],
         backfilled: bool = False,
     ) -> None:
@@ -1948,7 +1944,6 @@ class FederationHandler(BaseHandler):
         )
 
         await self.persist_events_and_notify(
-            room_id,
             [
                 (ev_info.event, context)
                 for ev_info, context in zip(event_infos, contexts)
@@ -1959,7 +1954,6 @@ class FederationHandler(BaseHandler):
     async def _persist_auth_tree(
         self,
         origin: str,
-        room_id: str,
         auth_events: List[EventBase],
         state: List[EventBase],
         event: EventBase,
@@ -1974,7 +1968,6 @@ class FederationHandler(BaseHandler):
 
         Args:
             origin: Where the events came from
-            room_id,
             auth_events
             state
             event
@@ -2049,20 +2042,17 @@ class FederationHandler(BaseHandler):
                 events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
 
         await self.persist_events_and_notify(
-            room_id,
             [
                 (e, events_to_context[e.event_id])
                 for e in itertools.chain(auth_events, state)
-            ],
+            ]
         )
 
         new_event_context = await self.state_handler.compute_event_context(
             event, old_state=state
         )
 
-        return await self.persist_events_and_notify(
-            room_id, [(event, new_event_context)]
-        )
+        return await self.persist_events_and_notify([(event, new_event_context)])
 
     async def _prep_event(
         self,
@@ -2913,7 +2903,6 @@ class FederationHandler(BaseHandler):
 
     async def persist_events_and_notify(
         self,
-        room_id: str,
         event_and_contexts: Sequence[Tuple[EventBase, EventContext]],
         backfilled: bool = False,
     ) -> int:
@@ -2921,19 +2910,14 @@ class FederationHandler(BaseHandler):
         necessary.
 
         Args:
-            room_id: The room ID of events being persisted.
-            event_and_contexts: Sequence of events with their associated
-                context that should be persisted. All events must belong to
-                the same room.
+            event_and_contexts:
             backfilled: Whether these events are a result of
                 backfilling or not
         """
-        instance = self.config.worker.events_shard_config.get_instance(room_id)
-        if instance != self._instance_name:
+        if self.config.worker.writers.events != self._instance_name:
             result = await self._send_events(
-                instance_name=instance,
+                instance_name=self.config.worker.writers.events,
                 store=self.store,
-                room_id=room_id,
                 event_and_contexts=event_and_contexts,
                 backfilled=backfilled,
             )
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 6398774c02..72bb638167 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -376,8 +376,9 @@ class EventCreationHandler(object):
         self.notifier = hs.get_notifier()
         self.config = hs.config
         self.require_membership_for_aliases = hs.config.require_membership_for_aliases
-        self._events_shard_config = self.config.worker.events_shard_config
-        self._instance_name = hs.get_instance_name()
+        self._is_event_writer = (
+            self.config.worker.writers.events == hs.get_instance_name()
+        )
 
         self.room_invite_state_types = self.hs.config.room_invite_state_types
 
@@ -903,10 +904,9 @@ class EventCreationHandler(object):
 
         try:
             # If we're a worker we need to hit out to the master.
-            writer_instance = self._events_shard_config.get_instance(event.room_id)
-            if writer_instance != self._instance_name:
+            if not self._is_event_writer:
                 result = await self.send_event(
-                    instance_name=writer_instance,
+                    instance_name=self.config.worker.writers.events,
                     event_id=event.event_id,
                     store=self.store,
                     requester=requester,
@@ -974,9 +974,7 @@ class EventCreationHandler(object):
 
         This should only be run on the instance in charge of persisting events.
         """
-        assert self._events_shard_config.should_handle(
-            self._instance_name, event.room_id
-        )
+        assert self._is_event_writer
 
         if ratelimit:
             # We check if this is a room admin redacting an event so that we
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 55794c3057..9d5b1828df 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -804,9 +804,7 @@ class RoomCreationHandler(BaseHandler):
 
         # Always wait for room creation to progate before returning
         await self._replication.wait_for_stream_position(
-            self.hs.config.worker.events_shard_config.get_instance(room_id),
-            "events",
-            last_stream_id,
+            self.hs.config.worker.writers.events, "events", last_stream_id
         )
 
         return result, last_stream_id
@@ -1262,10 +1260,10 @@ class RoomShutdownHandler(object):
             # We now wait for the create room to come back in via replication so
             # that we can assume that all the joins/invites have propogated before
             # we try and auto join below.
+            #
+            # TODO: Currently the events stream is written to from master
             await self._replication.wait_for_stream_position(
-                self.hs.config.worker.events_shard_config.get_instance(new_room_id),
-                "events",
-                stream_id,
+                self.hs.config.worker.writers.events, "events", stream_id
             )
         else:
             new_room_id = None
@@ -1295,9 +1293,7 @@ class RoomShutdownHandler(object):
 
                 # Wait for leave to come in over replication before trying to forget.
                 await self._replication.wait_for_stream_position(
-                    self.hs.config.worker.events_shard_config.get_instance(room_id),
-                    "events",
-                    stream_id,
+                    self.hs.config.worker.writers.events, "events", stream_id
                 )
 
                 await self.room_member_handler.forget(target_requester.user, room_id)
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index e5cb3c2d5c..a7962b0ada 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -82,6 +82,13 @@ class RoomMemberHandler(object):
         self._enable_lookup = hs.config.enable_3pid_lookup
         self.allow_per_room_profiles = self.config.allow_per_room_profiles
 
+        self._event_stream_writer_instance = hs.config.worker.writers.events
+        self._is_on_event_persistence_instance = (
+            self._event_stream_writer_instance == hs.get_instance_name()
+        )
+        if self._is_on_event_persistence_instance:
+            self.persist_event_storage = hs.get_storage().persistence
+
         self._join_rate_limiter_local = Ratelimiter(
             clock=self.clock,
             rate_hz=hs.config.ratelimiting.rc_joins_local.per_second,