diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index eecb276465..4df8ebdacd 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -24,8 +24,10 @@ from canonicaljson import json
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
+from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import LoggingTransaction
+from synapse.storage.engines import Sqlite3Engine
from synapse.storage.events_worker import EventsWorkerStore
from synapse.types import get_domain_from_id
from synapse.util.async_helpers import Linearizer
@@ -74,6 +76,63 @@ class RoomMemberWorkerStore(EventsWorkerStore):
self._check_safe_current_state_events_membership_updated_txn(txn)
txn.close()
+ if self.hs.config.metrics_flags.known_servers:
+ self._known_servers_count = 1
+ self.hs.get_clock().looping_call(
+ run_as_background_process,
+ 60 * 1000,
+ "_count_known_servers",
+ self._count_known_servers,
+ )
+ self.hs.get_clock().call_later(
+ 1000,
+ run_as_background_process,
+ "_count_known_servers",
+ self._count_known_servers,
+ )
+ LaterGauge(
+ "synapse_federation_known_servers",
+ "",
+ [],
+ lambda: self._known_servers_count,
+ )
+
+ @defer.inlineCallbacks
+ def _count_known_servers(self):
+ """
+ Count the servers that this server knows about.
+
+ The statistic is stored on the class for the
+ `synapse_federation_known_servers` LaterGauge to collect.
+ """
+
+ def _transact(txn):
+ if isinstance(self.database_engine, Sqlite3Engine):
+ query = """
+ SELECT COUNT(DISTINCT substr(out.user_id, pos+1))
+ FROM (
+ SELECT rm.user_id as user_id, instr(rm.user_id, ':')
+ AS pos FROM room_memberships as rm
+ INNER JOIN current_state_events as c ON rm.event_id = c.event_id
+ WHERE c.type = 'm.room.member'
+ ) as out
+ """
+ else:
+ query = """
+ SELECT COUNT(DISTINCT split_part(state_key, ':', 2))
+ FROM current_state_events
+ WHERE type = 'm.room.member' AND membership = 'join';
+ """
+ txn.execute(query)
+ return list(txn)[0][0]
+
+ count = yield self.runInteraction("get_known_servers", _transact)
+
+ # We always know about ourselves, even if we have nothing in
+ # room_memberships (for example, the server is new).
+ self._known_servers_count = max([count, 1])
+ return self._known_servers_count
+
def _check_safe_current_state_events_membership_updated_txn(self, txn):
"""Checks if it is safe to assume the new current_state_events
membership column is up to date
@@ -112,29 +171,31 @@ class RoomMemberWorkerStore(EventsWorkerStore):
@cached(max_entries=100000, iterable=True)
def get_users_in_room(self, room_id):
- def f(txn):
- # If we can assume current_state_events.membership is up to date
- # then we can avoid a join, which is a Very Good Thing given how
- # frequently this function gets called.
- if self._current_state_events_membership_up_to_date:
- sql = """
- SELECT state_key FROM current_state_events
- WHERE type = 'm.room.member' AND room_id = ? AND membership = ?
- """
- else:
- sql = """
- SELECT state_key FROM room_memberships as m
- INNER JOIN current_state_events as c
- ON m.event_id = c.event_id
- AND m.room_id = c.room_id
- AND m.user_id = c.state_key
- WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?
- """
+ return self.runInteraction(
+ "get_users_in_room", self.get_users_in_room_txn, room_id
+ )
- txn.execute(sql, (room_id, Membership.JOIN))
- return [to_ascii(r[0]) for r in txn]
+ def get_users_in_room_txn(self, txn, room_id):
+ # If we can assume current_state_events.membership is up to date
+ # then we can avoid a join, which is a Very Good Thing given how
+ # frequently this function gets called.
+ if self._current_state_events_membership_up_to_date:
+ sql = """
+ SELECT state_key FROM current_state_events
+ WHERE type = 'm.room.member' AND room_id = ? AND membership = ?
+ """
+ else:
+ sql = """
+ SELECT state_key FROM room_memberships as m
+ INNER JOIN current_state_events as c
+ ON m.event_id = c.event_id
+ AND m.room_id = c.room_id
+ AND m.user_id = c.state_key
+ WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?
+ """
- return self.runInteraction("get_users_in_room", f)
+ txn.execute(sql, (room_id, Membership.JOIN))
+ return [to_ascii(r[0]) for r in txn]
@cached(max_entries=100000)
def get_room_summary(self, room_id):
|