diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 383edf07ad..c265bc0803 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -604,7 +604,7 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler):
self.pusher_pool = hs.get_pusherpool()
if hs.config.send_federation:
- self.send_handler = FederationSenderHandler(hs, self)
+ self.send_handler = FederationSenderHandler(hs)
else:
self.send_handler = None
@@ -624,7 +624,9 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler):
async def process_and_notify(self, stream_name, instance_name, token, rows):
try:
if self.send_handler:
- self.send_handler.process_replication_rows(stream_name, token, rows)
+ self.send_handler.process_replication_rows(
+ stream_name, instance_name, token, rows
+ )
if stream_name == EventsStream.NAME:
# We shouldn't get multiple rows per token for events stream, so
@@ -724,13 +726,14 @@ class FederationSenderHandler(object):
to the federation sender.
"""
- def __init__(self, hs: GenericWorkerServer, replication_client):
+ def __init__(self, hs: GenericWorkerServer):
+ self.hs = hs
self.store = hs.get_datastore()
self._is_mine_id = hs.is_mine_id
self.federation_sender = hs.get_federation_sender()
- self.replication_client = replication_client
+ # self.replication_client = hs.get_tcp_replication()
- self.federation_position = self.store.federation_out_pos_startup
+ self.federation_position = {"master": self.store.federation_out_pos_startup}
self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer")
self._last_ack = self.federation_position
@@ -749,14 +752,14 @@ class FederationSenderHandler(object):
self.federation_sender.wake_destination(server)
def stream_positions(self):
- return {"federation": {"master": self.federation_position}}
+ return {"federation": self.federation_position}
- def process_replication_rows(self, stream_name, token, rows):
+ def process_replication_rows(self, stream_name, instance_name, token, rows):
# The federation stream contains things that we want to send out, e.g.
# presence, typing, etc.
if stream_name == "federation":
send_queue.process_rows_for_federation(self.federation_sender, rows)
- run_in_background(self.update_token, token)
+ run_in_background(self.update_token, instance_name, token)
# We also need to poke the federation sender when new events happen
elif stream_name == "events":
@@ -804,9 +807,12 @@ class FederationSenderHandler(object):
)
await self.federation_sender.send_read_receipt(receipt_info)
- async def update_token(self, token):
+ async def update_token(self, instance_name, token):
try:
- self.federation_position = token
+ self.federation_position[instance_name] = token
+ return
+
+ # FIXME
# We linearize here to ensure we don't have races updating the token
with (await self._fed_position_linearizer.queue(None)):
@@ -817,7 +823,7 @@ class FederationSenderHandler(object):
# We ACK this token over replication so that the master can drop
# its in memory queues
- self.replication_client.send_federation_ack(
+ self.hs.get_tcp_replication().send_federation_ack(
self.federation_position
)
self._last_ack = self.federation_position
diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index fef72ed974..43bdf039e7 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -48,6 +48,8 @@ class WorkerConfig(Config):
self.worker_main_http_uri = config.get("worker_main_http_uri", None)
+ self.instance_http_map = config.get("instance_http_map", {})
+
# This option is really only here to support `--manhole` command line
# argument.
manhole = config.get("worker_manhole")
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 89d521bc31..6fb65b5dfb 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -819,7 +819,16 @@ class ReplicationFederationHandlerRegistry(FederationHandlerRegistry):
edu_type, origin, content
)
- return await self._send_edu(edu_type=edu_type, origin=origin, content=content)
+ if edu_type == "m.typing":
+ instance_name = "synapse.app.client_reader"
+ else:
+ instance_name = "master"
+ return await self._send_edu(
+ instance_name=instance_name,
+ edu_type=edu_type,
+ origin=origin,
+ content=content,
+ )
async def on_query(self, query_type, args):
"""Overrides FederationHandlerRegistry
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index f8a347fa85..3e1eb27d55 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -56,7 +56,7 @@ class TypingHandler(object):
self.clock = hs.get_clock()
self.wheel_timer = WheelTimer(bucket_size=5000)
- # self.federation = hs.get_federation_sender()
+ self.federation = hs.get_federation_sender()
hs.get_federation_registry().register_edu_handler("m.typing", self._recv_edu)
@@ -203,16 +203,16 @@ class TypingHandler(object):
for domain in {get_domain_from_id(u) for u in users}:
if domain != self.server_name:
logger.debug("sending typing update to %s", domain)
- # self.federation.build_and_send_edu(
- # destination=domain,
- # edu_type="m.typing",
- # content={
- # "room_id": member.room_id,
- # "user_id": member.user_id,
- # "typing": typing,
- # },
- # key=member,
- # )
+ self.federation.build_and_send_edu(
+ destination=domain,
+ edu_type="m.typing",
+ content={
+ "room_id": member.room_id,
+ "user_id": member.user_id,
+ "typing": typing,
+ },
+ key=member,
+ )
except Exception:
logger.exception("Error pushing typing notif to remotes")
@@ -309,7 +309,9 @@ class TypingSlaveHandler(object):
# We must update this typing token from the response of the previous
# sync. In particular, the stream id may "reset" back to zero/a low
# value which we *must* use for the next replication request.
- return {"typing": {"master": self._latest_room_serial}}
+ return {
+ "typing": {"synapse.app.client_reader": self._latest_room_serial}
+ } # FIXME
def process_replication_rows(self, stream_name, token, rows):
if stream_name != TypingStream.NAME:
diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py
index 4613b2538c..f43a4ed009 100644
--- a/synapse/replication/http/__init__.py
+++ b/synapse/replication/http/__init__.py
@@ -33,10 +33,12 @@ class ReplicationRestResource(JsonResource):
self.register_servlets(hs)
def register_servlets(self, hs):
- send_event.register_servlets(hs, self)
- membership.register_servlets(hs, self)
- federation.register_servlets(hs, self)
- login.register_servlets(hs, self)
- register.register_servlets(hs, self)
- devices.register_servlets(hs, self)
+ if hs.config.worker_app is None:
+ send_event.register_servlets(hs, self)
+ membership.register_servlets(hs, self)
+ login.register_servlets(hs, self)
+ register.register_servlets(hs, self)
+ devices.register_servlets(hs, self)
+
streams.register_servlets(hs, self)
+ federation.register_servlets(hs, self)
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index a9ad601132..cdbc406c97 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -128,15 +128,23 @@ class ReplicationEndpoint(object):
Returns a callable that accepts the same parameters as `_serialize_payload`.
"""
clock = hs.get_clock()
- host = hs.config.worker_replication_host
- port = hs.config.worker_replication_http_port
+ master_host = hs.config.worker_replication_host
+ master_port = hs.config.worker_replication_http_port
+
+ instance_http_map = hs.config.instance_http_map
client = hs.get_simple_http_client()
@trace(opname="outgoing_replication_request")
@defer.inlineCallbacks
def send_request(instance_name="master", **kwargs):
- if instance_name != "master":
+ if instance_name == "master":
+ host = master_host
+ port = master_port
+ elif instance_name in instance_http_map:
+ host = instance_http_map[instance_name]["host"]
+ port = instance_http_map[instance_name]["port"]
+ else:
raise Exception("Unknown instance")
data = yield cls._serialize_payload(**kwargs)
diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py
index 7e23b565b9..0a11743845 100644
--- a/synapse/replication/http/federation.py
+++ b/synapse/replication/http/federation.py
@@ -277,8 +277,10 @@ class ReplicationStoreRoomOnInviteRestServlet(ReplicationEndpoint):
def register_servlets(hs, http_server):
- ReplicationFederationSendEventsRestServlet(hs).register(http_server)
+ if hs.config.worker_app is None:
+ ReplicationFederationSendEventsRestServlet(hs).register(http_server)
+ ReplicationGetQueryRestServlet(hs).register(http_server)
+ ReplicationCleanRoomRestServlet(hs).register(http_server)
+ ReplicationStoreRoomOnInviteRestServlet(hs).register(http_server)
+
ReplicationFederationSendEduRestServlet(hs).register(http_server)
- ReplicationGetQueryRestServlet(hs).register(http_server)
- ReplicationCleanRoomRestServlet(hs).register(http_server)
- ReplicationStoreRoomOnInviteRestServlet(hs).register(http_server)
diff --git a/synapse/replication/http/streams.py b/synapse/replication/http/streams.py
index 9c1fc9fb25..11a0270699 100644
--- a/synapse/replication/http/streams.py
+++ b/synapse/replication/http/streams.py
@@ -51,6 +51,8 @@ class ReplicationGetStreamUpdates(ReplicationEndpoint):
# them ourselves we end up in an import loop).
self.streams = hs.get_replication_streamer().get_streams()
+ self.instance_name = hs.config.worker_name or "master"
+
@staticmethod
def _serialize_payload(stream_name, from_token, upto_token, limit):
return {"from_token": from_token, "upto_token": upto_token, "limit": limit}
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index cfba255897..ac4d6d1dd1 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -63,6 +63,8 @@ class ReplicationClientHandler:
self.presence_handler = hs.get_presence_handler()
self.instance_id = hs.get_instance_id()
+ self.instance_name = hs.config.worker.worker_name or "master"
+
self.connections = [] # type: List[Any]
self.streams = {
@@ -134,7 +136,9 @@ class ReplicationClientHandler:
for stream_name, stream in self.streams.items():
current_token = stream.current_token()
- self.send_command(PositionCommand(stream_name, "master", current_token))
+ self.send_command(
+ PositionCommand(stream_name, self.instance_name, current_token)
+ )
async def on_USER_SYNC(self, cmd: UserSyncCommand):
user_sync_counter.inc()
@@ -232,17 +236,17 @@ class ReplicationClientHandler:
return
# Find where we previously streamed up to.
- current_token = (
- self.replication_data_handler.get_streams_to_replicate()
- .get(cmd.stream_name, {})
- .get(cmd.instance_name)
+ current_tokens = self.replication_data_handler.get_streams_to_replicate().get(
+ cmd.stream_name
)
- if current_token is None:
+ if current_tokens is None:
logger.debug(
"Got POSITION for stream we're not subscribed to: %s", cmd.stream_name
)
return
+ current_token = current_tokens.get(cmd.instance_name, 0)
+
# Fetch all updates between then and now.
limited = cmd.token != current_token
while limited:
@@ -335,7 +339,7 @@ class ReplicationClientHandler:
We need to check if the client is interested in the stream or not
"""
- self.send_command(RdataCommand(stream_name, "master", token, data))
+ self.send_command(RdataCommand(stream_name, self.instance_name, token, data))
class ReplicationDataHandler:
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index bbd2c6ec41..d421cc477f 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -74,9 +74,7 @@ class ReplicationStreamer(object):
self.streams = [] # type: List[Stream]
if hs.config.worker_app is None:
for stream in STREAMS_MAP.values():
- if stream == FederationStream and hs.config.send_federation:
- # We only support federation stream if federation sending
- # hase been disabled on the master.
+ if stream == FederationStream:
continue
if stream == TypingStream:
@@ -87,6 +85,9 @@ class ReplicationStreamer(object):
if hs.config.server.handle_typing:
self.streams.append(TypingStream(hs))
+ # We always add federation stream
+ self.streams.append(FederationStream(hs))
+
self.streams_by_name = {stream.NAME: stream for stream in self.streams}
self.notifier.add_replication_callback(self.on_notifier_poke)
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 87ab631fd4..e68b220956 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -256,7 +256,7 @@ class TypingStream(Stream):
self.current_token = typing_handler.get_current_token # type: ignore
- if hs.config.worker_app is None:
+ if hs.config.handle_typing:
self.update_function = db_query_to_update_function(typing_handler.get_all_typing_updates) # type: ignore
else:
# Query master process
diff --git a/synapse/replication/tcp/streams/federation.py b/synapse/replication/tcp/streams/federation.py
index 48c1d45718..40951e81c9 100644
--- a/synapse/replication/tcp/streams/federation.py
+++ b/synapse/replication/tcp/streams/federation.py
@@ -15,8 +15,6 @@
# limitations under the License.
from collections import namedtuple
-from twisted.internet import defer
-
from synapse.replication.tcp.streams._base import Stream, db_query_to_update_function
@@ -41,12 +39,8 @@ class FederationStream(Stream):
# Not all synapse instances will have a federation sender instance,
# whether that's a `FederationSender` or a `FederationRemoteSendQueue`,
# so we stub the stream out when that is the case.
- if hs.config.worker_app is None or hs.should_send_federation():
- federation_sender = hs.get_federation_sender()
- self.current_token = federation_sender.get_current_token # type: ignore
- self.update_function = db_query_to_update_function(federation_sender.get_replication_rows) # type: ignore
- else:
- self.current_token = lambda: 0 # type: ignore
- self.update_function = lambda from_token, upto_token, limit: defer.succeed(([], upto_token, bool)) # type: ignore
+ federation_sender = hs.get_federation_sender()
+ self.current_token = federation_sender.get_current_token # type: ignore
+ self.update_function = db_query_to_update_function(federation_sender.get_replication_rows) # type: ignore
super(FederationStream, self).__init__(hs)
diff --git a/synapse/server.py b/synapse/server.py
index 8955c32cc4..2136061f1c 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -457,10 +457,8 @@ class HomeServer(object):
def build_federation_sender(self):
if self.should_send_federation():
return FederationSender(self)
- elif not self.config.worker_app:
- return FederationRemoteSendQueue(self)
else:
- raise Exception("Workers cannot send federation traffic")
+ return FederationRemoteSendQueue(self)
def build_receipts_handler(self):
return ReceiptsHandler(self)
|