summary refs log tree commit diff
diff options
context:
space:
mode:
authorMark Haines <mark.haines@matrix.org>2016-06-02 13:59:24 +0100
committerMark Haines <mark.haines@matrix.org>2016-06-02 13:59:24 +0100
commit08e60476d5b71d0c13a250a84f12cb1ea7960a41 (patch)
treeb892ab25f7deae1c3e0ebda1165a35da450bf2b6
parentMerge branch 'markjh/external_presence' into markjh/synchrotron (diff)
downloadsynapse-08e60476d5b71d0c13a250a84f12cb1ea7960a41.tar.xz
Replicate the presence into the synchrotron
-rw-r--r--synapse/app/synchrotron.py44
-rw-r--r--synapse/replication/slave/storage/account_data.py2
-rw-r--r--synapse/replication/slave/storage/presence.py59
-rw-r--r--synapse/storage/__init__.py6
4 files changed, 97 insertions, 14 deletions
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 5ea157a4ef..c1338e8e36 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -22,6 +22,7 @@ from synapse.config.database import DatabaseConfig
 from synapse.config.logger import LoggingConfig
 from synapse.config.appservice import AppServiceConfig
 from synapse.events import FrozenEvent
+from synapse.handlers.presence import PresenceHandler
 from synapse.http.site import SynapseSite
 from synapse.http.server import JsonResource
 from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
@@ -33,8 +34,10 @@ from synapse.replication.slave.storage.appservice import SlavedApplicationServic
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
 from synapse.replication.slave.storage.filtering import SlavedFilteringStore
 from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
+from synapse.replication.slave.storage.presence import SlavedPresenceStore
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
+from synapse.storage.presence import UserPresenceState
 from synapse.storage.roommember import RoomMemberStore
 from synapse.util.async import sleep
 from synapse.util.httpresourcetree import create_resource_tree
@@ -115,12 +118,8 @@ class SynchrotronSlavedStore(
     SlavedApplicationServiceStore,
     SlavedRegistrationStore,
     SlavedFilteringStore,
+    SlavedPresenceStore,
 ):
-    def get_current_presence_token(self):
-        return 0
-
-    presence_stream_cache = ()
-
     def get_presence_list_accepted(self, user_localpart):
         return ()
 
@@ -140,19 +139,26 @@ class SynchrotronSlavedStore(
 class SynchrotronPresence(object):
     def __init__(self, hs):
         self.http_client = hs.get_simple_http_client()
+        self.store = hs.get_datastore()
         self.user_to_num_current_syncs = {}
-        self.process_id = random_string(16)
         self.syncing_users_url = hs.config.replication_url + "/syncing_users"
+        self.clock = hs.get_clock()
+
+        active_presence = self.store.take_presence_startup_info()
+        self.user_to_current_state = {
+            state.user_id: state
+            for state in active_presence
+        }
+
+        self.process_id = random_string(16)
         logger.info("Presence process_id is %r", self.process_id)
 
     def set_state(self, user, state):
+        # TODO Hows this supposed to work?
         pass
 
-    def get_states(self, user_ids, as_event=False):
-        return {}
-
-    def current_state_for_users(self, user_ids):
-        return {}
+    get_states = PresenceHandler.get_states.__func__
+    current_state_for_users = PresenceHandler.current_state_for_users.__func__
 
     @defer.inlineCallbacks
     def user_syncing(self, user_id, affect_presence):
@@ -188,6 +194,20 @@ class SynchrotronPresence(object):
             ],
         })
 
+    def process_replication(self, result):
+        stream = result.get("presence", {"rows": []})
+        for row in stream["rows"]:
+            (
+                position, user_id, state, last_active_ts,
+                last_federation_update_ts, last_user_sync_ts, status_msg,
+                currently_active
+            ) = row
+            self.user_to_current_state[user_id] = UserPresenceState(
+                user_id, state, last_active_ts,
+                last_federation_update_ts, last_user_sync_ts, status_msg,
+                currently_active
+            )
+
 
 class SynchrotronTyping(object):
     _latest_room_serial = 0
@@ -273,6 +293,7 @@ class SynchrotronServer(HomeServer):
         replication_url = self.config.replication_url
         clock = self.get_clock()
         notifier = self.get_notifier()
+        presence_handler = self.get_presence_handler()
 
         def expire_broken_caches():
             store.who_forgot_in_room.invalidate_all()
@@ -307,6 +328,7 @@ class SynchrotronServer(HomeServer):
                         now_ms + store.BROKEN_CACHE_EXPIRY_MS
                     )
                 yield store.process_replication(result)
+                presence_handler.process_replication(result)
                 notify(result)
             except:
                 logger.exception("Error replicating from %r", replication_url)
diff --git a/synapse/replication/slave/storage/account_data.py b/synapse/replication/slave/storage/account_data.py
index 5a44d314a3..735c03c7eb 100644
--- a/synapse/replication/slave/storage/account_data.py
+++ b/synapse/replication/slave/storage/account_data.py
@@ -96,3 +96,5 @@ class SlavedAccountDataStore(BaseSlavedStore):
                 self._account_data_stream_cache.entity_has_changed(
                     user_id, position
                 )
+
+        return super(SlavedAccountDataStore, self).process_replication(result)
diff --git a/synapse/replication/slave/storage/presence.py b/synapse/replication/slave/storage/presence.py
new file mode 100644
index 0000000000..703f4a49bf
--- /dev/null
+++ b/synapse/replication/slave/storage/presence.py
@@ -0,0 +1,59 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import BaseSlavedStore
+from ._slaved_id_tracker import SlavedIdTracker
+
+from synapse.util.caches.stream_change_cache import StreamChangeCache
+from synapse.storage import DataStore
+
+
+class SlavedPresenceStore(BaseSlavedStore):
+    def __init__(self, db_conn, hs):
+        super(SlavedPresenceStore, self).__init__(db_conn, hs)
+        self._presence_id_gen = SlavedIdTracker(
+            db_conn, "presence_stream", "stream_id",
+        )
+
+        self._presence_on_startup = self._get_active_presence(db_conn)
+
+        self.presence_stream_cache = self.presence_stream_cache = StreamChangeCache(
+            "PresenceStreamChangeCache", self._presence_id_gen.get_current_token()
+        )
+
+    _get_active_presence = DataStore._get_active_presence.__func__
+    take_presence_startup_info = DataStore.take_presence_startup_info.__func__
+    get_presence_for_users = DataStore.get_presence_for_users.__func__
+
+    def get_current_presence_token(self):
+        return self._presence_id_gen.get_current_token()
+
+    def stream_positions(self):
+        result = super(SlavedPresenceStore, self).stream_positions()
+        position = self._presence_id_gen.get_current_token()
+        result["presence"] = position
+        return result
+
+    def process_replication(self, result):
+        stream = result.get("presence")
+        if stream:
+            self._presence_id_gen.advance(int(stream["position"]))
+            for row in stream["rows"]:
+                position, user_id = row[:2]
+                self.presence_stream_cache.entity_has_changed(
+                    user_id, position
+                )
+
+        return super(SlavedPresenceStore, self).process_replication(result)
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 8581796b7e..6928a213e8 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -149,7 +149,7 @@ class DataStore(RoomMemberStore, RoomStore,
             "AccountDataAndTagsChangeCache", account_max,
         )
 
-        self.__presence_on_startup = self._get_active_presence(db_conn)
+        self._presence_on_startup = self._get_active_presence(db_conn)
 
         presence_cache_prefill, min_presence_val = self._get_cache_dict(
             db_conn, "presence_stream",
@@ -190,8 +190,8 @@ class DataStore(RoomMemberStore, RoomStore,
         super(DataStore, self).__init__(hs)
 
     def take_presence_startup_info(self):
-        active_on_startup = self.__presence_on_startup
-        self.__presence_on_startup = None
+        active_on_startup = self._presence_on_startup
+        self._presence_on_startup = None
         return active_on_startup
 
     def _get_active_presence(self, db_conn):