diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index e354c803db..75ec90d267 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -126,11 +126,10 @@ class FederationHandler(BaseHandler):
self._server_notices_mxid = hs.config.server_notices_mxid
self.config = hs.config
self.http_client = hs.get_simple_http_client()
+ self._instance_name = hs.get_instance_name()
self._replication = hs.get_replication_data_handler()
- self._send_events_to_master = ReplicationFederationSendEventsRestServlet.make_client(
- hs
- )
+ self._send_events = ReplicationFederationSendEventsRestServlet.make_client(hs)
self._notify_user_membership_change = ReplicationUserJoinedLeftRoomRestServlet.make_client(
hs
)
@@ -1243,6 +1242,10 @@ class FederationHandler(BaseHandler):
content: The event content to use for the join event.
"""
+ # TODO: We should be able to call this on workers, but the upgrading of
+ # room stuff after join currently doesn't work on workers.
+ assert self.config.worker.worker_app is None
+
logger.debug("Joining %s to %s", joinee, room_id)
origin, event, room_version_obj = await self._make_and_verify_event(
@@ -1314,7 +1317,7 @@ class FederationHandler(BaseHandler):
#
# TODO: Currently the events stream is written to from master
await self._replication.wait_for_stream_position(
- "master", "events", max_stream_id
+ self.config.worker.writers.events, "events", max_stream_id
)
# Check whether this room is the result of an upgrade of a room we already know
@@ -2854,8 +2857,9 @@ class FederationHandler(BaseHandler):
backfilled: Whether these events are a result of
backfilling or not
"""
- if self.config.worker_app:
- result = await self._send_events_to_master(
+ if self.config.worker.writers.events != self._instance_name:
+ result = await self._send_events(
+ instance_name=self.config.worker.writers.events,
store=self.store,
event_and_contexts=event_and_contexts,
backfilled=backfilled,
|