diff options
Diffstat (limited to 'synapse/app')
-rw-r--r-- | synapse/app/synchrotron.py | 53 |
1 files changed, 43 insertions, 10 deletions
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index f592ad352e..7b45c87a96 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -16,7 +16,7 @@ import synapse -from synapse.api.constants import EventTypes +from synapse.api.constants import EventTypes, PresenceState from synapse.config._base import ConfigError from synapse.config.database import DatabaseConfig from synapse.config.logger import LoggingConfig @@ -41,7 +41,7 @@ from synapse.storage.presence import UserPresenceState from synapse.storage.roommember import RoomMemberStore from synapse.util.async import sleep from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext +from synapse.util.logcontext import LoggingContext, preserve_fn from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.stringutils import random_string @@ -135,6 +135,8 @@ class SynchrotronSlavedStore( RoomMemberStore.__dict__["who_forgot_in_room"] ) +UPDATE_SYNCING_USERS_MS = 10 * 1000 + class SynchrotronPresence(object): def __init__(self, hs): @@ -153,6 +155,13 @@ class SynchrotronPresence(object): self.process_id = random_string(16) logger.info("Presence process_id is %r", self.process_id) + self._sending_sync = False + self._need_to_send_sync = False + self.clock.looping_call( + self._send_syncing_users_regularly, + UPDATE_SYNCING_USERS_MS, + ) + def set_state(self, user, state): # TODO Hows this supposed to work? pass @@ -165,12 +174,10 @@ class SynchrotronPresence(object): if affect_presence: curr_sync = self.user_to_num_current_syncs.get(user_id, 0) self.user_to_num_current_syncs[user_id] = curr_sync + 1 - # TODO: Send this less frequently. - # TODO: Make sure this doesn't race. Currently we can lose updates - # if two users come online in quick sucession and the second http - # to the master completes before the first. - # TODO: Don't block the sync request on this HTTP hit. - yield self._send_syncing_users() + prev_states = yield self.current_state_for_users([user_id]) + if prev_states[user_id].state == PresenceState.OFFLINE: + # TODO: Don't block the sync request on this HTTP hit. + yield self._send_syncing_users_now() def _end(): if affect_presence: @@ -185,8 +192,24 @@ class SynchrotronPresence(object): defer.returnValue(_user_syncing()) - def _send_syncing_users(self): - return self.http_client.post_json_get_json(self.syncing_users_url, { + def _send_syncing_users_regularly(self): + # Only send an update if we aren't in the middle of sending one. + if not self._sending_sync: + preserve_fn(self._send_syncing_users_now)() + + @defer.inlineCallbacks + def _send_syncing_users_now(self): + if self._sending_sync: + # We don't want to race with sending another update. + # Instead we wait for that update to finish and send another + # update afterwards. + self._need_to_send_sync = True + return + + # Flag that we are sending an update. + self._sending_sync = True + + yield self.http_client.post_json_get_json(self.syncing_users_url, { "process_id": self.process_id, "syncing_users": [ user_id for user_id, count in self.user_to_num_current_syncs.items() @@ -194,6 +217,16 @@ class SynchrotronPresence(object): ], }) + # Unset the flag as we are no longer sending an update. + self._sending_sync = False + if self._need_to_send_sync: + # If something happened while we were sending the update then + # we might need to send another update. + # TODO: Check if the update that was sent matches the current state + # as we only need to send an update if they are different. + self._need_to_send_sync = False + yield self._send_syncing_users_now() + def process_replication(self, result): stream = result.get("presence", {"rows": []}) for row in stream["rows"]: |