diff options
-rw-r--r-- | synapse/handlers/presence.py | 14 | ||||
-rw-r--r-- | synapse/replication/tcp/commands.py | 15 | ||||
-rw-r--r-- | synapse/replication/tcp/protocol.py | 7 | ||||
-rw-r--r-- | synapse/replication/tcp/resource.py | 4 |
4 files changed, 24 insertions, 16 deletions
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 9e14760659..53baf3e79a 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -511,7 +511,7 @@ class PresenceHandler(object): self.external_process_to_current_syncs[process_id] = syncing_user_ids @defer.inlineCallbacks - def update_external_syncs_row(self, process_id, user_id, is_syncing): + def update_external_syncs_row(self, process_id, user_id, is_syncing, sync_time_msec): """Update the syncing users for an external process as a delta. Args: @@ -520,6 +520,7 @@ class PresenceHandler(object): as user start and stop syncing against a given process. user_id (str): The user who has started or stopped syncing is_syncing (bool): Whether or not the user is now syncing + sync_time_msec(int): Time in ms when the user was last syncing """ with (yield self.external_sync_linearizer.queue(process_id)): prev_state = yield self.current_state_for_user(user_id) @@ -527,24 +528,23 @@ class PresenceHandler(object): process_presence = self.external_process_to_current_syncs.setdefault( process_id, set() ) - time_now_ms = self.clock.time_msec() updates = [] if is_syncing and user_id not in process_presence: if prev_state.state == PresenceState.OFFLINE: updates.append(prev_state.copy_and_replace( state=PresenceState.ONLINE, - last_active_ts=time_now_ms, - last_user_sync_ts=time_now_ms, + last_active_ts=sync_time_msec, + last_user_sync_ts=sync_time_msec, )) else: updates.append(prev_state.copy_and_replace( - last_user_sync_ts=time_now_ms, + last_user_sync_ts=sync_time_msec, )) process_presence.add(user_id) elif user_id in process_presence: updates.append(prev_state.copy_and_replace( - last_user_sync_ts=time_now_ms, + last_user_sync_ts=sync_time_msec, )) if not is_syncing: @@ -553,7 +553,7 @@ class PresenceHandler(object): if updates: yield self._update_states(updates) - self.external_process_last_updated_ms[process_id] = time_now_ms + self.external_process_last_updated_ms[process_id] = self.clock.time_msec() @defer.inlineCallbacks def update_external_syncs_clear(self, process_id): diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py index 68165cf2dc..84d2a2272a 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py @@ -189,29 +189,34 @@ class UserSyncCommand(Command): """Sent by the client to inform the server that a user has started or stopped syncing. Used to calculate presence on the master. + Includes a timestamp of when the last user sync was. + Format:: - USER_SYNC <user_id> <state> + USER_SYNC <user_id> <state> <last_sync_ms> Where <state> is either "start" or "stop" """ NAME = "USER_SYNC" - def __init__(self, user_id, is_syncing): + def __init__(self, user_id, is_syncing, last_sync_ms): self.user_id = user_id self.is_syncing = is_syncing + self.last_sync_ms = last_sync_ms @classmethod def from_line(cls, line): - user_id, state = line.split(" ", 1) + user_id, state, last_sync_ms = line.split(" ", 2) if state not in ("start", "end"): raise Exception("Invalid USER_SYNC state %r" % (state,)) - return cls(user_id, state == "start") + return cls(user_id, state == "start", int(last_sync_ms)) def to_line(self): - return " ".join((self.user_id, "start" if self.is_syncing else "end")) + return " ".join(( + self.user_id, "start" if self.is_syncing else "end", str(self.last_sync_ms), + )) class FederationAckCommand(Command): diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index c1dc91bdb7..80f732b455 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -368,7 +368,9 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): self.name = cmd.data def on_USER_SYNC(self, cmd): - self.streamer.on_user_sync(self.conn_id, cmd.user_id, cmd.is_syncing) + self.streamer.on_user_sync( + self.conn_id, cmd.user_id, cmd.is_syncing, cmd.last_sync_ms, + ) def on_REPLICATE(self, cmd): stream_name = cmd.stream_name @@ -481,8 +483,9 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol): # Tell the server if we have any users currently syncing (should only # happen on synchrotrons) currently_syncing = self.handler.get_currently_syncing_users() + now = self.clock.time_msec() for user_id in currently_syncing: - self.send_command(UserSyncCommand(user_id, True)) + self.send_command(UserSyncCommand(user_id, True, now)) # We've now finished connecting to so inform the client handler self.handler.update_connection(self) diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index d5da0496a8..243a81d488 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -220,12 +220,12 @@ class ReplicationStreamer(object): self.federation_sender.federation_ack(token) @measure_func("repl.on_user_sync") - def on_user_sync(self, conn_id, user_id, is_syncing): + def on_user_sync(self, conn_id, user_id, is_syncing, last_sync_ms): """A client has started/stopped syncing on a worker. """ user_sync_counter.inc() self.presence_handler.update_external_syncs_row( - conn_id, user_id, is_syncing + conn_id, user_id, is_syncing, last_sync_ms, ) @measure_func("repl.on_remove_pusher") |