diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index 68165cf2dc..84d2a2272a 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -189,29 +189,34 @@ class UserSyncCommand(Command):
"""Sent by the client to inform the server that a user has started or
stopped syncing. Used to calculate presence on the master.
+ Includes a timestamp of when the last user sync was.
+
Format::
- USER_SYNC <user_id> <state>
+ USER_SYNC <user_id> <state> <last_sync_ms>
Where <state> is either "start" or "stop"
"""
NAME = "USER_SYNC"
- def __init__(self, user_id, is_syncing):
+ def __init__(self, user_id, is_syncing, last_sync_ms):
self.user_id = user_id
self.is_syncing = is_syncing
+ self.last_sync_ms = last_sync_ms
@classmethod
def from_line(cls, line):
- user_id, state = line.split(" ", 1)
+ user_id, state, last_sync_ms = line.split(" ", 2)
if state not in ("start", "end"):
raise Exception("Invalid USER_SYNC state %r" % (state,))
- return cls(user_id, state == "start")
+ return cls(user_id, state == "start", int(last_sync_ms))
def to_line(self):
- return " ".join((self.user_id, "start" if self.is_syncing else "end"))
+ return " ".join((
+ self.user_id, "start" if self.is_syncing else "end", str(self.last_sync_ms),
+ ))
class FederationAckCommand(Command):
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index c1dc91bdb7..80f732b455 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -368,7 +368,9 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
self.name = cmd.data
def on_USER_SYNC(self, cmd):
- self.streamer.on_user_sync(self.conn_id, cmd.user_id, cmd.is_syncing)
+ self.streamer.on_user_sync(
+ self.conn_id, cmd.user_id, cmd.is_syncing, cmd.last_sync_ms,
+ )
def on_REPLICATE(self, cmd):
stream_name = cmd.stream_name
@@ -481,8 +483,9 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
# Tell the server if we have any users currently syncing (should only
# happen on synchrotrons)
currently_syncing = self.handler.get_currently_syncing_users()
+ now = self.clock.time_msec()
for user_id in currently_syncing:
- self.send_command(UserSyncCommand(user_id, True))
+ self.send_command(UserSyncCommand(user_id, True, now))
# We've now finished connecting to so inform the client handler
self.handler.update_connection(self)
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index d5da0496a8..243a81d488 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -220,12 +220,12 @@ class ReplicationStreamer(object):
self.federation_sender.federation_ack(token)
@measure_func("repl.on_user_sync")
- def on_user_sync(self, conn_id, user_id, is_syncing):
+ def on_user_sync(self, conn_id, user_id, is_syncing, last_sync_ms):
"""A client has started/stopped syncing on a worker.
"""
user_sync_counter.inc()
self.presence_handler.update_external_syncs_row(
- conn_id, user_id, is_syncing
+ conn_id, user_id, is_syncing, last_sync_ms,
)
@measure_func("repl.on_remove_pusher")
|