summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2017-03-31 11:46:20 +0100
committerErik Johnston <erik@matrix.org>2017-03-31 15:42:22 +0100
commit36d2b66f90dcdfae843c4b0ee7cba1dea23486f4 (patch)
treeed3497a65596f9daf3158330db858de02c47ea16
parentFix up presence (diff)
downloadsynapse-36d2b66f90dcdfae843c4b0ee7cba1dea23486f4.tar.xz
Add a timestamp to USER_SYNC command
This timestamp is used to indicate when the user last sync'd
-rw-r--r--synapse/handlers/presence.py14
-rw-r--r--synapse/replication/tcp/commands.py15
-rw-r--r--synapse/replication/tcp/protocol.py7
-rw-r--r--synapse/replication/tcp/resource.py4
4 files changed, 24 insertions, 16 deletions
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 9e14760659..53baf3e79a 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -511,7 +511,7 @@ class PresenceHandler(object):
         self.external_process_to_current_syncs[process_id] = syncing_user_ids
 
     @defer.inlineCallbacks
-    def update_external_syncs_row(self, process_id, user_id, is_syncing):
+    def update_external_syncs_row(self, process_id, user_id, is_syncing, sync_time_msec):
         """Update the syncing users for an external process as a delta.
 
         Args:
@@ -520,6 +520,7 @@ class PresenceHandler(object):
                 as user start and stop syncing against a given process.
             user_id (str): The user who has started or stopped syncing
             is_syncing (bool): Whether or not the user is now syncing
+            sync_time_msec(int): Time in ms when the user was last syncing
         """
         with (yield self.external_sync_linearizer.queue(process_id)):
             prev_state = yield self.current_state_for_user(user_id)
@@ -527,24 +528,23 @@ class PresenceHandler(object):
             process_presence = self.external_process_to_current_syncs.setdefault(
                 process_id, set()
             )
-            time_now_ms = self.clock.time_msec()
 
             updates = []
             if is_syncing and user_id not in process_presence:
                 if prev_state.state == PresenceState.OFFLINE:
                     updates.append(prev_state.copy_and_replace(
                         state=PresenceState.ONLINE,
-                        last_active_ts=time_now_ms,
-                        last_user_sync_ts=time_now_ms,
+                        last_active_ts=sync_time_msec,
+                        last_user_sync_ts=sync_time_msec,
                     ))
                 else:
                     updates.append(prev_state.copy_and_replace(
-                        last_user_sync_ts=time_now_ms,
+                        last_user_sync_ts=sync_time_msec,
                     ))
                 process_presence.add(user_id)
             elif user_id in process_presence:
                 updates.append(prev_state.copy_and_replace(
-                    last_user_sync_ts=time_now_ms,
+                    last_user_sync_ts=sync_time_msec,
                 ))
 
             if not is_syncing:
@@ -553,7 +553,7 @@ class PresenceHandler(object):
             if updates:
                 yield self._update_states(updates)
 
-            self.external_process_last_updated_ms[process_id] = time_now_ms
+            self.external_process_last_updated_ms[process_id] = self.clock.time_msec()
 
     @defer.inlineCallbacks
     def update_external_syncs_clear(self, process_id):
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index 68165cf2dc..84d2a2272a 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -189,29 +189,34 @@ class UserSyncCommand(Command):
     """Sent by the client to inform the server that a user has started or
     stopped syncing. Used to calculate presence on the master.
 
+    Includes a timestamp of when the last user sync was.
+
     Format::
 
-        USER_SYNC <user_id> <state>
+        USER_SYNC <user_id> <state> <last_sync_ms>
 
     Where <state> is either "start" or "stop"
     """
     NAME = "USER_SYNC"
 
-    def __init__(self, user_id, is_syncing):
+    def __init__(self, user_id, is_syncing, last_sync_ms):
         self.user_id = user_id
         self.is_syncing = is_syncing
+        self.last_sync_ms = last_sync_ms
 
     @classmethod
     def from_line(cls, line):
-        user_id, state = line.split(" ", 1)
+        user_id, state, last_sync_ms = line.split(" ", 2)
 
         if state not in ("start", "end"):
             raise Exception("Invalid USER_SYNC state %r" % (state,))
 
-        return cls(user_id, state == "start")
+        return cls(user_id, state == "start", int(last_sync_ms))
 
     def to_line(self):
-        return " ".join((self.user_id, "start" if self.is_syncing else "end"))
+        return " ".join((
+            self.user_id, "start" if self.is_syncing else "end", str(self.last_sync_ms),
+        ))
 
 
 class FederationAckCommand(Command):
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index c1dc91bdb7..80f732b455 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -368,7 +368,9 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
         self.name = cmd.data
 
     def on_USER_SYNC(self, cmd):
-        self.streamer.on_user_sync(self.conn_id, cmd.user_id, cmd.is_syncing)
+        self.streamer.on_user_sync(
+            self.conn_id, cmd.user_id, cmd.is_syncing, cmd.last_sync_ms,
+        )
 
     def on_REPLICATE(self, cmd):
         stream_name = cmd.stream_name
@@ -481,8 +483,9 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
         # Tell the server if we have any users currently syncing (should only
         # happen on synchrotrons)
         currently_syncing = self.handler.get_currently_syncing_users()
+        now = self.clock.time_msec()
         for user_id in currently_syncing:
-            self.send_command(UserSyncCommand(user_id, True))
+            self.send_command(UserSyncCommand(user_id, True, now))
 
         # We've now finished connecting to so inform the client handler
         self.handler.update_connection(self)
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index d5da0496a8..243a81d488 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -220,12 +220,12 @@ class ReplicationStreamer(object):
             self.federation_sender.federation_ack(token)
 
     @measure_func("repl.on_user_sync")
-    def on_user_sync(self, conn_id, user_id, is_syncing):
+    def on_user_sync(self, conn_id, user_id, is_syncing, last_sync_ms):
         """A client has started/stopped syncing on a worker.
         """
         user_sync_counter.inc()
         self.presence_handler.update_external_syncs_row(
-            conn_id, user_id, is_syncing
+            conn_id, user_id, is_syncing, last_sync_ms,
         )
 
     @measure_func("repl.on_remove_pusher")