diff options
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/devices.py | 2 | ||||
-rw-r--r-- | synapse/storage/registration.py | 158 | ||||
-rw-r--r-- | synapse/storage/roommember.py | 59 | ||||
-rw-r--r-- | synapse/storage/stats.py | 4 |
4 files changed, 142 insertions, 81 deletions
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 7b4213f20b..2db3ef4004 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -933,7 +933,7 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore): "ts": now, "opentracing_context": json.dumps(context) if whitelisted_homeserver(destination) - else None, + else "{}", } for destination in hosts for device_id in device_ids diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 2d3c7e2dc9..5138792a5f 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -614,6 +614,85 @@ class RegistrationWorkerStore(SQLBaseStore): # Convert the integer into a boolean. return res == 1 + def get_threepid_validation_session( + self, medium, client_secret, address=None, sid=None, validated=True + ): + """Gets a session_id and last_send_attempt (if available) for a + client_secret/medium/(address|session_id) combo + + Args: + medium (str|None): The medium of the 3PID + address (str|None): The address of the 3PID + sid (str|None): The ID of the validation session + client_secret (str|None): A unique string provided by the client to + help identify this validation attempt + validated (bool|None): Whether sessions should be filtered by + whether they have been validated already or not. None to + perform no filtering + + Returns: + deferred {str, int}|None: A dict containing the + latest session_id and send_attempt count for this 3PID. + Otherwise None if there hasn't been a previous attempt + """ + keyvalues = {"medium": medium, "client_secret": client_secret} + if address: + keyvalues["address"] = address + if sid: + keyvalues["session_id"] = sid + + assert address or sid + + def get_threepid_validation_session_txn(txn): + sql = """ + SELECT address, session_id, medium, client_secret, + last_send_attempt, validated_at + FROM threepid_validation_session WHERE %s + """ % ( + " AND ".join("%s = ?" % k for k in iterkeys(keyvalues)), + ) + + if validated is not None: + sql += " AND validated_at IS " + ("NOT NULL" if validated else "NULL") + + sql += " LIMIT 1" + + txn.execute(sql, list(keyvalues.values())) + rows = self.cursor_to_dict(txn) + if not rows: + return None + + return rows[0] + + return self.runInteraction( + "get_threepid_validation_session", get_threepid_validation_session_txn + ) + + def delete_threepid_session(self, session_id): + """Removes a threepid validation session from the database. This can + be done after validation has been performed and whatever action was + waiting on it has been carried out + + Args: + session_id (str): The ID of the session to delete + """ + + def delete_threepid_session_txn(txn): + self._simple_delete_txn( + txn, + table="threepid_validation_token", + keyvalues={"session_id": session_id}, + ) + self._simple_delete_txn( + txn, + table="threepid_validation_session", + keyvalues={"session_id": session_id}, + ) + + return self.runInteraction( + "delete_threepid_session", delete_threepid_session_txn + ) + class RegistrationStore( RegistrationWorkerStore, background_updates.BackgroundUpdateStore @@ -1082,60 +1161,6 @@ class RegistrationStore( return 1 - def get_threepid_validation_session( - self, medium, client_secret, address=None, sid=None, validated=True - ): - """Gets a session_id and last_send_attempt (if available) for a - client_secret/medium/(address|session_id) combo - - Args: - medium (str|None): The medium of the 3PID - address (str|None): The address of the 3PID - sid (str|None): The ID of the validation session - client_secret (str|None): A unique string provided by the client to - help identify this validation attempt - validated (bool|None): Whether sessions should be filtered by - whether they have been validated already or not. None to - perform no filtering - - Returns: - deferred {str, int}|None: A dict containing the - latest session_id and send_attempt count for this 3PID. - Otherwise None if there hasn't been a previous attempt - """ - keyvalues = {"medium": medium, "client_secret": client_secret} - if address: - keyvalues["address"] = address - if sid: - keyvalues["session_id"] = sid - - assert address or sid - - def get_threepid_validation_session_txn(txn): - sql = """ - SELECT address, session_id, medium, client_secret, - last_send_attempt, validated_at - FROM threepid_validation_session WHERE %s - """ % ( - " AND ".join("%s = ?" % k for k in iterkeys(keyvalues)), - ) - - if validated is not None: - sql += " AND validated_at IS " + ("NOT NULL" if validated else "NULL") - - sql += " LIMIT 1" - - txn.execute(sql, list(keyvalues.values())) - rows = self.cursor_to_dict(txn) - if not rows: - return None - - return rows[0] - - return self.runInteraction( - "get_threepid_validation_session", get_threepid_validation_session_txn - ) - def validate_threepid_session(self, session_id, client_secret, token, current_ts): """Attempt to validate a threepid session using a token @@ -1323,31 +1348,6 @@ class RegistrationStore( self.clock.time_msec(), ) - def delete_threepid_session(self, session_id): - """Removes a threepid validation session from the database. This can - be done after validation has been performed and whatever action was - waiting on it has been carried out - - Args: - session_id (str): The ID of the session to delete - """ - - def delete_threepid_session_txn(txn): - self._simple_delete_txn( - txn, - table="threepid_validation_token", - keyvalues={"session_id": session_id}, - ) - self._simple_delete_txn( - txn, - table="threepid_validation_session", - keyvalues={"session_id": session_id}, - ) - - return self.runInteraction( - "delete_threepid_session", delete_threepid_session_txn - ) - def set_user_deactivated_status_txn(self, txn, user_id, deactivated): self._simple_update_one_txn( txn=txn, diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index f8b682ebd9..4df8ebdacd 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -24,8 +24,10 @@ from canonicaljson import json from twisted.internet import defer from synapse.api.constants import EventTypes, Membership +from synapse.metrics import LaterGauge from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage._base import LoggingTransaction +from synapse.storage.engines import Sqlite3Engine from synapse.storage.events_worker import EventsWorkerStore from synapse.types import get_domain_from_id from synapse.util.async_helpers import Linearizer @@ -74,6 +76,63 @@ class RoomMemberWorkerStore(EventsWorkerStore): self._check_safe_current_state_events_membership_updated_txn(txn) txn.close() + if self.hs.config.metrics_flags.known_servers: + self._known_servers_count = 1 + self.hs.get_clock().looping_call( + run_as_background_process, + 60 * 1000, + "_count_known_servers", + self._count_known_servers, + ) + self.hs.get_clock().call_later( + 1000, + run_as_background_process, + "_count_known_servers", + self._count_known_servers, + ) + LaterGauge( + "synapse_federation_known_servers", + "", + [], + lambda: self._known_servers_count, + ) + + @defer.inlineCallbacks + def _count_known_servers(self): + """ + Count the servers that this server knows about. + + The statistic is stored on the class for the + `synapse_federation_known_servers` LaterGauge to collect. + """ + + def _transact(txn): + if isinstance(self.database_engine, Sqlite3Engine): + query = """ + SELECT COUNT(DISTINCT substr(out.user_id, pos+1)) + FROM ( + SELECT rm.user_id as user_id, instr(rm.user_id, ':') + AS pos FROM room_memberships as rm + INNER JOIN current_state_events as c ON rm.event_id = c.event_id + WHERE c.type = 'm.room.member' + ) as out + """ + else: + query = """ + SELECT COUNT(DISTINCT split_part(state_key, ':', 2)) + FROM current_state_events + WHERE type = 'm.room.member' AND membership = 'join'; + """ + txn.execute(query) + return list(txn)[0][0] + + count = yield self.runInteraction("get_known_servers", _transact) + + # We always know about ourselves, even if we have nothing in + # room_memberships (for example, the server is new). + self._known_servers_count = max([count, 1]) + return self._known_servers_count + def _check_safe_current_state_events_membership_updated_txn(self, txn): """Checks if it is safe to assume the new current_state_events membership column is up to date diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 6560173c08..09190d684e 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -823,7 +823,9 @@ class StatsStore(StateDeltasStore): elif event.type == EventTypes.CanonicalAlias: room_state["canonical_alias"] = event.content.get("alias") elif event.type == EventTypes.Create: - room_state["is_federatable"] = event.content.get("m.federate", True) + room_state["is_federatable"] = ( + event.content.get("m.federate", True) is True + ) yield self.update_room_state(room_id, room_state) |