summary refs log tree commit diff
path: root/synapse/app/generic_worker.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/app/generic_worker.py')
-rw-r--r--synapse/app/generic_worker.py28
1 files changed, 17 insertions, 11 deletions
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 383edf07ad..c265bc0803 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -604,7 +604,7 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler):
         self.pusher_pool = hs.get_pusherpool()
 
         if hs.config.send_federation:
-            self.send_handler = FederationSenderHandler(hs, self)
+            self.send_handler = FederationSenderHandler(hs)
         else:
             self.send_handler = None
 
@@ -624,7 +624,9 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler):
     async def process_and_notify(self, stream_name, instance_name, token, rows):
         try:
             if self.send_handler:
-                self.send_handler.process_replication_rows(stream_name, token, rows)
+                self.send_handler.process_replication_rows(
+                    stream_name, instance_name, token, rows
+                )
 
             if stream_name == EventsStream.NAME:
                 # We shouldn't get multiple rows per token for events stream, so
@@ -724,13 +726,14 @@ class FederationSenderHandler(object):
     to the federation sender.
     """
 
-    def __init__(self, hs: GenericWorkerServer, replication_client):
+    def __init__(self, hs: GenericWorkerServer):
+        self.hs = hs
         self.store = hs.get_datastore()
         self._is_mine_id = hs.is_mine_id
         self.federation_sender = hs.get_federation_sender()
-        self.replication_client = replication_client
+        # self.replication_client = hs.get_tcp_replication()
 
-        self.federation_position = self.store.federation_out_pos_startup
+        self.federation_position = {"master": self.store.federation_out_pos_startup}
         self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer")
 
         self._last_ack = self.federation_position
@@ -749,14 +752,14 @@ class FederationSenderHandler(object):
         self.federation_sender.wake_destination(server)
 
     def stream_positions(self):
-        return {"federation": {"master": self.federation_position}}
+        return {"federation": self.federation_position}
 
-    def process_replication_rows(self, stream_name, token, rows):
+    def process_replication_rows(self, stream_name, instance_name, token, rows):
         # The federation stream contains things that we want to send out, e.g.
         # presence, typing, etc.
         if stream_name == "federation":
             send_queue.process_rows_for_federation(self.federation_sender, rows)
-            run_in_background(self.update_token, token)
+            run_in_background(self.update_token, instance_name, token)
 
         # We also need to poke the federation sender when new events happen
         elif stream_name == "events":
@@ -804,9 +807,12 @@ class FederationSenderHandler(object):
             )
             await self.federation_sender.send_read_receipt(receipt_info)
 
-    async def update_token(self, token):
+    async def update_token(self, instance_name, token):
         try:
-            self.federation_position = token
+            self.federation_position[instance_name] = token
+            return
+
+            # FIXME
 
             # We linearize here to ensure we don't have races updating the token
             with (await self._fed_position_linearizer.queue(None)):
@@ -817,7 +823,7 @@ class FederationSenderHandler(object):
 
                     # We ACK this token over replication so that the master can drop
                     # its in memory queues
-                    self.replication_client.send_federation_ack(
+                    self.hs.get_tcp_replication().send_federation_ack(
                         self.federation_position
                     )
                     self._last_ack = self.federation_position