summary refs log tree commit diff
path: root/synapse/storage/roommember.py
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2019-09-19 20:32:25 +0100
committerRichard van der Hoff <richard@matrix.org>2019-09-19 20:32:25 +0100
commit284e1cb027c2c5e8376a5b83d21d626e18dcbc33 (patch)
tree39adfb627d806891d95ffabb0bd7490f1fc737eb /synapse/storage/roommember.py
parentFix a bug with saml attribute maps. (diff)
parentfix sample config (diff)
downloadsynapse-284e1cb027c2c5e8376a5b83d21d626e18dcbc33.tar.xz
Merge branch 'develop' into rav/fix_attribute_mapping
Diffstat (limited to 'synapse/storage/roommember.py')
-rw-r--r--synapse/storage/roommember.py103
1 files changed, 82 insertions, 21 deletions
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index eecb276465..4df8ebdacd 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -24,8 +24,10 @@ from canonicaljson import json
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership
+from synapse.metrics import LaterGauge
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage._base import LoggingTransaction
+from synapse.storage.engines import Sqlite3Engine
 from synapse.storage.events_worker import EventsWorkerStore
 from synapse.types import get_domain_from_id
 from synapse.util.async_helpers import Linearizer
@@ -74,6 +76,63 @@ class RoomMemberWorkerStore(EventsWorkerStore):
         self._check_safe_current_state_events_membership_updated_txn(txn)
         txn.close()
 
+        if self.hs.config.metrics_flags.known_servers:
+            self._known_servers_count = 1
+            self.hs.get_clock().looping_call(
+                run_as_background_process,
+                60 * 1000,
+                "_count_known_servers",
+                self._count_known_servers,
+            )
+            self.hs.get_clock().call_later(
+                1000,
+                run_as_background_process,
+                "_count_known_servers",
+                self._count_known_servers,
+            )
+            LaterGauge(
+                "synapse_federation_known_servers",
+                "",
+                [],
+                lambda: self._known_servers_count,
+            )
+
+    @defer.inlineCallbacks
+    def _count_known_servers(self):
+        """
+        Count the servers that this server knows about.
+
+        The statistic is stored on the class for the
+        `synapse_federation_known_servers` LaterGauge to collect.
+        """
+
+        def _transact(txn):
+            if isinstance(self.database_engine, Sqlite3Engine):
+                query = """
+                    SELECT COUNT(DISTINCT substr(out.user_id, pos+1))
+                    FROM (
+                        SELECT rm.user_id as user_id, instr(rm.user_id, ':')
+                            AS pos FROM room_memberships as rm
+                        INNER JOIN current_state_events as c ON rm.event_id = c.event_id
+                        WHERE c.type = 'm.room.member'
+                    ) as out
+                """
+            else:
+                query = """
+                    SELECT COUNT(DISTINCT split_part(state_key, ':', 2))
+                    FROM current_state_events
+                    WHERE type = 'm.room.member' AND membership = 'join';
+                """
+            txn.execute(query)
+            return list(txn)[0][0]
+
+        count = yield self.runInteraction("get_known_servers", _transact)
+
+        # We always know about ourselves, even if we have nothing in
+        # room_memberships (for example, the server is new).
+        self._known_servers_count = max([count, 1])
+        return self._known_servers_count
+
     def _check_safe_current_state_events_membership_updated_txn(self, txn):
         """Checks if it is safe to assume the new current_state_events
         membership column is up to date
@@ -112,29 +171,31 @@ class RoomMemberWorkerStore(EventsWorkerStore):
 
     @cached(max_entries=100000, iterable=True)
     def get_users_in_room(self, room_id):
-        def f(txn):
-            # If we can assume current_state_events.membership is up to date
-            # then we can avoid a join, which is a Very Good Thing given how
-            # frequently this function gets called.
-            if self._current_state_events_membership_up_to_date:
-                sql = """
-                    SELECT state_key FROM current_state_events
-                    WHERE type = 'm.room.member' AND room_id = ? AND membership = ?
-                """
-            else:
-                sql = """
-                    SELECT state_key FROM room_memberships as m
-                    INNER JOIN current_state_events as c
-                    ON m.event_id = c.event_id
-                    AND m.room_id = c.room_id
-                    AND m.user_id = c.state_key
-                    WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?
-                """
+        return self.runInteraction(
+            "get_users_in_room", self.get_users_in_room_txn, room_id
+        )
 
-            txn.execute(sql, (room_id, Membership.JOIN))
-            return [to_ascii(r[0]) for r in txn]
+    def get_users_in_room_txn(self, txn, room_id):
+        # If we can assume current_state_events.membership is up to date
+        # then we can avoid a join, which is a Very Good Thing given how
+        # frequently this function gets called.
+        if self._current_state_events_membership_up_to_date:
+            sql = """
+                SELECT state_key FROM current_state_events
+                WHERE type = 'm.room.member' AND room_id = ? AND membership = ?
+            """
+        else:
+            sql = """
+                SELECT state_key FROM room_memberships as m
+                INNER JOIN current_state_events as c
+                ON m.event_id = c.event_id
+                AND m.room_id = c.room_id
+                AND m.user_id = c.state_key
+                WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?
+            """
 
-        return self.runInteraction("get_users_in_room", f)
+        txn.execute(sql, (room_id, Membership.JOIN))
+        return [to_ascii(r[0]) for r in txn]
 
     @cached(max_entries=100000)
     def get_room_summary(self, room_id):