summary refs log tree commit diff
path: root/synapse/replication/tcp
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp')
-rw-r--r--synapse/replication/tcp/client.py12
-rw-r--r--synapse/replication/tcp/redis.py1
-rw-r--r--synapse/replication/tcp/resource.py18
-rw-r--r--synapse/replication/tcp/streams/events.py1
4 files changed, 28 insertions, 4 deletions
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py

index cc0528bd8e..424854efbe 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py
@@ -370,15 +370,23 @@ class ReplicationDataHandler: # We measure here to get in flight counts and average waiting time. with Measure(self._clock, "repl.wait_for_stream_position"): logger.info( - "Waiting for repl stream %r to reach %s (%s)", + "Waiting for repl stream %r to reach %s (%s); currently at: %s", stream_name, position, instance_name, + current_position, ) try: await make_deferred_yieldable(deferred) except defer.TimeoutError: - logger.error("Timed out waiting for stream %s", stream_name) + logger.error( + "Timed out waiting for repl stream %r to reach %s (%s)" + "; currently at: %s", + stream_name, + position, + instance_name, + self._streams[stream_name].current_token(instance_name), + ) return logger.info( diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py
index fd1c0ec6af..dfc061eb5e 100644 --- a/synapse/replication/tcp/redis.py +++ b/synapse/replication/tcp/redis.py
@@ -328,7 +328,6 @@ class RedisDirectTcpReplicationClientFactory(SynapseRedisFactory): outbound_redis_connection: txredisapi.ConnectionHandler, channel_names: List[str], ): - super().__init__( hs, uuid="subscriber", diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 9d17eff714..347467d863 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py
@@ -238,6 +238,24 @@ class ReplicationStreamer: except Exception: logger.exception("Failed to replicate") + # The last token we send may not match the current + # token, in which case we want to send out a `POSITION` + # to tell other workers the actual current position. + if updates[-1][0] < current_token: + logger.info( + "Sending position: %s -> %s", + stream.NAME, + current_token, + ) + self.command_handler.send_command( + PositionCommand( + stream.NAME, + self._instance_name, + updates[-1][0], + current_token, + ) + ) + logger.debug("No more pending updates, breaking poke loop") finally: self.pending_updates = False diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
index 14b6705862..ad9b760713 100644 --- a/synapse/replication/tcp/streams/events.py +++ b/synapse/replication/tcp/streams/events.py
@@ -139,7 +139,6 @@ class EventsStream(Stream): current_token: Token, target_row_count: int, ) -> StreamUpdateResult: - # the events stream merges together three separate sources: # * new events # * current_state changes