summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2018-11-23 16:00:32 +0000
committerAndrew Morgan <andrew@amorgan.xyz>2019-02-13 14:23:21 +0000
commit3223f415e2e088ea1f8eff3af4d3415dc9e5531a (patch)
tree571a66900647eb4190077f7411b811eb4bc9606a
parentMissing file (diff)
downloadsynapse-3223f415e2e088ea1f8eff3af4d3415dc9e5531a.tar.xz
Add server health apis and server presence
-rw-r--r--synapse/event_auth.py3
-rw-r--r--synapse/handlers/sync.py25
-rw-r--r--synapse/rest/client/v1/admin.py87
-rw-r--r--synapse/storage/registration.py9
-rw-r--r--synapse/storage/roommember.py17
5 files changed, 131 insertions, 10 deletions
diff --git a/synapse/event_auth.py b/synapse/event_auth.py
index c81d8e6729..2f5f8819c1 100644
--- a/synapse/event_auth.py
+++ b/synapse/event_auth.py
@@ -103,6 +103,9 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
             "No create event in auth events",
         )
 
+    if event.type == "org.matrix.server_presence":
+        return
+
     creating_domain = get_domain_from_id(event.room_id)
     originating_domain = get_domain_from_id(event.sender)
     if creating_domain != originating_domain:
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 09739f2862..675e6c8e02 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -35,6 +35,7 @@ from synapse.util.caches.lrucache import LruCache
 from synapse.util.caches.response_cache import ResponseCache
 from synapse.util.logcontext import LoggingContext
 from synapse.util.metrics import Measure, measure_func
+from synapse.util.stringutils import random_string
 from synapse.visibility import filter_events_for_client
 
 logger = logging.getLogger(__name__)
@@ -211,6 +212,7 @@ class SyncHandler(object):
         self.response_cache = ResponseCache(hs, "sync")
         self.state = hs.get_state_handler()
         self.auth = hs.get_auth()
+        self.builder_factory = hs.get_event_builder_factory()
 
         # ExpiringCache((User, Device)) -> LruCache(state_key => event_id)
         self.lazy_loaded_members_cache = ExpiringCache(
@@ -709,7 +711,6 @@ class SyncHandler(object):
         # TODO(mjark) Check for new redactions in the state events.
 
         with Measure(self.clock, "compute_state_delta"):
-
             members_to_fetch = None
 
             lazy_load_members = sync_config.filter_collection.lazy_load_members()
@@ -858,6 +859,28 @@ class SyncHandler(object):
         if state_ids:
             state = yield self.store.get_events(list(state_ids.values()))
 
+        hosts_in_room = yield self.store.get_hosts_in_room(room_id)
+        destination_states = yield self.store.get_destination_states()
+
+        for host in hosts_in_room:
+            if host not in destination_states:
+                continue
+
+            if ("org.matrix.server_presence", host) in timeline_state:
+                continue
+
+            state[("org.matrix.server_presence", host)] = self.builder_factory.new({
+                "type": "org.matrix.server_presence",
+                "content": {
+                    "state": "connected" if destination_states[host] else "disconnected",
+                },
+                "state_key": host,
+                "event_id": random_string(24),
+                "origin_server_ts": self.clock.time_msec(),
+                "sender": "@server:server",
+                "room_id": room_id,
+            })
+
         defer.returnValue({
             (e.type, e.state_key): e
             for e in sync_config.filter_collection.filter_room_state(list(state.values()))
diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py
index 41534b8c2a..c729cdbd30 100644
--- a/synapse/rest/client/v1/admin.py
+++ b/synapse/rest/client/v1/admin.py
@@ -32,6 +32,7 @@ from synapse.http.servlet import (
     parse_string,
 )
 from synapse.types import UserID, create_requester
+from synapse.util.stringutils import random_string
 
 from .base import ClientV1RestServlet, client_path_patterns
 
@@ -740,6 +741,91 @@ class SearchUsersRestServlet(ClientV1RestServlet):
         defer.returnValue((200, ret))
 
 
+class ServerHealth(ClientV1RestServlet):
+    PATTERNS = client_path_patterns("/admin/server_health")
+
+    def __init__(self, hs):
+        super(ServerHealth, self).__init__(hs)
+        self.event_creation_handler = hs.get_event_creation_handler()
+        self.store = hs.get_datastore()
+        self.builder_factory = hs.get_event_builder_factory()
+        self.clock = hs.get_clock()
+
+    def on_GET(self, request):
+        return self.do_update()
+
+    def on_POST(self, request):
+        return self.do_update()
+
+    @defer.inlineCallbacks
+    def do_update(self):
+        hosts = yield self.store.get_all_destination_healths()
+
+        up_servers = set(h for h, c in hosts.items() if c is not None)
+        down_servers = set(h for h, c in hosts.items() if c is None)
+
+        rooms_to_hosts = yield self.store.get_all_hosts_and_room()
+
+        requester = create_requester(UserID("server", "server")),
+
+        state = yield self.store.get_destination_states()
+
+        new_up = set()
+        new_down = set()
+
+        for host in up_servers:
+            if state.get(host, True):
+                continue
+            new_up.add(host)
+
+            yield self.store.store_destination_state(host, True)
+
+        for host in down_servers:
+            if not state.get(host, True):
+                continue
+            new_down.add(host)
+
+            yield self.store.store_destination_state(host, False)
+
+        for room_id, hosts in rooms_to_hosts.items():
+            for host in hosts:
+                if host in new_up:
+                    new_state = "connected"
+                elif host in new_down:
+                    new_state = "disconnected"
+                else:
+                    continue
+
+                logger.info("Marking %s as %r", host, new_state)
+
+                builder = self.builder_factory.new({
+                    "type": "org.matrix.server_presence",
+                    "content": {
+                        "state": new_state,
+                    },
+                    "state_key": host,
+                    "event_id": random_string(24),
+                    "origin_server_ts": self.clock.time_msec(),
+                    "sender": "@server:server",
+                    "room_id": room_id,
+                })
+
+                event, context = yield self.event_creation_handler.create_new_client_event(
+                    builder=builder,
+                )
+                event.internal_metadata.internal_event = True
+                yield self.event_creation_handler.handle_new_client_event(
+                    requester,
+                    event,
+                    context,
+                    ratelimit=False,
+                    extra_users=[],
+                    do_auth=False,
+                )
+
+        defer.returnValue((200, {}))
+
+
 def register_servlets(hs, http_server):
     WhoisRestServlet(hs).register(http_server)
     PurgeMediaCacheRestServlet(hs).register(http_server)
@@ -754,3 +840,4 @@ def register_servlets(hs, http_server):
     QuarantineMediaInRoom(hs).register(http_server)
     ListMediaInRoom(hs).register(http_server)
     UserRegisterServlet(hs).register(http_server)
+    ServerHealth(hs).register(http_server)
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 80d76bf9d7..ac5e1656fe 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -406,11 +406,6 @@ class RegistrationStore(RegistrationWorkerStore,
             )
             tokens_and_devices = [(r[0], r[1], r[2]) for r in txn]
 
-            for token, _, _ in tokens_and_devices:
-                self._invalidate_cache_and_stream(
-                    txn, self.get_user_by_access_token, (token,)
-                )
-
             txn.execute(
                 "DELETE FROM access_tokens WHERE %s" % where_clause,
                 values
@@ -432,10 +427,6 @@ class RegistrationStore(RegistrationWorkerStore,
                 },
             )
 
-            self._invalidate_cache_and_stream(
-                txn, self.get_user_by_access_token, (access_token,)
-            )
-
         return self.runInteraction("delete_access_token", f)
 
     @cachedInlineCallbacks()
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 0707f9a86a..d0db8528ee 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -88,6 +88,23 @@ class RoomMemberWorkerStore(EventsWorkerStore):
             return [to_ascii(r[0]) for r in txn]
         return self.runInteraction("get_users_in_room", f)
 
+    def get_all_hosts_and_room(self):
+        def f(txn):
+            sql = """
+                SELECT DISTINCT room_id, regexp_replace(state_key, '^[^:]*:', '') AS host
+                FROM current_state_events
+                INNER JOIN room_memberships USING (event_id, room_id)
+                WHERE
+                    type = 'm.room.member' AND membership = 'join'
+            """
+
+            txn.execute(sql)
+            results = {}
+            for r in txn:
+                results.setdefault(to_ascii(r[0]), set()).add(to_ascii(r[1]))
+            return results
+        return self.runInteraction("get_users_in_room", f)
+
     @cached(max_entries=100000)
     def get_room_summary(self, room_id):
         """ Get the details of a room roughly suitable for use by the room