summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/app/synchrotron.py36
-rw-r--r--synapse/handlers/__init__.py3
-rw-r--r--synapse/handlers/presence.py27
-rw-r--r--synapse/rest/client/v1/events.py9
-rw-r--r--synapse/server.py9
5 files changed, 66 insertions, 18 deletions
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 48bc97636c..3dca1c37a0 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -26,6 +26,7 @@ from synapse.http.site import SynapseSite
 from synapse.http.server import JsonResource
 from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
 from synapse.rest.client.v2_alpha import sync
+from synapse.rest.client.v1 import events
 from synapse.replication.slave.storage._base import BaseSlavedStore
 from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
@@ -89,17 +90,23 @@ class SynchrotronSlavedStore(
     get_presence_list_accepted = PresenceStore.__dict__[
         "get_presence_list_accepted"
     ]
+    get_presence_list_observers_accepted = PresenceStore.__dict__[
+        "get_presence_list_observers_accepted"
+    ]
+
 
 UPDATE_SYNCING_USERS_MS = 10 * 1000
 
 
 class SynchrotronPresence(object):
     def __init__(self, hs):
+        self.is_mine_id = hs.is_mine_id
         self.http_client = hs.get_simple_http_client()
         self.store = hs.get_datastore()
         self.user_to_num_current_syncs = {}
         self.syncing_users_url = hs.config.worker_replication_url + "/syncing_users"
         self.clock = hs.get_clock()
+        self.notifier = hs.get_notifier()
 
         active_presence = self.store.take_presence_startup_info()
         self.user_to_current_state = {
@@ -124,6 +131,8 @@ class SynchrotronPresence(object):
         pass
 
     get_states = PresenceHandler.get_states.__func__
+    get_state = PresenceHandler.get_state.__func__
+    _get_interested_parties = PresenceHandler._get_interested_parties.__func__
     current_state_for_users = PresenceHandler.current_state_for_users.__func__
 
     @defer.inlineCallbacks
@@ -194,19 +203,39 @@ class SynchrotronPresence(object):
             self._need_to_send_sync = False
             yield self._send_syncing_users_now()
 
+    @defer.inlineCallbacks
+    def notify_from_replication(self, states, stream_id):
+        parties = yield self._get_interested_parties(
+            states, calculate_remote_hosts=False
+        )
+        room_ids_to_states, users_to_states, _ = parties
+
+        self.notifier.on_new_event(
+            "presence_key", stream_id, rooms=room_ids_to_states.keys(),
+            users=users_to_states.keys()
+        )
+
+    @defer.inlineCallbacks
     def process_replication(self, result):
         stream = result.get("presence", {"rows": []})
+        states = []
         for row in stream["rows"]:
             (
                 position, user_id, state, last_active_ts,
                 last_federation_update_ts, last_user_sync_ts, status_msg,
                 currently_active
             ) = row
-            self.user_to_current_state[user_id] = UserPresenceState(
+            state = UserPresenceState(
                 user_id, state, last_active_ts,
                 last_federation_update_ts, last_user_sync_ts, status_msg,
                 currently_active
             )
+            self.user_to_current_state[user_id] = state
+            states.append(state)
+
+        if states and "position" in stream:
+            stream_id = int(stream["position"])
+            yield self.notify_from_replication(states, stream_id)
 
 
 class SynchrotronTyping(object):
@@ -266,10 +295,12 @@ class SynchrotronServer(HomeServer):
                 elif name == "client":
                     resource = JsonResource(self, canonical_json=False)
                     sync.register_servlets(self, resource)
+                    events.register_servlets(self, resource)
                     resources.update({
                         "/_matrix/client/r0": resource,
                         "/_matrix/client/unstable": resource,
                         "/_matrix/client/v2_alpha": resource,
+                        "/_matrix/client/api/v1": resource,
                     })
 
         root_resource = create_resource_tree(resources, Resource())
@@ -315,6 +346,7 @@ class SynchrotronServer(HomeServer):
         def expire_broken_caches():
             store.who_forgot_in_room.invalidate_all()
             store.get_presence_list_accepted.invalidate_all()
+            store.get_presence_list_observers_accepted.invalidate_all()
 
         def notify_from_stream(
             result, stream_name, stream_key, room=None, user=None
@@ -392,7 +424,7 @@ class SynchrotronServer(HomeServer):
                     )
                 yield store.process_replication(result)
                 typing_handler.process_replication(result)
-                presence_handler.process_replication(result)
+                yield presence_handler.process_replication(result)
                 notify(result)
             except:
                 logger.exception("Error replicating from %r", replication_url)
diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py
index 1a50a2ec98..63d05f2531 100644
--- a/synapse/handlers/__init__.py
+++ b/synapse/handlers/__init__.py
@@ -19,7 +19,6 @@ from .room import (
 )
 from .room_member import RoomMemberHandler
 from .message import MessageHandler
-from .events import EventStreamHandler, EventHandler
 from .federation import FederationHandler
 from .profile import ProfileHandler
 from .directory import DirectoryHandler
@@ -53,8 +52,6 @@ class Handlers(object):
         self.message_handler = MessageHandler(hs)
         self.room_creation_handler = RoomCreationHandler(hs)
         self.room_member_handler = RoomMemberHandler(hs)
-        self.event_stream_handler = EventStreamHandler(hs)
-        self.event_handler = EventHandler(hs)
         self.federation_handler = FederationHandler(hs)
         self.profile_handler = ProfileHandler(hs)
         self.directory_handler = DirectoryHandler(hs)
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 2293b5fdf7..6a1fe76c88 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -503,7 +503,7 @@ class PresenceHandler(object):
         defer.returnValue(states)
 
     @defer.inlineCallbacks
-    def _get_interested_parties(self, states):
+    def _get_interested_parties(self, states, calculate_remote_hosts=True):
         """Given a list of states return which entities (rooms, users, servers)
         are interested in the given states.
 
@@ -526,14 +526,15 @@ class PresenceHandler(object):
             users_to_states.setdefault(state.user_id, []).append(state)
 
         hosts_to_states = {}
-        for room_id, states in room_ids_to_states.items():
-            local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
-            if not local_states:
-                continue
+        if calculate_remote_hosts:
+            for room_id, states in room_ids_to_states.items():
+                local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
+                if not local_states:
+                    continue
 
-            hosts = yield self.store.get_joined_hosts_for_room(room_id)
-            for host in hosts:
-                hosts_to_states.setdefault(host, []).extend(local_states)
+                hosts = yield self.store.get_joined_hosts_for_room(room_id)
+                for host in hosts:
+                    hosts_to_states.setdefault(host, []).extend(local_states)
 
         for user_id, states in users_to_states.items():
             local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
@@ -565,6 +566,16 @@ class PresenceHandler(object):
 
         self._push_to_remotes(hosts_to_states)
 
+    @defer.inlineCallbacks
+    def notify_for_states(self, state, stream_id):
+        parties = yield self._get_interested_parties([state])
+        room_ids_to_states, users_to_states, hosts_to_states = parties
+
+        self.notifier.on_new_event(
+            "presence_key", stream_id, rooms=room_ids_to_states.keys(),
+            users=[UserID.from_string(u) for u in users_to_states.keys()]
+        )
+
     def _push_to_remotes(self, hosts_to_states):
         """Sends state updates to remote servers.
 
diff --git a/synapse/rest/client/v1/events.py b/synapse/rest/client/v1/events.py
index 998b115bb9..701b6f549b 100644
--- a/synapse/rest/client/v1/events.py
+++ b/synapse/rest/client/v1/events.py
@@ -34,7 +34,7 @@ class EventStreamRestServlet(ClientV1RestServlet):
 
     def __init__(self, hs):
         super(EventStreamRestServlet, self).__init__(hs)
-        self.handlers = hs.get_handlers()
+        self.event_stream_handler = hs.get_event_stream_handler()
 
     @defer.inlineCallbacks
     def on_GET(self, request):
@@ -50,7 +50,6 @@ class EventStreamRestServlet(ClientV1RestServlet):
         if "room_id" in request.args:
             room_id = request.args["room_id"][0]
 
-        handler = self.handlers.event_stream_handler
         pagin_config = PaginationConfig.from_request(request)
         timeout = EventStreamRestServlet.DEFAULT_LONGPOLL_TIME_MS
         if "timeout" in request.args:
@@ -61,7 +60,7 @@ class EventStreamRestServlet(ClientV1RestServlet):
 
         as_client_event = "raw" not in request.args
 
-        chunk = yield handler.get_stream(
+        chunk = yield self.event_stream_handler.get_stream(
             requester.user.to_string(),
             pagin_config,
             timeout=timeout,
@@ -84,12 +83,12 @@ class EventRestServlet(ClientV1RestServlet):
     def __init__(self, hs):
         super(EventRestServlet, self).__init__(hs)
         self.clock = hs.get_clock()
+        self.event_handler = hs.get_event_handler()
 
     @defer.inlineCallbacks
     def on_GET(self, request, event_id):
         requester = yield self.auth.get_user_by_req(request)
-        handler = self.handlers.event_handler
-        event = yield handler.get_event(requester.user, event_id)
+        event = yield self.event_handler.get_event(requester.user, event_id)
 
         time_now = self.clock.time_msec()
         if event:
diff --git a/synapse/server.py b/synapse/server.py
index 6bb4988309..af3246504b 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -41,6 +41,7 @@ from synapse.handlers.presence import PresenceHandler
 from synapse.handlers.room import RoomListHandler
 from synapse.handlers.sync import SyncHandler
 from synapse.handlers.typing import TypingHandler
+from synapse.handlers.events import EventHandler, EventStreamHandler
 from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory
 from synapse.http.matrixfederationclient import MatrixFederationHttpClient
 from synapse.notifier import Notifier
@@ -94,6 +95,8 @@ class HomeServer(object):
         'auth_handler',
         'device_handler',
         'e2e_keys_handler',
+        'event_handler',
+        'event_stream_handler',
         'application_service_api',
         'application_service_scheduler',
         'application_service_handler',
@@ -214,6 +217,12 @@ class HomeServer(object):
     def build_application_service_handler(self):
         return ApplicationServicesHandler(self)
 
+    def build_event_handler(self):
+        return EventHandler(self)
+
+    def build_event_stream_handler(self):
+        return EventStreamHandler(self)
+
     def build_event_sources(self):
         return EventSources(self)