diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index 7b4213f20b..2db3ef4004 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -933,7 +933,7 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
"ts": now,
"opentracing_context": json.dumps(context)
if whitelisted_homeserver(destination)
- else None,
+ else "{}",
}
for destination in hosts
for device_id in device_ids
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 2d3c7e2dc9..5138792a5f 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -614,6 +614,85 @@ class RegistrationWorkerStore(SQLBaseStore):
# Convert the integer into a boolean.
return res == 1
+ def get_threepid_validation_session(
+ self, medium, client_secret, address=None, sid=None, validated=True
+ ):
+ """Gets a session_id and last_send_attempt (if available) for a
+ client_secret/medium/(address|session_id) combo
+
+ Args:
+ medium (str|None): The medium of the 3PID
+ address (str|None): The address of the 3PID
+ sid (str|None): The ID of the validation session
+ client_secret (str|None): A unique string provided by the client to
+ help identify this validation attempt
+ validated (bool|None): Whether sessions should be filtered by
+ whether they have been validated already or not. None to
+ perform no filtering
+
+ Returns:
+ deferred {str, int}|None: A dict containing the
+ latest session_id and send_attempt count for this 3PID.
+ Otherwise None if there hasn't been a previous attempt
+ """
+ keyvalues = {"medium": medium, "client_secret": client_secret}
+ if address:
+ keyvalues["address"] = address
+ if sid:
+ keyvalues["session_id"] = sid
+
+ assert address or sid
+
+ def get_threepid_validation_session_txn(txn):
+ sql = """
+ SELECT address, session_id, medium, client_secret,
+ last_send_attempt, validated_at
+ FROM threepid_validation_session WHERE %s
+ """ % (
+ " AND ".join("%s = ?" % k for k in iterkeys(keyvalues)),
+ )
+
+ if validated is not None:
+ sql += " AND validated_at IS " + ("NOT NULL" if validated else "NULL")
+
+ sql += " LIMIT 1"
+
+ txn.execute(sql, list(keyvalues.values()))
+ rows = self.cursor_to_dict(txn)
+ if not rows:
+ return None
+
+ return rows[0]
+
+ return self.runInteraction(
+ "get_threepid_validation_session", get_threepid_validation_session_txn
+ )
+
+ def delete_threepid_session(self, session_id):
+ """Removes a threepid validation session from the database. This can
+ be done after validation has been performed and whatever action was
+ waiting on it has been carried out
+
+ Args:
+ session_id (str): The ID of the session to delete
+ """
+
+ def delete_threepid_session_txn(txn):
+ self._simple_delete_txn(
+ txn,
+ table="threepid_validation_token",
+ keyvalues={"session_id": session_id},
+ )
+ self._simple_delete_txn(
+ txn,
+ table="threepid_validation_session",
+ keyvalues={"session_id": session_id},
+ )
+
+ return self.runInteraction(
+ "delete_threepid_session", delete_threepid_session_txn
+ )
+
class RegistrationStore(
RegistrationWorkerStore, background_updates.BackgroundUpdateStore
@@ -1082,60 +1161,6 @@ class RegistrationStore(
return 1
- def get_threepid_validation_session(
- self, medium, client_secret, address=None, sid=None, validated=True
- ):
- """Gets a session_id and last_send_attempt (if available) for a
- client_secret/medium/(address|session_id) combo
-
- Args:
- medium (str|None): The medium of the 3PID
- address (str|None): The address of the 3PID
- sid (str|None): The ID of the validation session
- client_secret (str|None): A unique string provided by the client to
- help identify this validation attempt
- validated (bool|None): Whether sessions should be filtered by
- whether they have been validated already or not. None to
- perform no filtering
-
- Returns:
- deferred {str, int}|None: A dict containing the
- latest session_id and send_attempt count for this 3PID.
- Otherwise None if there hasn't been a previous attempt
- """
- keyvalues = {"medium": medium, "client_secret": client_secret}
- if address:
- keyvalues["address"] = address
- if sid:
- keyvalues["session_id"] = sid
-
- assert address or sid
-
- def get_threepid_validation_session_txn(txn):
- sql = """
- SELECT address, session_id, medium, client_secret,
- last_send_attempt, validated_at
- FROM threepid_validation_session WHERE %s
- """ % (
- " AND ".join("%s = ?" % k for k in iterkeys(keyvalues)),
- )
-
- if validated is not None:
- sql += " AND validated_at IS " + ("NOT NULL" if validated else "NULL")
-
- sql += " LIMIT 1"
-
- txn.execute(sql, list(keyvalues.values()))
- rows = self.cursor_to_dict(txn)
- if not rows:
- return None
-
- return rows[0]
-
- return self.runInteraction(
- "get_threepid_validation_session", get_threepid_validation_session_txn
- )
-
def validate_threepid_session(self, session_id, client_secret, token, current_ts):
"""Attempt to validate a threepid session using a token
@@ -1323,31 +1348,6 @@ class RegistrationStore(
self.clock.time_msec(),
)
- def delete_threepid_session(self, session_id):
- """Removes a threepid validation session from the database. This can
- be done after validation has been performed and whatever action was
- waiting on it has been carried out
-
- Args:
- session_id (str): The ID of the session to delete
- """
-
- def delete_threepid_session_txn(txn):
- self._simple_delete_txn(
- txn,
- table="threepid_validation_token",
- keyvalues={"session_id": session_id},
- )
- self._simple_delete_txn(
- txn,
- table="threepid_validation_session",
- keyvalues={"session_id": session_id},
- )
-
- return self.runInteraction(
- "delete_threepid_session", delete_threepid_session_txn
- )
-
def set_user_deactivated_status_txn(self, txn, user_id, deactivated):
self._simple_update_one_txn(
txn=txn,
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index f8b682ebd9..4df8ebdacd 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -24,8 +24,10 @@ from canonicaljson import json
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
+from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import LoggingTransaction
+from synapse.storage.engines import Sqlite3Engine
from synapse.storage.events_worker import EventsWorkerStore
from synapse.types import get_domain_from_id
from synapse.util.async_helpers import Linearizer
@@ -74,6 +76,63 @@ class RoomMemberWorkerStore(EventsWorkerStore):
self._check_safe_current_state_events_membership_updated_txn(txn)
txn.close()
+ if self.hs.config.metrics_flags.known_servers:
+ self._known_servers_count = 1
+ self.hs.get_clock().looping_call(
+ run_as_background_process,
+ 60 * 1000,
+ "_count_known_servers",
+ self._count_known_servers,
+ )
+ self.hs.get_clock().call_later(
+ 1000,
+ run_as_background_process,
+ "_count_known_servers",
+ self._count_known_servers,
+ )
+ LaterGauge(
+ "synapse_federation_known_servers",
+ "",
+ [],
+ lambda: self._known_servers_count,
+ )
+
+ @defer.inlineCallbacks
+ def _count_known_servers(self):
+ """
+ Count the servers that this server knows about.
+
+ The statistic is stored on the class for the
+ `synapse_federation_known_servers` LaterGauge to collect.
+ """
+
+ def _transact(txn):
+ if isinstance(self.database_engine, Sqlite3Engine):
+ query = """
+ SELECT COUNT(DISTINCT substr(out.user_id, pos+1))
+ FROM (
+ SELECT rm.user_id as user_id, instr(rm.user_id, ':')
+ AS pos FROM room_memberships as rm
+ INNER JOIN current_state_events as c ON rm.event_id = c.event_id
+ WHERE c.type = 'm.room.member'
+ ) as out
+ """
+ else:
+ query = """
+ SELECT COUNT(DISTINCT split_part(state_key, ':', 2))
+ FROM current_state_events
+ WHERE type = 'm.room.member' AND membership = 'join';
+ """
+ txn.execute(query)
+ return list(txn)[0][0]
+
+ count = yield self.runInteraction("get_known_servers", _transact)
+
+ # We always know about ourselves, even if we have nothing in
+ # room_memberships (for example, the server is new).
+ self._known_servers_count = max([count, 1])
+ return self._known_servers_count
+
def _check_safe_current_state_events_membership_updated_txn(self, txn):
"""Checks if it is safe to assume the new current_state_events
membership column is up to date
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index 6560173c08..09190d684e 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -823,7 +823,9 @@ class StatsStore(StateDeltasStore):
elif event.type == EventTypes.CanonicalAlias:
room_state["canonical_alias"] = event.content.get("alias")
elif event.type == EventTypes.Create:
- room_state["is_federatable"] = event.content.get("m.federate", True)
+ room_state["is_federatable"] = (
+ event.content.get("m.federate", True) is True
+ )
yield self.update_room_state(room_id, room_state)
|