diff options
Diffstat (limited to 'synapse/app/generic_worker.py')
-rw-r--r-- | synapse/app/generic_worker.py | 28 |
1 files changed, 17 insertions, 11 deletions
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 383edf07ad..c265bc0803 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -604,7 +604,7 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler): self.pusher_pool = hs.get_pusherpool() if hs.config.send_federation: - self.send_handler = FederationSenderHandler(hs, self) + self.send_handler = FederationSenderHandler(hs) else: self.send_handler = None @@ -624,7 +624,9 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler): async def process_and_notify(self, stream_name, instance_name, token, rows): try: if self.send_handler: - self.send_handler.process_replication_rows(stream_name, token, rows) + self.send_handler.process_replication_rows( + stream_name, instance_name, token, rows + ) if stream_name == EventsStream.NAME: # We shouldn't get multiple rows per token for events stream, so @@ -724,13 +726,14 @@ class FederationSenderHandler(object): to the federation sender. """ - def __init__(self, hs: GenericWorkerServer, replication_client): + def __init__(self, hs: GenericWorkerServer): + self.hs = hs self.store = hs.get_datastore() self._is_mine_id = hs.is_mine_id self.federation_sender = hs.get_federation_sender() - self.replication_client = replication_client + # self.replication_client = hs.get_tcp_replication() - self.federation_position = self.store.federation_out_pos_startup + self.federation_position = {"master": self.store.federation_out_pos_startup} self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer") self._last_ack = self.federation_position @@ -749,14 +752,14 @@ class FederationSenderHandler(object): self.federation_sender.wake_destination(server) def stream_positions(self): - return {"federation": {"master": self.federation_position}} + return {"federation": self.federation_position} - def process_replication_rows(self, stream_name, token, rows): + def process_replication_rows(self, stream_name, instance_name, token, rows): # The federation stream contains things that we want to send out, e.g. # presence, typing, etc. if stream_name == "federation": send_queue.process_rows_for_federation(self.federation_sender, rows) - run_in_background(self.update_token, token) + run_in_background(self.update_token, instance_name, token) # We also need to poke the federation sender when new events happen elif stream_name == "events": @@ -804,9 +807,12 @@ class FederationSenderHandler(object): ) await self.federation_sender.send_read_receipt(receipt_info) - async def update_token(self, token): + async def update_token(self, instance_name, token): try: - self.federation_position = token + self.federation_position[instance_name] = token + return + + # FIXME # We linearize here to ensure we don't have races updating the token with (await self._fed_position_linearizer.queue(None)): @@ -817,7 +823,7 @@ class FederationSenderHandler(object): # We ACK this token over replication so that the master can drop # its in memory queues - self.replication_client.send_federation_ack( + self.hs.get_tcp_replication().send_federation_ack( self.federation_position ) self._last_ack = self.federation_position |