summary refs log tree commit diff
path: root/synapse/replication/tcp/handler.py
diff options
context:
space:
mode:
authorPatrick Cloke <patrickc@matrix.org>2023-11-09 09:48:24 -0500
committerPatrick Cloke <patrickc@matrix.org>2023-11-09 09:48:24 -0500
commit396fa974a10a393be068ff2ab87134e89567b808 (patch)
treefa9cc3cd57a77b71b68dd5f22afe952220ed936a /synapse/replication/tcp/handler.py
parentDon't use separate copy_read method. (diff)
parentBump pyicu from 2.11 to 2.12 (#16603) (diff)
downloadsynapse-396fa974a10a393be068ff2ab87134e89567b808.tar.xz
Merge remote-tracking branch 'origin/develop' into clokep/psycopg3
Diffstat (limited to 'synapse/replication/tcp/handler.py')
-rw-r--r--synapse/replication/tcp/handler.py16
1 files changed, 9 insertions, 7 deletions
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py

index b668bb5da1..afd03137f0 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py
@@ -611,10 +611,14 @@ class ReplicationCommandHandler: # Find where we previously streamed up to. current_token = stream.current_token(cmd.instance_name) - # If the position token matches our current token then we're up to - # date and there's nothing to do. Otherwise, fetch all updates - # between then and now. - missing_updates = cmd.prev_token != current_token + # If the incoming previous position is less than our current position + # then we're up to date and there's nothing to do. Otherwise, fetch + # all updates between then and now. + # + # Note: We also have to check that `current_token` is at most the + # new position, to handle the case where the stream gets "reset" + # (e.g. for `caches` and `typing` after the writer's restart). + missing_updates = not (cmd.prev_token <= current_token <= cmd.new_token) while missing_updates: # Note: There may very well not be any new updates, but we check to # make sure. This can particularly happen for the event stream where @@ -644,7 +648,7 @@ class ReplicationCommandHandler: [stream.parse_row(row) for row in rows], ) - logger.info("Caught up with stream '%s' to %i", stream_name, cmd.new_token) + logger.info("Caught up with stream '%s' to %i", stream_name, current_token) # We've now caught up to position sent to us, notify handler. await self._replication_data_handler.on_position( @@ -657,8 +661,6 @@ class ReplicationCommandHandler: self, conn: IReplicationConnection, cmd: RemoteServerUpCommand ) -> None: """Called when get a new REMOTE_SERVER_UP command.""" - self._replication_data_handler.on_remote_server_up(cmd.data) - self._notifier.notify_remote_server_up(cmd.data) def on_LOCK_RELEASED(