diff options
author | Erik Johnston <erik@matrix.org> | 2017-04-04 10:29:29 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2017-04-04 11:02:44 +0100 |
commit | 6ce6bbedcb8721fc99200ecace94eeedb51da6f2 (patch) | |
tree | 9a3592597976757db9da486bb8dfb13961ea9ac0 | |
parent | Update all the workers and master to use TCP replication (diff) | |
download | synapse-6ce6bbedcb8721fc99200ecace94eeedb51da6f2.tar.xz |
Move where we ack federation
Diffstat (limited to '')
-rw-r--r-- | synapse/app/federation_sender.py | 23 |
1 files changed, 16 insertions, 7 deletions
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 8994891aeb..705fe5e3df 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -153,15 +153,13 @@ class FederationSenderServer(HomeServer): class FederationSenderReplicationHandler(ReplicationClientHandler): def __init__(self, hs): super(FederationSenderReplicationHandler, self).__init__(hs.get_datastore()) - self.send_handler = FederationSenderHandler(hs) + self.send_handler = FederationSenderHandler(hs, self) def on_rdata(self, stream_name, token, rows): super(FederationSenderReplicationHandler, self).on_rdata( stream_name, token, rows ) self.send_handler.process_replication_rows(stream_name, token, rows) - if stream_name == "federation": - self.send_federation_ack(token) def get_streams_to_replicate(self): args = super(FederationSenderReplicationHandler, self).get_streams_to_replicate() @@ -248,13 +246,16 @@ class FederationSenderHandler(object): """Processes the replication stream and forwards the appropriate entries to the federation sender. """ - def __init__(self, hs): + def __init__(self, hs, replication_client): self.store = hs.get_datastore() self.federation_sender = hs.get_federation_sender() + self.replication_client = replication_client self.federation_position = self.store.federation_out_pos_startup self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer") + self._last_ack = self.federation_position + self._room_serials = {} self._room_typing = {} @@ -345,10 +346,18 @@ class FederationSenderHandler(object): @defer.inlineCallbacks def update_token(self, token): self.federation_position = token + + # We linearize here to ensure we don't have races updating the token with (yield self._fed_position_linearizer.queue(None)): - yield self.store.update_federation_out_pos( - "federation", self.federation_position - ) + if self._last_ack < self.federation_position: + yield self.store.update_federation_out_pos( + "federation", self.federation_position + ) + + # We ACK this token over replication so that the master can drop + # its in memory queues + self.replication_client.send_federation_ack(self.federation_position) + self._last_ack = self.federation_position if __name__ == '__main__': |