summary refs log tree commit diff
diff options
context:
space:
mode:
authorMark Haines <mark.haines@matrix.org>2016-06-02 11:55:42 +0100
committerMark Haines <mark.haines@matrix.org>2016-06-02 11:56:11 +0100
commit40c7c81da930fddafb5a1c1a7f0bb06751e350a0 (patch)
treeb350d538eee96868315679716c7ee3add9952377
parentMerge branch 'markjh/external_presence' into markjh/synchrotron (diff)
downloadsynapse-40c7c81da930fddafb5a1c1a7f0bb06751e350a0.tar.xz
Prod the main synapse with the list of syncing users
-rw-r--r--synapse/app/synchrotron.py50
1 files changed, 45 insertions, 5 deletions
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 3875d0de23..5ea157a4ef 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -41,6 +41,7 @@ from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.logcontext import LoggingContext
 from synapse.util.manhole import manhole
 from synapse.util.rlimit import change_resource_limit
+from synapse.util.stringutils import random_string
 from synapse.util.versionstring import get_version_string
 
 from twisted.internet import reactor, defer
@@ -137,19 +138,56 @@ class SynchrotronSlavedStore(
 
 
 class SynchrotronPresence(object):
+    def __init__(self, hs):
+        self.http_client = hs.get_simple_http_client()
+        self.user_to_num_current_syncs = {}
+        self.process_id = random_string(16)
+        self.syncing_users_url = hs.config.replication_url + "/syncing_users"
+        logger.info("Presence process_id is %r", self.process_id)
+
     def set_state(self, user, state):
         pass
 
     def get_states(self, user_ids, as_event=False):
         return {}
 
-    @contextlib.contextmanager
-    def user_syncing(self, user, affect_presence):
-        yield
-
     def current_state_for_users(self, user_ids):
         return {}
 
+    @defer.inlineCallbacks
+    def user_syncing(self, user_id, affect_presence):
+        if affect_presence:
+            curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
+            self.user_to_num_current_syncs[user_id] = curr_sync + 1
+            # TODO: Send this less frequently.
+            # TODO: Make sure this doesn't race. Currently we can lose updates
+            # if two users come online in quick sucession and the second http
+            # to the master completes before the first.
+            # TODO: Don't block the sync request on this HTTP hit.
+            yield self._send_syncing_users()
+
+        def _end():
+            if affect_presence:
+                self.user_to_num_current_syncs[user_id] -= 1
+
+        @contextlib.contextmanager
+        def _user_syncing():
+            try:
+                yield
+            finally:
+                _end()
+
+        defer.returnValue(_user_syncing())
+
+    def _send_syncing_users(self):
+        return self.http_client.post_json_get_json(self.syncing_users_url, {
+            "process_id": self.process_id,
+            "syncing_users": [
+                user_id for user_id, count in self.user_to_num_current_syncs.items()
+                if count > 0
+            ],
+        })
+
 
 class SynchrotronTyping(object):
     _latest_room_serial = 0
@@ -274,6 +312,9 @@ class SynchrotronServer(HomeServer):
                 logger.exception("Error replicating from %r", replication_url)
                 sleep(5)
 
+    def build_presence_handler(self):
+        return SynchrotronPresence(self)
+
 
 def setup(config_options):
     try:
@@ -297,7 +338,6 @@ def setup(config_options):
         config=config,
         version_string=get_version_string("Synapse", synapse),
         database_engine=database_engine,
-        presence_handler=SynchrotronPresence(),
         typing_handler=SynchrotronTyping(),
         application_service_handler=SynchrotronApplicationService(),
     )