diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 383edf07ad..c265bc0803 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -604,7 +604,7 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler):
self.pusher_pool = hs.get_pusherpool()
if hs.config.send_federation:
- self.send_handler = FederationSenderHandler(hs, self)
+ self.send_handler = FederationSenderHandler(hs)
else:
self.send_handler = None
@@ -624,7 +624,9 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler):
async def process_and_notify(self, stream_name, instance_name, token, rows):
try:
if self.send_handler:
- self.send_handler.process_replication_rows(stream_name, token, rows)
+ self.send_handler.process_replication_rows(
+ stream_name, instance_name, token, rows
+ )
if stream_name == EventsStream.NAME:
# We shouldn't get multiple rows per token for events stream, so
@@ -724,13 +726,14 @@ class FederationSenderHandler(object):
to the federation sender.
"""
- def __init__(self, hs: GenericWorkerServer, replication_client):
+ def __init__(self, hs: GenericWorkerServer):
+ self.hs = hs
self.store = hs.get_datastore()
self._is_mine_id = hs.is_mine_id
self.federation_sender = hs.get_federation_sender()
- self.replication_client = replication_client
+ # self.replication_client = hs.get_tcp_replication()
- self.federation_position = self.store.federation_out_pos_startup
+ self.federation_position = {"master": self.store.federation_out_pos_startup}
self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer")
self._last_ack = self.federation_position
@@ -749,14 +752,14 @@ class FederationSenderHandler(object):
self.federation_sender.wake_destination(server)
def stream_positions(self):
- return {"federation": {"master": self.federation_position}}
+ return {"federation": self.federation_position}
- def process_replication_rows(self, stream_name, token, rows):
+ def process_replication_rows(self, stream_name, instance_name, token, rows):
# The federation stream contains things that we want to send out, e.g.
# presence, typing, etc.
if stream_name == "federation":
send_queue.process_rows_for_federation(self.federation_sender, rows)
- run_in_background(self.update_token, token)
+ run_in_background(self.update_token, instance_name, token)
# We also need to poke the federation sender when new events happen
elif stream_name == "events":
@@ -804,9 +807,12 @@ class FederationSenderHandler(object):
)
await self.federation_sender.send_read_receipt(receipt_info)
- async def update_token(self, token):
+ async def update_token(self, instance_name, token):
try:
- self.federation_position = token
+ self.federation_position[instance_name] = token
+ return
+
+ # FIXME
# We linearize here to ensure we don't have races updating the token
with (await self._fed_position_linearizer.queue(None)):
@@ -817,7 +823,7 @@ class FederationSenderHandler(object):
# We ACK this token over replication so that the master can drop
# its in memory queues
- self.replication_client.send_federation_ack(
+ self.hs.get_tcp_replication().send_federation_ack(
self.federation_position
)
self._last_ack = self.federation_position
|