From 24d35ab47bdec651706a221974424409d9ab036b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 27 Mar 2017 14:03:38 +0100 Subject: Add new storage functions for new replication The new replication protocol will keep all the streams separate, rather than muxing multiple streams into one. --- synapse/handlers/typing.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'synapse/handlers') diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 0eea7f8f9c..d6809862e0 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -293,6 +293,9 @@ class TypingHandler(object): rows.sort() return rows + def get_current_token(self): + return self._latest_room_serial + class TypingNotificationEventSource(object): def __init__(self, hs): -- cgit 1.5.1 From e9dd8370b0ae8ecc1ac3bf13857a230ae9f72199 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 27 Mar 2017 15:56:25 +0100 Subject: Add functions to presence to support remote syncs The TCP replication protocol streams deltas of who has started or stopped syncing. This is different from the HTTP API which periodically sends the full list of users who are syncing. This commit adds support for the new TCP style of sending deltas. --- synapse/handlers/presence.py | 66 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) (limited to 'synapse/handlers') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 1ede117c79..9f9257029e 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -30,6 +30,7 @@ from synapse.api.constants import PresenceState from synapse.storage.presence import UserPresenceState from synapse.util.caches.descriptors import cachedInlineCallbacks +from synapse.util.async import Linearizer from synapse.util.logcontext import preserve_fn from synapse.util.logutils import log_function from synapse.util.metrics import Measure @@ -187,6 +188,7 @@ class PresenceHandler(object): # process_id to millisecond timestamp last updated. self.external_process_to_current_syncs = {} self.external_process_last_updated_ms = {} + self.external_sync_linearizer = Linearizer(name="external_sync_linearizer") # Start a LoopingCall in 30s that fires every 5s. # The initial delay is to allow disconnected clients a chance to @@ -508,6 +510,70 @@ class PresenceHandler(object): self.external_process_last_updated_ms[process_id] = self.clock.time_msec() self.external_process_to_current_syncs[process_id] = syncing_user_ids + @defer.inlineCallbacks + def update_external_syncs_row(self, process_id, user_id, is_syncing): + """Update the syncing users for an external process as a delta. + + Args: + process_id (str): An identifier for the process the users are + syncing against. This allows synapse to process updates + as user start and stop syncing against a given process. + user_id (str): The user who has started or stopped syncing + is_syncing (bool): Whether or not the user is now syncing + """ + with (yield self.external_sync_linearizer.queue(process_id)): + prev_state = yield self.current_state_for_user(user_id) + + process_presence = self.external_process_to_current_syncs.setdefault( + process_id, set() + ) + time_now_ms = self.clock.time_msec() + + updates = [] + if is_syncing and user_id not in process_presence: + if prev_state.state == PresenceState.OFFLINE: + updates.append(prev_state.copy_and_replace( + state=PresenceState.ONLINE, + last_active_ts=time_now_ms, + last_user_sync_ts=time_now_ms, + )) + else: + updates.append(prev_state.copy_and_replace( + last_user_sync_ts=time_now_ms, + )) + process_presence.add(user_id) + elif user_id in process_presence: + updates.append(prev_state.copy_and_replace( + last_user_sync_ts=time_now_ms, + )) + + if updates: + yield self._update_states(updates) + + self.external_process_last_updated_ms[process_id] = self.clock.time_msec() + + @defer.inlineCallbacks + def update_external_syncs_clear(self, process_id): + """Marks all users that had been marked as syncing by a given process + as offline. + + Used when the process has stopped/disappeared. + """ + with (yield self.external_sync_linearizer.queue(process_id)): + process_presence = self.external_process_to_current_syncs.pop( + process_id, set() + ) + prev_states = yield self.current_state_for_users(process_presence) + time_now_ms = self.clock.time_msec() + + yield self._update_states([ + prev_state.copy_and_replace( + last_user_sync_ts=time_now_ms, + ) + for prev_state in prev_states.itervalues() + ]) + self.external_process_last_updated_ms.pop(process_id, None) + @defer.inlineCallbacks def current_state_for_user(self, user_id): """Get the current presence state for a user. -- cgit 1.5.1 From 63fcc42990765563867536de550d18412f172626 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 30 Mar 2017 14:26:08 +0100 Subject: Remove user from process_presence when stops syncing --- synapse/handlers/presence.py | 1 + 1 file changed, 1 insertion(+) (limited to 'synapse/handlers') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 9f9257029e..ac6d1107bf 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -546,6 +546,7 @@ class PresenceHandler(object): updates.append(prev_state.copy_and_replace( last_user_sync_ts=time_now_ms, )) + process_presence.discard(user_id) if updates: yield self._update_states(updates) -- cgit 1.5.1