diff options
author | Mark Haines <mark.haines@matrix.org> | 2016-06-02 11:55:42 +0100 |
---|---|---|
committer | Mark Haines <mark.haines@matrix.org> | 2016-06-02 11:56:11 +0100 |
commit | 40c7c81da930fddafb5a1c1a7f0bb06751e350a0 (patch) | |
tree | b350d538eee96868315679716c7ee3add9952377 | |
parent | Merge branch 'markjh/external_presence' into markjh/synchrotron (diff) | |
download | synapse-40c7c81da930fddafb5a1c1a7f0bb06751e350a0.tar.xz |
Prod the main synapse with the list of syncing users
-rw-r--r-- | synapse/app/synchrotron.py | 50 |
1 files changed, 45 insertions, 5 deletions
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 3875d0de23..5ea157a4ef 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -41,6 +41,7 @@ from synapse.util.httpresourcetree import create_resource_tree from synapse.util.logcontext import LoggingContext from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit +from synapse.util.stringutils import random_string from synapse.util.versionstring import get_version_string from twisted.internet import reactor, defer @@ -137,19 +138,56 @@ class SynchrotronSlavedStore( class SynchrotronPresence(object): + def __init__(self, hs): + self.http_client = hs.get_simple_http_client() + self.user_to_num_current_syncs = {} + self.process_id = random_string(16) + self.syncing_users_url = hs.config.replication_url + "/syncing_users" + logger.info("Presence process_id is %r", self.process_id) + def set_state(self, user, state): pass def get_states(self, user_ids, as_event=False): return {} - @contextlib.contextmanager - def user_syncing(self, user, affect_presence): - yield - def current_state_for_users(self, user_ids): return {} + @defer.inlineCallbacks + def user_syncing(self, user_id, affect_presence): + if affect_presence: + curr_sync = self.user_to_num_current_syncs.get(user_id, 0) + self.user_to_num_current_syncs[user_id] = curr_sync + 1 + # TODO: Send this less frequently. + # TODO: Make sure this doesn't race. Currently we can lose updates + # if two users come online in quick sucession and the second http + # to the master completes before the first. + # TODO: Don't block the sync request on this HTTP hit. + yield self._send_syncing_users() + + def _end(): + if affect_presence: + self.user_to_num_current_syncs[user_id] -= 1 + + @contextlib.contextmanager + def _user_syncing(): + try: + yield + finally: + _end() + + defer.returnValue(_user_syncing()) + + def _send_syncing_users(self): + return self.http_client.post_json_get_json(self.syncing_users_url, { + "process_id": self.process_id, + "syncing_users": [ + user_id for user_id, count in self.user_to_num_current_syncs.items() + if count > 0 + ], + }) + class SynchrotronTyping(object): _latest_room_serial = 0 @@ -274,6 +312,9 @@ class SynchrotronServer(HomeServer): logger.exception("Error replicating from %r", replication_url) sleep(5) + def build_presence_handler(self): + return SynchrotronPresence(self) + def setup(config_options): try: @@ -297,7 +338,6 @@ def setup(config_options): config=config, version_string=get_version_string("Synapse", synapse), database_engine=database_engine, - presence_handler=SynchrotronPresence(), typing_handler=SynchrotronTyping(), application_service_handler=SynchrotronApplicationService(), ) |