diff options
Diffstat (limited to 'synapse/replication/tcp/resource.py')
-rw-r--r-- | synapse/replication/tcp/resource.py | 43 |
1 files changed, 19 insertions, 24 deletions
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index 99f09669f0..9d17eff714 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -199,33 +199,28 @@ class ReplicationStreamer: # The token has advanced but there is no data to # send, so we send a `POSITION` to inform other # workers of the updated position. - if stream.NAME == EventsStream.NAME: - # XXX: We only do this for the EventStream as it - # turns out that e.g. account data streams share - # their "current token" with each other, meaning - # that it is *not* safe to send a POSITION. - - # Note: `last_token` may not *actually* be the - # last token we sent out in a RDATA or POSITION. - # This can happen if we sent out an RDATA for - # position X when our current token was say X+1. - # Other workers will see RDATA for X and then a - # POSITION with last token of X+1, which will - # cause them to check if there were any missing - # updates between X and X+1. - logger.info( - "Sending position: %s -> %s", + + # Note: `last_token` may not *actually* be the + # last token we sent out in a RDATA or POSITION. + # This can happen if we sent out an RDATA for + # position X when our current token was say X+1. + # Other workers will see RDATA for X and then a + # POSITION with last token of X+1, which will + # cause them to check if there were any missing + # updates between X and X+1. + logger.info( + "Sending position: %s -> %s", + stream.NAME, + current_token, + ) + self.command_handler.send_command( + PositionCommand( stream.NAME, + self._instance_name, + last_token, current_token, ) - self.command_handler.send_command( - PositionCommand( - stream.NAME, - self._instance_name, - last_token, - current_token, - ) - ) + ) continue # Some streams return multiple rows with the same stream IDs, |