summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorHubert Chathi <hubert@uhoreg.ca>2019-09-07 13:20:34 -0400
committerHubert Chathi <hubert@uhoreg.ca>2019-09-07 13:20:34 -0400
commit8e86f5b65c2c2ef4606e415d710f1d847a462a6d (patch)
tree2f97e3d7416fe9fbabe2432055ffd625ccb6da82 /synapse/storage
parentMerge pull request #5769 from matrix-org/uhoreg/e2e_cross-signing2-part1 (diff)
parentServers-known-about statistic (#5981) (diff)
downloadsynapse-8e86f5b65c2c2ef4606e415d710f1d847a462a6d.tar.xz
Merge branch 'develop' into uhoreg/e2e_cross-signing_merged
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/devices.py2
-rw-r--r--synapse/storage/registration.py158
-rw-r--r--synapse/storage/roommember.py59
-rw-r--r--synapse/storage/stats.py4
4 files changed, 142 insertions, 81 deletions
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index 7b4213f20b..2db3ef4004 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -933,7 +933,7 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
                     "ts": now,
                     "opentracing_context": json.dumps(context)
                     if whitelisted_homeserver(destination)
-                    else None,
+                    else "{}",
                 }
                 for destination in hosts
                 for device_id in device_ids
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 2d3c7e2dc9..5138792a5f 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -614,6 +614,85 @@ class RegistrationWorkerStore(SQLBaseStore):
         # Convert the integer into a boolean.
         return res == 1
 
+    def get_threepid_validation_session(
+        self, medium, client_secret, address=None, sid=None, validated=True
+    ):
+        """Gets a session_id and last_send_attempt (if available) for a
+        client_secret/medium/(address|session_id) combo
+
+        Args:
+            medium (str|None): The medium of the 3PID
+            address (str|None): The address of the 3PID
+            sid (str|None): The ID of the validation session
+            client_secret (str|None): A unique string provided by the client to
+                help identify this validation attempt
+            validated (bool|None): Whether sessions should be filtered by
+                whether they have been validated already or not. None to
+                perform no filtering
+
+        Returns:
+            deferred {str, int}|None: A dict containing the
+                latest session_id and send_attempt count for this 3PID.
+                Otherwise None if there hasn't been a previous attempt
+        """
+        keyvalues = {"medium": medium, "client_secret": client_secret}
+        if address:
+            keyvalues["address"] = address
+        if sid:
+            keyvalues["session_id"] = sid
+
+        assert address or sid
+
+        def get_threepid_validation_session_txn(txn):
+            sql = """
+                SELECT address, session_id, medium, client_secret,
+                last_send_attempt, validated_at
+                FROM threepid_validation_session WHERE %s
+                """ % (
+                " AND ".join("%s = ?" % k for k in iterkeys(keyvalues)),
+            )
+
+            if validated is not None:
+                sql += " AND validated_at IS " + ("NOT NULL" if validated else "NULL")
+
+            sql += " LIMIT 1"
+
+            txn.execute(sql, list(keyvalues.values()))
+            rows = self.cursor_to_dict(txn)
+            if not rows:
+                return None
+
+            return rows[0]
+
+        return self.runInteraction(
+            "get_threepid_validation_session", get_threepid_validation_session_txn
+        )
+
+    def delete_threepid_session(self, session_id):
+        """Removes a threepid validation session from the database. This can
+        be done after validation has been performed and whatever action was
+        waiting on it has been carried out
+
+        Args:
+            session_id (str): The ID of the session to delete
+        """
+
+        def delete_threepid_session_txn(txn):
+            self._simple_delete_txn(
+                txn,
+                table="threepid_validation_token",
+                keyvalues={"session_id": session_id},
+            )
+            self._simple_delete_txn(
+                txn,
+                table="threepid_validation_session",
+                keyvalues={"session_id": session_id},
+            )
+
+        return self.runInteraction(
+            "delete_threepid_session", delete_threepid_session_txn
+        )
+
 
 class RegistrationStore(
     RegistrationWorkerStore, background_updates.BackgroundUpdateStore
@@ -1082,60 +1161,6 @@ class RegistrationStore(
 
         return 1
 
-    def get_threepid_validation_session(
-        self, medium, client_secret, address=None, sid=None, validated=True
-    ):
-        """Gets a session_id and last_send_attempt (if available) for a
-        client_secret/medium/(address|session_id) combo
-
-        Args:
-            medium (str|None): The medium of the 3PID
-            address (str|None): The address of the 3PID
-            sid (str|None): The ID of the validation session
-            client_secret (str|None): A unique string provided by the client to
-                help identify this validation attempt
-            validated (bool|None): Whether sessions should be filtered by
-                whether they have been validated already or not. None to
-                perform no filtering
-
-        Returns:
-            deferred {str, int}|None: A dict containing the
-                latest session_id and send_attempt count for this 3PID.
-                Otherwise None if there hasn't been a previous attempt
-        """
-        keyvalues = {"medium": medium, "client_secret": client_secret}
-        if address:
-            keyvalues["address"] = address
-        if sid:
-            keyvalues["session_id"] = sid
-
-        assert address or sid
-
-        def get_threepid_validation_session_txn(txn):
-            sql = """
-                SELECT address, session_id, medium, client_secret,
-                last_send_attempt, validated_at
-                FROM threepid_validation_session WHERE %s
-                """ % (
-                " AND ".join("%s = ?" % k for k in iterkeys(keyvalues)),
-            )
-
-            if validated is not None:
-                sql += " AND validated_at IS " + ("NOT NULL" if validated else "NULL")
-
-            sql += " LIMIT 1"
-
-            txn.execute(sql, list(keyvalues.values()))
-            rows = self.cursor_to_dict(txn)
-            if not rows:
-                return None
-
-            return rows[0]
-
-        return self.runInteraction(
-            "get_threepid_validation_session", get_threepid_validation_session_txn
-        )
-
     def validate_threepid_session(self, session_id, client_secret, token, current_ts):
         """Attempt to validate a threepid session using a token
 
@@ -1323,31 +1348,6 @@ class RegistrationStore(
             self.clock.time_msec(),
         )
 
-    def delete_threepid_session(self, session_id):
-        """Removes a threepid validation session from the database. This can
-        be done after validation has been performed and whatever action was
-        waiting on it has been carried out
-
-        Args:
-            session_id (str): The ID of the session to delete
-        """
-
-        def delete_threepid_session_txn(txn):
-            self._simple_delete_txn(
-                txn,
-                table="threepid_validation_token",
-                keyvalues={"session_id": session_id},
-            )
-            self._simple_delete_txn(
-                txn,
-                table="threepid_validation_session",
-                keyvalues={"session_id": session_id},
-            )
-
-        return self.runInteraction(
-            "delete_threepid_session", delete_threepid_session_txn
-        )
-
     def set_user_deactivated_status_txn(self, txn, user_id, deactivated):
         self._simple_update_one_txn(
             txn=txn,
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index f8b682ebd9..4df8ebdacd 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -24,8 +24,10 @@ from canonicaljson import json
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership
+from synapse.metrics import LaterGauge
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage._base import LoggingTransaction
+from synapse.storage.engines import Sqlite3Engine
 from synapse.storage.events_worker import EventsWorkerStore
 from synapse.types import get_domain_from_id
 from synapse.util.async_helpers import Linearizer
@@ -74,6 +76,63 @@ class RoomMemberWorkerStore(EventsWorkerStore):
         self._check_safe_current_state_events_membership_updated_txn(txn)
         txn.close()
 
+        if self.hs.config.metrics_flags.known_servers:
+            self._known_servers_count = 1
+            self.hs.get_clock().looping_call(
+                run_as_background_process,
+                60 * 1000,
+                "_count_known_servers",
+                self._count_known_servers,
+            )
+            self.hs.get_clock().call_later(
+                1000,
+                run_as_background_process,
+                "_count_known_servers",
+                self._count_known_servers,
+            )
+            LaterGauge(
+                "synapse_federation_known_servers",
+                "",
+                [],
+                lambda: self._known_servers_count,
+            )
+
+    @defer.inlineCallbacks
+    def _count_known_servers(self):
+        """
+        Count the servers that this server knows about.
+
+        The statistic is stored on the class for the
+        `synapse_federation_known_servers` LaterGauge to collect.
+        """
+
+        def _transact(txn):
+            if isinstance(self.database_engine, Sqlite3Engine):
+                query = """
+                    SELECT COUNT(DISTINCT substr(out.user_id, pos+1))
+                    FROM (
+                        SELECT rm.user_id as user_id, instr(rm.user_id, ':')
+                            AS pos FROM room_memberships as rm
+                        INNER JOIN current_state_events as c ON rm.event_id = c.event_id
+                        WHERE c.type = 'm.room.member'
+                    ) as out
+                """
+            else:
+                query = """
+                    SELECT COUNT(DISTINCT split_part(state_key, ':', 2))
+                    FROM current_state_events
+                    WHERE type = 'm.room.member' AND membership = 'join';
+                """
+            txn.execute(query)
+            return list(txn)[0][0]
+
+        count = yield self.runInteraction("get_known_servers", _transact)
+
+        # We always know about ourselves, even if we have nothing in
+        # room_memberships (for example, the server is new).
+        self._known_servers_count = max([count, 1])
+        return self._known_servers_count
+
     def _check_safe_current_state_events_membership_updated_txn(self, txn):
         """Checks if it is safe to assume the new current_state_events
         membership column is up to date
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index 6560173c08..09190d684e 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -823,7 +823,9 @@ class StatsStore(StateDeltasStore):
             elif event.type == EventTypes.CanonicalAlias:
                 room_state["canonical_alias"] = event.content.get("alias")
             elif event.type == EventTypes.Create:
-                room_state["is_federatable"] = event.content.get("m.federate", True)
+                room_state["is_federatable"] = (
+                    event.content.get("m.federate", True) is True
+                )
 
         yield self.update_room_state(room_id, room_state)