From 24d35ab47bdec651706a221974424409d9ab036b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 27 Mar 2017 14:03:38 +0100 Subject: Add new storage functions for new replication The new replication protocol will keep all the streams separate, rather than muxing multiple streams into one. --- synapse/handlers/typing.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'synapse/handlers') diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 0eea7f8f9c..d6809862e0 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -293,6 +293,9 @@ class TypingHandler(object): rows.sort() return rows + def get_current_token(self): + return self._latest_room_serial + class TypingNotificationEventSource(object): def __init__(self, hs): -- cgit 1.5.1 From e9dd8370b0ae8ecc1ac3bf13857a230ae9f72199 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 27 Mar 2017 15:56:25 +0100 Subject: Add functions to presence to support remote syncs The TCP replication protocol streams deltas of who has started or stopped syncing. This is different from the HTTP API which periodically sends the full list of users who are syncing. This commit adds support for the new TCP style of sending deltas. --- synapse/handlers/presence.py | 66 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) (limited to 'synapse/handlers') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 1ede117c79..9f9257029e 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -30,6 +30,7 @@ from synapse.api.constants import PresenceState from synapse.storage.presence import UserPresenceState from synapse.util.caches.descriptors import cachedInlineCallbacks +from synapse.util.async import Linearizer from synapse.util.logcontext import preserve_fn from synapse.util.logutils import log_function from synapse.util.metrics import Measure @@ -187,6 +188,7 @@ class PresenceHandler(object): # process_id to millisecond timestamp last updated. self.external_process_to_current_syncs = {} self.external_process_last_updated_ms = {} + self.external_sync_linearizer = Linearizer(name="external_sync_linearizer") # Start a LoopingCall in 30s that fires every 5s. # The initial delay is to allow disconnected clients a chance to @@ -508,6 +510,70 @@ class PresenceHandler(object): self.external_process_last_updated_ms[process_id] = self.clock.time_msec() self.external_process_to_current_syncs[process_id] = syncing_user_ids + @defer.inlineCallbacks + def update_external_syncs_row(self, process_id, user_id, is_syncing): + """Update the syncing users for an external process as a delta. + + Args: + process_id (str): An identifier for the process the users are + syncing against. This allows synapse to process updates + as user start and stop syncing against a given process. + user_id (str): The user who has started or stopped syncing + is_syncing (bool): Whether or not the user is now syncing + """ + with (yield self.external_sync_linearizer.queue(process_id)): + prev_state = yield self.current_state_for_user(user_id) + + process_presence = self.external_process_to_current_syncs.setdefault( + process_id, set() + ) + time_now_ms = self.clock.time_msec() + + updates = [] + if is_syncing and user_id not in process_presence: + if prev_state.state == PresenceState.OFFLINE: + updates.append(prev_state.copy_and_replace( + state=PresenceState.ONLINE, + last_active_ts=time_now_ms, + last_user_sync_ts=time_now_ms, + )) + else: + updates.append(prev_state.copy_and_replace( + last_user_sync_ts=time_now_ms, + )) + process_presence.add(user_id) + elif user_id in process_presence: + updates.append(prev_state.copy_and_replace( + last_user_sync_ts=time_now_ms, + )) + + if updates: + yield self._update_states(updates) + + self.external_process_last_updated_ms[process_id] = self.clock.time_msec() + + @defer.inlineCallbacks + def update_external_syncs_clear(self, process_id): + """Marks all users that had been marked as syncing by a given process + as offline. + + Used when the process has stopped/disappeared. + """ + with (yield self.external_sync_linearizer.queue(process_id)): + process_presence = self.external_process_to_current_syncs.pop( + process_id, set() + ) + prev_states = yield self.current_state_for_users(process_presence) + time_now_ms = self.clock.time_msec() + + yield self._update_states([ + prev_state.copy_and_replace( + last_user_sync_ts=time_now_ms, + ) + for prev_state in prev_states.itervalues() + ]) + self.external_process_last_updated_ms.pop(process_id, None) + @defer.inlineCallbacks def current_state_for_user(self, user_id): """Get the current presence state for a user. -- cgit 1.5.1 From 63fcc42990765563867536de550d18412f172626 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 30 Mar 2017 14:26:08 +0100 Subject: Remove user from process_presence when stops syncing --- synapse/handlers/presence.py | 1 + 1 file changed, 1 insertion(+) (limited to 'synapse/handlers') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 9f9257029e..ac6d1107bf 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -546,6 +546,7 @@ class PresenceHandler(object): updates.append(prev_state.copy_and_replace( last_user_sync_ts=time_now_ms, )) + process_presence.discard(user_id) if updates: yield self._update_states(updates) -- cgit 1.5.1 From 9d0170ac6cb54f5fe86cf1a5121d688772a544f0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 31 Mar 2017 11:36:32 +0100 Subject: Fix up presence --- synapse/handlers/presence.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index ac6d1107bf..9e14760659 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -546,12 +546,14 @@ class PresenceHandler(object): updates.append(prev_state.copy_and_replace( last_user_sync_ts=time_now_ms, )) + + if not is_syncing: process_presence.discard(user_id) if updates: yield self._update_states(updates) - self.external_process_last_updated_ms[process_id] = self.clock.time_msec() + self.external_process_last_updated_ms[process_id] = time_now_ms @defer.inlineCallbacks def update_external_syncs_clear(self, process_id): -- cgit 1.5.1 From 36d2b66f90dcdfae843c4b0ee7cba1dea23486f4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 31 Mar 2017 11:46:20 +0100 Subject: Add a timestamp to USER_SYNC command This timestamp is used to indicate when the user last sync'd --- synapse/handlers/presence.py | 14 +++++++------- synapse/replication/tcp/commands.py | 15 ++++++++++----- synapse/replication/tcp/protocol.py | 7 +++++-- synapse/replication/tcp/resource.py | 4 ++-- 4 files changed, 24 insertions(+), 16 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 9e14760659..53baf3e79a 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -511,7 +511,7 @@ class PresenceHandler(object): self.external_process_to_current_syncs[process_id] = syncing_user_ids @defer.inlineCallbacks - def update_external_syncs_row(self, process_id, user_id, is_syncing): + def update_external_syncs_row(self, process_id, user_id, is_syncing, sync_time_msec): """Update the syncing users for an external process as a delta. Args: @@ -520,6 +520,7 @@ class PresenceHandler(object): as user start and stop syncing against a given process. user_id (str): The user who has started or stopped syncing is_syncing (bool): Whether or not the user is now syncing + sync_time_msec(int): Time in ms when the user was last syncing """ with (yield self.external_sync_linearizer.queue(process_id)): prev_state = yield self.current_state_for_user(user_id) @@ -527,24 +528,23 @@ class PresenceHandler(object): process_presence = self.external_process_to_current_syncs.setdefault( process_id, set() ) - time_now_ms = self.clock.time_msec() updates = [] if is_syncing and user_id not in process_presence: if prev_state.state == PresenceState.OFFLINE: updates.append(prev_state.copy_and_replace( state=PresenceState.ONLINE, - last_active_ts=time_now_ms, - last_user_sync_ts=time_now_ms, + last_active_ts=sync_time_msec, + last_user_sync_ts=sync_time_msec, )) else: updates.append(prev_state.copy_and_replace( - last_user_sync_ts=time_now_ms, + last_user_sync_ts=sync_time_msec, )) process_presence.add(user_id) elif user_id in process_presence: updates.append(prev_state.copy_and_replace( - last_user_sync_ts=time_now_ms, + last_user_sync_ts=sync_time_msec, )) if not is_syncing: @@ -553,7 +553,7 @@ class PresenceHandler(object): if updates: yield self._update_states(updates) - self.external_process_last_updated_ms[process_id] = time_now_ms + self.external_process_last_updated_ms[process_id] = self.clock.time_msec() @defer.inlineCallbacks def update_external_syncs_clear(self, process_id): diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py index 68165cf2dc..84d2a2272a 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py @@ -189,29 +189,34 @@ class UserSyncCommand(Command): """Sent by the client to inform the server that a user has started or stopped syncing. Used to calculate presence on the master. + Includes a timestamp of when the last user sync was. + Format:: - USER_SYNC + USER_SYNC Where is either "start" or "stop" """ NAME = "USER_SYNC" - def __init__(self, user_id, is_syncing): + def __init__(self, user_id, is_syncing, last_sync_ms): self.user_id = user_id self.is_syncing = is_syncing + self.last_sync_ms = last_sync_ms @classmethod def from_line(cls, line): - user_id, state = line.split(" ", 1) + user_id, state, last_sync_ms = line.split(" ", 2) if state not in ("start", "end"): raise Exception("Invalid USER_SYNC state %r" % (state,)) - return cls(user_id, state == "start") + return cls(user_id, state == "start", int(last_sync_ms)) def to_line(self): - return " ".join((self.user_id, "start" if self.is_syncing else "end")) + return " ".join(( + self.user_id, "start" if self.is_syncing else "end", str(self.last_sync_ms), + )) class FederationAckCommand(Command): diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index c1dc91bdb7..80f732b455 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -368,7 +368,9 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): self.name = cmd.data def on_USER_SYNC(self, cmd): - self.streamer.on_user_sync(self.conn_id, cmd.user_id, cmd.is_syncing) + self.streamer.on_user_sync( + self.conn_id, cmd.user_id, cmd.is_syncing, cmd.last_sync_ms, + ) def on_REPLICATE(self, cmd): stream_name = cmd.stream_name @@ -481,8 +483,9 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol): # Tell the server if we have any users currently syncing (should only # happen on synchrotrons) currently_syncing = self.handler.get_currently_syncing_users() + now = self.clock.time_msec() for user_id in currently_syncing: - self.send_command(UserSyncCommand(user_id, True)) + self.send_command(UserSyncCommand(user_id, True, now)) # We've now finished connecting to so inform the client handler self.handler.update_connection(self) diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index d5da0496a8..243a81d488 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -220,12 +220,12 @@ class ReplicationStreamer(object): self.federation_sender.federation_ack(token) @measure_func("repl.on_user_sync") - def on_user_sync(self, conn_id, user_id, is_syncing): + def on_user_sync(self, conn_id, user_id, is_syncing, last_sync_ms): """A client has started/stopped syncing on a worker. """ user_sync_counter.inc() self.presence_handler.update_external_syncs_row( - conn_id, user_id, is_syncing + conn_id, user_id, is_syncing, last_sync_ms, ) @measure_func("repl.on_remove_pusher") -- cgit 1.5.1