summary refs log tree commit diff
path: root/synapse/app/synchrotron.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/app/synchrotron.py')
-rw-r--r--synapse/app/synchrotron.py54
1 files changed, 34 insertions, 20 deletions
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 215ccfd522..e3173533e2 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -26,6 +26,7 @@ from synapse.http.site import SynapseSite
 from synapse.http.server import JsonResource
 from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
 from synapse.rest.client.v2_alpha import sync
+from synapse.rest.client.v1 import events
 from synapse.replication.slave.storage._base import BaseSlavedStore
 from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
@@ -74,11 +75,6 @@ class SynchrotronSlavedStore(
     BaseSlavedStore,
     ClientIpStore,  # After BaseSlavedStore because the constructor is different
 ):
-    # XXX: This is a bit broken because we don't persist forgotten rooms
-    # in a way that they can be streamed. This means that we don't have a
-    # way to invalidate the forgotten rooms cache correctly.
-    # For now we expire the cache every 10 minutes.
-    BROKEN_CACHE_EXPIRY_MS = 60 * 60 * 1000
     who_forgot_in_room = (
         RoomMemberStore.__dict__["who_forgot_in_room"]
     )
@@ -89,17 +85,23 @@ class SynchrotronSlavedStore(
     get_presence_list_accepted = PresenceStore.__dict__[
         "get_presence_list_accepted"
     ]
+    get_presence_list_observers_accepted = PresenceStore.__dict__[
+        "get_presence_list_observers_accepted"
+    ]
+
 
 UPDATE_SYNCING_USERS_MS = 10 * 1000
 
 
 class SynchrotronPresence(object):
     def __init__(self, hs):
+        self.is_mine_id = hs.is_mine_id
         self.http_client = hs.get_simple_http_client()
         self.store = hs.get_datastore()
         self.user_to_num_current_syncs = {}
         self.syncing_users_url = hs.config.worker_replication_url + "/syncing_users"
         self.clock = hs.get_clock()
+        self.notifier = hs.get_notifier()
 
         active_presence = self.store.take_presence_startup_info()
         self.user_to_current_state = {
@@ -119,11 +121,13 @@ class SynchrotronPresence(object):
 
         reactor.addSystemEventTrigger("before", "shutdown", self._on_shutdown)
 
-    def set_state(self, user, state):
+    def set_state(self, user, state, ignore_status_msg=False):
         # TODO Hows this supposed to work?
         pass
 
     get_states = PresenceHandler.get_states.__func__
+    get_state = PresenceHandler.get_state.__func__
+    _get_interested_parties = PresenceHandler._get_interested_parties.__func__
     current_state_for_users = PresenceHandler.current_state_for_users.__func__
 
     @defer.inlineCallbacks
@@ -194,19 +198,39 @@ class SynchrotronPresence(object):
             self._need_to_send_sync = False
             yield self._send_syncing_users_now()
 
+    @defer.inlineCallbacks
+    def notify_from_replication(self, states, stream_id):
+        parties = yield self._get_interested_parties(
+            states, calculate_remote_hosts=False
+        )
+        room_ids_to_states, users_to_states, _ = parties
+
+        self.notifier.on_new_event(
+            "presence_key", stream_id, rooms=room_ids_to_states.keys(),
+            users=users_to_states.keys()
+        )
+
+    @defer.inlineCallbacks
     def process_replication(self, result):
         stream = result.get("presence", {"rows": []})
+        states = []
         for row in stream["rows"]:
             (
                 position, user_id, state, last_active_ts,
                 last_federation_update_ts, last_user_sync_ts, status_msg,
                 currently_active
             ) = row
-            self.user_to_current_state[user_id] = UserPresenceState(
+            state = UserPresenceState(
                 user_id, state, last_active_ts,
                 last_federation_update_ts, last_user_sync_ts, status_msg,
                 currently_active
             )
+            self.user_to_current_state[user_id] = state
+            states.append(state)
+
+        if states and "position" in stream:
+            stream_id = int(stream["position"])
+            yield self.notify_from_replication(states, stream_id)
 
 
 class SynchrotronTyping(object):
@@ -266,10 +290,12 @@ class SynchrotronServer(HomeServer):
                 elif name == "client":
                     resource = JsonResource(self, canonical_json=False)
                     sync.register_servlets(self, resource)
+                    events.register_servlets(self, resource)
                     resources.update({
                         "/_matrix/client/r0": resource,
                         "/_matrix/client/unstable": resource,
                         "/_matrix/client/v2_alpha": resource,
+                        "/_matrix/client/api/v1": resource,
                     })
 
         root_resource = create_resource_tree(resources, Resource())
@@ -307,15 +333,10 @@ class SynchrotronServer(HomeServer):
         http_client = self.get_simple_http_client()
         store = self.get_datastore()
         replication_url = self.config.worker_replication_url
-        clock = self.get_clock()
         notifier = self.get_notifier()
         presence_handler = self.get_presence_handler()
         typing_handler = self.get_typing_handler()
 
-        def expire_broken_caches():
-            store.who_forgot_in_room.invalidate_all()
-            store.get_presence_list_accepted.invalidate_all()
-
         def notify_from_stream(
             result, stream_name, stream_key, room=None, user=None
         ):
@@ -377,22 +398,15 @@ class SynchrotronServer(HomeServer):
                 result, "typing", "typing_key", room="room_id"
             )
 
-        next_expire_broken_caches_ms = 0
         while True:
             try:
                 args = store.stream_positions()
                 args.update(typing_handler.stream_positions())
                 args["timeout"] = 30000
                 result = yield http_client.get_json(replication_url, args=args)
-                now_ms = clock.time_msec()
-                if now_ms > next_expire_broken_caches_ms:
-                    expire_broken_caches()
-                    next_expire_broken_caches_ms = (
-                        now_ms + store.BROKEN_CACHE_EXPIRY_MS
-                    )
                 yield store.process_replication(result)
                 typing_handler.process_replication(result)
-                presence_handler.process_replication(result)
+                yield presence_handler.process_replication(result)
                 notify(result)
             except:
                 logger.exception("Error replicating from %r", replication_url)