diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 4374e99e32..8b6067e20d 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -251,14 +251,19 @@ class ReplicationStreamer(object):
self.federation_sender.federation_ack(token)
@measure_func("repl.on_user_sync")
- async def on_user_sync(self, conn_id, user_id, is_syncing, last_sync_ms):
+ async def on_user_sync(self, instance_id, user_id, is_syncing, last_sync_ms):
"""A client has started/stopped syncing on a worker.
"""
user_sync_counter.inc()
await self.presence_handler.update_external_syncs_row(
- conn_id, user_id, is_syncing, last_sync_ms
+ instance_id, user_id, is_syncing, last_sync_ms
)
+ async def on_clear_user_syncs(self, instance_id):
+ """A replication client wants us to drop all their UserSync data.
+ """
+ await self.presence_handler.update_external_syncs_clear(instance_id)
+
@measure_func("repl.on_remove_pusher")
async def on_remove_pusher(self, app_id, push_key, user_id):
"""A client has asked us to remove a pusher
@@ -321,14 +326,6 @@ class ReplicationStreamer(object):
except ValueError:
pass
- # We need to tell the presence handler that the connection has been
- # lost so that it can handle any ongoing syncs on that connection.
- run_as_background_process(
- "update_external_syncs_clear",
- self.presence_handler.update_external_syncs_clear,
- connection.conn_id,
- )
-
def _batch_updates(updates):
"""Takes a list of updates of form [(token, row)] and sets the token to
|