From 50d2a3059db29e7c9c7ccc9b005cec8497827e4b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 9 Apr 2019 14:36:40 +0100 Subject: Fix schema upgrade when dropping tables We need to drop tables in the correct order due to foreign table constraints (on `application_services`), otherwise the DROP TABLE command will fail. Introduced in #4992. --- synapse/storage/schema/delta/54/drop_legacy_tables.sql | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'synapse/storage/schema') diff --git a/synapse/storage/schema/delta/54/drop_legacy_tables.sql b/synapse/storage/schema/delta/54/drop_legacy_tables.sql index ecca005d9b..dbbe682697 100644 --- a/synapse/storage/schema/delta/54/drop_legacy_tables.sql +++ b/synapse/storage/schema/delta/54/drop_legacy_tables.sql @@ -13,8 +13,10 @@ * limitations under the License. */ -DROP TABLE IF EXISTS application_services; +-- we need to do this first due to foreign constraints DROP TABLE IF EXISTS application_services_regex; + +DROP TABLE IF EXISTS application_services; DROP TABLE IF EXISTS transaction_id_to_pdu; DROP TABLE IF EXISTS stats_reporting; DROP TABLE IF EXISTS current_state_resets; -- cgit 1.5.1 From 20f0617e87924c929f0db0c06d30de0c8d15081c Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 10 Apr 2019 17:58:47 +0100 Subject: Send out emails with links to extend an account's validity period --- changelog.d/5047.feature | 1 + docs/sample_config.yaml | 29 ++- synapse/api/auth.py | 5 +- synapse/config/emailconfig.py | 7 +- synapse/config/registration.py | 52 ++++- synapse/handlers/account_validity.py | 228 +++++++++++++++++++++ synapse/push/mailer.py | 14 +- synapse/push/pusher.py | 6 +- synapse/res/templates/mail-expiry.css | 4 + synapse/res/templates/notice_expiry.html | 43 ++++ synapse/res/templates/notice_expiry.txt | 7 + synapse/rest/__init__.py | 2 + synapse/rest/client/v2_alpha/account_validity.py | 62 ++++++ synapse/server.py | 5 + synapse/storage/registration.py | 168 +++++++++++++-- .../storage/schema/delta/54/account_validity.sql | 9 +- tests/rest/client/v2_alpha/test_register.py | 100 ++++++++- 17 files changed, 699 insertions(+), 43 deletions(-) create mode 100644 changelog.d/5047.feature create mode 100644 synapse/handlers/account_validity.py create mode 100644 synapse/res/templates/mail-expiry.css create mode 100644 synapse/res/templates/notice_expiry.html create mode 100644 synapse/res/templates/notice_expiry.txt create mode 100644 synapse/rest/client/v2_alpha/account_validity.py (limited to 'synapse/storage/schema') diff --git a/changelog.d/5047.feature b/changelog.d/5047.feature new file mode 100644 index 0000000000..12766a82a7 --- /dev/null +++ b/changelog.d/5047.feature @@ -0,0 +1 @@ +Add time-based account expiration. diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml index 5594c8b9af..8bbd437239 100644 --- a/docs/sample_config.yaml +++ b/docs/sample_config.yaml @@ -643,11 +643,31 @@ uploads_path: "DATADIR/uploads" # #enable_registration: false -# Optional account validity parameter. This allows for, e.g., accounts to -# be denied any request after a given period. +# Optional account validity configuration. This allows for accounts to be denied +# any request after a given period. +# +# ``enabled`` defines whether the account validity feature is enabled. Defaults +# to False. +# +# ``period`` allows setting the period after which an account is valid +# after its registration. When renewing the account, its validity period +# will be extended by this amount of time. This parameter is required when using +# the account validity feature. +# +# ``renew_at`` is the amount of time before an account's expiry date at which +# Synapse will send an email to the account's email address with a renewal link. +# This needs the ``email`` and ``public_baseurl`` configuration sections to be +# filled. +# +# ``renew_email_subject`` is the subject of the email sent out with the renewal +# link. ``%(app)s`` can be used as a placeholder for the ``app_name`` parameter +# from the ``email`` section. # #account_validity: +# enabled: True # period: 6w +# renew_at: 1w +# renew_email_subject: "Renew your %(app)s account" # The user must provide all of the below types of 3PID when registering. # @@ -890,7 +910,7 @@ password_config: -# Enable sending emails for notification events +# Enable sending emails for notification events or expiry notices # Defining a custom URL for Riot is only needed if email notifications # should contain links to a self-hosted installation of Riot; when set # the "app_name" setting is ignored. @@ -912,6 +932,9 @@ password_config: # #template_dir: res/templates # notif_template_html: notif_mail.html # notif_template_text: notif_mail.txt +# # Templates for account expiry notices. +# expiry_template_html: notice_expiry.html +# expiry_template_text: notice_expiry.txt # notif_for_new_users: True # riot_base_url: "http://localhost/riot" diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 976e0dd18b..4482962510 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -230,8 +230,9 @@ class Auth(object): # Deny the request if the user account has expired. if self._account_validity.enabled: - expiration_ts = yield self.store.get_expiration_ts_for_user(user) - if self.clock.time_msec() >= expiration_ts: + user_id = user.to_string() + expiration_ts = yield self.store.get_expiration_ts_for_user(user_id) + if expiration_ts and self.clock.time_msec() >= expiration_ts: raise AuthError( 403, "User account has expired", diff --git a/synapse/config/emailconfig.py b/synapse/config/emailconfig.py index 93d70cff14..60827be72f 100644 --- a/synapse/config/emailconfig.py +++ b/synapse/config/emailconfig.py @@ -71,6 +71,8 @@ class EmailConfig(Config): self.email_notif_from = email_config["notif_from"] self.email_notif_template_html = email_config["notif_template_html"] self.email_notif_template_text = email_config["notif_template_text"] + self.email_expiry_template_html = email_config["expiry_template_html"] + self.email_expiry_template_text = email_config["expiry_template_text"] template_dir = email_config.get("template_dir") # we need an absolute path, because we change directory after starting (and @@ -120,7 +122,7 @@ class EmailConfig(Config): def default_config(self, config_dir_path, server_name, **kwargs): return """ - # Enable sending emails for notification events + # Enable sending emails for notification events or expiry notices # Defining a custom URL for Riot is only needed if email notifications # should contain links to a self-hosted installation of Riot; when set # the "app_name" setting is ignored. @@ -142,6 +144,9 @@ class EmailConfig(Config): # #template_dir: res/templates # notif_template_html: notif_mail.html # notif_template_text: notif_mail.txt + # # Templates for account expiry notices. + # expiry_template_html: notice_expiry.html + # expiry_template_text: notice_expiry.txt # notif_for_new_users: True # riot_base_url: "http://localhost/riot" """ diff --git a/synapse/config/registration.py b/synapse/config/registration.py index b7a7b4f1cf..129c208204 100644 --- a/synapse/config/registration.py +++ b/synapse/config/registration.py @@ -21,12 +21,26 @@ from synapse.util.stringutils import random_string_with_symbols class AccountValidityConfig(Config): - def __init__(self, config): - self.enabled = (len(config) > 0) + def __init__(self, config, synapse_config): + self.enabled = config.get("enabled", False) + self.renew_by_email_enabled = ("renew_at" in config) - period = config.get("period", None) - if period: - self.period = self.parse_duration(period) + if self.enabled: + if "period" in config: + self.period = self.parse_duration(config["period"]) + else: + raise ConfigError("'period' is required when using account validity") + + if "renew_at" in config: + self.renew_at = self.parse_duration(config["renew_at"]) + + if "renew_email_subject" in config: + self.renew_email_subject = config["renew_email_subject"] + else: + self.renew_email_subject = "Renew your %(app)s account" + + if self.renew_by_email_enabled and "public_baseurl" not in synapse_config: + raise ConfigError("Can't send renewal emails without 'public_baseurl'") class RegistrationConfig(Config): @@ -40,7 +54,9 @@ class RegistrationConfig(Config): strtobool(str(config["disable_registration"])) ) - self.account_validity = AccountValidityConfig(config.get("account_validity", {})) + self.account_validity = AccountValidityConfig( + config.get("account_validity", {}), config, + ) self.registrations_require_3pid = config.get("registrations_require_3pid", []) self.allowed_local_3pids = config.get("allowed_local_3pids", []) @@ -86,11 +102,31 @@ class RegistrationConfig(Config): # #enable_registration: false - # Optional account validity parameter. This allows for, e.g., accounts to - # be denied any request after a given period. + # Optional account validity configuration. This allows for accounts to be denied + # any request after a given period. + # + # ``enabled`` defines whether the account validity feature is enabled. Defaults + # to False. + # + # ``period`` allows setting the period after which an account is valid + # after its registration. When renewing the account, its validity period + # will be extended by this amount of time. This parameter is required when using + # the account validity feature. + # + # ``renew_at`` is the amount of time before an account's expiry date at which + # Synapse will send an email to the account's email address with a renewal link. + # This needs the ``email`` and ``public_baseurl`` configuration sections to be + # filled. + # + # ``renew_email_subject`` is the subject of the email sent out with the renewal + # link. ``%%(app)s`` can be used as a placeholder for the ``app_name`` parameter + # from the ``email`` section. # #account_validity: + # enabled: True # period: 6w + # renew_at: 1w + # renew_email_subject: "Renew your %%(app)s account" # The user must provide all of the below types of 3PID when registering. # diff --git a/synapse/handlers/account_validity.py b/synapse/handlers/account_validity.py new file mode 100644 index 0000000000..e82049e42d --- /dev/null +++ b/synapse/handlers/account_validity.py @@ -0,0 +1,228 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import email.mime.multipart +import email.utils +import logging +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText + +from twisted.internet import defer + +from synapse.api.errors import StoreError +from synapse.types import UserID +from synapse.util import stringutils +from synapse.util.logcontext import make_deferred_yieldable + +try: + from synapse.push.mailer import load_jinja2_templates +except ImportError: + load_jinja2_templates = None + +logger = logging.getLogger(__name__) + + +class AccountValidityHandler(object): + def __init__(self, hs): + self.hs = hs + self.store = self.hs.get_datastore() + self.sendmail = self.hs.get_sendmail() + self.clock = self.hs.get_clock() + + self._account_validity = self.hs.config.account_validity + + if self._account_validity.renew_by_email_enabled and load_jinja2_templates: + # Don't do email-specific configuration if renewal by email is disabled. + try: + app_name = self.hs.config.email_app_name + + self._subject = self._account_validity.renew_email_subject % { + "app": app_name, + } + + self._from_string = self.hs.config.email_notif_from % { + "app": app_name, + } + except Exception: + # If substitution failed, fall back to the bare strings. + self._subject = self._account_validity.renew_email_subject + self._from_string = self.hs.config.email_notif_from + + self._raw_from = email.utils.parseaddr(self._from_string)[1] + + self._template_html, self._template_text = load_jinja2_templates( + config=self.hs.config, + template_html_name=self.hs.config.email_expiry_template_html, + template_text_name=self.hs.config.email_expiry_template_text, + ) + + # Check the renewal emails to send and send them every 30min. + self.clock.looping_call( + self.send_renewal_emails, + 30 * 60 * 1000, + ) + + @defer.inlineCallbacks + def send_renewal_emails(self): + """Gets the list of users whose account is expiring in the amount of time + configured in the ``renew_at`` parameter from the ``account_validity`` + configuration, and sends renewal emails to all of these users as long as they + have an email 3PID attached to their account. + """ + expiring_users = yield self.store.get_users_expiring_soon() + + if expiring_users: + for user in expiring_users: + yield self._send_renewal_email( + user_id=user["user_id"], + expiration_ts=user["expiration_ts_ms"], + ) + + @defer.inlineCallbacks + def _send_renewal_email(self, user_id, expiration_ts): + """Sends out a renewal email to every email address attached to the given user + with a unique link allowing them to renew their account. + + Args: + user_id (str): ID of the user to send email(s) to. + expiration_ts (int): Timestamp in milliseconds for the expiration date of + this user's account (used in the email templates). + """ + addresses = yield self._get_email_addresses_for_user(user_id) + + # Stop right here if the user doesn't have at least one email address. + # In this case, they will have to ask their server admin to renew their + # account manually. + if not addresses: + return + + try: + user_display_name = yield self.store.get_profile_displayname( + UserID.from_string(user_id).localpart + ) + if user_display_name is None: + user_display_name = user_id + except StoreError: + user_display_name = user_id + + renewal_token = yield self._get_renewal_token(user_id) + url = "%s_matrix/client/unstable/account_validity/renew?token=%s" % ( + self.hs.config.public_baseurl, + renewal_token, + ) + + template_vars = { + "display_name": user_display_name, + "expiration_ts": expiration_ts, + "url": url, + } + + html_text = self._template_html.render(**template_vars) + html_part = MIMEText(html_text, "html", "utf8") + + plain_text = self._template_text.render(**template_vars) + text_part = MIMEText(plain_text, "plain", "utf8") + + for address in addresses: + raw_to = email.utils.parseaddr(address)[1] + + multipart_msg = MIMEMultipart('alternative') + multipart_msg['Subject'] = self._subject + multipart_msg['From'] = self._from_string + multipart_msg['To'] = address + multipart_msg['Date'] = email.utils.formatdate() + multipart_msg['Message-ID'] = email.utils.make_msgid() + multipart_msg.attach(text_part) + multipart_msg.attach(html_part) + + logger.info("Sending renewal email to %s", address) + + yield make_deferred_yieldable(self.sendmail( + self.hs.config.email_smtp_host, + self._raw_from, raw_to, multipart_msg.as_string().encode('utf8'), + reactor=self.hs.get_reactor(), + port=self.hs.config.email_smtp_port, + requireAuthentication=self.hs.config.email_smtp_user is not None, + username=self.hs.config.email_smtp_user, + password=self.hs.config.email_smtp_pass, + requireTransportSecurity=self.hs.config.require_transport_security + )) + + yield self.store.set_renewal_mail_status( + user_id=user_id, + email_sent=True, + ) + + @defer.inlineCallbacks + def _get_email_addresses_for_user(self, user_id): + """Retrieve the list of email addresses attached to a user's account. + + Args: + user_id (str): ID of the user to lookup email addresses for. + + Returns: + defer.Deferred[list[str]]: Email addresses for this account. + """ + threepids = yield self.store.user_get_threepids(user_id) + + addresses = [] + for threepid in threepids: + if threepid["medium"] == "email": + addresses.append(threepid["address"]) + + defer.returnValue(addresses) + + @defer.inlineCallbacks + def _get_renewal_token(self, user_id): + """Generates a 32-byte long random string that will be inserted into the + user's renewal email's unique link, then saves it into the database. + + Args: + user_id (str): ID of the user to generate a string for. + + Returns: + defer.Deferred[str]: The generated string. + + Raises: + StoreError(500): Couldn't generate a unique string after 5 attempts. + """ + attempts = 0 + while attempts < 5: + try: + renewal_token = stringutils.random_string(32) + yield self.store.set_renewal_token_for_user(user_id, renewal_token) + defer.returnValue(renewal_token) + except StoreError: + attempts += 1 + raise StoreError(500, "Couldn't generate a unique string as refresh string.") + + @defer.inlineCallbacks + def renew_account(self, renewal_token): + """Renews the account attached to a given renewal token by pushing back the + expiration date by the current validity period in the server's configuration. + + Args: + renewal_token (str): Token sent with the renewal request. + """ + user_id = yield self.store.get_user_from_renewal_token(renewal_token) + + logger.debug("Renewing an account for user %s", user_id) + + new_expiration_date = self.clock.time_msec() + self._account_validity.period + + yield self.store.renew_account_for_user( + user_id=user_id, + new_expiration_ts=new_expiration_date, + ) diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py index 1eb5be0957..c269bcf4a4 100644 --- a/synapse/push/mailer.py +++ b/synapse/push/mailer.py @@ -521,11 +521,11 @@ def format_ts_filter(value, format): return time.strftime(format, time.localtime(value / 1000)) -def load_jinja2_templates(config): +def load_jinja2_templates(config, template_html_name, template_text_name): """Load the jinja2 email templates from disk Returns: - (notif_template_html, notif_template_text) + (template_html, template_text) """ logger.info("loading email templates from '%s'", config.email_template_dir) loader = jinja2.FileSystemLoader(config.email_template_dir) @@ -533,14 +533,10 @@ def load_jinja2_templates(config): env.filters["format_ts"] = format_ts_filter env.filters["mxc_to_http"] = _create_mxc_to_http_filter(config) - notif_template_html = env.get_template( - config.email_notif_template_html - ) - notif_template_text = env.get_template( - config.email_notif_template_text - ) + template_html = env.get_template(template_html_name) + template_text = env.get_template(template_text_name) - return notif_template_html, notif_template_text + return template_html, template_text def _create_mxc_to_http_filter(config): diff --git a/synapse/push/pusher.py b/synapse/push/pusher.py index b33f2a357b..14bc7823cf 100644 --- a/synapse/push/pusher.py +++ b/synapse/push/pusher.py @@ -44,7 +44,11 @@ class PusherFactory(object): if hs.config.email_enable_notifs: self.mailers = {} # app_name -> Mailer - templates = load_jinja2_templates(hs.config) + templates = load_jinja2_templates( + config=hs.config, + template_html_name=hs.config.email_notif_template_html, + template_text_name=hs.config.email_notif_template_text, + ) self.notif_template_html, self.notif_template_text = templates self.pusher_types["email"] = self._create_email_pusher diff --git a/synapse/res/templates/mail-expiry.css b/synapse/res/templates/mail-expiry.css new file mode 100644 index 0000000000..3dea486467 --- /dev/null +++ b/synapse/res/templates/mail-expiry.css @@ -0,0 +1,4 @@ +.noticetext { + margin-top: 10px; + margin-bottom: 10px; +} diff --git a/synapse/res/templates/notice_expiry.html b/synapse/res/templates/notice_expiry.html new file mode 100644 index 0000000000..f0d7c66e1b --- /dev/null +++ b/synapse/res/templates/notice_expiry.html @@ -0,0 +1,43 @@ + + + + + + + + + + + + +
+ + + + + + + + +
+
Hi {{ display_name }},
+
+
Your account will expire on {{ expiration_ts|format_ts("%d-%m-%Y") }}. This means that you will lose access to your account after this date.
+
To extend the validity of your account, please click on the link bellow (or copy and paste it into a new browser tab):
+ +
+
+ + diff --git a/synapse/res/templates/notice_expiry.txt b/synapse/res/templates/notice_expiry.txt new file mode 100644 index 0000000000..41f1c4279c --- /dev/null +++ b/synapse/res/templates/notice_expiry.txt @@ -0,0 +1,7 @@ +Hi {{ display_name }}, + +Your account will expire on {{ expiration_ts|format_ts("%d-%m-%Y") }}. This means that you will lose access to your account after this date. + +To extend the validity of your account, please click on the link bellow (or copy and paste it to a new browser tab): + +{{ url }} diff --git a/synapse/rest/__init__.py b/synapse/rest/__init__.py index 91f5247d52..a66885d349 100644 --- a/synapse/rest/__init__.py +++ b/synapse/rest/__init__.py @@ -33,6 +33,7 @@ from synapse.rest.client.v1 import ( from synapse.rest.client.v2_alpha import ( account, account_data, + account_validity, auth, capabilities, devices, @@ -109,3 +110,4 @@ class ClientRestResource(JsonResource): groups.register_servlets(hs, client_resource) room_upgrade_rest_servlet.register_servlets(hs, client_resource) capabilities.register_servlets(hs, client_resource) + account_validity.register_servlets(hs, client_resource) diff --git a/synapse/rest/client/v2_alpha/account_validity.py b/synapse/rest/client/v2_alpha/account_validity.py new file mode 100644 index 0000000000..1ff6a6b638 --- /dev/null +++ b/synapse/rest/client/v2_alpha/account_validity.py @@ -0,0 +1,62 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from twisted.internet import defer + +from synapse.api.errors import SynapseError +from synapse.http.server import finish_request +from synapse.http.servlet import RestServlet + +from ._base import client_v2_patterns + +logger = logging.getLogger(__name__) + + +class AccountValidityRenewServlet(RestServlet): + PATTERNS = client_v2_patterns("/account_validity/renew$") + SUCCESS_HTML = b"Your account has been successfully renewed." + + def __init__(self, hs): + """ + Args: + hs (synapse.server.HomeServer): server + """ + super(AccountValidityRenewServlet, self).__init__() + + self.hs = hs + self.account_activity_handler = hs.get_account_validity_handler() + + @defer.inlineCallbacks + def on_GET(self, request): + if b"token" not in request.args: + raise SynapseError(400, "Missing renewal token") + renewal_token = request.args[b"token"][0] + + yield self.account_activity_handler.renew_account(renewal_token.decode('utf8')) + + request.setResponseCode(200) + request.setHeader(b"Content-Type", b"text/html; charset=utf-8") + request.setHeader(b"Content-Length", b"%d" % ( + len(AccountValidityRenewServlet.SUCCESS_HTML), + )) + request.write(AccountValidityRenewServlet.SUCCESS_HTML) + finish_request(request) + defer.returnValue(None) + + +def register_servlets(hs, http_server): + AccountValidityRenewServlet(hs).register(http_server) diff --git a/synapse/server.py b/synapse/server.py index dc8f1ccb8c..8c30ac2fa5 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -47,6 +47,7 @@ from synapse.federation.transport.client import TransportLayerClient from synapse.groups.attestations import GroupAttestationSigning, GroupAttestionRenewer from synapse.groups.groups_server import GroupsServerHandler from synapse.handlers import Handlers +from synapse.handlers.account_validity import AccountValidityHandler from synapse.handlers.acme import AcmeHandler from synapse.handlers.appservice import ApplicationServicesHandler from synapse.handlers.auth import AuthHandler, MacaroonGenerator @@ -183,6 +184,7 @@ class HomeServer(object): 'room_context_handler', 'sendmail', 'registration_handler', + 'account_validity_handler', ] REQUIRED_ON_MASTER_STARTUP = [ @@ -506,6 +508,9 @@ class HomeServer(object): def build_registration_handler(self): return RegistrationHandler(self) + def build_account_validity_handler(self): + return AccountValidityHandler(self) + def remove_pusher(self, app_id, push_key, user_id): return self.get_pusherpool().remove_pusher(app_id, push_key, user_id) diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index eede8ae4d2..a78850259f 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -32,6 +32,7 @@ class RegistrationWorkerStore(SQLBaseStore): super(RegistrationWorkerStore, self).__init__(db_conn, hs) self.config = hs.config + self.clock = hs.get_clock() @cached() def get_user_by_id(self, user_id): @@ -87,25 +88,156 @@ class RegistrationWorkerStore(SQLBaseStore): ) @cachedInlineCallbacks() - def get_expiration_ts_for_user(self, user): + def get_expiration_ts_for_user(self, user_id): """Get the expiration timestamp for the account bearing a given user ID. Args: - user (str): The ID of the user. + user_id (str): The ID of the user. Returns: defer.Deferred: None, if the account has no expiration timestamp, - otherwise int representation of the timestamp (as a number of - milliseconds since epoch). + otherwise int representation of the timestamp (as a number of + milliseconds since epoch). """ res = yield self._simple_select_one_onecol( table="account_validity", - keyvalues={"user_id": user.to_string()}, + keyvalues={"user_id": user_id}, retcol="expiration_ts_ms", allow_none=True, - desc="get_expiration_date_for_user", + desc="get_expiration_ts_for_user", + ) + defer.returnValue(res) + + @defer.inlineCallbacks + def renew_account_for_user(self, user_id, new_expiration_ts): + """Updates the account validity table with a new timestamp for a given + user, removes the existing renewal token from this user, and unsets the + flag indicating that an email has been sent for renewing this account. + + Args: + user_id (str): ID of the user whose account validity to renew. + new_expiration_ts: New expiration date, as a timestamp in milliseconds + since epoch. + """ + def renew_account_for_user_txn(txn): + self._simple_update_txn( + txn=txn, + table="account_validity", + keyvalues={"user_id": user_id}, + updatevalues={ + "expiration_ts_ms": new_expiration_ts, + "email_sent": False, + "renewal_token": None, + }, + ) + self._invalidate_cache_and_stream( + txn, self.get_expiration_ts_for_user, (user_id,), + ) + + yield self.runInteraction( + "renew_account_for_user", + renew_account_for_user_txn, + ) + + @defer.inlineCallbacks + def set_renewal_token_for_user(self, user_id, renewal_token): + """Defines a renewal token for a given user. + + Args: + user_id (str): ID of the user to set the renewal token for. + renewal_token (str): Random unique string that will be used to renew the + user's account. + + Raises: + StoreError: The provided token is already set for another user. + """ + yield self._simple_update_one( + table="account_validity", + keyvalues={"user_id": user_id}, + updatevalues={"renewal_token": renewal_token}, + desc="set_renewal_token_for_user", + ) + + @defer.inlineCallbacks + def get_user_from_renewal_token(self, renewal_token): + """Get a user ID from a renewal token. + + Args: + renewal_token (str): The renewal token to perform the lookup with. + + Returns: + defer.Deferred[str]: The ID of the user to which the token belongs. + """ + res = yield self._simple_select_one_onecol( + table="account_validity", + keyvalues={"renewal_token": renewal_token}, + retcol="user_id", + desc="get_user_from_renewal_token", + ) + + defer.returnValue(res) + + @defer.inlineCallbacks + def get_renewal_token_for_user(self, user_id): + """Get the renewal token associated with a given user ID. + + Args: + user_id (str): The user ID to lookup a token for. + + Returns: + defer.Deferred[str]: The renewal token associated with this user ID. + """ + res = yield self._simple_select_one_onecol( + table="account_validity", + keyvalues={"user_id": user_id}, + retcol="renewal_token", + desc="get_renewal_token_for_user", ) + defer.returnValue(res) + @defer.inlineCallbacks + def get_users_expiring_soon(self): + """Selects users whose account will expire in the [now, now + renew_at] time + window (see configuration for account_validity for information on what renew_at + refers to). + + Returns: + Deferred: Resolves to a list[dict[user_id (str), expiration_ts_ms (int)]] + """ + def select_users_txn(txn, now_ms, renew_at): + sql = ( + "SELECT user_id, expiration_ts_ms FROM account_validity" + " WHERE email_sent = ? AND (expiration_ts_ms - ?) <= ?" + ) + values = [False, now_ms, renew_at] + txn.execute(sql, values) + return self.cursor_to_dict(txn) + + res = yield self.runInteraction( + "get_users_expiring_soon", + select_users_txn, + self.clock.time_msec(), self.config.account_validity.renew_at, + ) + + defer.returnValue(res) + + @defer.inlineCallbacks + def set_renewal_mail_status(self, user_id, email_sent): + """Sets or unsets the flag that indicates whether a renewal email has been sent + to the user (and the user hasn't renewed their account yet). + + Args: + user_id (str): ID of the user to set/unset the flag for. + email_sent (bool): Flag which indicates whether a renewal email has been sent + to this user. + """ + yield self._simple_update_one( + table="account_validity", + keyvalues={"user_id": user_id}, + updatevalues={"email_sent": email_sent}, + desc="set_renewal_mail_status", + ) + @defer.inlineCallbacks def is_server_admin(self, user): res = yield self._simple_select_one_onecol( @@ -508,22 +640,24 @@ class RegistrationStore(RegistrationWorkerStore, } ) - if self._account_validity.enabled: - now_ms = self.clock.time_msec() - expiration_ts = now_ms + self._account_validity.period - self._simple_insert_txn( - txn, - "account_validity", - values={ - "user_id": user_id, - "expiration_ts_ms": expiration_ts, - } - ) except self.database_engine.module.IntegrityError: raise StoreError( 400, "User ID already taken.", errcode=Codes.USER_IN_USE ) + if self._account_validity.enabled: + now_ms = self.clock.time_msec() + expiration_ts = now_ms + self._account_validity.period + self._simple_insert_txn( + txn, + "account_validity", + values={ + "user_id": user_id, + "expiration_ts_ms": expiration_ts, + "email_sent": False, + } + ) + if token: # it's possible for this to get a conflict, but only for a single user # since tokens are namespaced based on their user ID diff --git a/synapse/storage/schema/delta/54/account_validity.sql b/synapse/storage/schema/delta/54/account_validity.sql index 57249262d7..2357626000 100644 --- a/synapse/storage/schema/delta/54/account_validity.sql +++ b/synapse/storage/schema/delta/54/account_validity.sql @@ -13,8 +13,15 @@ * limitations under the License. */ +DROP TABLE IF EXISTS account_validity; + -- Track what users are in public rooms. CREATE TABLE IF NOT EXISTS account_validity ( user_id TEXT PRIMARY KEY, - expiration_ts_ms BIGINT NOT NULL + expiration_ts_ms BIGINT NOT NULL, + email_sent BOOLEAN NOT NULL, + renewal_token TEXT ); + +CREATE INDEX account_validity_email_sent_idx ON account_validity(email_sent, expiration_ts_ms) +CREATE UNIQUE INDEX account_validity_renewal_string_idx ON account_validity(renewal_token) diff --git a/tests/rest/client/v2_alpha/test_register.py b/tests/rest/client/v2_alpha/test_register.py index d3611ed21f..8fb5140a05 100644 --- a/tests/rest/client/v2_alpha/test_register.py +++ b/tests/rest/client/v2_alpha/test_register.py @@ -1,14 +1,22 @@ import datetime import json +import os + +import pkg_resources from synapse.api.constants import LoginType from synapse.api.errors import Codes from synapse.appservice import ApplicationService from synapse.rest.client.v1 import admin, login -from synapse.rest.client.v2_alpha import register, sync +from synapse.rest.client.v2_alpha import account_validity, register, sync from tests import unittest +try: + from synapse.push.mailer import load_jinja2_templates +except ImportError: + load_jinja2_templates = None + class RegisterRestServletTestCase(unittest.HomeserverTestCase): @@ -197,6 +205,7 @@ class AccountValidityTestCase(unittest.HomeserverTestCase): def make_homeserver(self, reactor, clock): config = self.default_config() + # Test for account expiring after a week. config.enable_registration = True config.account_validity.enabled = True config.account_validity.period = 604800000 # Time in ms for 1 week @@ -228,3 +237,92 @@ class AccountValidityTestCase(unittest.HomeserverTestCase): self.assertEquals( channel.json_body["errcode"], Codes.EXPIRED_ACCOUNT, channel.result, ) + + +class AccountValidityRenewalByEmailTestCase(unittest.HomeserverTestCase): + + skip = "No Jinja installed" if not load_jinja2_templates else None + servlets = [ + register.register_servlets, + admin.register_servlets, + login.register_servlets, + sync.register_servlets, + account_validity.register_servlets, + ] + + def make_homeserver(self, reactor, clock): + config = self.default_config() + # Test for account expiring after a week and renewal emails being sent 2 + # days before expiry. + config.enable_registration = True + config.account_validity.enabled = True + config.account_validity.renew_by_email_enabled = True + config.account_validity.period = 604800000 # Time in ms for 1 week + config.account_validity.renew_at = 172800000 # Time in ms for 2 days + config.account_validity.renew_email_subject = "Renew your account" + + # Email config. + self.email_attempts = [] + + def sendmail(*args, **kwargs): + self.email_attempts.append((args, kwargs)) + return + + config.email_template_dir = os.path.abspath( + pkg_resources.resource_filename('synapse', 'res/templates') + ) + config.email_expiry_template_html = "notice_expiry.html" + config.email_expiry_template_text = "notice_expiry.txt" + config.email_smtp_host = "127.0.0.1" + config.email_smtp_port = 20 + config.require_transport_security = False + config.email_smtp_user = None + config.email_smtp_pass = None + config.email_notif_from = "test@example.com" + + self.hs = self.setup_test_homeserver(config=config, sendmail=sendmail) + + self.store = self.hs.get_datastore() + + return self.hs + + def test_renewal_email(self): + user_id = self.register_user("kermit", "monkey") + tok = self.login("kermit", "monkey") + # We need to manually add an email address otherwise the handler will do + # nothing. + now = self.hs.clock.time_msec() + self.get_success(self.store.user_add_threepid( + user_id=user_id, medium="email", address="kermit@example.com", + validated_at=now, added_at=now, + )) + + # The specific endpoint doesn't matter, all we need is an authenticated + # endpoint. + request, channel = self.make_request( + b"GET", "/sync", access_token=tok, + ) + self.render(request) + self.assertEquals(channel.result["code"], b"200", channel.result) + + # Move 6 days forward. This should trigger a renewal email to be sent. + self.reactor.advance(datetime.timedelta(days=6).total_seconds()) + self.assertEqual(len(self.email_attempts), 1) + + # Retrieving the URL from the email is too much pain for now, so we + # retrieve the token from the DB. + renewal_token = self.get_success(self.store.get_renewal_token_for_user(user_id)) + url = "/_matrix/client/unstable/account_validity/renew?token=%s" % renewal_token + request, channel = self.make_request(b"GET", url) + self.render(request) + self.assertEquals(channel.result["code"], b"200", channel.result) + + # Move 3 days forward. If the renewal failed, every authed request with + # our access token should be denied from now, otherwise they should + # succeed. + self.reactor.advance(datetime.timedelta(days=3).total_seconds()) + request, channel = self.make_request( + b"GET", "/sync", access_token=tok, + ) + self.render(request) + self.assertEquals(channel.result["code"], b"200", channel.result) -- cgit 1.5.1 From efe3c7977a3dc9c4308388f71e82b837df7d09b4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 14 May 2019 16:59:21 +0100 Subject: Add simple send_relation API and track in DB --- synapse/api/constants.py | 8 ++ synapse/rest/__init__.py | 2 + synapse/rest/client/v2_alpha/relations.py | 110 ++++++++++++++++++++++++++ synapse/storage/__init__.py | 2 + synapse/storage/events.py | 2 + synapse/storage/relations.py | 62 +++++++++++++++ synapse/storage/schema/delta/54/relations.sql | 27 +++++++ tests/rest/client/v2_alpha/test_relations.py | 98 +++++++++++++++++++++++ 8 files changed, 311 insertions(+) create mode 100644 synapse/rest/client/v2_alpha/relations.py create mode 100644 synapse/storage/relations.py create mode 100644 synapse/storage/schema/delta/54/relations.sql create mode 100644 tests/rest/client/v2_alpha/test_relations.py (limited to 'synapse/storage/schema') diff --git a/synapse/api/constants.py b/synapse/api/constants.py index 8547a63535..30bebd749f 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -116,3 +116,11 @@ class UserTypes(object): """ SUPPORT = "support" ALL_USER_TYPES = (SUPPORT,) + + +class RelationTypes(object): + """The types of relations known to this server. + """ + ANNOTATION = "m.annotation" + REPLACES = "m.replaces" + REFERENCES = "m.references" diff --git a/synapse/rest/__init__.py b/synapse/rest/__init__.py index 3a24d31d1b..e6110ad9b1 100644 --- a/synapse/rest/__init__.py +++ b/synapse/rest/__init__.py @@ -44,6 +44,7 @@ from synapse.rest.client.v2_alpha import ( read_marker, receipts, register, + relations, report_event, room_keys, room_upgrade_rest_servlet, @@ -115,6 +116,7 @@ class ClientRestResource(JsonResource): room_upgrade_rest_servlet.register_servlets(hs, client_resource) capabilities.register_servlets(hs, client_resource) account_validity.register_servlets(hs, client_resource) + relations.register_servlets(hs, client_resource) # moving to /_synapse/admin synapse.rest.admin.register_servlets_for_client_rest_resource( diff --git a/synapse/rest/client/v2_alpha/relations.py b/synapse/rest/client/v2_alpha/relations.py new file mode 100644 index 0000000000..b504b4a8be --- /dev/null +++ b/synapse/rest/client/v2_alpha/relations.py @@ -0,0 +1,110 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""This class implements the proposed relation APIs from MSC 1849. + +Since the MSC has not been approved all APIs here are unstable and may change at +any time to reflect changes in the MSC. +""" + +import logging + +from twisted.internet import defer + +from synapse.api.constants import EventTypes +from synapse.api.errors import SynapseError +from synapse.http.servlet import ( + RestServlet, + parse_json_object_from_request, + parse_string, +) + +from ._base import client_v2_patterns + +logger = logging.getLogger(__name__) + + +class RelationSendServlet(RestServlet): + """Helper API for sending events that have relation data. + + Example API shape to send a 👍 reaction to a room: + + POST /rooms/!foo/send_relation/$bar/m.annotation/m.reaction?key=%F0%9F%91%8D + {} + + { + "event_id": "$foobar" + } + """ + + PATTERN = ( + "/rooms/(?P[^/]*)/send_relation" + "/(?P[^/]*)/(?P[^/]*)/(?P[^/]*)" + ) + + def __init__(self, hs): + super(RelationSendServlet, self).__init__() + self.auth = hs.get_auth() + self.event_creation_handler = hs.get_event_creation_handler() + + def register(self, http_server): + http_server.register_paths( + "POST", + client_v2_patterns(self.PATTERN + "$", releases=()), + self.on_PUT_or_POST, + ) + http_server.register_paths( + "PUT", + client_v2_patterns(self.PATTERN + "/(?P[^/]*)$", releases=()), + self.on_PUT_or_POST, + ) + + @defer.inlineCallbacks + def on_PUT_or_POST( + self, request, room_id, parent_id, relation_type, event_type, txn_id=None + ): + requester = yield self.auth.get_user_by_req(request, allow_guest=True) + + if event_type == EventTypes.Member: + # Add relations to a membership is meaningless, so we just deny it + # at the CS API rather than trying to handle it correctly. + raise SynapseError(400, "Cannot send member events with relations") + + content = parse_json_object_from_request(request) + + aggregation_key = parse_string(request, "key", encoding="utf-8") + + content["m.relates_to"] = { + "event_id": parent_id, + "key": aggregation_key, + "rel_type": relation_type, + } + + event_dict = { + "type": event_type, + "content": content, + "room_id": room_id, + "sender": requester.user.to_string(), + } + + event = yield self.event_creation_handler.create_and_send_nonmember_event( + requester, event_dict=event_dict, txn_id=txn_id + ) + + defer.returnValue((200, {"event_id": event.event_id})) + + +def register_servlets(hs, http_server): + RelationSendServlet(hs).register(http_server) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index c432041b4e..7522d3fd57 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -49,6 +49,7 @@ from .pusher import PusherStore from .receipts import ReceiptsStore from .registration import RegistrationStore from .rejections import RejectionsStore +from .relations import RelationsStore from .room import RoomStore from .roommember import RoomMemberStore from .search import SearchStore @@ -99,6 +100,7 @@ class DataStore( GroupServerStore, UserErasureStore, MonthlyActiveUsersStore, + RelationsStore, ): def __init__(self, db_conn, hs): self.hs = hs diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 7a7f841c6c..6802bf42ce 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1351,6 +1351,8 @@ class EventsStore( # Insert into the event_search table. self._store_guest_access_txn(txn, event) + self._handle_event_relations(txn, event) + # Insert into the room_memberships table. self._store_room_members_txn( txn, diff --git a/synapse/storage/relations.py b/synapse/storage/relations.py new file mode 100644 index 0000000000..a4905162e0 --- /dev/null +++ b/synapse/storage/relations.py @@ -0,0 +1,62 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from synapse.api.constants import RelationTypes +from synapse.storage._base import SQLBaseStore + +logger = logging.getLogger(__name__) + + +class RelationsStore(SQLBaseStore): + def _handle_event_relations(self, txn, event): + """Handles inserting relation data during peristence of events + + Args: + txn + event (EventBase) + """ + relation = event.content.get("m.relates_to") + if not relation: + # No relations + return + + rel_type = relation.get("rel_type") + if rel_type not in ( + RelationTypes.ANNOTATION, + RelationTypes.REFERENCES, + RelationTypes.REPLACES, + ): + # Unknown relation type + return + + parent_id = relation.get("event_id") + if not parent_id: + # Invalid relation + return + + aggregation_key = relation.get("key") + + self._simple_insert_txn( + txn, + table="event_relations", + values={ + "event_id": event.event_id, + "relates_to_id": parent_id, + "relation_type": rel_type, + "aggregation_key": aggregation_key, + }, + ) diff --git a/synapse/storage/schema/delta/54/relations.sql b/synapse/storage/schema/delta/54/relations.sql new file mode 100644 index 0000000000..134862b870 --- /dev/null +++ b/synapse/storage/schema/delta/54/relations.sql @@ -0,0 +1,27 @@ +/* Copyright 2019 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Tracks related events, like reactions, replies, edits, etc. Note that things +-- in this table are not necessarily "valid", e.g. it may contain edits from +-- people who don't have power to edit other peoples events. +CREATE TABLE IF NOT EXISTS event_relations ( + event_id TEXT NOT NULL, + relates_to_id TEXT NOT NULL, + relation_type TEXT NOT NULL, + aggregation_key TEXT +); + +CREATE UNIQUE INDEX event_relations_id ON event_relations(event_id); +CREATE INDEX event_relations_relates ON event_relations(relates_to_id, relation_type, aggregation_key); diff --git a/tests/rest/client/v2_alpha/test_relations.py b/tests/rest/client/v2_alpha/test_relations.py new file mode 100644 index 0000000000..61163d5b26 --- /dev/null +++ b/tests/rest/client/v2_alpha/test_relations.py @@ -0,0 +1,98 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import six + +from synapse.api.constants import EventTypes, RelationTypes +from synapse.rest.client.v1 import login, room +from synapse.rest.client.v2_alpha import relations + +from tests import unittest + + +class RelationsTestCase(unittest.HomeserverTestCase): + user_id = "@alice:test" + servlets = [ + relations.register_servlets, + room.register_servlets, + login.register_servlets, + ] + + def prepare(self, reactor, clock, hs): + self.room = self.helper.create_room_as(self.user_id) + res = self.helper.send(self.room, body="Hi!") + self.parent_id = res["event_id"] + + def test_send_relation(self): + """Tests that sending a relation using the new /send_relation works + creates the right shape of event. + """ + + channel = self._send_relation(RelationTypes.ANNOTATION, "m.reaction", key="👍") + self.assertEquals(200, channel.code, channel.json_body) + + event_id = channel.json_body["event_id"] + + request, channel = self.make_request( + "GET", "/rooms/%s/event/%s" % (self.room, event_id) + ) + self.render(request) + self.assertEquals(200, channel.code, channel.json_body) + + self.assert_dict( + { + "type": "m.reaction", + "sender": self.user_id, + "content": { + "m.relates_to": { + "event_id": self.parent_id, + "key": u"👍", + "rel_type": RelationTypes.ANNOTATION, + } + }, + }, + channel.json_body, + ) + + def test_deny_membership(self): + """Test that we deny relations on membership events + """ + channel = self._send_relation(RelationTypes.ANNOTATION, EventTypes.Member) + self.assertEquals(400, channel.code, channel.json_body) + + def _send_relation(self, relation_type, event_type, key=None): + """Helper function to send a relation pointing at `self.parent_id` + + Args: + relation_type (str): One of `RelationTypes` + event_type (str): The type of the event to create + key (str|None): The aggregation key used for m.annotation relation + type. + + Returns: + FakeChannel + """ + query = "" + if key: + query = "?key=" + six.moves.urllib.parse.quote_plus(key) + + request, channel = self.make_request( + "POST", + "/_matrix/client/unstable/rooms/%s/send_relation/%s/%s/%s%s" + % (self.room, self.parent_id, relation_type, event_type, query), + b"{}", + ) + self.render(request) + return channel -- cgit 1.5.1 From 4a30e4acb4ef14431914bd42ad09a51bd81d6c3e Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Tue, 21 May 2019 11:36:50 -0500 Subject: Room Statistics (#4338) --- changelog.d/4338.feature | 1 + docs/sample_config.yaml | 16 ++ synapse/api/constants.py | 1 + synapse/config/homeserver.py | 42 ++- synapse/config/stats.py | 60 ++++ synapse/handlers/stats.py | 325 +++++++++++++++++++++ synapse/server.py | 6 + synapse/storage/__init__.py | 2 + synapse/storage/events_worker.py | 24 ++ synapse/storage/roommember.py | 32 +++ synapse/storage/schema/delta/54/stats.sql | 80 ++++++ synapse/storage/state_deltas.py | 12 +- synapse/storage/stats.py | 450 ++++++++++++++++++++++++++++++ tests/handlers/test_stats.py | 251 +++++++++++++++++ tests/rest/client/v1/utils.py | 17 ++ 15 files changed, 1306 insertions(+), 13 deletions(-) create mode 100644 changelog.d/4338.feature create mode 100644 synapse/config/stats.py create mode 100644 synapse/handlers/stats.py create mode 100644 synapse/storage/schema/delta/54/stats.sql create mode 100644 synapse/storage/stats.py create mode 100644 tests/handlers/test_stats.py (limited to 'synapse/storage/schema') diff --git a/changelog.d/4338.feature b/changelog.d/4338.feature new file mode 100644 index 0000000000..01285e965c --- /dev/null +++ b/changelog.d/4338.feature @@ -0,0 +1 @@ +Synapse now more efficiently collates room statistics. diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml index f658ec8ecd..559fbcdd01 100644 --- a/docs/sample_config.yaml +++ b/docs/sample_config.yaml @@ -1153,6 +1153,22 @@ password_config: # + +# Local statistics collection. Used in populating the room directory. +# +# 'bucket_size' controls how large each statistics timeslice is. It can +# be defined in a human readable short form -- e.g. "1d", "1y". +# +# 'retention' controls how long historical statistics will be kept for. +# It can be defined in a human readable short form -- e.g. "1d", "1y". +# +# +#stats: +# enabled: true +# bucket_size: 1d +# retention: 1y + + # Server Notices room configuration # # Uncomment this section to enable a room which can be used to send notices diff --git a/synapse/api/constants.py b/synapse/api/constants.py index 6b347b1749..ee129c8689 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -79,6 +79,7 @@ class EventTypes(object): RoomHistoryVisibility = "m.room.history_visibility" CanonicalAlias = "m.room.canonical_alias" + Encryption = "m.room.encryption" RoomAvatar = "m.room.avatar" RoomEncryption = "m.room.encryption" GuestAccess = "m.room.guest_access" diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py index 727fdc54d8..5c4fc8ff21 100644 --- a/synapse/config/homeserver.py +++ b/synapse/config/homeserver.py @@ -13,6 +13,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + from .api import ApiConfig from .appservice import AppServiceConfig from .captcha import CaptchaConfig @@ -36,20 +37,41 @@ from .saml2_config import SAML2Config from .server import ServerConfig from .server_notices_config import ServerNoticesConfig from .spam_checker import SpamCheckerConfig +from .stats import StatsConfig from .tls import TlsConfig from .user_directory import UserDirectoryConfig from .voip import VoipConfig from .workers import WorkerConfig -class HomeServerConfig(ServerConfig, TlsConfig, DatabaseConfig, LoggingConfig, - RatelimitConfig, ContentRepositoryConfig, CaptchaConfig, - VoipConfig, RegistrationConfig, MetricsConfig, ApiConfig, - AppServiceConfig, KeyConfig, SAML2Config, CasConfig, - JWTConfig, PasswordConfig, EmailConfig, - WorkerConfig, PasswordAuthProviderConfig, PushConfig, - SpamCheckerConfig, GroupsConfig, UserDirectoryConfig, - ConsentConfig, - ServerNoticesConfig, RoomDirectoryConfig, - ): +class HomeServerConfig( + ServerConfig, + TlsConfig, + DatabaseConfig, + LoggingConfig, + RatelimitConfig, + ContentRepositoryConfig, + CaptchaConfig, + VoipConfig, + RegistrationConfig, + MetricsConfig, + ApiConfig, + AppServiceConfig, + KeyConfig, + SAML2Config, + CasConfig, + JWTConfig, + PasswordConfig, + EmailConfig, + WorkerConfig, + PasswordAuthProviderConfig, + PushConfig, + SpamCheckerConfig, + GroupsConfig, + UserDirectoryConfig, + ConsentConfig, + StatsConfig, + ServerNoticesConfig, + RoomDirectoryConfig, +): pass diff --git a/synapse/config/stats.py b/synapse/config/stats.py new file mode 100644 index 0000000000..80fc1b9dd0 --- /dev/null +++ b/synapse/config/stats.py @@ -0,0 +1,60 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import division + +import sys + +from ._base import Config + + +class StatsConfig(Config): + """Stats Configuration + Configuration for the behaviour of synapse's stats engine + """ + + def read_config(self, config): + self.stats_enabled = True + self.stats_bucket_size = 86400 + self.stats_retention = sys.maxsize + stats_config = config.get("stats", None) + if stats_config: + self.stats_enabled = stats_config.get("enabled", self.stats_enabled) + self.stats_bucket_size = ( + self.parse_duration(stats_config.get("bucket_size", "1d")) / 1000 + ) + self.stats_retention = ( + self.parse_duration( + stats_config.get("retention", "%ds" % (sys.maxsize,)) + ) + / 1000 + ) + + def default_config(self, config_dir_path, server_name, **kwargs): + return """ + # Local statistics collection. Used in populating the room directory. + # + # 'bucket_size' controls how large each statistics timeslice is. It can + # be defined in a human readable short form -- e.g. "1d", "1y". + # + # 'retention' controls how long historical statistics will be kept for. + # It can be defined in a human readable short form -- e.g. "1d", "1y". + # + # + #stats: + # enabled: true + # bucket_size: 1d + # retention: 1y + """ diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py new file mode 100644 index 0000000000..0e92b405ba --- /dev/null +++ b/synapse/handlers/stats.py @@ -0,0 +1,325 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from twisted.internet import defer + +from synapse.api.constants import EventTypes, JoinRules, Membership +from synapse.handlers.state_deltas import StateDeltasHandler +from synapse.metrics import event_processing_positions +from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.types import UserID +from synapse.util.metrics import Measure + +logger = logging.getLogger(__name__) + + +class StatsHandler(StateDeltasHandler): + """Handles keeping the *_stats tables updated with a simple time-series of + information about the users, rooms and media on the server, such that admins + have some idea of who is consuming their resources. + + Heavily derived from UserDirectoryHandler + """ + + def __init__(self, hs): + super(StatsHandler, self).__init__(hs) + self.hs = hs + self.store = hs.get_datastore() + self.state = hs.get_state_handler() + self.server_name = hs.hostname + self.clock = hs.get_clock() + self.notifier = hs.get_notifier() + self.is_mine_id = hs.is_mine_id + self.stats_bucket_size = hs.config.stats_bucket_size + + # The current position in the current_state_delta stream + self.pos = None + + # Guard to ensure we only process deltas one at a time + self._is_processing = False + + if hs.config.stats_enabled: + self.notifier.add_replication_callback(self.notify_new_event) + + # We kick this off so that we don't have to wait for a change before + # we start populating stats + self.clock.call_later(0, self.notify_new_event) + + def notify_new_event(self): + """Called when there may be more deltas to process + """ + if not self.hs.config.stats_enabled: + return + + if self._is_processing: + return + + @defer.inlineCallbacks + def process(): + try: + yield self._unsafe_process() + finally: + self._is_processing = False + + self._is_processing = True + run_as_background_process("stats.notify_new_event", process) + + @defer.inlineCallbacks + def _unsafe_process(self): + # If self.pos is None then means we haven't fetched it from DB + if self.pos is None: + self.pos = yield self.store.get_stats_stream_pos() + + # If still None then the initial background update hasn't happened yet + if self.pos is None: + defer.returnValue(None) + + # Loop round handling deltas until we're up to date + while True: + with Measure(self.clock, "stats_delta"): + deltas = yield self.store.get_current_state_deltas(self.pos) + if not deltas: + return + + logger.info("Handling %d state deltas", len(deltas)) + yield self._handle_deltas(deltas) + + self.pos = deltas[-1]["stream_id"] + yield self.store.update_stats_stream_pos(self.pos) + + event_processing_positions.labels("stats").set(self.pos) + + @defer.inlineCallbacks + def _handle_deltas(self, deltas): + """ + Called with the state deltas to process + """ + for delta in deltas: + typ = delta["type"] + state_key = delta["state_key"] + room_id = delta["room_id"] + event_id = delta["event_id"] + stream_id = delta["stream_id"] + prev_event_id = delta["prev_event_id"] + + logger.debug("Handling: %r %r, %s", typ, state_key, event_id) + + token = yield self.store.get_earliest_token_for_room_stats(room_id) + + # If the earliest token to begin from is larger than our current + # stream ID, skip processing this delta. + if token is not None and token >= stream_id: + logger.debug( + "Ignoring: %s as earlier than this room's initial ingestion event", + event_id, + ) + continue + + if event_id is None and prev_event_id is None: + # Errr... + continue + + event_content = {} + + if event_id is not None: + event_content = (yield self.store.get_event(event_id)).content or {} + + # quantise time to the nearest bucket + now = yield self.store.get_received_ts(event_id) + now = (now // 1000 // self.stats_bucket_size) * self.stats_bucket_size + + if typ == EventTypes.Member: + # we could use _get_key_change here but it's a bit inefficient + # given we're not testing for a specific result; might as well + # just grab the prev_membership and membership strings and + # compare them. + prev_event_content = {} + if prev_event_id is not None: + prev_event_content = ( + yield self.store.get_event(prev_event_id) + ).content + + membership = event_content.get("membership", Membership.LEAVE) + prev_membership = prev_event_content.get("membership", Membership.LEAVE) + + if prev_membership == membership: + continue + + if prev_membership == Membership.JOIN: + yield self.store.update_stats_delta( + now, "room", room_id, "joined_members", -1 + ) + elif prev_membership == Membership.INVITE: + yield self.store.update_stats_delta( + now, "room", room_id, "invited_members", -1 + ) + elif prev_membership == Membership.LEAVE: + yield self.store.update_stats_delta( + now, "room", room_id, "left_members", -1 + ) + elif prev_membership == Membership.BAN: + yield self.store.update_stats_delta( + now, "room", room_id, "banned_members", -1 + ) + else: + err = "%s is not a valid prev_membership" % (repr(prev_membership),) + logger.error(err) + raise ValueError(err) + + if membership == Membership.JOIN: + yield self.store.update_stats_delta( + now, "room", room_id, "joined_members", +1 + ) + elif membership == Membership.INVITE: + yield self.store.update_stats_delta( + now, "room", room_id, "invited_members", +1 + ) + elif membership == Membership.LEAVE: + yield self.store.update_stats_delta( + now, "room", room_id, "left_members", +1 + ) + elif membership == Membership.BAN: + yield self.store.update_stats_delta( + now, "room", room_id, "banned_members", +1 + ) + else: + err = "%s is not a valid membership" % (repr(membership),) + logger.error(err) + raise ValueError(err) + + user_id = state_key + if self.is_mine_id(user_id): + # update user_stats as it's one of our users + public = yield self._is_public_room(room_id) + + if membership == Membership.LEAVE: + yield self.store.update_stats_delta( + now, + "user", + user_id, + "public_rooms" if public else "private_rooms", + -1, + ) + elif membership == Membership.JOIN: + yield self.store.update_stats_delta( + now, + "user", + user_id, + "public_rooms" if public else "private_rooms", + +1, + ) + + elif typ == EventTypes.Create: + # Newly created room. Add it with all blank portions. + yield self.store.update_room_state( + room_id, + { + "join_rules": None, + "history_visibility": None, + "encryption": None, + "name": None, + "topic": None, + "avatar": None, + "canonical_alias": None, + }, + ) + + elif typ == EventTypes.JoinRules: + yield self.store.update_room_state( + room_id, {"join_rules": event_content.get("join_rule")} + ) + + is_public = yield self._get_key_change( + prev_event_id, event_id, "join_rule", JoinRules.PUBLIC + ) + if is_public is not None: + yield self.update_public_room_stats(now, room_id, is_public) + + elif typ == EventTypes.RoomHistoryVisibility: + yield self.store.update_room_state( + room_id, + {"history_visibility": event_content.get("history_visibility")}, + ) + + is_public = yield self._get_key_change( + prev_event_id, event_id, "history_visibility", "world_readable" + ) + if is_public is not None: + yield self.update_public_room_stats(now, room_id, is_public) + + elif typ == EventTypes.Encryption: + yield self.store.update_room_state( + room_id, {"encryption": event_content.get("algorithm")} + ) + elif typ == EventTypes.Name: + yield self.store.update_room_state( + room_id, {"name": event_content.get("name")} + ) + elif typ == EventTypes.Topic: + yield self.store.update_room_state( + room_id, {"topic": event_content.get("topic")} + ) + elif typ == EventTypes.RoomAvatar: + yield self.store.update_room_state( + room_id, {"avatar": event_content.get("url")} + ) + elif typ == EventTypes.CanonicalAlias: + yield self.store.update_room_state( + room_id, {"canonical_alias": event_content.get("alias")} + ) + + @defer.inlineCallbacks + def update_public_room_stats(self, ts, room_id, is_public): + """ + Increment/decrement a user's number of public rooms when a room they are + in changes to/from public visibility. + + Args: + ts (int): Timestamp in seconds + room_id (str) + is_public (bool) + """ + # For now, blindly iterate over all local users in the room so that + # we can handle the whole problem of copying buckets over as needed + user_ids = yield self.store.get_users_in_room(room_id) + + for user_id in user_ids: + if self.hs.is_mine(UserID.from_string(user_id)): + yield self.store.update_stats_delta( + ts, "user", user_id, "public_rooms", +1 if is_public else -1 + ) + yield self.store.update_stats_delta( + ts, "user", user_id, "private_rooms", -1 if is_public else +1 + ) + + @defer.inlineCallbacks + def _is_public_room(self, room_id): + join_rules = yield self.state.get_current_state(room_id, EventTypes.JoinRules) + history_visibility = yield self.state.get_current_state( + room_id, EventTypes.RoomHistoryVisibility + ) + + if (join_rules and join_rules.content.get("join_rule") == JoinRules.PUBLIC) or ( + ( + history_visibility + and history_visibility.content.get("history_visibility") + == "world_readable" + ) + ): + defer.returnValue(True) + else: + defer.returnValue(False) diff --git a/synapse/server.py b/synapse/server.py index 80d40b9272..9229a68a8d 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -72,6 +72,7 @@ from synapse.handlers.room_list import RoomListHandler from synapse.handlers.room_member import RoomMemberMasterHandler from synapse.handlers.room_member_worker import RoomMemberWorkerHandler from synapse.handlers.set_password import SetPasswordHandler +from synapse.handlers.stats import StatsHandler from synapse.handlers.sync import SyncHandler from synapse.handlers.typing import TypingHandler from synapse.handlers.user_directory import UserDirectoryHandler @@ -139,6 +140,7 @@ class HomeServer(object): 'acme_handler', 'auth_handler', 'device_handler', + 'stats_handler', 'e2e_keys_handler', 'e2e_room_keys_handler', 'event_handler', @@ -191,6 +193,7 @@ class HomeServer(object): REQUIRED_ON_MASTER_STARTUP = [ "user_directory_handler", + "stats_handler" ] # This is overridden in derived application classes @@ -474,6 +477,9 @@ class HomeServer(object): def build_secrets(self): return Secrets() + def build_stats_handler(self): + return StatsHandler(self) + def build_spam_checker(self): return SpamChecker(self) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 7522d3fd57..66675d08ae 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -55,6 +55,7 @@ from .roommember import RoomMemberStore from .search import SearchStore from .signatures import SignatureStore from .state import StateStore +from .stats import StatsStore from .stream import StreamStore from .tags import TagsStore from .transactions import TransactionStore @@ -100,6 +101,7 @@ class DataStore( GroupServerStore, UserErasureStore, MonthlyActiveUsersStore, + StatsStore, RelationsStore, ): def __init__(self, db_conn, hs): diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index adc6cf26b5..83ffae2132 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -611,3 +611,27 @@ class EventsWorkerStore(SQLBaseStore): return res return self.runInteraction("get_rejection_reasons", f) + + def _get_total_state_event_counts_txn(self, txn, room_id): + """ + See get_state_event_counts. + """ + sql = "SELECT COUNT(*) FROM state_events WHERE room_id=?" + txn.execute(sql, (room_id,)) + row = txn.fetchone() + return row[0] if row else 0 + + def get_total_state_event_counts(self, room_id): + """ + Gets the total number of state events in a room. + + Args: + room_id (str) + + Returns: + Deferred[int] + """ + return self.runInteraction( + "get_total_state_event_counts", + self._get_total_state_event_counts_txn, room_id + ) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 57df17bcc2..4bd1669458 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -142,6 +142,38 @@ class RoomMemberWorkerStore(EventsWorkerStore): return self.runInteraction("get_room_summary", _get_room_summary_txn) + def _get_user_count_in_room_txn(self, txn, room_id, membership): + """ + See get_user_count_in_room. + """ + sql = ( + "SELECT count(*) FROM room_memberships as m" + " INNER JOIN current_state_events as c" + " ON m.event_id = c.event_id " + " AND m.room_id = c.room_id " + " AND m.user_id = c.state_key" + " WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?" + ) + + txn.execute(sql, (room_id, membership)) + row = txn.fetchone() + return row[0] + + def get_user_count_in_room(self, room_id, membership): + """ + Get the user count in a room with a particular membership. + + Args: + room_id (str) + membership (Membership) + + Returns: + Deferred[int] + """ + return self.runInteraction( + "get_users_in_room", self._get_user_count_in_room_txn, room_id, membership + ) + @cached() def get_invited_rooms_for_user(self, user_id): """ Get all the rooms the user is invited to diff --git a/synapse/storage/schema/delta/54/stats.sql b/synapse/storage/schema/delta/54/stats.sql new file mode 100644 index 0000000000..652e58308e --- /dev/null +++ b/synapse/storage/schema/delta/54/stats.sql @@ -0,0 +1,80 @@ +/* Copyright 2018 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE stats_stream_pos ( + Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row. + stream_id BIGINT, + CHECK (Lock='X') +); + +INSERT INTO stats_stream_pos (stream_id) VALUES (null); + +CREATE TABLE user_stats ( + user_id TEXT NOT NULL, + ts BIGINT NOT NULL, + bucket_size INT NOT NULL, + public_rooms INT NOT NULL, + private_rooms INT NOT NULL +); + +CREATE UNIQUE INDEX user_stats_user_ts ON user_stats(user_id, ts); + +CREATE TABLE room_stats ( + room_id TEXT NOT NULL, + ts BIGINT NOT NULL, + bucket_size INT NOT NULL, + current_state_events INT NOT NULL, + joined_members INT NOT NULL, + invited_members INT NOT NULL, + left_members INT NOT NULL, + banned_members INT NOT NULL, + state_events INT NOT NULL +); + +CREATE UNIQUE INDEX room_stats_room_ts ON room_stats(room_id, ts); + +-- cache of current room state; useful for the publicRooms list +CREATE TABLE room_state ( + room_id TEXT NOT NULL, + join_rules TEXT, + history_visibility TEXT, + encryption TEXT, + name TEXT, + topic TEXT, + avatar TEXT, + canonical_alias TEXT + -- get aliases straight from the right table +); + +CREATE UNIQUE INDEX room_state_room ON room_state(room_id); + +CREATE TABLE room_stats_earliest_token ( + room_id TEXT NOT NULL, + token BIGINT NOT NULL +); + +CREATE UNIQUE INDEX room_stats_earliest_token_idx ON room_stats_earliest_token(room_id); + +-- Set up staging tables +INSERT INTO background_updates (update_name, progress_json) VALUES + ('populate_stats_createtables', '{}'); + +-- Run through each room and update stats +INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES + ('populate_stats_process_rooms', '{}', 'populate_stats_createtables'); + +-- Clean up staging tables +INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES + ('populate_stats_cleanup', '{}', 'populate_stats_process_rooms'); diff --git a/synapse/storage/state_deltas.py b/synapse/storage/state_deltas.py index 31a0279b18..5fdb442104 100644 --- a/synapse/storage/state_deltas.py +++ b/synapse/storage/state_deltas.py @@ -84,10 +84,16 @@ class StateDeltasStore(SQLBaseStore): "get_current_state_deltas", get_current_state_deltas_txn ) - def get_max_stream_id_in_current_state_deltas(self): - return self._simple_select_one_onecol( + def _get_max_stream_id_in_current_state_deltas_txn(self, txn): + return self._simple_select_one_onecol_txn( + txn, table="current_state_delta_stream", keyvalues={}, retcol="COALESCE(MAX(stream_id), -1)", - desc="get_max_stream_id_in_current_state_deltas", + ) + + def get_max_stream_id_in_current_state_deltas(self): + return self.runInteraction( + "get_max_stream_id_in_current_state_deltas", + self._get_max_stream_id_in_current_state_deltas_txn, ) diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py new file mode 100644 index 0000000000..71b80a891d --- /dev/null +++ b/synapse/storage/stats.py @@ -0,0 +1,450 @@ +# -*- coding: utf-8 -*- +# Copyright 2018, 2019 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from twisted.internet import defer + +from synapse.api.constants import EventTypes, Membership +from synapse.storage.state_deltas import StateDeltasStore +from synapse.util.caches.descriptors import cached + +logger = logging.getLogger(__name__) + +# these fields track absolutes (e.g. total number of rooms on the server) +ABSOLUTE_STATS_FIELDS = { + "room": ( + "current_state_events", + "joined_members", + "invited_members", + "left_members", + "banned_members", + "state_events", + ), + "user": ("public_rooms", "private_rooms"), +} + +TYPE_TO_ROOM = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")} + +TEMP_TABLE = "_temp_populate_stats" + + +class StatsStore(StateDeltasStore): + def __init__(self, db_conn, hs): + super(StatsStore, self).__init__(db_conn, hs) + + self.server_name = hs.hostname + self.clock = self.hs.get_clock() + self.stats_enabled = hs.config.stats_enabled + self.stats_bucket_size = hs.config.stats_bucket_size + + self.register_background_update_handler( + "populate_stats_createtables", self._populate_stats_createtables + ) + self.register_background_update_handler( + "populate_stats_process_rooms", self._populate_stats_process_rooms + ) + self.register_background_update_handler( + "populate_stats_cleanup", self._populate_stats_cleanup + ) + + @defer.inlineCallbacks + def _populate_stats_createtables(self, progress, batch_size): + + if not self.stats_enabled: + yield self._end_background_update("populate_stats_createtables") + defer.returnValue(1) + + # Get all the rooms that we want to process. + def _make_staging_area(txn): + sql = ( + "CREATE TABLE IF NOT EXISTS " + + TEMP_TABLE + + "_rooms(room_id TEXT NOT NULL, events BIGINT NOT NULL)" + ) + txn.execute(sql) + + sql = ( + "CREATE TABLE IF NOT EXISTS " + + TEMP_TABLE + + "_position(position TEXT NOT NULL)" + ) + txn.execute(sql) + + # Get rooms we want to process from the database + sql = """ + SELECT room_id, count(*) FROM current_state_events + GROUP BY room_id + """ + txn.execute(sql) + rooms = [{"room_id": x[0], "events": x[1]} for x in txn.fetchall()] + self._simple_insert_many_txn(txn, TEMP_TABLE + "_rooms", rooms) + del rooms + + new_pos = yield self.get_max_stream_id_in_current_state_deltas() + yield self.runInteraction("populate_stats_temp_build", _make_staging_area) + yield self._simple_insert(TEMP_TABLE + "_position", {"position": new_pos}) + self.get_earliest_token_for_room_stats.invalidate_all() + + yield self._end_background_update("populate_stats_createtables") + defer.returnValue(1) + + @defer.inlineCallbacks + def _populate_stats_cleanup(self, progress, batch_size): + """ + Update the user directory stream position, then clean up the old tables. + """ + if not self.stats_enabled: + yield self._end_background_update("populate_stats_cleanup") + defer.returnValue(1) + + position = yield self._simple_select_one_onecol( + TEMP_TABLE + "_position", None, "position" + ) + yield self.update_stats_stream_pos(position) + + def _delete_staging_area(txn): + txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_rooms") + txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_position") + + yield self.runInteraction("populate_stats_cleanup", _delete_staging_area) + + yield self._end_background_update("populate_stats_cleanup") + defer.returnValue(1) + + @defer.inlineCallbacks + def _populate_stats_process_rooms(self, progress, batch_size): + + if not self.stats_enabled: + yield self._end_background_update("populate_stats_process_rooms") + defer.returnValue(1) + + # If we don't have progress filed, delete everything. + if not progress: + yield self.delete_all_stats() + + def _get_next_batch(txn): + # Only fetch 250 rooms, so we don't fetch too many at once, even + # if those 250 rooms have less than batch_size state events. + sql = """ + SELECT room_id, events FROM %s_rooms + ORDER BY events DESC + LIMIT 250 + """ % ( + TEMP_TABLE, + ) + txn.execute(sql) + rooms_to_work_on = txn.fetchall() + + if not rooms_to_work_on: + return None + + # Get how many are left to process, so we can give status on how + # far we are in processing + txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms") + progress["remaining"] = txn.fetchone()[0] + + return rooms_to_work_on + + rooms_to_work_on = yield self.runInteraction( + "populate_stats_temp_read", _get_next_batch + ) + + # No more rooms -- complete the transaction. + if not rooms_to_work_on: + yield self._end_background_update("populate_stats_process_rooms") + defer.returnValue(1) + + logger.info( + "Processing the next %d rooms of %d remaining", + (len(rooms_to_work_on), progress["remaining"]), + ) + + # Number of state events we've processed by going through each room + processed_event_count = 0 + + for room_id, event_count in rooms_to_work_on: + + current_state_ids = yield self.get_current_state_ids(room_id) + + join_rules = yield self.get_event( + current_state_ids.get((EventTypes.JoinRules, "")), allow_none=True + ) + history_visibility = yield self.get_event( + current_state_ids.get((EventTypes.RoomHistoryVisibility, "")), + allow_none=True, + ) + encryption = yield self.get_event( + current_state_ids.get((EventTypes.RoomEncryption, "")), allow_none=True + ) + name = yield self.get_event( + current_state_ids.get((EventTypes.Name, "")), allow_none=True + ) + topic = yield self.get_event( + current_state_ids.get((EventTypes.Topic, "")), allow_none=True + ) + avatar = yield self.get_event( + current_state_ids.get((EventTypes.RoomAvatar, "")), allow_none=True + ) + canonical_alias = yield self.get_event( + current_state_ids.get((EventTypes.CanonicalAlias, "")), allow_none=True + ) + + def _or_none(x, arg): + if x: + return x.content.get(arg) + return None + + yield self.update_room_state( + room_id, + { + "join_rules": _or_none(join_rules, "join_rule"), + "history_visibility": _or_none( + history_visibility, "history_visibility" + ), + "encryption": _or_none(encryption, "algorithm"), + "name": _or_none(name, "name"), + "topic": _or_none(topic, "topic"), + "avatar": _or_none(avatar, "url"), + "canonical_alias": _or_none(canonical_alias, "alias"), + }, + ) + + now = self.hs.get_reactor().seconds() + + # quantise time to the nearest bucket + now = (now // self.stats_bucket_size) * self.stats_bucket_size + + def _fetch_data(txn): + + # Get the current token of the room + current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn) + + current_state_events = len(current_state_ids) + joined_members = self._get_user_count_in_room_txn( + txn, room_id, Membership.JOIN + ) + invited_members = self._get_user_count_in_room_txn( + txn, room_id, Membership.INVITE + ) + left_members = self._get_user_count_in_room_txn( + txn, room_id, Membership.LEAVE + ) + banned_members = self._get_user_count_in_room_txn( + txn, room_id, Membership.BAN + ) + total_state_events = self._get_total_state_event_counts_txn( + txn, room_id + ) + + self._update_stats_txn( + txn, + "room", + room_id, + now, + { + "bucket_size": self.stats_bucket_size, + "current_state_events": current_state_events, + "joined_members": joined_members, + "invited_members": invited_members, + "left_members": left_members, + "banned_members": banned_members, + "state_events": total_state_events, + }, + ) + self._simple_insert_txn( + txn, + "room_stats_earliest_token", + {"room_id": room_id, "token": current_token}, + ) + + yield self.runInteraction("update_room_stats", _fetch_data) + + # We've finished a room. Delete it from the table. + yield self._simple_delete_one(TEMP_TABLE + "_rooms", {"room_id": room_id}) + # Update the remaining counter. + progress["remaining"] -= 1 + yield self.runInteraction( + "populate_stats", + self._background_update_progress_txn, + "populate_stats_process_rooms", + progress, + ) + + processed_event_count += event_count + + if processed_event_count > batch_size: + # Don't process any more rooms, we've hit our batch size. + defer.returnValue(processed_event_count) + + defer.returnValue(processed_event_count) + + def delete_all_stats(self): + """ + Delete all statistics records. + """ + + def _delete_all_stats_txn(txn): + txn.execute("DELETE FROM room_state") + txn.execute("DELETE FROM room_stats") + txn.execute("DELETE FROM room_stats_earliest_token") + txn.execute("DELETE FROM user_stats") + + return self.runInteraction("delete_all_stats", _delete_all_stats_txn) + + def get_stats_stream_pos(self): + return self._simple_select_one_onecol( + table="stats_stream_pos", + keyvalues={}, + retcol="stream_id", + desc="stats_stream_pos", + ) + + def update_stats_stream_pos(self, stream_id): + return self._simple_update_one( + table="stats_stream_pos", + keyvalues={}, + updatevalues={"stream_id": stream_id}, + desc="update_stats_stream_pos", + ) + + def update_room_state(self, room_id, fields): + """ + Args: + room_id (str) + fields (dict[str:Any]) + """ + return self._simple_upsert( + table="room_state", + keyvalues={"room_id": room_id}, + values=fields, + desc="update_room_state", + ) + + def get_deltas_for_room(self, room_id, start, size=100): + """ + Get statistics deltas for a given room. + + Args: + room_id (str) + start (int): Pagination start. Number of entries, not timestamp. + size (int): How many entries to return. + + Returns: + Deferred[list[dict]], where the dict has the keys of + ABSOLUTE_STATS_FIELDS["room"] and "ts". + """ + return self._simple_select_list_paginate( + "room_stats", + {"room_id": room_id}, + "ts", + start, + size, + retcols=(list(ABSOLUTE_STATS_FIELDS["room"]) + ["ts"]), + order_direction="DESC", + ) + + def get_all_room_state(self): + return self._simple_select_list( + "room_state", None, retcols=("name", "topic", "canonical_alias") + ) + + @cached() + def get_earliest_token_for_room_stats(self, room_id): + """ + Fetch the "earliest token". This is used by the room stats delta + processor to ignore deltas that have been processed between the + start of the background task and any particular room's stats + being calculated. + + Returns: + Deferred[int] + """ + return self._simple_select_one_onecol( + "room_stats_earliest_token", + {"room_id": room_id}, + retcol="token", + allow_none=True, + ) + + def update_stats(self, stats_type, stats_id, ts, fields): + table, id_col = TYPE_TO_ROOM[stats_type] + return self._simple_upsert( + table=table, + keyvalues={id_col: stats_id, "ts": ts}, + values=fields, + desc="update_stats", + ) + + def _update_stats_txn(self, txn, stats_type, stats_id, ts, fields): + table, id_col = TYPE_TO_ROOM[stats_type] + return self._simple_upsert_txn( + txn, table=table, keyvalues={id_col: stats_id, "ts": ts}, values=fields + ) + + def update_stats_delta(self, ts, stats_type, stats_id, field, value): + def _update_stats_delta(txn): + table, id_col = TYPE_TO_ROOM[stats_type] + + sql = ( + "SELECT * FROM %s" + " WHERE %s=? and ts=(" + " SELECT MAX(ts) FROM %s" + " WHERE %s=?" + ")" + ) % (table, id_col, table, id_col) + txn.execute(sql, (stats_id, stats_id)) + rows = self.cursor_to_dict(txn) + if len(rows) == 0: + # silently skip as we don't have anything to apply a delta to yet. + # this tries to minimise any race between the initial sync and + # subsequent deltas arriving. + return + + current_ts = ts + latest_ts = rows[0]["ts"] + if current_ts < latest_ts: + # This one is in the past, but we're just encountering it now. + # Mark it as part of the current bucket. + current_ts = latest_ts + elif ts != latest_ts: + # we have to copy our absolute counters over to the new entry. + values = { + key: rows[0][key] for key in ABSOLUTE_STATS_FIELDS[stats_type] + } + values[id_col] = stats_id + values["ts"] = ts + values["bucket_size"] = self.stats_bucket_size + + self._simple_insert_txn(txn, table=table, values=values) + + # actually update the new value + if stats_type in ABSOLUTE_STATS_FIELDS[stats_type]: + self._simple_update_txn( + txn, + table=table, + keyvalues={id_col: stats_id, "ts": current_ts}, + updatevalues={field: value}, + ) + else: + sql = ("UPDATE %s SET %s=%s+? WHERE %s=? AND ts=?") % ( + table, + field, + field, + id_col, + ) + txn.execute(sql, (value, stats_id, current_ts)) + + return self.runInteraction("update_stats_delta", _update_stats_delta) diff --git a/tests/handlers/test_stats.py b/tests/handlers/test_stats.py new file mode 100644 index 0000000000..249aba3d59 --- /dev/null +++ b/tests/handlers/test_stats.py @@ -0,0 +1,251 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from mock import Mock + +from twisted.internet import defer + +from synapse.api.constants import EventTypes, Membership +from synapse.rest import admin +from synapse.rest.client.v1 import login, room + +from tests import unittest + + +class StatsRoomTests(unittest.HomeserverTestCase): + + servlets = [ + admin.register_servlets_for_client_rest_resource, + room.register_servlets, + login.register_servlets, + ] + + def prepare(self, reactor, clock, hs): + + self.store = hs.get_datastore() + self.handler = self.hs.get_stats_handler() + + def _add_background_updates(self): + """ + Add the background updates we need to run. + """ + # Ugh, have to reset this flag + self.store._all_done = False + + self.get_success( + self.store._simple_insert( + "background_updates", + {"update_name": "populate_stats_createtables", "progress_json": "{}"}, + ) + ) + self.get_success( + self.store._simple_insert( + "background_updates", + { + "update_name": "populate_stats_process_rooms", + "progress_json": "{}", + "depends_on": "populate_stats_createtables", + }, + ) + ) + self.get_success( + self.store._simple_insert( + "background_updates", + { + "update_name": "populate_stats_cleanup", + "progress_json": "{}", + "depends_on": "populate_stats_process_rooms", + }, + ) + ) + + def test_initial_room(self): + """ + The background updates will build the table from scratch. + """ + r = self.get_success(self.store.get_all_room_state()) + self.assertEqual(len(r), 0) + + # Disable stats + self.hs.config.stats_enabled = False + self.handler.stats_enabled = False + + u1 = self.register_user("u1", "pass") + u1_token = self.login("u1", "pass") + + room_1 = self.helper.create_room_as(u1, tok=u1_token) + self.helper.send_state( + room_1, event_type="m.room.topic", body={"topic": "foo"}, tok=u1_token + ) + + # Stats disabled, shouldn't have done anything + r = self.get_success(self.store.get_all_room_state()) + self.assertEqual(len(r), 0) + + # Enable stats + self.hs.config.stats_enabled = True + self.handler.stats_enabled = True + + # Do the initial population of the user directory via the background update + self._add_background_updates() + + while not self.get_success(self.store.has_completed_background_updates()): + self.get_success(self.store.do_next_background_update(100), by=0.1) + + r = self.get_success(self.store.get_all_room_state()) + + self.assertEqual(len(r), 1) + self.assertEqual(r[0]["topic"], "foo") + + def test_initial_earliest_token(self): + """ + Ingestion via notify_new_event will ignore tokens that the background + update have already processed. + """ + self.reactor.advance(86401) + + self.hs.config.stats_enabled = False + self.handler.stats_enabled = False + + u1 = self.register_user("u1", "pass") + u1_token = self.login("u1", "pass") + + u2 = self.register_user("u2", "pass") + u2_token = self.login("u2", "pass") + + u3 = self.register_user("u3", "pass") + u3_token = self.login("u3", "pass") + + room_1 = self.helper.create_room_as(u1, tok=u1_token) + self.helper.send_state( + room_1, event_type="m.room.topic", body={"topic": "foo"}, tok=u1_token + ) + + # Begin the ingestion by creating the temp tables. This will also store + # the position that the deltas should begin at, once they take over. + self.hs.config.stats_enabled = True + self.handler.stats_enabled = True + self.store._all_done = False + self.get_success(self.store.update_stats_stream_pos(None)) + + self.get_success( + self.store._simple_insert( + "background_updates", + {"update_name": "populate_stats_createtables", "progress_json": "{}"}, + ) + ) + + while not self.get_success(self.store.has_completed_background_updates()): + self.get_success(self.store.do_next_background_update(100), by=0.1) + + # Now, before the table is actually ingested, add some more events. + self.helper.invite(room=room_1, src=u1, targ=u2, tok=u1_token) + self.helper.join(room=room_1, user=u2, tok=u2_token) + + # Now do the initial ingestion. + self.get_success( + self.store._simple_insert( + "background_updates", + {"update_name": "populate_stats_process_rooms", "progress_json": "{}"}, + ) + ) + self.get_success( + self.store._simple_insert( + "background_updates", + { + "update_name": "populate_stats_cleanup", + "progress_json": "{}", + "depends_on": "populate_stats_process_rooms", + }, + ) + ) + + self.store._all_done = False + while not self.get_success(self.store.has_completed_background_updates()): + self.get_success(self.store.do_next_background_update(100), by=0.1) + + self.reactor.advance(86401) + + # Now add some more events, triggering ingestion. Because of the stream + # position being set to before the events sent in the middle, a simpler + # implementation would reprocess those events, and say there were four + # users, not three. + self.helper.invite(room=room_1, src=u1, targ=u3, tok=u1_token) + self.helper.join(room=room_1, user=u3, tok=u3_token) + + # Get the deltas! There should be two -- day 1, and day 2. + r = self.get_success(self.store.get_deltas_for_room(room_1, 0)) + + # The oldest has 2 joined members + self.assertEqual(r[-1]["joined_members"], 2) + + # The newest has 3 + self.assertEqual(r[0]["joined_members"], 3) + + def test_incorrect_state_transition(self): + """ + If the state transition is not one of (JOIN, INVITE, LEAVE, BAN) to + (JOIN, INVITE, LEAVE, BAN), an error is raised. + """ + events = { + "a1": {"membership": Membership.LEAVE}, + "a2": {"membership": "not a real thing"}, + } + + def get_event(event_id): + m = Mock() + m.content = events[event_id] + d = defer.Deferred() + self.reactor.callLater(0.0, d.callback, m) + return d + + def get_received_ts(event_id): + return defer.succeed(1) + + self.store.get_received_ts = get_received_ts + self.store.get_event = get_event + + deltas = [ + { + "type": EventTypes.Member, + "state_key": "some_user", + "room_id": "room", + "event_id": "a1", + "prev_event_id": "a2", + "stream_id": "bleb", + } + ] + + f = self.get_failure(self.handler._handle_deltas(deltas), ValueError) + self.assertEqual( + f.value.args[0], "'not a real thing' is not a valid prev_membership" + ) + + # And the other way... + deltas = [ + { + "type": EventTypes.Member, + "state_key": "some_user", + "room_id": "room", + "event_id": "a2", + "prev_event_id": "a1", + "stream_id": "bleb", + } + ] + + f = self.get_failure(self.handler._handle_deltas(deltas), ValueError) + self.assertEqual( + f.value.args[0], "'not a real thing' is not a valid membership" + ) diff --git a/tests/rest/client/v1/utils.py b/tests/rest/client/v1/utils.py index 05b0143c42..f7133fc12e 100644 --- a/tests/rest/client/v1/utils.py +++ b/tests/rest/client/v1/utils.py @@ -127,3 +127,20 @@ class RestHelper(object): ) return channel.json_body + + def send_state(self, room_id, event_type, body, tok, expect_code=200): + path = "/_matrix/client/r0/rooms/%s/state/%s" % (room_id, event_type) + if tok: + path = path + "?access_token=%s" % tok + + request, channel = make_request( + self.hs.get_reactor(), "PUT", path, json.dumps(body).encode('utf8') + ) + render(request, self.resource, self.hs.get_reactor()) + + assert int(channel.result["code"]) == expect_code, ( + "Expected: %d, got: %d, resp: %r" + % (expect_code, int(channel.result["code"]), channel.result["body"]) + ) + + return channel.json_body -- cgit 1.5.1 From b75537beaf841089f9f07c9dbed04a7a420a8b1f Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 3 Apr 2019 18:10:24 +0100 Subject: Store key validity time in the storage layer This is a first step to checking that the key is valid at the required moment. The idea here is that, rather than passing VerifyKey objects in and out of the storage layer, we instead pass FetchKeyResult objects, which simply wrap the VerifyKey and add a valid_until_ts field. --- changelog.d/5237.misc | 1 + synapse/crypto/keyring.py | 47 +++++++++++++++------- synapse/storage/keys.py | 31 +++++++++----- .../delta/54/add_validity_to_server_keys.sql | 23 +++++++++++ tests/crypto/test_keyring.py | 22 ++++++---- tests/storage/test_keys.py | 44 +++++++++++++------- 6 files changed, 122 insertions(+), 46 deletions(-) create mode 100644 changelog.d/5237.misc create mode 100644 synapse/storage/schema/delta/54/add_validity_to_server_keys.sql (limited to 'synapse/storage/schema') diff --git a/changelog.d/5237.misc b/changelog.d/5237.misc new file mode 100644 index 0000000000..f4fe3b821b --- /dev/null +++ b/changelog.d/5237.misc @@ -0,0 +1 @@ +Store key validity time in the storage layer. diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index 9d629b2238..14a27288fd 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -20,7 +20,6 @@ from collections import namedtuple from six import raise_from from six.moves import urllib -import nacl.signing from signedjson.key import ( decode_verify_key_bytes, encode_verify_key_base64, @@ -43,6 +42,7 @@ from synapse.api.errors import ( RequestSendFailed, SynapseError, ) +from synapse.storage.keys import FetchKeyResult from synapse.util import logcontext, unwrapFirstError from synapse.util.logcontext import ( LoggingContext, @@ -307,11 +307,15 @@ class Keyring(object): # complete this VerifyKeyRequest. result_keys = results.get(server_name, {}) for key_id in verify_request.key_ids: - key = result_keys.get(key_id) - if key: + fetch_key_result = result_keys.get(key_id) + if fetch_key_result: with PreserveLoggingContext(): verify_request.deferred.callback( - (server_name, key_id, key) + ( + server_name, + key_id, + fetch_key_result.verify_key, + ) ) break else: @@ -348,12 +352,12 @@ class Keyring(object): def get_keys_from_store(self, server_name_and_key_ids): """ Args: - server_name_and_key_ids (iterable(Tuple[str, iterable[str]]): + server_name_and_key_ids (iterable[Tuple[str, iterable[str]]]): list of (server_name, iterable[key_id]) tuples to fetch keys for Returns: - Deferred: resolves to dict[str, dict[str, VerifyKey|None]]: map from - server_name -> key_id -> VerifyKey + Deferred[dict[str, dict[str, synapse.storage.keys.FetchKeyResult|None]]]: + map from server_name -> key_id -> FetchKeyResult """ keys_to_fetch = ( (server_name, key_id) @@ -430,6 +434,18 @@ class Keyring(object): def get_server_verify_key_v2_indirect( self, server_names_and_key_ids, perspective_name, perspective_keys ): + """ + Args: + server_names_and_key_ids (iterable[Tuple[str, iterable[str]]]): + list of (server_name, iterable[key_id]) tuples to fetch keys for + perspective_name (str): name of the notary server to query for the keys + perspective_keys (dict[str, VerifyKey]): map of key_id->key for the + notary server + + Returns: + Deferred[dict[str, dict[str, synapse.storage.keys.FetchKeyResult]]]: map + from server_name -> key_id -> FetchKeyResult + """ # TODO(mark): Set the minimum_valid_until_ts to that needed by # the events being validated or the current time if validating # an incoming request. @@ -506,7 +522,7 @@ class Keyring(object): @defer.inlineCallbacks def get_server_verify_key_v2_direct(self, server_name, key_ids): - keys = {} # type: dict[str, nacl.signing.VerifyKey] + keys = {} # type: dict[str, FetchKeyResult] for requested_key_id in key_ids: if requested_key_id in keys: @@ -583,9 +599,9 @@ class Keyring(object): actually in the response Returns: - Deferred[dict[str, nacl.signing.VerifyKey]]: - map from key_id to key object + Deferred[dict[str, FetchKeyResult]]: map from key_id to result object """ + ts_valid_until_ms = response_json[u"valid_until_ts"] # start by extracting the keys from the response, since they may be required # to validate the signature on the response. @@ -595,7 +611,9 @@ class Keyring(object): key_base64 = key_data["key"] key_bytes = decode_base64(key_base64) verify_key = decode_verify_key_bytes(key_id, key_bytes) - verify_keys[key_id] = verify_key + verify_keys[key_id] = FetchKeyResult( + verify_key=verify_key, valid_until_ts=ts_valid_until_ms + ) # TODO: improve this signature checking server_name = response_json["server_name"] @@ -606,7 +624,7 @@ class Keyring(object): ) verify_signed_json( - response_json, server_name, verify_keys[key_id] + response_json, server_name, verify_keys[key_id].verify_key ) for key_id, key_data in response_json["old_verify_keys"].items(): @@ -614,7 +632,9 @@ class Keyring(object): key_base64 = key_data["key"] key_bytes = decode_base64(key_base64) verify_key = decode_verify_key_bytes(key_id, key_bytes) - verify_keys[key_id] = verify_key + verify_keys[key_id] = FetchKeyResult( + verify_key=verify_key, valid_until_ts=key_data["expired_ts"] + ) # re-sign the json with our own key, so that it is ready if we are asked to # give it out as a notary server @@ -623,7 +643,6 @@ class Keyring(object): ) signed_key_json_bytes = encode_canonical_json(signed_key_json) - ts_valid_until_ms = signed_key_json[u"valid_until_ts"] # for reasons I don't quite understand, we store this json for the key ids we # requested, as well as those we got. diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py index 3c5f52009b..5300720dbb 100644 --- a/synapse/storage/keys.py +++ b/synapse/storage/keys.py @@ -19,6 +19,7 @@ import logging import six +import attr from signedjson.key import decode_verify_key_bytes from synapse.util import batch_iter @@ -36,6 +37,12 @@ else: db_binary_type = memoryview +@attr.s(slots=True, frozen=True) +class FetchKeyResult(object): + verify_key = attr.ib() # VerifyKey: the key itself + valid_until_ts = attr.ib() # int: how long we can use this key for + + class KeyStore(SQLBaseStore): """Persistence for signature verification keys """ @@ -54,8 +61,8 @@ class KeyStore(SQLBaseStore): iterable of (server_name, key-id) tuples to fetch keys for Returns: - Deferred: resolves to dict[Tuple[str, str], VerifyKey|None]: - map from (server_name, key_id) -> VerifyKey, or None if the key is + Deferred: resolves to dict[Tuple[str, str], FetchKeyResult|None]: + map from (server_name, key_id) -> FetchKeyResult, or None if the key is unknown """ keys = {} @@ -65,17 +72,19 @@ class KeyStore(SQLBaseStore): # batch_iter always returns tuples so it's safe to do len(batch) sql = ( - "SELECT server_name, key_id, verify_key FROM server_signature_keys " - "WHERE 1=0" + "SELECT server_name, key_id, verify_key, ts_valid_until_ms " + "FROM server_signature_keys WHERE 1=0" ) + " OR (server_name=? AND key_id=?)" * len(batch) txn.execute(sql, tuple(itertools.chain.from_iterable(batch))) for row in txn: - server_name, key_id, key_bytes = row - keys[(server_name, key_id)] = decode_verify_key_bytes( - key_id, bytes(key_bytes) + server_name, key_id, key_bytes, ts_valid_until_ms = row + res = FetchKeyResult( + verify_key=decode_verify_key_bytes(key_id, bytes(key_bytes)), + valid_until_ts=ts_valid_until_ms, ) + keys[(server_name, key_id)] = res def _txn(txn): for batch in batch_iter(server_name_and_key_ids, 50): @@ -89,20 +98,21 @@ class KeyStore(SQLBaseStore): Args: from_server (str): Where the verification keys were looked up ts_added_ms (int): The time to record that the key was added - verify_keys (iterable[tuple[str, str, nacl.signing.VerifyKey]]): + verify_keys (iterable[tuple[str, str, FetchKeyResult]]): keys to be stored. Each entry is a triplet of (server_name, key_id, key). """ key_values = [] value_values = [] invalidations = [] - for server_name, key_id, verify_key in verify_keys: + for server_name, key_id, fetch_result in verify_keys: key_values.append((server_name, key_id)) value_values.append( ( from_server, ts_added_ms, - db_binary_type(verify_key.encode()), + fetch_result.valid_until_ts, + db_binary_type(fetch_result.verify_key.encode()), ) ) # invalidate takes a tuple corresponding to the params of @@ -125,6 +135,7 @@ class KeyStore(SQLBaseStore): value_names=( "from_server", "ts_added_ms", + "ts_valid_until_ms", "verify_key", ), value_values=value_values, diff --git a/synapse/storage/schema/delta/54/add_validity_to_server_keys.sql b/synapse/storage/schema/delta/54/add_validity_to_server_keys.sql new file mode 100644 index 0000000000..c01aa9d2d9 --- /dev/null +++ b/synapse/storage/schema/delta/54/add_validity_to_server_keys.sql @@ -0,0 +1,23 @@ +/* Copyright 2019 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* When we can use this key until, before we have to refresh it. */ +ALTER TABLE server_signature_keys ADD COLUMN ts_valid_until_ms BIGINT; + +UPDATE server_signature_keys SET ts_valid_until_ms = ( + SELECT MAX(ts_valid_until_ms) FROM server_keys_json skj WHERE + skj.server_name = server_signature_keys.server_name AND + skj.key_id = server_signature_keys.key_id +); diff --git a/tests/crypto/test_keyring.py b/tests/crypto/test_keyring.py index bcffe53a91..83de32b05d 100644 --- a/tests/crypto/test_keyring.py +++ b/tests/crypto/test_keyring.py @@ -25,6 +25,7 @@ from twisted.internet import defer from synapse.api.errors import SynapseError from synapse.crypto import keyring from synapse.crypto.keyring import KeyLookupError +from synapse.storage.keys import FetchKeyResult from synapse.util import logcontext from synapse.util.logcontext import LoggingContext @@ -201,7 +202,7 @@ class KeyringTestCase(unittest.HomeserverTestCase): ( "server9", key1_id, - signedjson.key.get_verify_key(key1), + FetchKeyResult(signedjson.key.get_verify_key(key1), 1000), ), ], ) @@ -251,9 +252,10 @@ class KeyringTestCase(unittest.HomeserverTestCase): server_name_and_key_ids = [(SERVER_NAME, ("key1",))] keys = self.get_success(kr.get_keys_from_server(server_name_and_key_ids)) k = keys[SERVER_NAME][testverifykey_id] - self.assertEqual(k, testverifykey) - self.assertEqual(k.alg, "ed25519") - self.assertEqual(k.version, "ver1") + self.assertEqual(k.valid_until_ts, VALID_UNTIL_TS) + self.assertEqual(k.verify_key, testverifykey) + self.assertEqual(k.verify_key.alg, "ed25519") + self.assertEqual(k.verify_key.version, "ver1") # check that the perspectives store is correctly updated lookup_triplet = (SERVER_NAME, testverifykey_id, None) @@ -321,9 +323,10 @@ class KeyringTestCase(unittest.HomeserverTestCase): keys = self.get_success(kr.get_keys_from_perspectives(server_name_and_key_ids)) self.assertIn(SERVER_NAME, keys) k = keys[SERVER_NAME][testverifykey_id] - self.assertEqual(k, testverifykey) - self.assertEqual(k.alg, "ed25519") - self.assertEqual(k.version, "ver1") + self.assertEqual(k.valid_until_ts, VALID_UNTIL_TS) + self.assertEqual(k.verify_key, testverifykey) + self.assertEqual(k.verify_key.alg, "ed25519") + self.assertEqual(k.verify_key.version, "ver1") # check that the perspectives store is correctly updated lookup_triplet = (SERVER_NAME, testverifykey_id, None) @@ -346,7 +349,10 @@ class KeyringTestCase(unittest.HomeserverTestCase): @defer.inlineCallbacks def run_in_context(f, *args, **kwargs): - with LoggingContext("testctx"): + with LoggingContext("testctx") as ctx: + # we set the "request" prop to make it easier to follow what's going on in the + # logs. + ctx.request = "testctx" rv = yield f(*args, **kwargs) defer.returnValue(rv) diff --git a/tests/storage/test_keys.py b/tests/storage/test_keys.py index 71ad7aee32..e07ff01201 100644 --- a/tests/storage/test_keys.py +++ b/tests/storage/test_keys.py @@ -17,6 +17,8 @@ import signedjson.key from twisted.internet.defer import Deferred +from synapse.storage.keys import FetchKeyResult + import tests.unittest KEY_1 = signedjson.key.decode_verify_key_base64( @@ -37,8 +39,8 @@ class KeyStoreTestCase(tests.unittest.HomeserverTestCase): "from_server", 10, [ - ("server1", key_id_1, KEY_1), - ("server1", key_id_2, KEY_2), + ("server1", key_id_1, FetchKeyResult(KEY_1, 100)), + ("server1", key_id_2, FetchKeyResult(KEY_2, 200)), ], ) self.get_success(d) @@ -50,13 +52,15 @@ class KeyStoreTestCase(tests.unittest.HomeserverTestCase): self.assertEqual(len(res.keys()), 3) res1 = res[("server1", key_id_1)] - self.assertEqual(res1, KEY_1) - self.assertEqual(res1.version, "key1") + self.assertEqual(res1.verify_key, KEY_1) + self.assertEqual(res1.verify_key.version, "key1") + self.assertEqual(res1.valid_until_ts, 100) res2 = res[("server1", key_id_2)] - self.assertEqual(res2, KEY_2) + self.assertEqual(res2.verify_key, KEY_2) # version comes from the ID it was stored with - self.assertEqual(res2.version, "KEY_ID_2") + self.assertEqual(res2.verify_key.version, "KEY_ID_2") + self.assertEqual(res2.valid_until_ts, 200) # non-existent result gives None self.assertIsNone(res[("server1", "ed25519:key3")]) @@ -73,8 +77,8 @@ class KeyStoreTestCase(tests.unittest.HomeserverTestCase): "from_server", 0, [ - ("srv1", key_id_1, KEY_1), - ("srv1", key_id_2, KEY_2), + ("srv1", key_id_1, FetchKeyResult(KEY_1, 100)), + ("srv1", key_id_2, FetchKeyResult(KEY_2, 200)), ], ) self.get_success(d) @@ -82,26 +86,38 @@ class KeyStoreTestCase(tests.unittest.HomeserverTestCase): d = store.get_server_verify_keys([("srv1", key_id_1), ("srv1", key_id_2)]) res = self.get_success(d) self.assertEqual(len(res.keys()), 2) - self.assertEqual(res[("srv1", key_id_1)], KEY_1) - self.assertEqual(res[("srv1", key_id_2)], KEY_2) + + res1 = res[("srv1", key_id_1)] + self.assertEqual(res1.verify_key, KEY_1) + self.assertEqual(res1.valid_until_ts, 100) + + res2 = res[("srv1", key_id_2)] + self.assertEqual(res2.verify_key, KEY_2) + self.assertEqual(res2.valid_until_ts, 200) # we should be able to look up the same thing again without a db hit res = store.get_server_verify_keys([("srv1", key_id_1)]) if isinstance(res, Deferred): res = self.successResultOf(res) self.assertEqual(len(res.keys()), 1) - self.assertEqual(res[("srv1", key_id_1)], KEY_1) + self.assertEqual(res[("srv1", key_id_1)].verify_key, KEY_1) new_key_2 = signedjson.key.get_verify_key( signedjson.key.generate_signing_key("key2") ) d = store.store_server_verify_keys( - "from_server", 10, [("srv1", key_id_2, new_key_2)] + "from_server", 10, [("srv1", key_id_2, FetchKeyResult(new_key_2, 300))] ) self.get_success(d) d = store.get_server_verify_keys([("srv1", key_id_1), ("srv1", key_id_2)]) res = self.get_success(d) self.assertEqual(len(res.keys()), 2) - self.assertEqual(res[("srv1", key_id_1)], KEY_1) - self.assertEqual(res[("srv1", key_id_2)], new_key_2) + + res1 = res[("srv1", key_id_1)] + self.assertEqual(res1.verify_key, KEY_1) + self.assertEqual(res1.valid_until_ts, 100) + + res2 = res[("srv1", key_id_2)] + self.assertEqual(res2.verify_key, new_key_2) + self.assertEqual(res2.valid_until_ts, 300) -- cgit 1.5.1 From ba17de7fbc29700163b23363ae0e03f8a01ef274 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 28 May 2019 10:11:38 +0100 Subject: Fix schema update for account validity --- .../storage/schema/delta/54/account_validity.sql | 27 ------------------- .../delta/54/account_validity_with_renewal.sql | 30 ++++++++++++++++++++++ 2 files changed, 30 insertions(+), 27 deletions(-) delete mode 100644 synapse/storage/schema/delta/54/account_validity.sql create mode 100644 synapse/storage/schema/delta/54/account_validity_with_renewal.sql (limited to 'synapse/storage/schema') diff --git a/synapse/storage/schema/delta/54/account_validity.sql b/synapse/storage/schema/delta/54/account_validity.sql deleted file mode 100644 index 2357626000..0000000000 --- a/synapse/storage/schema/delta/54/account_validity.sql +++ /dev/null @@ -1,27 +0,0 @@ -/* Copyright 2019 New Vector Ltd - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -DROP TABLE IF EXISTS account_validity; - --- Track what users are in public rooms. -CREATE TABLE IF NOT EXISTS account_validity ( - user_id TEXT PRIMARY KEY, - expiration_ts_ms BIGINT NOT NULL, - email_sent BOOLEAN NOT NULL, - renewal_token TEXT -); - -CREATE INDEX account_validity_email_sent_idx ON account_validity(email_sent, expiration_ts_ms) -CREATE UNIQUE INDEX account_validity_renewal_string_idx ON account_validity(renewal_token) diff --git a/synapse/storage/schema/delta/54/account_validity_with_renewal.sql b/synapse/storage/schema/delta/54/account_validity_with_renewal.sql new file mode 100644 index 0000000000..0adb2ad55e --- /dev/null +++ b/synapse/storage/schema/delta/54/account_validity_with_renewal.sql @@ -0,0 +1,30 @@ +/* Copyright 2019 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- We previously changed the schema for this table without renaming the file, which means +-- that some databases might still be using the old schema. This ensures Synapse uses the +-- right schema for the table. +DROP TABLE IF EXISTS account_validity; + +-- Track what users are in public rooms. +CREATE TABLE IF NOT EXISTS account_validity ( + user_id TEXT PRIMARY KEY, + expiration_ts_ms BIGINT NOT NULL, + email_sent BOOLEAN NOT NULL, + renewal_token TEXT +); + +CREATE INDEX account_validity_email_sent_idx ON account_validity(email_sent, expiration_ts_ms) +CREATE UNIQUE INDEX account_validity_renewal_string_idx ON account_validity(renewal_token) -- cgit 1.5.1 From d79c9994f416ee5dab27a277fa729ffa5ee74ccc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 28 May 2019 18:52:41 +0100 Subject: Add DB bg update to cleanup extremities. Due to #5269 we may have extremities in our DB that we shouldn't have, so lets add a cleanup task such to remove those. --- synapse/storage/events.py | 186 +++++++++++++++++++++ .../schema/delta/54/delete_forward_extremities.sql | 19 +++ 2 files changed, 205 insertions(+) create mode 100644 synapse/storage/schema/delta/54/delete_forward_extremities.sql (limited to 'synapse/storage/schema') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 6e9f3d1dc0..a9be143bd5 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -221,6 +221,7 @@ class EventsStore( ): EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts" EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url" + EVENT_DELETE_SOFT_FAILED_EXTREMITIES = "delete_soft_failed_extremities" def __init__(self, db_conn, hs): super(EventsStore, self).__init__(db_conn, hs) @@ -252,6 +253,11 @@ class EventsStore( psql_only=True, ) + self.register_background_update_handler( + self.EVENT_DELETE_SOFT_FAILED_EXTREMITIES, + self._cleanup_extremities_bg_update, + ) + self._event_persist_queue = _EventPeristenceQueue() self._state_resolution_handler = hs.get_state_resolution_handler() @@ -2341,6 +2347,186 @@ class EventsStore( get_all_updated_current_state_deltas_txn, ) + @defer.inlineCallbacks + def _cleanup_extremities_bg_update(self, progress, batch_size): + """Background update to clean out extremities that should have been + deleted previously. + + Mainly used to deal with the aftermath of #5269. + """ + + # This works by first copying all existing forward extremities into the + # `_extremities_to_check` table at start up, and then checking each + # event in that table whether we have any descendants that are not + # soft-failed/rejected. If that is the case then we delete that event + # from the forward extremities table. + # + # For efficiency, we do this in batches by recursively pulling out all + # descendants of a batch until we find the non soft-failed/rejected + # events, i.e. the set of descendants whose chain of prev events back + # to the batch of extremities are all soft-failed or rejected. + # Typically, we won't find any such events as extremities will rarely + # have any descendants, but if they do then we should delete those + # extremities. + + def _cleanup_extremities_bg_update_txn(txn): + # The set of extremity event IDs that we're checking this round + original_set = set() + + # A dict[str, set[str]] of event ID to their prev events. + graph = {} + + # The set of descendants of the original set that are not rejected + # nor soft-failed. Ancestors of these events should be removed + # from the forward extremities table. + non_rejected_leaves = set() + + # Set of event IDs that have been soft failed, and for which we + # should check if they have descendants which haven't been soft + # failed. + soft_failed_events_to_lookup = set() + + # First, we get `batch_size` events from the table, pulling out + # their prev events, if any, and their prev events rejection status. + txn.execute( + """SELECT prev_event_id, event_id, internal_metadata, + rejections.event_id IS NOT NULL, events.outlier + FROM ( + SELECT event_id AS prev_event_id + FROM _extremities_to_check + LIMIT ? + ) AS f + LEFT JOIN event_edges USING (prev_event_id) + LEFT JOIN events USING (event_id) + LEFT JOIN event_json USING (event_id) + LEFT JOIN rejections USING (event_id) + """, (batch_size,) + ) + + for prev_event_id, event_id, metadata, rejected, outlier in txn: + original_set.add(prev_event_id) + + if not event_id or outlier: + # Common case where the forward extremity doesn't have any + # descendants. + continue + + graph.setdefault(event_id, set()).add(prev_event_id) + + soft_failed = False + if metadata: + soft_failed = json.loads(metadata).get("soft_failed") + + if soft_failed or rejected: + soft_failed_events_to_lookup.add(event_id) + else: + non_rejected_leaves.add(event_id) + + # Now we recursively check all the soft-failed descendants we + # found above in the same way, until we have nothing left to + # check. + while soft_failed_events_to_lookup: + # We only want to do 100 at a time, so we split given list + # into two. + batch = list(soft_failed_events_to_lookup) + to_check, to_defer = batch[:100], batch[100:] + soft_failed_events_to_lookup = set(to_defer) + + sql = """SELECT prev_event_id, event_id, internal_metadata, + rejections.event_id IS NOT NULL + FROM event_edges + INNER JOIN events USING (event_id) + INNER JOIN event_json USING (event_id) + LEFT JOIN rejections USING (event_id) + WHERE + prev_event_id IN (%s) + AND NOT events.outlier + """ % ( + ",".join("?" for _ in to_check), + ) + txn.execute(sql, to_check) + + for prev_event_id, event_id, metadata, rejected in txn: + if event_id in graph: + # Already handled this event previously, but we still + # want to record the edge. + graph.setdefault(event_id, set()).add(prev_event_id) + logger.info("Already handled") + continue + + graph.setdefault(event_id, set()).add(prev_event_id) + + soft_failed = json.loads(metadata).get("soft_failed") + if soft_failed or rejected: + soft_failed_events_to_lookup.add(event_id) + else: + non_rejected_leaves.add(event_id) + + # We have a set of non-soft-failed descendants, so we recurse up + # the graph to find all ancestors and add them to the set of event + # IDs that we can delete from forward extremities table. + to_delete = set() + while non_rejected_leaves: + event_id = non_rejected_leaves.pop() + prev_event_ids = graph.get(event_id, set()) + non_rejected_leaves.update(prev_event_ids) + to_delete.update(prev_event_ids) + + to_delete.intersection_update(original_set) + + logger.info("Deleting up to %d forward extremities", len(to_delete)) + + self._simple_delete_many_txn( + txn=txn, + table="event_forward_extremities", + column="event_id", + iterable=to_delete, + keyvalues={}, + ) + + if to_delete: + # We now need to invalidate the caches of these rooms + rows = self._simple_select_many_txn( + txn, + table="events", + column="event_id", + iterable=to_delete, + keyvalues={}, + retcols=("room_id",) + ) + for row in rows: + txn.call_after( + self.get_latest_event_ids_in_room.invalidate, + (row["room_id"],) + ) + + self._simple_delete_many_txn( + txn=txn, + table="_extremities_to_check", + column="event_id", + iterable=original_set, + keyvalues={}, + ) + + return len(original_set) + + num_handled = yield self.runInteraction( + "_cleanup_extremities_bg_update", _cleanup_extremities_bg_update_txn, + ) + + if not num_handled: + yield self._end_background_update(self.EVENT_DELETE_SOFT_FAILED_EXTREMITIES) + + def _drop_table_txn(txn): + txn.execute("DROP TABLE _extremities_to_check") + + yield self.runInteraction( + "_cleanup_extremities_bg_update_drop_table", + _drop_table_txn, + ) + + defer.returnValue(num_handled) + AllNewEventsResult = namedtuple( "AllNewEventsResult", diff --git a/synapse/storage/schema/delta/54/delete_forward_extremities.sql b/synapse/storage/schema/delta/54/delete_forward_extremities.sql new file mode 100644 index 0000000000..7056bd1d00 --- /dev/null +++ b/synapse/storage/schema/delta/54/delete_forward_extremities.sql @@ -0,0 +1,19 @@ +/* Copyright 2019 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +INSERT INTO background_updates (update_name, progress_json) VALUES + ('delete_soft_failed_extremities', '{}'); + +CREATE TABLE _extremities_to_check AS SELECT event_id FROM event_forward_extremities; -- cgit 1.5.1 From 640fcbb07f8dc7d89465734f009d8e0a458c2b17 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 30 May 2019 10:55:55 +0100 Subject: Fixup comments and logging --- synapse/storage/events.py | 21 ++++++++++++--------- .../schema/delta/54/delete_forward_extremities.sql | 3 +++ 2 files changed, 15 insertions(+), 9 deletions(-) (limited to 'synapse/storage/schema') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index a9664928ca..418d88b8dc 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -2387,7 +2387,8 @@ class EventsStore( soft_failed_events_to_lookup = set() # First, we get `batch_size` events from the table, pulling out - # their prev events, if any, and their prev events rejection status. + # their successor events, if any, and their successor events + # rejection status. txn.execute( """SELECT prev_event_id, event_id, internal_metadata, rejections.event_id IS NOT NULL, events.outlier @@ -2450,11 +2451,10 @@ class EventsStore( if event_id in graph: # Already handled this event previously, but we still # want to record the edge. - graph.setdefault(event_id, set()).add(prev_event_id) - logger.info("Already handled") + graph[event_id].add(prev_event_id) continue - graph.setdefault(event_id, set()).add(prev_event_id) + graph[event_id] = {prev_event_id} soft_failed = json.loads(metadata).get("soft_failed") if soft_failed or rejected: @@ -2474,8 +2474,6 @@ class EventsStore( to_delete.intersection_update(original_set) - logger.info("Deleting up to %d forward extremities", len(to_delete)) - deleted = self._simple_delete_many_txn( txn=txn, table="event_forward_extremities", @@ -2484,7 +2482,11 @@ class EventsStore( keyvalues={}, ) - logger.info("Deleted %d forward extremities", deleted) + logger.info( + "Deleted %d forward extremities of %d checked, to clean up #5269", + deleted, + len(original_set), + ) if deleted: # We now need to invalidate the caches of these rooms @@ -2496,10 +2498,11 @@ class EventsStore( keyvalues={}, retcols=("room_id",) ) - for row in rows: + room_ids = set(row["room_id"] for row in rows) + for room_id in room_ids: txn.call_after( self.get_latest_event_ids_in_room.invalidate, - (row["room_id"],) + (room_id,) ) self._simple_delete_many_txn( diff --git a/synapse/storage/schema/delta/54/delete_forward_extremities.sql b/synapse/storage/schema/delta/54/delete_forward_extremities.sql index 7056bd1d00..aa40f13da7 100644 --- a/synapse/storage/schema/delta/54/delete_forward_extremities.sql +++ b/synapse/storage/schema/delta/54/delete_forward_extremities.sql @@ -13,7 +13,10 @@ * limitations under the License. */ +-- Start a background job to cleanup extremities that were incorrectly added +-- by bug #5269. INSERT INTO background_updates (update_name, progress_json) VALUES ('delete_soft_failed_extremities', '{}'); +DROP TABLE IF EXISTS _extremities_to_check; -- To make this delta schema file idempotent. CREATE TABLE _extremities_to_check AS SELECT event_id FROM event_forward_extremities; -- cgit 1.5.1 From 6cdfb0207e2de72286a7a8d3b3c417c2808e90dc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 30 May 2019 14:54:56 +0100 Subject: Add index to temp table --- synapse/storage/schema/delta/54/delete_forward_extremities.sql | 1 + 1 file changed, 1 insertion(+) (limited to 'synapse/storage/schema') diff --git a/synapse/storage/schema/delta/54/delete_forward_extremities.sql b/synapse/storage/schema/delta/54/delete_forward_extremities.sql index aa40f13da7..b062ec840c 100644 --- a/synapse/storage/schema/delta/54/delete_forward_extremities.sql +++ b/synapse/storage/schema/delta/54/delete_forward_extremities.sql @@ -20,3 +20,4 @@ INSERT INTO background_updates (update_name, progress_json) VALUES DROP TABLE IF EXISTS _extremities_to_check; -- To make this delta schema file idempotent. CREATE TABLE _extremities_to_check AS SELECT event_id FROM event_forward_extremities; +CREATE INDEX _extremities_to_check_id ON _extremities_to_check(event_id); -- cgit 1.5.1 From 5037326d6624d1d1780a0536d19ff79e275f8735 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 30 May 2019 15:37:57 +0100 Subject: Add indices. Remove room_ids accidentally added We have to do this by re-inserting a background update and recreating tables, as the tables only get created during a background update and will later be deleted. We also make sure that we remove any entries that should have been removed but weren't due to a race that has been fixed in a previous commit. --- synapse/storage/schema/delta/54/stats2.sql | 28 ++++++++++++++++++++ synapse/storage/stats.py | 41 ++++++++++++++++++++---------- 2 files changed, 56 insertions(+), 13 deletions(-) create mode 100644 synapse/storage/schema/delta/54/stats2.sql (limited to 'synapse/storage/schema') diff --git a/synapse/storage/schema/delta/54/stats2.sql b/synapse/storage/schema/delta/54/stats2.sql new file mode 100644 index 0000000000..3b2d48447f --- /dev/null +++ b/synapse/storage/schema/delta/54/stats2.sql @@ -0,0 +1,28 @@ +/* Copyright 2019 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- This delta file gets run after `54/stats.sql` delta. + +-- We want to add some indices to the temporary stats table, so we re-insert +-- 'populate_stats_createtables' if we are still processing the rooms update. +INSERT INTO background_updates (update_name, progress_json) + SELECT 'populate_stats_createtables', '{}' + WHERE + 'populate_stats_process_rooms' IN ( + SELECT update_name FROM background_updates + ) + AND 'populate_stats_createtables' NOT IN ( -- don't insert if already exists + SELECT update_name FROM background_updates + ); diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index a99637d4b4..1c0b183a56 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -18,6 +18,7 @@ import logging from twisted.internet import defer from synapse.api.constants import EventTypes, Membership +from synapse.storage.prepare_database import get_statements from synapse.storage.state_deltas import StateDeltasStore from synapse.util.caches.descriptors import cached @@ -69,12 +70,25 @@ class StatsStore(StateDeltasStore): # Get all the rooms that we want to process. def _make_staging_area(txn): - sql = ( - "CREATE TABLE IF NOT EXISTS " - + TEMP_TABLE - + "_rooms(room_id TEXT NOT NULL, events BIGINT NOT NULL)" - ) - txn.execute(sql) + # Create the temporary tables + stmts = get_statements(""" + -- We just recreate the table, we'll be reinserting the + -- correct entries again later anyway. + DROP TABLE IF EXISTS {temp}_rooms; + + CREATE TABLE IF NOT EXISTS {temp}_rooms( + room_id TEXT NOT NULL, + events BIGINT NOT NULL + ); + + CREATE INDEX {temp}_rooms_events + ON {temp}_rooms(events); + CREATE INDEX {temp}_rooms_id + ON {temp}_rooms(room_id); + """.format(temp=TEMP_TABLE).splitlines()) + + for statement in stmts: + txn.execute(statement) sql = ( "CREATE TABLE IF NOT EXISTS " @@ -83,15 +97,16 @@ class StatsStore(StateDeltasStore): ) txn.execute(sql) - # Get rooms we want to process from the database + # Get rooms we want to process from the database, only adding + # those that we haven't (i.e. those not in room_stats_earliest_token) sql = """ - SELECT room_id, count(*) FROM current_state_events - GROUP BY room_id - """ + INSERT INTO %s_rooms (room_id, events) + SELECT c.room_id, count(*) FROM current_state_events AS c + LEFT JOIN room_stats_earliest_token AS t USING (room_id) + WHERE t.room_id IS NULL + GROUP BY c.room_id + """ % (TEMP_TABLE,) txn.execute(sql) - rooms = [{"room_id": x[0], "events": x[1]} for x in txn.fetchall()] - self._simple_insert_many_txn(txn, TEMP_TABLE + "_rooms", rooms) - del rooms new_pos = yield self.get_max_stream_id_in_current_state_deltas() yield self.runInteraction("populate_stats_temp_build", _make_staging_area) -- cgit 1.5.1 From dc72b90cd674f69fea1d27a1c1dab60a60d5ab9d Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Mon, 3 Jun 2019 22:03:28 +1000 Subject: full schema --- .../schema/full_schemas/54/full.sql.postgres | 2040 ++++++++++++++++++++ .../storage/schema/full_schemas/54/full.sql.sqlite | 261 +++ synapse/storage/schema/full_schemas/README.txt | 14 + 3 files changed, 2315 insertions(+) create mode 100644 synapse/storage/schema/full_schemas/54/full.sql.postgres create mode 100644 synapse/storage/schema/full_schemas/54/full.sql.sqlite create mode 100644 synapse/storage/schema/full_schemas/README.txt (limited to 'synapse/storage/schema') diff --git a/synapse/storage/schema/full_schemas/54/full.sql.postgres b/synapse/storage/schema/full_schemas/54/full.sql.postgres new file mode 100644 index 0000000000..5fb54cfe77 --- /dev/null +++ b/synapse/storage/schema/full_schemas/54/full.sql.postgres @@ -0,0 +1,2040 @@ + + + + + +CREATE TABLE _extremities_to_check ( + event_id text +); + + + +CREATE TABLE access_tokens ( + id bigint NOT NULL, + user_id text NOT NULL, + device_id text, + token text NOT NULL, + last_used bigint +); + + + +CREATE TABLE account_data ( + user_id text NOT NULL, + account_data_type text NOT NULL, + stream_id bigint NOT NULL, + content text NOT NULL +); + + + +CREATE TABLE account_data_max_stream_id ( + lock character(1) DEFAULT 'X'::bpchar NOT NULL, + stream_id bigint NOT NULL, + CONSTRAINT private_user_data_max_stream_id_lock_check CHECK ((lock = 'X'::bpchar)) +); + + + +CREATE TABLE account_validity ( + user_id text NOT NULL, + expiration_ts_ms bigint NOT NULL, + email_sent boolean NOT NULL, + renewal_token text +); + + + +CREATE TABLE application_services_state ( + as_id text NOT NULL, + state character varying(5), + last_txn integer +); + + + +CREATE TABLE application_services_txns ( + as_id text NOT NULL, + txn_id integer NOT NULL, + event_ids text NOT NULL +); + + + +CREATE TABLE applied_module_schemas ( + module_name text NOT NULL, + file text NOT NULL +); + + + +CREATE TABLE applied_schema_deltas ( + version integer NOT NULL, + file text NOT NULL +); + + + +CREATE TABLE appservice_room_list ( + appservice_id text NOT NULL, + network_id text NOT NULL, + room_id text NOT NULL +); + + + +CREATE TABLE appservice_stream_position ( + lock character(1) DEFAULT 'X'::bpchar NOT NULL, + stream_ordering bigint, + CONSTRAINT appservice_stream_position_lock_check CHECK ((lock = 'X'::bpchar)) +); + + + +CREATE TABLE background_updates ( + update_name text NOT NULL, + progress_json text NOT NULL, + depends_on text +); + + + +CREATE TABLE blocked_rooms ( + room_id text NOT NULL, + user_id text NOT NULL +); + + + +CREATE TABLE cache_invalidation_stream ( + stream_id bigint, + cache_func text, + keys text[], + invalidation_ts bigint +); + + + +CREATE TABLE current_state_delta_stream ( + stream_id bigint NOT NULL, + room_id text NOT NULL, + type text NOT NULL, + state_key text NOT NULL, + event_id text, + prev_event_id text +); + + + +CREATE TABLE current_state_events ( + event_id text NOT NULL, + room_id text NOT NULL, + type text NOT NULL, + state_key text NOT NULL +); + + + +CREATE TABLE deleted_pushers ( + stream_id bigint NOT NULL, + app_id text NOT NULL, + pushkey text NOT NULL, + user_id text NOT NULL +); + + + +CREATE TABLE destinations ( + destination text NOT NULL, + retry_last_ts bigint, + retry_interval integer +); + + + +CREATE TABLE device_federation_inbox ( + origin text NOT NULL, + message_id text NOT NULL, + received_ts bigint NOT NULL +); + + + +CREATE TABLE device_federation_outbox ( + destination text NOT NULL, + stream_id bigint NOT NULL, + queued_ts bigint NOT NULL, + messages_json text NOT NULL +); + + + +CREATE TABLE device_inbox ( + user_id text NOT NULL, + device_id text NOT NULL, + stream_id bigint NOT NULL, + message_json text NOT NULL +); + + + +CREATE TABLE device_lists_outbound_last_success ( + destination text NOT NULL, + user_id text NOT NULL, + stream_id bigint NOT NULL +); + + + +CREATE TABLE device_lists_outbound_pokes ( + destination text NOT NULL, + stream_id bigint NOT NULL, + user_id text NOT NULL, + device_id text NOT NULL, + sent boolean NOT NULL, + ts bigint NOT NULL +); + + + +CREATE TABLE device_lists_remote_cache ( + user_id text NOT NULL, + device_id text NOT NULL, + content text NOT NULL +); + + + +CREATE TABLE device_lists_remote_extremeties ( + user_id text NOT NULL, + stream_id text NOT NULL +); + + + +CREATE TABLE device_lists_stream ( + stream_id bigint NOT NULL, + user_id text NOT NULL, + device_id text NOT NULL +); + + + +CREATE TABLE device_max_stream_id ( + stream_id bigint NOT NULL +); + + + +CREATE TABLE devices ( + user_id text NOT NULL, + device_id text NOT NULL, + display_name text +); + + + +CREATE TABLE e2e_device_keys_json ( + user_id text NOT NULL, + device_id text NOT NULL, + ts_added_ms bigint NOT NULL, + key_json text NOT NULL +); + + + +CREATE TABLE e2e_one_time_keys_json ( + user_id text NOT NULL, + device_id text NOT NULL, + algorithm text NOT NULL, + key_id text NOT NULL, + ts_added_ms bigint NOT NULL, + key_json text NOT NULL +); + + + +CREATE TABLE e2e_room_keys ( + user_id text NOT NULL, + room_id text NOT NULL, + session_id text NOT NULL, + version bigint NOT NULL, + first_message_index integer, + forwarded_count integer, + is_verified boolean, + session_data text NOT NULL +); + + + +CREATE TABLE e2e_room_keys_versions ( + user_id text NOT NULL, + version bigint NOT NULL, + algorithm text NOT NULL, + auth_data text NOT NULL, + deleted smallint DEFAULT 0 NOT NULL +); + + + +CREATE TABLE erased_users ( + user_id text NOT NULL +); + + + +CREATE TABLE event_auth ( + event_id text NOT NULL, + auth_id text NOT NULL, + room_id text NOT NULL +); + + + +CREATE TABLE event_backward_extremities ( + event_id text NOT NULL, + room_id text NOT NULL +); + + + +CREATE TABLE event_edges ( + event_id text NOT NULL, + prev_event_id text NOT NULL, + room_id text NOT NULL, + is_state boolean NOT NULL +); + + + +CREATE TABLE event_forward_extremities ( + event_id text NOT NULL, + room_id text NOT NULL +); + + + +CREATE TABLE event_json ( + event_id text NOT NULL, + room_id text NOT NULL, + internal_metadata text NOT NULL, + json text NOT NULL, + format_version integer +); + + + +CREATE TABLE event_push_actions ( + room_id text NOT NULL, + event_id text NOT NULL, + user_id text NOT NULL, + profile_tag character varying(32), + actions text NOT NULL, + topological_ordering bigint, + stream_ordering bigint, + notif smallint, + highlight smallint +); + + + +CREATE TABLE event_push_actions_staging ( + event_id text NOT NULL, + user_id text NOT NULL, + actions text NOT NULL, + notif smallint NOT NULL, + highlight smallint NOT NULL +); + + + +CREATE TABLE event_push_summary ( + user_id text NOT NULL, + room_id text NOT NULL, + notif_count bigint NOT NULL, + stream_ordering bigint NOT NULL +); + + + +CREATE TABLE event_push_summary_stream_ordering ( + lock character(1) DEFAULT 'X'::bpchar NOT NULL, + stream_ordering bigint NOT NULL, + CONSTRAINT event_push_summary_stream_ordering_lock_check CHECK ((lock = 'X'::bpchar)) +); + + + +CREATE TABLE event_reference_hashes ( + event_id text, + algorithm text, + hash bytea +); + + + +CREATE TABLE event_relations ( + event_id text NOT NULL, + relates_to_id text NOT NULL, + relation_type text NOT NULL, + aggregation_key text +); + + + +CREATE TABLE event_reports ( + id bigint NOT NULL, + received_ts bigint NOT NULL, + room_id text NOT NULL, + event_id text NOT NULL, + user_id text NOT NULL, + reason text, + content text +); + + + +CREATE TABLE event_search ( + event_id text, + room_id text, + sender text, + key text, + vector tsvector, + origin_server_ts bigint, + stream_ordering bigint +); + + + +CREATE TABLE event_to_state_groups ( + event_id text NOT NULL, + state_group bigint NOT NULL +); + + + +CREATE TABLE events ( + stream_ordering integer NOT NULL, + topological_ordering bigint NOT NULL, + event_id text NOT NULL, + type text NOT NULL, + room_id text NOT NULL, + content text, + unrecognized_keys text, + processed boolean NOT NULL, + outlier boolean NOT NULL, + depth bigint DEFAULT 0 NOT NULL, + origin_server_ts bigint, + received_ts bigint, + sender text, + contains_url boolean +); + + + +CREATE TABLE ex_outlier_stream ( + event_stream_ordering bigint NOT NULL, + event_id text NOT NULL, + state_group bigint NOT NULL +); + + + +CREATE TABLE federation_stream_position ( + type text NOT NULL, + stream_id integer NOT NULL +); + + + +CREATE TABLE group_attestations_remote ( + group_id text NOT NULL, + user_id text NOT NULL, + valid_until_ms bigint NOT NULL, + attestation_json text NOT NULL +); + + + +CREATE TABLE group_attestations_renewals ( + group_id text NOT NULL, + user_id text NOT NULL, + valid_until_ms bigint NOT NULL +); + + + +CREATE TABLE group_invites ( + group_id text NOT NULL, + user_id text NOT NULL +); + + + +CREATE TABLE group_roles ( + group_id text NOT NULL, + role_id text NOT NULL, + profile text NOT NULL, + is_boolean NOT NULL +); + + + +CREATE TABLE group_room_categories ( + group_id text NOT NULL, + category_id text NOT NULL, + profile text NOT NULL, + is_boolean NOT NULL +); + + + +CREATE TABLE group_rooms ( + group_id text NOT NULL, + room_id text NOT NULL, + is_boolean NOT NULL +); + + + +CREATE TABLE group_summary_roles ( + group_id text NOT NULL, + role_id text NOT NULL, + role_order bigint NOT NULL, + CONSTRAINT group_summary_roles_role_order_check CHECK ((role_order > 0)) +); + + + +CREATE TABLE group_summary_room_categories ( + group_id text NOT NULL, + category_id text NOT NULL, + cat_order bigint NOT NULL, + CONSTRAINT group_summary_room_categories_cat_order_check CHECK ((cat_order > 0)) +); + + + +CREATE TABLE group_summary_rooms ( + group_id text NOT NULL, + room_id text NOT NULL, + category_id text NOT NULL, + room_order bigint NOT NULL, + is_boolean NOT NULL, + CONSTRAINT group_summary_rooms_room_order_check CHECK ((room_order > 0)) +); + + + +CREATE TABLE group_summary_users ( + group_id text NOT NULL, + user_id text NOT NULL, + role_id text NOT NULL, + user_order bigint NOT NULL, + is_boolean NOT NULL +); + + + +CREATE TABLE group_users ( + group_id text NOT NULL, + user_id text NOT NULL, + is_admin boolean NOT NULL, + is_boolean NOT NULL +); + + + +CREATE TABLE groups ( + group_id text NOT NULL, + name text, + avatar_url text, + short_description text, + long_description text, + is_boolean NOT NULL, + join_policy text DEFAULT 'invite'::text NOT NULL +); + + + +CREATE TABLE guest_access ( + event_id text NOT NULL, + room_id text NOT NULL, + guest_access text NOT NULL +); + + + +CREATE TABLE history_visibility ( + event_id text NOT NULL, + room_id text NOT NULL, + history_visibility text NOT NULL +); + + + +CREATE TABLE local_group_membership ( + group_id text NOT NULL, + user_id text NOT NULL, + is_admin boolean NOT NULL, + membership text NOT NULL, + is_sed boolean NOT NULL, + content text NOT NULL +); + + + +CREATE TABLE local_group_updates ( + stream_id bigint NOT NULL, + group_id text NOT NULL, + user_id text NOT NULL, + type text NOT NULL, + content text NOT NULL +); + + + +CREATE TABLE local_invites ( + stream_id bigint NOT NULL, + inviter text NOT NULL, + invitee text NOT NULL, + event_id text NOT NULL, + room_id text NOT NULL, + locally_rejected text, + replaced_by text +); + + + +CREATE TABLE local_media_repository ( + media_id text, + media_type text, + media_length integer, + created_ts bigint, + upload_name text, + user_id text, + quarantined_by text, + url_cache text, + last_access_ts bigint +); + + + +CREATE TABLE local_media_repository_thumbnails ( + media_id text, + thumbnail_width integer, + thumbnail_height integer, + thumbnail_type text, + thumbnail_method text, + thumbnail_length integer +); + + + +CREATE TABLE local_media_repository_url_cache ( + url text, + response_code integer, + etag text, + expires_ts bigint, + og text, + media_id text, + download_ts bigint +); + + + +CREATE TABLE monthly_active_users ( + user_id text NOT NULL, + "timestamp" bigint NOT NULL +); + + + +CREATE TABLE open_id_tokens ( + token text NOT NULL, + ts_valid_until_ms bigint NOT NULL, + user_id text NOT NULL +); + + + +CREATE TABLE presence ( + user_id text NOT NULL, + state character varying(20), + status_msg text, + mtime bigint +); + + + +CREATE TABLE presence_allow_inbound ( + observed_user_id text NOT NULL, + observer_user_id text NOT NULL +); + + + +CREATE TABLE presence_stream ( + stream_id bigint, + user_id text, + state text, + last_active_ts bigint, + last_federation_update_ts bigint, + last_user_sync_ts bigint, + status_msg text, + currently_active boolean +); + + + +CREATE TABLE profiles ( + user_id text NOT NULL, + displayname text, + avatar_url text +); + + + +CREATE TABLE room_list_stream ( + stream_id bigint NOT NULL, + room_id text NOT NULL, + visibility boolean NOT NULL, + appservice_id text, + network_id text +); + + + +CREATE TABLE push_rules ( + id bigint NOT NULL, + user_name text NOT NULL, + rule_id text NOT NULL, + priority_class smallint NOT NULL, + priority integer DEFAULT 0 NOT NULL, + conditions text NOT NULL, + actions text NOT NULL +); + + + +CREATE TABLE push_rules_enable ( + id bigint NOT NULL, + user_name text NOT NULL, + rule_id text NOT NULL, + enabled smallint +); + + + +CREATE TABLE push_rules_stream ( + stream_id bigint NOT NULL, + event_stream_ordering bigint NOT NULL, + user_id text NOT NULL, + rule_id text NOT NULL, + op text NOT NULL, + priority_class smallint, + priority integer, + conditions text, + actions text +); + + + +CREATE TABLE pusher_throttle ( + pusher bigint NOT NULL, + room_id text NOT NULL, + last_sent_ts bigint, + throttle_ms bigint +); + + + +CREATE TABLE pushers ( + id bigint NOT NULL, + user_name text NOT NULL, + access_token bigint, + profile_tag text NOT NULL, + kind text NOT NULL, + app_id text NOT NULL, + app_display_name text NOT NULL, + device_display_name text NOT NULL, + pushkey text NOT NULL, + ts bigint NOT NULL, + lang text, + data text, + last_stream_ordering integer, + last_success bigint, + failing_since bigint +); + + + +CREATE TABLE ratelimit_override ( + user_id text NOT NULL, + messages_per_second bigint, + burst_count bigint +); + + + +CREATE TABLE receipts_graph ( + room_id text NOT NULL, + receipt_type text NOT NULL, + user_id text NOT NULL, + event_ids text NOT NULL, + data text NOT NULL +); + + + +CREATE TABLE receipts_linearized ( + stream_id bigint NOT NULL, + room_id text NOT NULL, + receipt_type text NOT NULL, + user_id text NOT NULL, + event_id text NOT NULL, + data text NOT NULL +); + + + +CREATE TABLE received_transactions ( + transaction_id text, + origin text, + ts bigint, + response_code integer, + response_json bytea, + has_been_referenced smallint DEFAULT 0 +); + + + +CREATE TABLE redactions ( + event_id text NOT NULL, + redacts text NOT NULL +); + + + +CREATE TABLE rejections ( + event_id text NOT NULL, + reason text NOT NULL, + last_check text NOT NULL +); + + + +CREATE TABLE remote_media_cache ( + media_origin text, + media_id text, + media_type text, + created_ts bigint, + upload_name text, + media_length integer, + filesystem_id text, + last_access_ts bigint, + quarantined_by text +); + + + +CREATE TABLE remote_media_cache_thumbnails ( + media_origin text, + media_id text, + thumbnail_width integer, + thumbnail_height integer, + thumbnail_method text, + thumbnail_type text, + thumbnail_length integer, + filesystem_id text +); + + + +CREATE TABLE remote_profile_cache ( + user_id text NOT NULL, + displayname text, + avatar_url text, + last_check bigint NOT NULL +); + + + +CREATE TABLE room_account_data ( + user_id text NOT NULL, + room_id text NOT NULL, + account_data_type text NOT NULL, + stream_id bigint NOT NULL, + content text NOT NULL +); + + + +CREATE TABLE room_alias_servers ( + room_alias text NOT NULL, + server text NOT NULL +); + + + +CREATE TABLE room_aliases ( + room_alias text NOT NULL, + room_id text NOT NULL, + creator text +); + + + +CREATE TABLE room_depth ( + room_id text NOT NULL, + min_depth integer NOT NULL +); + + + +CREATE TABLE room_memberships ( + event_id text NOT NULL, + user_id text NOT NULL, + sender text NOT NULL, + room_id text NOT NULL, + membership text NOT NULL, + forgotten integer DEFAULT 0, + display_name text, + avatar_url text +); + + + +CREATE TABLE room_names ( + event_id text NOT NULL, + room_id text NOT NULL, + name text NOT NULL +); + + + +CREATE TABLE room_state ( + room_id text NOT NULL, + join_rules text, + history_visibility text, + encryption text, + name text, + topic text, + avatar text, + canonical_alias text +); + + + +CREATE TABLE room_stats ( + room_id text NOT NULL, + ts bigint NOT NULL, + bucket_size integer NOT NULL, + current_state_events integer NOT NULL, + joined_members integer NOT NULL, + invited_members integer NOT NULL, + left_members integer NOT NULL, + banned_members integer NOT NULL, + state_events integer NOT NULL +); + + + +CREATE TABLE room_stats_earliest_token ( + room_id text NOT NULL, + token bigint NOT NULL +); + + + +CREATE TABLE room_tags ( + user_id text NOT NULL, + room_id text NOT NULL, + tag text NOT NULL, + content text NOT NULL +); + + + +CREATE TABLE room_tags_revisions ( + user_id text NOT NULL, + room_id text NOT NULL, + stream_id bigint NOT NULL +); + + + +CREATE TABLE rooms ( + room_id text NOT NULL, + is_boolean, + creator text +); + + + +CREATE TABLE schema_version ( + lock character(1) DEFAULT 'X'::bpchar NOT NULL, + version integer NOT NULL, + upgraded boolean NOT NULL, + CONSTRAINT schema_version_lock_check CHECK ((lock = 'X'::bpchar)) +); + + + +CREATE TABLE server_keys_json ( + server_name text NOT NULL, + key_id text NOT NULL, + from_server text NOT NULL, + ts_added_ms bigint NOT NULL, + ts_valid_until_ms bigint NOT NULL, + key_json bytea NOT NULL +); + + + +CREATE TABLE server_signature_keys ( + server_name text, + key_id text, + from_server text, + ts_added_ms bigint, + verify_key bytea, + ts_valid_until_ms bigint +); + + + +CREATE TABLE state_events ( + event_id text NOT NULL, + room_id text NOT NULL, + type text NOT NULL, + state_key text NOT NULL, + prev_state text +); + + + +CREATE TABLE state_group_edges ( + state_group bigint NOT NULL, + prev_state_group bigint NOT NULL +); + + + +CREATE SEQUENCE state_group_id_seq + START WITH 1 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; + + + +CREATE TABLE state_groups ( + id bigint NOT NULL, + room_id text NOT NULL, + event_id text NOT NULL +); + + + +CREATE TABLE state_groups_state ( + state_group bigint NOT NULL, + room_id text NOT NULL, + type text NOT NULL, + state_key text NOT NULL, + event_id text NOT NULL +); + + + +CREATE TABLE stats_stream_pos ( + lock character(1) DEFAULT 'X'::bpchar NOT NULL, + stream_id bigint, + CONSTRAINT stats_stream_pos_lock_check CHECK ((lock = 'X'::bpchar)) +); + + + +CREATE TABLE stream_ordering_to_exterm ( + stream_ordering bigint NOT NULL, + room_id text NOT NULL, + event_id text NOT NULL +); + + + +CREATE TABLE threepid_guest_access_tokens ( + medium text, + address text, + guest_access_token text, + first_inviter text +); + + + +CREATE TABLE topics ( + event_id text NOT NULL, + room_id text NOT NULL, + topic text NOT NULL +); + + + +CREATE TABLE user_daily_visits ( + user_id text NOT NULL, + device_id text, + "timestamp" bigint NOT NULL +); + + + +CREATE TABLE user_directory ( + user_id text NOT NULL, + room_id text, + display_name text, + avatar_url text +); + + + +CREATE TABLE user_directory_search ( + user_id text NOT NULL, + vector tsvector +); + + + +CREATE TABLE user_directory_stream_pos ( + lock character(1) DEFAULT 'X'::bpchar NOT NULL, + stream_id bigint, + CONSTRAINT user_directory_stream_pos_lock_check CHECK ((lock = 'X'::bpchar)) +); + + + +CREATE TABLE user_filters ( + user_id text, + filter_id bigint, + filter_json bytea +); + + + +CREATE TABLE user_ips ( + user_id text NOT NULL, + access_token text NOT NULL, + device_id text, + ip text NOT NULL, + user_agent text NOT NULL, + last_seen bigint NOT NULL +); + + + +CREATE TABLE user_stats ( + user_id text NOT NULL, + ts bigint NOT NULL, + bucket_size integer NOT NULL, + rooms integer NOT NULL, + private_rooms integer NOT NULL +); + + + +CREATE TABLE user_threepid_id_server ( + user_id text NOT NULL, + medium text NOT NULL, + address text NOT NULL, + id_server text NOT NULL +); + + + +CREATE TABLE user_threepids ( + user_id text NOT NULL, + medium text NOT NULL, + address text NOT NULL, + validated_at bigint NOT NULL, + added_at bigint NOT NULL +); + + + +CREATE TABLE users ( + name text, + password_hash text, + creation_ts bigint, + admin smallint DEFAULT 0 NOT NULL, + upgrade_ts bigint, + is_guest smallint DEFAULT 0 NOT NULL, + appservice_id text, + consent_version text, + consent_server_notice_sent text, + user_type text +); + + + +CREATE TABLE users_in_rooms ( + user_id text NOT NULL, + room_id text NOT NULL +); + + + +CREATE TABLE users_pending_deactivation ( + user_id text NOT NULL +); + + + +CREATE TABLE users_who_share_private_rooms ( + user_id text NOT NULL, + other_user_id text NOT NULL, + room_id text NOT NULL +); + + + +ALTER TABLE ONLY access_tokens + ADD CONSTRAINT access_tokens_pkey PRIMARY KEY (id); + + + +ALTER TABLE ONLY access_tokens + ADD CONSTRAINT access_tokens_token_key UNIQUE (token); + + + +ALTER TABLE ONLY account_data + ADD CONSTRAINT account_data_uniqueness UNIQUE (user_id, account_data_type); + + + +ALTER TABLE ONLY account_validity + ADD CONSTRAINT account_validity_pkey PRIMARY KEY (user_id); + + + +ALTER TABLE ONLY application_services_state + ADD CONSTRAINT application_services_state_pkey PRIMARY KEY (as_id); + + + +ALTER TABLE ONLY application_services_txns + ADD CONSTRAINT application_services_txns_as_id_txn_id_key UNIQUE (as_id, txn_id); + + + +ALTER TABLE ONLY applied_module_schemas + ADD CONSTRAINT applied_module_schemas_module_name_file_key UNIQUE (module_name, file); + + + +ALTER TABLE ONLY applied_schema_deltas + ADD CONSTRAINT applied_schema_deltas_version_file_key UNIQUE (version, file); + + + +ALTER TABLE ONLY appservice_stream_position + ADD CONSTRAINT appservice_stream_position_lock_key UNIQUE (lock); + + + +ALTER TABLE ONLY background_updates + ADD CONSTRAINT background_updates_uniqueness UNIQUE (update_name); + + + +ALTER TABLE ONLY current_state_events + ADD CONSTRAINT current_state_events_event_id_key UNIQUE (event_id); + + + +ALTER TABLE ONLY current_state_events + ADD CONSTRAINT current_state_events_room_id_type_state_key_key UNIQUE (room_id, type, state_key); + + + +ALTER TABLE ONLY destinations + ADD CONSTRAINT destinations_pkey PRIMARY KEY (destination); + + + +ALTER TABLE ONLY devices + ADD CONSTRAINT device_uniqueness UNIQUE (user_id, device_id); + + + +ALTER TABLE ONLY e2e_device_keys_json + ADD CONSTRAINT e2e_device_keys_json_uniqueness UNIQUE (user_id, device_id); + + + +ALTER TABLE ONLY e2e_one_time_keys_json + ADD CONSTRAINT e2e_one_time_keys_json_uniqueness UNIQUE (user_id, device_id, algorithm, key_id); + + + +ALTER TABLE ONLY event_backward_extremities + ADD CONSTRAINT event_backward_extremities_event_id_room_id_key UNIQUE (event_id, room_id); + + + +ALTER TABLE ONLY event_edges + ADD CONSTRAINT event_edges_event_id_prev_event_id_room_id_is_state_key UNIQUE (event_id, prev_event_id, room_id, is_state); + + + +ALTER TABLE ONLY event_forward_extremities + ADD CONSTRAINT event_forward_extremities_event_id_room_id_key UNIQUE (event_id, room_id); + + + +ALTER TABLE ONLY event_push_actions + ADD CONSTRAINT event_id_user_id_profile_tag_uniqueness UNIQUE (room_id, event_id, user_id, profile_tag); + + + +ALTER TABLE ONLY event_json + ADD CONSTRAINT event_json_event_id_key UNIQUE (event_id); + + + +ALTER TABLE ONLY event_push_summary_stream_ordering + ADD CONSTRAINT event_push_summary_stream_ordering_lock_key UNIQUE (lock); + + + +ALTER TABLE ONLY event_reference_hashes + ADD CONSTRAINT event_reference_hashes_event_id_algorithm_key UNIQUE (event_id, algorithm); + + + +ALTER TABLE ONLY event_reports + ADD CONSTRAINT event_reports_pkey PRIMARY KEY (id); + + + +ALTER TABLE ONLY event_to_state_groups + ADD CONSTRAINT event_to_state_groups_event_id_key UNIQUE (event_id); + + + +ALTER TABLE ONLY events + ADD CONSTRAINT events_event_id_key UNIQUE (event_id); + + + +ALTER TABLE ONLY events + ADD CONSTRAINT events_pkey PRIMARY KEY (stream_ordering); + + + +ALTER TABLE ONLY ex_outlier_stream + ADD CONSTRAINT ex_outlier_stream_pkey PRIMARY KEY (event_stream_ordering); + + + +ALTER TABLE ONLY group_roles + ADD CONSTRAINT group_roles_group_id_role_id_key UNIQUE (group_id, role_id); + + + +ALTER TABLE ONLY group_room_categories + ADD CONSTRAINT group_room_categories_group_id_category_id_key UNIQUE (group_id, category_id); + + + +ALTER TABLE ONLY group_summary_roles + ADD CONSTRAINT group_summary_roles_group_id_role_id_role_order_key UNIQUE (group_id, role_id, role_order); + + + +ALTER TABLE ONLY group_summary_room_categories + ADD CONSTRAINT group_summary_room_categories_group_id_category_id_cat_orde_key UNIQUE (group_id, category_id, cat_order); + + + +ALTER TABLE ONLY group_summary_rooms + ADD CONSTRAINT group_summary_rooms_group_id_category_id_room_id_room_order_key UNIQUE (group_id, category_id, room_id, room_order); + + + +ALTER TABLE ONLY guest_access + ADD CONSTRAINT guest_access_event_id_key UNIQUE (event_id); + + + +ALTER TABLE ONLY history_visibility + ADD CONSTRAINT history_visibility_event_id_key UNIQUE (event_id); + + + +ALTER TABLE ONLY local_media_repository + ADD CONSTRAINT local_media_repository_media_id_key UNIQUE (media_id); + + + +ALTER TABLE ONLY local_media_repository_thumbnails + ADD CONSTRAINT local_media_repository_thumbn_media_id_thumbnail_width_thum_key UNIQUE (media_id, thumbnail_width, thumbnail_height, thumbnail_type); + + + +ALTER TABLE ONLY user_threepids + ADD CONSTRAINT medium_address UNIQUE (medium, address); + + + +ALTER TABLE ONLY open_id_tokens + ADD CONSTRAINT open_id_tokens_pkey PRIMARY KEY (token); + + + +ALTER TABLE ONLY presence_allow_inbound + ADD CONSTRAINT presence_allow_inbound_observed_user_id_observer_user_id_key UNIQUE (observed_user_id, observer_user_id); + + + +ALTER TABLE ONLY presence + ADD CONSTRAINT presence_user_id_key UNIQUE (user_id); + + + +ALTER TABLE ONLY account_data_max_stream_id + ADD CONSTRAINT private_user_data_max_stream_id_lock_key UNIQUE (lock); + + + +ALTER TABLE ONLY profiles + ADD CONSTRAINT profiles_user_id_key UNIQUE (user_id); + + + +ALTER TABLE ONLY push_rules_enable + ADD CONSTRAINT push_rules_enable_pkey PRIMARY KEY (id); + + + +ALTER TABLE ONLY push_rules_enable + ADD CONSTRAINT push_rules_enable_user_name_rule_id_key UNIQUE (user_name, rule_id); + + + +ALTER TABLE ONLY push_rules + ADD CONSTRAINT push_rules_pkey PRIMARY KEY (id); + + + +ALTER TABLE ONLY push_rules + ADD CONSTRAINT push_rules_user_name_rule_id_key UNIQUE (user_name, rule_id); + + + +ALTER TABLE ONLY pusher_throttle + ADD CONSTRAINT pusher_throttle_pkey PRIMARY KEY (pusher, room_id); + + + +ALTER TABLE ONLY pushers + ADD CONSTRAINT pushers2_app_id_pushkey_user_name_key UNIQUE (app_id, pushkey, user_name); + + + +ALTER TABLE ONLY pushers + ADD CONSTRAINT pushers2_pkey PRIMARY KEY (id); + + + +ALTER TABLE ONLY receipts_graph + ADD CONSTRAINT receipts_graph_uniqueness UNIQUE (room_id, receipt_type, user_id); + + + +ALTER TABLE ONLY receipts_linearized + ADD CONSTRAINT receipts_linearized_uniqueness UNIQUE (room_id, receipt_type, user_id); + + + +ALTER TABLE ONLY received_transactions + ADD CONSTRAINT received_transactions_transaction_id_origin_key UNIQUE (transaction_id, origin); + + + +ALTER TABLE ONLY redactions + ADD CONSTRAINT redactions_event_id_key UNIQUE (event_id); + + + +ALTER TABLE ONLY rejections + ADD CONSTRAINT rejections_event_id_key UNIQUE (event_id); + + + +ALTER TABLE ONLY remote_media_cache + ADD CONSTRAINT remote_media_cache_media_origin_media_id_key UNIQUE (media_origin, media_id); + + + +ALTER TABLE ONLY remote_media_cache_thumbnails + ADD CONSTRAINT remote_media_cache_thumbnails_media_origin_media_id_thumbna_key UNIQUE (media_origin, media_id, thumbnail_width, thumbnail_height, thumbnail_type); + + + +ALTER TABLE ONLY room_account_data + ADD CONSTRAINT room_account_data_uniqueness UNIQUE (user_id, room_id, account_data_type); + + + +ALTER TABLE ONLY room_aliases + ADD CONSTRAINT room_aliases_room_alias_key UNIQUE (room_alias); + + + +ALTER TABLE ONLY room_depth + ADD CONSTRAINT room_depth_room_id_key UNIQUE (room_id); + + + +ALTER TABLE ONLY room_memberships + ADD CONSTRAINT room_memberships_event_id_key UNIQUE (event_id); + + + +ALTER TABLE ONLY room_names + ADD CONSTRAINT room_names_event_id_key UNIQUE (event_id); + + + +ALTER TABLE ONLY room_tags_revisions + ADD CONSTRAINT room_tag_revisions_uniqueness UNIQUE (user_id, room_id); + + + +ALTER TABLE ONLY room_tags + ADD CONSTRAINT room_tag_uniqueness UNIQUE (user_id, room_id, tag); + + + +ALTER TABLE ONLY rooms + ADD CONSTRAINT rooms_pkey PRIMARY KEY (room_id); + + + +ALTER TABLE ONLY schema_version + ADD CONSTRAINT schema_version_lock_key UNIQUE (lock); + + + +ALTER TABLE ONLY server_keys_json + ADD CONSTRAINT server_keys_json_uniqueness UNIQUE (server_name, key_id, from_server); + + + +ALTER TABLE ONLY server_signature_keys + ADD CONSTRAINT server_signature_keys_server_name_key_id_key UNIQUE (server_name, key_id); + + + +ALTER TABLE ONLY state_events + ADD CONSTRAINT state_events_event_id_key UNIQUE (event_id); + + + +ALTER TABLE ONLY state_groups + ADD CONSTRAINT state_groups_pkey PRIMARY KEY (id); + + + +ALTER TABLE ONLY stats_stream_pos + ADD CONSTRAINT stats_stream_pos_lock_key UNIQUE (lock); + + + +ALTER TABLE ONLY topics + ADD CONSTRAINT topics_event_id_key UNIQUE (event_id); + + + +ALTER TABLE ONLY user_directory_stream_pos + ADD CONSTRAINT user_directory_stream_pos_lock_key UNIQUE (lock); + + + +ALTER TABLE ONLY users + ADD CONSTRAINT users_name_key UNIQUE (name); + + + +CREATE INDEX _extremities_to_check_id ON _extremities_to_check USING btree (event_id); + + + +CREATE INDEX account_data_stream_id ON account_data USING btree (user_id, stream_id); + + + +CREATE INDEX application_services_txns_id ON application_services_txns USING btree (as_id); + + + +CREATE UNIQUE INDEX appservice_room_list_idx ON appservice_room_list USING btree (appservice_id, network_id, room_id); + + + +CREATE UNIQUE INDEX blocked_rooms_idx ON blocked_rooms USING btree (room_id); + + + +CREATE INDEX cache_invalidation_stream_id ON cache_invalidation_stream USING btree (stream_id); + + + +CREATE INDEX current_state_delta_stream_idx ON current_state_delta_stream USING btree (stream_id); + + + +CREATE INDEX deleted_pushers_stream_id ON deleted_pushers USING btree (stream_id); + + + +CREATE INDEX device_federation_inbox_sender_id ON device_federation_inbox USING btree (origin, message_id); + + + +CREATE INDEX device_federation_outbox_destination_id ON device_federation_outbox USING btree (destination, stream_id); + + + +CREATE INDEX device_federation_outbox_id ON device_federation_outbox USING btree (stream_id); + + + +CREATE INDEX device_inbox_stream_id ON device_inbox USING btree (stream_id); + + + +CREATE INDEX device_inbox_user_stream_id ON device_inbox USING btree (user_id, device_id, stream_id); + + + +CREATE INDEX device_lists_outbound_last_success_idx ON device_lists_outbound_last_success USING btree (destination, user_id, stream_id); + + + +CREATE INDEX device_lists_outbound_pokes_id ON device_lists_outbound_pokes USING btree (destination, stream_id); + + + +CREATE INDEX device_lists_outbound_pokes_stream ON device_lists_outbound_pokes USING btree (stream_id); + + + +CREATE INDEX device_lists_outbound_pokes_user ON device_lists_outbound_pokes USING btree (destination, user_id); + + + +CREATE INDEX device_lists_stream_id ON device_lists_stream USING btree (stream_id, user_id); + + + +CREATE UNIQUE INDEX e2e_room_keys_idx ON e2e_room_keys USING btree (user_id, room_id, session_id); + + + +CREATE UNIQUE INDEX e2e_room_keys_versions_idx ON e2e_room_keys_versions USING btree (user_id, version); + + + +CREATE UNIQUE INDEX erased_users_user ON erased_users USING btree (user_id); + + + +CREATE INDEX ev_b_extrem_id ON event_backward_extremities USING btree (event_id); + + + +CREATE INDEX ev_b_extrem_room ON event_backward_extremities USING btree (room_id); + + + +CREATE INDEX ev_edges_id ON event_edges USING btree (event_id); + + + +CREATE INDEX ev_edges_prev_id ON event_edges USING btree (prev_event_id); + + + +CREATE INDEX ev_extrem_id ON event_forward_extremities USING btree (event_id); + + + +CREATE INDEX ev_extrem_room ON event_forward_extremities USING btree (room_id); + + + +CREATE INDEX evauth_edges_id ON event_auth USING btree (event_id); + + + +CREATE INDEX event_json_room_id ON event_json USING btree (room_id); + + + +CREATE INDEX event_push_actions_rm_tokens ON event_push_actions USING btree (user_id, room_id, topological_ordering, stream_ordering); + + + +CREATE INDEX event_push_actions_room_id_user_id ON event_push_actions USING btree (room_id, user_id); + + + +CREATE INDEX event_push_actions_staging_id ON event_push_actions_staging USING btree (event_id); + + + +CREATE INDEX event_push_actions_stream_ordering ON event_push_actions USING btree (stream_ordering, user_id); + + + +CREATE INDEX event_push_summary_user_rm ON event_push_summary USING btree (user_id, room_id); + + + +CREATE INDEX event_reference_hashes_id ON event_reference_hashes USING btree (event_id); + + + +CREATE UNIQUE INDEX event_relations_id ON event_relations USING btree (event_id); + + + +CREATE INDEX event_relations_relates ON event_relations USING btree (relates_to_id, relation_type, aggregation_key); + + + +CREATE INDEX event_search_ev_ridx ON event_search USING btree (room_id); + + + +CREATE INDEX event_search_fts_idx ON event_search USING gin (vector); + + + +CREATE INDEX events_order_room ON events USING btree (room_id, topological_ordering, stream_ordering); + + + +CREATE INDEX events_room_stream ON events USING btree (room_id, stream_ordering); + + + +CREATE INDEX events_ts ON events USING btree (origin_server_ts, stream_ordering); + + + +CREATE INDEX group_attestations_remote_g_idx ON group_attestations_remote USING btree (group_id, user_id); + + + +CREATE INDEX group_attestations_remote_u_idx ON group_attestations_remote USING btree (user_id); + + + +CREATE INDEX group_attestations_remote_v_idx ON group_attestations_remote USING btree (valid_until_ms); + + + +CREATE INDEX group_attestations_renewals_g_idx ON group_attestations_renewals USING btree (group_id, user_id); + + + +CREATE INDEX group_attestations_renewals_u_idx ON group_attestations_renewals USING btree (user_id); + + + +CREATE INDEX group_attestations_renewals_v_idx ON group_attestations_renewals USING btree (valid_until_ms); + + + +CREATE UNIQUE INDEX group_invites_g_idx ON group_invites USING btree (group_id, user_id); + + + +CREATE INDEX group_invites_u_idx ON group_invites USING btree (user_id); + + + +CREATE UNIQUE INDEX group_rooms_g_idx ON group_rooms USING btree (group_id, room_id); + + + +CREATE INDEX group_rooms_r_idx ON group_rooms USING btree (room_id); + + + +CREATE UNIQUE INDEX group_summary_rooms_g_idx ON group_summary_rooms USING btree (group_id, room_id, category_id); + + + +CREATE INDEX group_summary_users_g_idx ON group_summary_users USING btree (group_id); + + + +CREATE UNIQUE INDEX group_users_g_idx ON group_users USING btree (group_id, user_id); + + + +CREATE INDEX group_users_u_idx ON group_users USING btree (user_id); + + + +CREATE UNIQUE INDEX groups_idx ON groups USING btree (group_id); + + + +CREATE INDEX local_group_membership_g_idx ON local_group_membership USING btree (group_id); + + + +CREATE INDEX local_group_membership_u_idx ON local_group_membership USING btree (user_id, group_id); + + + +CREATE INDEX local_invites_for_user_idx ON local_invites USING btree (invitee, locally_rejected, replaced_by, room_id); + + + +CREATE INDEX local_invites_id ON local_invites USING btree (stream_id); + + + +CREATE INDEX local_media_repository_thumbnails_media_id ON local_media_repository_thumbnails USING btree (media_id); + + + +CREATE INDEX local_media_repository_url_cache_by_url_download_ts ON local_media_repository_url_cache USING btree (url, download_ts); + + + +CREATE INDEX local_media_repository_url_cache_expires_idx ON local_media_repository_url_cache USING btree (expires_ts); + + + +CREATE INDEX local_media_repository_url_cache_media_idx ON local_media_repository_url_cache USING btree (media_id); + + + +CREATE INDEX monthly_active_users_time_stamp ON monthly_active_users USING btree ("timestamp"); + + + +CREATE UNIQUE INDEX monthly_active_users_users ON monthly_active_users USING btree (user_id); + + + +CREATE INDEX open_id_tokens_ts_valid_until_ms ON open_id_tokens USING btree (ts_valid_until_ms); + + + +CREATE INDEX presence_stream_id ON presence_stream USING btree (stream_id, user_id); + + + +CREATE INDEX presence_stream_user_id ON presence_stream USING btree (user_id); + + + +CREATE INDEX room_index ON rooms USING btree (is_; + + + +CREATE INDEX room_list_stream_idx ON room_list_stream USING btree (stream_id); + + + +CREATE INDEX room_list_stream_rm_idx ON room_list_stream USING btree (room_id, stream_id); + + + +CREATE INDEX push_rules_enable_user_name ON push_rules_enable USING btree (user_name); + + + +CREATE INDEX push_rules_stream_id ON push_rules_stream USING btree (stream_id); + + + +CREATE INDEX push_rules_stream_user_stream_id ON push_rules_stream USING btree (user_id, stream_id); + + + +CREATE INDEX push_rules_user_name ON push_rules USING btree (user_name); + + + +CREATE UNIQUE INDEX ratelimit_override_idx ON ratelimit_override USING btree (user_id); + + + +CREATE INDEX receipts_linearized_id ON receipts_linearized USING btree (stream_id); + + + +CREATE INDEX receipts_linearized_room_stream ON receipts_linearized USING btree (room_id, stream_id); + + + +CREATE INDEX receipts_linearized_user ON receipts_linearized USING btree (user_id); + + + +CREATE INDEX received_transactions_ts ON received_transactions USING btree (ts); + + + +CREATE INDEX redactions_redacts ON redactions USING btree (redacts); + + + +CREATE INDEX remote_profile_cache_time ON remote_profile_cache USING btree (last_check); + + + +CREATE UNIQUE INDEX remote_profile_cache_user_id ON remote_profile_cache USING btree (user_id); + + + +CREATE INDEX room_account_data_stream_id ON room_account_data USING btree (user_id, stream_id); + + + +CREATE INDEX room_alias_servers_alias ON room_alias_servers USING btree (room_alias); + + + +CREATE INDEX room_aliases_id ON room_aliases USING btree (room_id); + + + +CREATE INDEX room_depth_room ON room_depth USING btree (room_id); + + + +CREATE INDEX room_memberships_room_id ON room_memberships USING btree (room_id); + + + +CREATE INDEX room_memberships_user_id ON room_memberships USING btree (user_id); + + + +CREATE INDEX room_names_room_id ON room_names USING btree (room_id); + + + +CREATE UNIQUE INDEX room_state_room ON room_state USING btree (room_id); + + + +CREATE UNIQUE INDEX room_stats_earliest_token_idx ON room_stats_earliest_token USING btree (room_id); + + + +CREATE UNIQUE INDEX room_stats_room_ts ON room_stats USING btree (room_id, ts); + + + +CREATE INDEX state_group_edges_idx ON state_group_edges USING btree (state_group); + + + +CREATE INDEX state_group_edges_prev_idx ON state_group_edges USING btree (prev_state_group); + + + +CREATE INDEX state_groups_state_id ON state_groups_state USING btree (state_group); + + + +CREATE INDEX stream_ordering_to_exterm_idx ON stream_ordering_to_exterm USING btree (stream_ordering); + + + +CREATE INDEX stream_ordering_to_exterm_rm_idx ON stream_ordering_to_exterm USING btree (room_id, stream_ordering); + + + +CREATE UNIQUE INDEX threepid_guest_access_tokens_index ON threepid_guest_access_tokens USING btree (medium, address); + + + +CREATE INDEX topics_room_id ON topics USING btree (room_id); + + + +CREATE INDEX user_daily_visits_ts_idx ON user_daily_visits USING btree ("timestamp"); + + + +CREATE INDEX user_daily_visits_uts_idx ON user_daily_visits USING btree (user_id, "timestamp"); + + + +CREATE INDEX user_directory_room_idx ON user_directory USING btree (room_id); + + + +CREATE INDEX user_directory_search_fts_idx ON user_directory_search USING gin (vector); + + + +CREATE UNIQUE INDEX user_directory_search_user_idx ON user_directory_search USING btree (user_id); + + + +CREATE UNIQUE INDEX user_directory_user_idx ON user_directory USING btree (user_id); + + + +CREATE INDEX user_filters_by_user_id_filter_id ON user_filters USING btree (user_id, filter_id); + + + +CREATE INDEX user_ips_user_ip ON user_ips USING btree (user_id, access_token, ip); + + + +CREATE UNIQUE INDEX user_stats_user_ts ON user_stats USING btree (user_id, ts); + + + +CREATE UNIQUE INDEX user_threepid_id_server_idx ON user_threepid_id_server USING btree (user_id, medium, address, id_server); + + + +CREATE INDEX user_threepids_medium_address ON user_threepids USING btree (medium, address); + + + +CREATE INDEX user_threepids_user_id ON user_threepids USING btree (user_id); + + + +CREATE UNIQUE INDEX users_in_rooms_u_idx ON users_in_rooms USING btree (user_id, room_id); + + + +CREATE INDEX users_who_share_private_rooms_o_idx ON users_who_share_private_rooms USING btree (other_user_id); + + + +CREATE INDEX users_who_share_private_rooms_r_idx ON users_who_share_private_rooms USING btree (room_id); + + + +CREATE UNIQUE INDEX users_who_share_private_rooms_u_idx ON users_who_share_private_rooms USING btree (user_id, other_user_id, room_id); + + + diff --git a/synapse/storage/schema/full_schemas/54/full.sql.sqlite b/synapse/storage/schema/full_schemas/54/full.sql.sqlite new file mode 100644 index 0000000000..0b60a6c789 --- /dev/null +++ b/synapse/storage/schema/full_schemas/54/full.sql.sqlite @@ -0,0 +1,261 @@ +CREATE TABLE application_services_state( as_id TEXT PRIMARY KEY, state VARCHAR(5), last_txn INTEGER ); +CREATE TABLE application_services_txns( as_id TEXT NOT NULL, txn_id INTEGER NOT NULL, event_ids TEXT NOT NULL, UNIQUE(as_id, txn_id) ); +CREATE INDEX application_services_txns_id ON application_services_txns ( as_id ); +CREATE TABLE presence( user_id TEXT NOT NULL, state VARCHAR(20), status_msg TEXT, mtime BIGINT, UNIQUE (user_id) ); +CREATE TABLE presence_allow_inbound( observed_user_id TEXT NOT NULL, observer_user_id TEXT NOT NULL, UNIQUE (observed_user_id, observer_user_id) ); +CREATE TABLE users( name TEXT, password_hash TEXT, creation_ts BIGINT, admin SMALLINT DEFAULT 0 NOT NULL, upgrade_ts BIGINT, is_guest SMALLINT DEFAULT 0 NOT NULL, appservice_id TEXT, consent_version TEXT, consent_server_notice_sent TEXT, user_type TEXT DEFAULT NULL, UNIQUE(name) ); +CREATE TABLE access_tokens( id BIGINT PRIMARY KEY, user_id TEXT NOT NULL, device_id TEXT, token TEXT NOT NULL, last_used BIGINT, UNIQUE(token) ); +CREATE TABLE user_ips ( user_id TEXT NOT NULL, access_token TEXT NOT NULL, device_id TEXT, ip TEXT NOT NULL, user_agent TEXT NOT NULL, last_seen BIGINT NOT NULL ); +CREATE TABLE profiles( user_id TEXT NOT NULL, displayname TEXT, avatar_url TEXT, UNIQUE(user_id) ); +CREATE TABLE received_transactions( transaction_id TEXT, origin TEXT, ts BIGINT, response_code INTEGER, response_json bytea, has_been_referenced smallint default 0, UNIQUE (transaction_id, origin) ); +CREATE TABLE destinations( destination TEXT PRIMARY KEY, retry_last_ts BIGINT, retry_interval INTEGER ); +CREATE TABLE events( stream_ordering INTEGER PRIMARY KEY, topological_ordering BIGINT NOT NULL, event_id TEXT NOT NULL, type TEXT NOT NULL, room_id TEXT NOT NULL, content TEXT, unrecognized_keys TEXT, processed BOOL NOT NULL, outlier BOOL NOT NULL, depth BIGINT DEFAULT 0 NOT NULL, origin_server_ts BIGINT, received_ts BIGINT, sender TEXT, contains_url BOOLEAN, UNIQUE (event_id) ); +CREATE INDEX events_order_room ON events ( room_id, topological_ordering, stream_ordering ); +CREATE TABLE event_json( event_id TEXT NOT NULL, room_id TEXT NOT NULL, internal_metadata TEXT NOT NULL, json TEXT NOT NULL, format_version INTEGER, UNIQUE (event_id) ); +CREATE INDEX event_json_room_id ON event_json(room_id); +CREATE TABLE state_events( event_id TEXT NOT NULL, room_id TEXT NOT NULL, type TEXT NOT NULL, state_key TEXT NOT NULL, prev_state TEXT, UNIQUE (event_id) ); +CREATE TABLE current_state_events( event_id TEXT NOT NULL, room_id TEXT NOT NULL, type TEXT NOT NULL, state_key TEXT NOT NULL, UNIQUE (event_id), UNIQUE (room_id, type, state_key) ); +CREATE TABLE room_memberships( event_id TEXT NOT NULL, user_id TEXT NOT NULL, sender TEXT NOT NULL, room_id TEXT NOT NULL, membership TEXT NOT NULL, forgotten INTEGER DEFAULT 0, display_name TEXT, avatar_url TEXT, UNIQUE (event_id) ); +CREATE INDEX room_memberships_room_id ON room_memberships (room_id); +CREATE INDEX room_memberships_user_id ON room_memberships (user_id); +CREATE TABLE topics( event_id TEXT NOT NULL, room_id TEXT NOT NULL, topic TEXT NOT NULL, UNIQUE (event_id) ); +CREATE INDEX topics_room_id ON topics(room_id); +CREATE TABLE room_names( event_id TEXT NOT NULL, room_id TEXT NOT NULL, name TEXT NOT NULL, UNIQUE (event_id) ); +CREATE INDEX room_names_room_id ON room_names(room_id); +CREATE TABLE rooms( room_id TEXT PRIMARY KEY NOT NULL, is_public BOOL, creator TEXT ); +CREATE TABLE server_signature_keys( server_name TEXT, key_id TEXT, from_server TEXT, ts_added_ms BIGINT, verify_key bytea, ts_valid_until_ms BIGINT, UNIQUE (server_name, key_id) ); +CREATE TABLE rejections( event_id TEXT NOT NULL, reason TEXT NOT NULL, last_check TEXT NOT NULL, UNIQUE (event_id) ); +CREATE TABLE push_rules ( id BIGINT PRIMARY KEY, user_name TEXT NOT NULL, rule_id TEXT NOT NULL, priority_class SMALLINT NOT NULL, priority INTEGER NOT NULL DEFAULT 0, conditions TEXT NOT NULL, actions TEXT NOT NULL, UNIQUE(user_name, rule_id) ); +CREATE INDEX push_rules_user_name on push_rules (user_name); +CREATE TABLE user_filters( user_id TEXT, filter_id BIGINT, filter_json bytea ); +CREATE INDEX user_filters_by_user_id_filter_id ON user_filters( user_id, filter_id ); +CREATE TABLE push_rules_enable ( id BIGINT PRIMARY KEY, user_name TEXT NOT NULL, rule_id TEXT NOT NULL, enabled SMALLINT, UNIQUE(user_name, rule_id) ); +CREATE INDEX push_rules_enable_user_name on push_rules_enable (user_name); +CREATE TABLE event_forward_extremities( event_id TEXT NOT NULL, room_id TEXT NOT NULL, UNIQUE (event_id, room_id) ); +CREATE INDEX ev_extrem_room ON event_forward_extremities(room_id); +CREATE INDEX ev_extrem_id ON event_forward_extremities(event_id); +CREATE TABLE event_backward_extremities( event_id TEXT NOT NULL, room_id TEXT NOT NULL, UNIQUE (event_id, room_id) ); +CREATE INDEX ev_b_extrem_room ON event_backward_extremities(room_id); +CREATE INDEX ev_b_extrem_id ON event_backward_extremities(event_id); +CREATE TABLE event_edges( event_id TEXT NOT NULL, prev_event_id TEXT NOT NULL, room_id TEXT NOT NULL, is_state BOOL NOT NULL, UNIQUE (event_id, prev_event_id, room_id, is_state) ); +CREATE INDEX ev_edges_id ON event_edges(event_id); +CREATE INDEX ev_edges_prev_id ON event_edges(prev_event_id); +CREATE TABLE room_depth( room_id TEXT NOT NULL, min_depth INTEGER NOT NULL, UNIQUE (room_id) ); +CREATE INDEX room_depth_room ON room_depth(room_id); +CREATE TABLE state_groups( id BIGINT PRIMARY KEY, room_id TEXT NOT NULL, event_id TEXT NOT NULL ); +CREATE TABLE state_groups_state( state_group BIGINT NOT NULL, room_id TEXT NOT NULL, type TEXT NOT NULL, state_key TEXT NOT NULL, event_id TEXT NOT NULL ); +CREATE TABLE event_to_state_groups( event_id TEXT NOT NULL, state_group BIGINT NOT NULL, UNIQUE (event_id) ); +CREATE TABLE local_media_repository ( media_id TEXT, media_type TEXT, media_length INTEGER, created_ts BIGINT, upload_name TEXT, user_id TEXT, quarantined_by TEXT, url_cache TEXT, last_access_ts BIGINT, UNIQUE (media_id) ); +CREATE TABLE local_media_repository_thumbnails ( media_id TEXT, thumbnail_width INTEGER, thumbnail_height INTEGER, thumbnail_type TEXT, thumbnail_method TEXT, thumbnail_length INTEGER, UNIQUE ( media_id, thumbnail_width, thumbnail_height, thumbnail_type ) ); +CREATE INDEX local_media_repository_thumbnails_media_id ON local_media_repository_thumbnails (media_id); +CREATE TABLE remote_media_cache ( media_origin TEXT, media_id TEXT, media_type TEXT, created_ts BIGINT, upload_name TEXT, media_length INTEGER, filesystem_id TEXT, last_access_ts BIGINT, quarantined_by TEXT, UNIQUE (media_origin, media_id) ); +CREATE TABLE remote_media_cache_thumbnails ( media_origin TEXT, media_id TEXT, thumbnail_width INTEGER, thumbnail_height INTEGER, thumbnail_method TEXT, thumbnail_type TEXT, thumbnail_length INTEGER, filesystem_id TEXT, UNIQUE ( media_origin, media_id, thumbnail_width, thumbnail_height, thumbnail_type ) ); +CREATE TABLE redactions ( event_id TEXT NOT NULL, redacts TEXT NOT NULL, UNIQUE (event_id) ); +CREATE INDEX redactions_redacts ON redactions (redacts); +CREATE TABLE room_aliases( room_alias TEXT NOT NULL, room_id TEXT NOT NULL, creator TEXT, UNIQUE (room_alias) ); +CREATE INDEX room_aliases_id ON room_aliases(room_id); +CREATE TABLE room_alias_servers( room_alias TEXT NOT NULL, server TEXT NOT NULL ); +CREATE INDEX room_alias_servers_alias ON room_alias_servers(room_alias); +CREATE TABLE event_reference_hashes ( event_id TEXT, algorithm TEXT, hash bytea, UNIQUE (event_id, algorithm) ); +CREATE INDEX event_reference_hashes_id ON event_reference_hashes(event_id); +CREATE TABLE IF NOT EXISTS "server_keys_json" ( server_name TEXT NOT NULL, key_id TEXT NOT NULL, from_server TEXT NOT NULL, ts_added_ms BIGINT NOT NULL, ts_valid_until_ms BIGINT NOT NULL, key_json bytea NOT NULL, CONSTRAINT server_keys_json_uniqueness UNIQUE (server_name, key_id, from_server) ); +CREATE TABLE e2e_device_keys_json ( user_id TEXT NOT NULL, device_id TEXT NOT NULL, ts_added_ms BIGINT NOT NULL, key_json TEXT NOT NULL, CONSTRAINT e2e_device_keys_json_uniqueness UNIQUE (user_id, device_id) ); +CREATE TABLE e2e_one_time_keys_json ( user_id TEXT NOT NULL, device_id TEXT NOT NULL, algorithm TEXT NOT NULL, key_id TEXT NOT NULL, ts_added_ms BIGINT NOT NULL, key_json TEXT NOT NULL, CONSTRAINT e2e_one_time_keys_json_uniqueness UNIQUE (user_id, device_id, algorithm, key_id) ); +CREATE TABLE receipts_graph( room_id TEXT NOT NULL, receipt_type TEXT NOT NULL, user_id TEXT NOT NULL, event_ids TEXT NOT NULL, data TEXT NOT NULL, CONSTRAINT receipts_graph_uniqueness UNIQUE (room_id, receipt_type, user_id) ); +CREATE TABLE receipts_linearized ( stream_id BIGINT NOT NULL, room_id TEXT NOT NULL, receipt_type TEXT NOT NULL, user_id TEXT NOT NULL, event_id TEXT NOT NULL, data TEXT NOT NULL, CONSTRAINT receipts_linearized_uniqueness UNIQUE (room_id, receipt_type, user_id) ); +CREATE INDEX receipts_linearized_id ON receipts_linearized( stream_id ); +CREATE INDEX receipts_linearized_room_stream ON receipts_linearized( room_id, stream_id ); +CREATE TABLE IF NOT EXISTS "user_threepids" ( user_id TEXT NOT NULL, medium TEXT NOT NULL, address TEXT NOT NULL, validated_at BIGINT NOT NULL, added_at BIGINT NOT NULL, CONSTRAINT medium_address UNIQUE (medium, address) ); +CREATE INDEX user_threepids_user_id ON user_threepids(user_id); +CREATE TABLE background_updates( update_name TEXT NOT NULL, progress_json TEXT NOT NULL, depends_on TEXT, CONSTRAINT background_updates_uniqueness UNIQUE (update_name) ); +CREATE VIRTUAL TABLE event_search USING fts4 ( event_id, room_id, sender, key, value ) +/* event_search(event_id,room_id,sender,"key",value) */; +CREATE TABLE IF NOT EXISTS 'event_search_content'(docid INTEGER PRIMARY KEY, 'c0event_id', 'c1room_id', 'c2sender', 'c3key', 'c4value'); +CREATE TABLE IF NOT EXISTS 'event_search_segments'(blockid INTEGER PRIMARY KEY, block BLOB); +CREATE TABLE IF NOT EXISTS 'event_search_segdir'(level INTEGER,idx INTEGER,start_block INTEGER,leaves_end_block INTEGER,end_block INTEGER,root BLOB,PRIMARY KEY(level, idx)); +CREATE TABLE IF NOT EXISTS 'event_search_docsize'(docid INTEGER PRIMARY KEY, size BLOB); +CREATE TABLE IF NOT EXISTS 'event_search_stat'(id INTEGER PRIMARY KEY, value BLOB); +CREATE TABLE guest_access( event_id TEXT NOT NULL, room_id TEXT NOT NULL, guest_access TEXT NOT NULL, UNIQUE (event_id) ); +CREATE TABLE history_visibility( event_id TEXT NOT NULL, room_id TEXT NOT NULL, history_visibility TEXT NOT NULL, UNIQUE (event_id) ); +CREATE TABLE room_tags( user_id TEXT NOT NULL, room_id TEXT NOT NULL, tag TEXT NOT NULL, content TEXT NOT NULL, CONSTRAINT room_tag_uniqueness UNIQUE (user_id, room_id, tag) ); +CREATE TABLE room_tags_revisions ( user_id TEXT NOT NULL, room_id TEXT NOT NULL, stream_id BIGINT NOT NULL, CONSTRAINT room_tag_revisions_uniqueness UNIQUE (user_id, room_id) ); +CREATE TABLE IF NOT EXISTS "account_data_max_stream_id"( Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, stream_id BIGINT NOT NULL, CHECK (Lock='X') ); +CREATE TABLE account_data( user_id TEXT NOT NULL, account_data_type TEXT NOT NULL, stream_id BIGINT NOT NULL, content TEXT NOT NULL, CONSTRAINT account_data_uniqueness UNIQUE (user_id, account_data_type) ); +CREATE TABLE room_account_data( user_id TEXT NOT NULL, room_id TEXT NOT NULL, account_data_type TEXT NOT NULL, stream_id BIGINT NOT NULL, content TEXT NOT NULL, CONSTRAINT room_account_data_uniqueness UNIQUE (user_id, room_id, account_data_type) ); +CREATE INDEX account_data_stream_id on account_data(user_id, stream_id); +CREATE INDEX room_account_data_stream_id on room_account_data(user_id, stream_id); +CREATE INDEX events_ts ON events(origin_server_ts, stream_ordering); +CREATE TABLE event_push_actions( room_id TEXT NOT NULL, event_id TEXT NOT NULL, user_id TEXT NOT NULL, profile_tag VARCHAR(32), actions TEXT NOT NULL, topological_ordering BIGINT, stream_ordering BIGINT, notif SMALLINT, highlight SMALLINT, CONSTRAINT event_id_user_id_profile_tag_uniqueness UNIQUE (room_id, event_id, user_id, profile_tag) ); +CREATE INDEX event_push_actions_room_id_user_id on event_push_actions(room_id, user_id); +CREATE INDEX events_room_stream on events(room_id, stream_ordering); +CREATE INDEX public_room_index on rooms(is_public); +CREATE INDEX receipts_linearized_user ON receipts_linearized( user_id ); +CREATE INDEX event_push_actions_rm_tokens on event_push_actions( user_id, room_id, topological_ordering, stream_ordering ); +CREATE TABLE presence_stream( stream_id BIGINT, user_id TEXT, state TEXT, last_active_ts BIGINT, last_federation_update_ts BIGINT, last_user_sync_ts BIGINT, status_msg TEXT, currently_active BOOLEAN ); +CREATE INDEX presence_stream_id ON presence_stream(stream_id, user_id); +CREATE INDEX presence_stream_user_id ON presence_stream(user_id); +CREATE TABLE push_rules_stream( stream_id BIGINT NOT NULL, event_stream_ordering BIGINT NOT NULL, user_id TEXT NOT NULL, rule_id TEXT NOT NULL, op TEXT NOT NULL, priority_class SMALLINT, priority INTEGER, conditions TEXT, actions TEXT ); +CREATE INDEX push_rules_stream_id ON push_rules_stream(stream_id); +CREATE INDEX push_rules_stream_user_stream_id on push_rules_stream(user_id, stream_id); +CREATE TABLE ex_outlier_stream( event_stream_ordering BIGINT PRIMARY KEY NOT NULL, event_id TEXT NOT NULL, state_group BIGINT NOT NULL ); +CREATE TABLE threepid_guest_access_tokens( medium TEXT, address TEXT, guest_access_token TEXT, first_inviter TEXT ); +CREATE UNIQUE INDEX threepid_guest_access_tokens_index ON threepid_guest_access_tokens(medium, address); +CREATE TABLE local_invites( stream_id BIGINT NOT NULL, inviter TEXT NOT NULL, invitee TEXT NOT NULL, event_id TEXT NOT NULL, room_id TEXT NOT NULL, locally_rejected TEXT, replaced_by TEXT ); +CREATE INDEX local_invites_id ON local_invites(stream_id); +CREATE INDEX local_invites_for_user_idx ON local_invites(invitee, locally_rejected, replaced_by, room_id); +CREATE INDEX event_push_actions_stream_ordering on event_push_actions( stream_ordering, user_id ); +CREATE TABLE open_id_tokens ( token TEXT NOT NULL PRIMARY KEY, ts_valid_until_ms bigint NOT NULL, user_id TEXT NOT NULL, UNIQUE (token) ); +CREATE INDEX open_id_tokens_ts_valid_until_ms ON open_id_tokens(ts_valid_until_ms); +CREATE TABLE pusher_throttle( pusher BIGINT NOT NULL, room_id TEXT NOT NULL, last_sent_ts BIGINT, throttle_ms BIGINT, PRIMARY KEY (pusher, room_id) ); +CREATE TABLE event_reports( id BIGINT NOT NULL PRIMARY KEY, received_ts BIGINT NOT NULL, room_id TEXT NOT NULL, event_id TEXT NOT NULL, user_id TEXT NOT NULL, reason TEXT, content TEXT ); +CREATE TABLE devices ( user_id TEXT NOT NULL, device_id TEXT NOT NULL, display_name TEXT, CONSTRAINT device_uniqueness UNIQUE (user_id, device_id) ); +CREATE TABLE appservice_stream_position( Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, stream_ordering BIGINT, CHECK (Lock='X') ); +CREATE TABLE device_inbox ( user_id TEXT NOT NULL, device_id TEXT NOT NULL, stream_id BIGINT NOT NULL, message_json TEXT NOT NULL ); +CREATE INDEX device_inbox_user_stream_id ON device_inbox(user_id, device_id, stream_id); +CREATE INDEX received_transactions_ts ON received_transactions(ts); +CREATE TABLE device_federation_outbox ( destination TEXT NOT NULL, stream_id BIGINT NOT NULL, queued_ts BIGINT NOT NULL, messages_json TEXT NOT NULL ); +CREATE INDEX device_federation_outbox_destination_id ON device_federation_outbox(destination, stream_id); +CREATE TABLE device_federation_inbox ( origin TEXT NOT NULL, message_id TEXT NOT NULL, received_ts BIGINT NOT NULL ); +CREATE INDEX device_federation_inbox_sender_id ON device_federation_inbox(origin, message_id); +CREATE TABLE device_max_stream_id ( stream_id BIGINT NOT NULL ); +CREATE TABLE public_room_list_stream ( stream_id BIGINT NOT NULL, room_id TEXT NOT NULL, visibility BOOLEAN NOT NULL , appservice_id TEXT, network_id TEXT); +CREATE INDEX public_room_list_stream_idx on public_room_list_stream( stream_id ); +CREATE INDEX public_room_list_stream_rm_idx on public_room_list_stream( room_id, stream_id ); +CREATE TABLE state_group_edges( state_group BIGINT NOT NULL, prev_state_group BIGINT NOT NULL ); +CREATE INDEX state_group_edges_idx ON state_group_edges(state_group); +CREATE INDEX state_group_edges_prev_idx ON state_group_edges(prev_state_group); +CREATE TABLE stream_ordering_to_exterm ( stream_ordering BIGINT NOT NULL, room_id TEXT NOT NULL, event_id TEXT NOT NULL ); +CREATE INDEX stream_ordering_to_exterm_idx on stream_ordering_to_exterm( stream_ordering ); +CREATE INDEX stream_ordering_to_exterm_rm_idx on stream_ordering_to_exterm( room_id, stream_ordering ); +CREATE TABLE IF NOT EXISTS "event_auth"( event_id TEXT NOT NULL, auth_id TEXT NOT NULL, room_id TEXT NOT NULL ); +CREATE INDEX evauth_edges_id ON event_auth(event_id); +CREATE INDEX user_threepids_medium_address on user_threepids (medium, address); +CREATE TABLE appservice_room_list( appservice_id TEXT NOT NULL, network_id TEXT NOT NULL, room_id TEXT NOT NULL ); +CREATE UNIQUE INDEX appservice_room_list_idx ON appservice_room_list( appservice_id, network_id, room_id ); +CREATE INDEX device_federation_outbox_id ON device_federation_outbox(stream_id); +CREATE TABLE federation_stream_position( type TEXT NOT NULL, stream_id INTEGER NOT NULL ); +CREATE TABLE device_lists_remote_cache ( user_id TEXT NOT NULL, device_id TEXT NOT NULL, content TEXT NOT NULL ); +CREATE TABLE device_lists_remote_extremeties ( user_id TEXT NOT NULL, stream_id TEXT NOT NULL ); +CREATE TABLE device_lists_stream ( stream_id BIGINT NOT NULL, user_id TEXT NOT NULL, device_id TEXT NOT NULL ); +CREATE INDEX device_lists_stream_id ON device_lists_stream(stream_id, user_id); +CREATE TABLE device_lists_outbound_pokes ( destination TEXT NOT NULL, stream_id BIGINT NOT NULL, user_id TEXT NOT NULL, device_id TEXT NOT NULL, sent BOOLEAN NOT NULL, ts BIGINT NOT NULL ); +CREATE INDEX device_lists_outbound_pokes_id ON device_lists_outbound_pokes(destination, stream_id); +CREATE INDEX device_lists_outbound_pokes_user ON device_lists_outbound_pokes(destination, user_id); +CREATE TABLE event_push_summary ( user_id TEXT NOT NULL, room_id TEXT NOT NULL, notif_count BIGINT NOT NULL, stream_ordering BIGINT NOT NULL ); +CREATE INDEX event_push_summary_user_rm ON event_push_summary(user_id, room_id); +CREATE TABLE event_push_summary_stream_ordering ( Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, stream_ordering BIGINT NOT NULL, CHECK (Lock='X') ); +CREATE TABLE IF NOT EXISTS "pushers" ( id BIGINT PRIMARY KEY, user_name TEXT NOT NULL, access_token BIGINT DEFAULT NULL, profile_tag TEXT NOT NULL, kind TEXT NOT NULL, app_id TEXT NOT NULL, app_display_name TEXT NOT NULL, device_display_name TEXT NOT NULL, pushkey TEXT NOT NULL, ts BIGINT NOT NULL, lang TEXT, data TEXT, last_stream_ordering INTEGER, last_success BIGINT, failing_since BIGINT, UNIQUE (app_id, pushkey, user_name) ); +CREATE INDEX device_lists_outbound_pokes_stream ON device_lists_outbound_pokes(stream_id); +CREATE TABLE ratelimit_override ( user_id TEXT NOT NULL, messages_per_second BIGINT, burst_count BIGINT ); +CREATE UNIQUE INDEX ratelimit_override_idx ON ratelimit_override(user_id); +CREATE TABLE current_state_delta_stream ( stream_id BIGINT NOT NULL, room_id TEXT NOT NULL, type TEXT NOT NULL, state_key TEXT NOT NULL, event_id TEXT, prev_event_id TEXT ); +CREATE INDEX current_state_delta_stream_idx ON current_state_delta_stream(stream_id); +CREATE TABLE device_lists_outbound_last_success ( destination TEXT NOT NULL, user_id TEXT NOT NULL, stream_id BIGINT NOT NULL ); +CREATE INDEX device_lists_outbound_last_success_idx ON device_lists_outbound_last_success( destination, user_id, stream_id ); +CREATE TABLE user_directory_stream_pos ( Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, stream_id BIGINT, CHECK (Lock='X') ); +CREATE VIRTUAL TABLE user_directory_search USING fts4 ( user_id, value ) +/* user_directory_search(user_id,value) */; +CREATE TABLE IF NOT EXISTS 'user_directory_search_content'(docid INTEGER PRIMARY KEY, 'c0user_id', 'c1value'); +CREATE TABLE IF NOT EXISTS 'user_directory_search_segments'(blockid INTEGER PRIMARY KEY, block BLOB); +CREATE TABLE IF NOT EXISTS 'user_directory_search_segdir'(level INTEGER,idx INTEGER,start_block INTEGER,leaves_end_block INTEGER,end_block INTEGER,root BLOB,PRIMARY KEY(level, idx)); +CREATE TABLE IF NOT EXISTS 'user_directory_search_docsize'(docid INTEGER PRIMARY KEY, size BLOB); +CREATE TABLE IF NOT EXISTS 'user_directory_search_stat'(id INTEGER PRIMARY KEY, value BLOB); +CREATE TABLE blocked_rooms ( room_id TEXT NOT NULL, user_id TEXT NOT NULL ); +CREATE UNIQUE INDEX blocked_rooms_idx ON blocked_rooms(room_id); +CREATE TABLE IF NOT EXISTS "local_media_repository_url_cache"( url TEXT, response_code INTEGER, etag TEXT, expires_ts BIGINT, og TEXT, media_id TEXT, download_ts BIGINT ); +CREATE INDEX local_media_repository_url_cache_expires_idx ON local_media_repository_url_cache(expires_ts); +CREATE INDEX local_media_repository_url_cache_by_url_download_ts ON local_media_repository_url_cache(url, download_ts); +CREATE INDEX local_media_repository_url_cache_media_idx ON local_media_repository_url_cache(media_id); +CREATE TABLE group_users ( group_id TEXT NOT NULL, user_id TEXT NOT NULL, is_admin BOOLEAN NOT NULL, is_public BOOLEAN NOT NULL ); +CREATE TABLE group_invites ( group_id TEXT NOT NULL, user_id TEXT NOT NULL ); +CREATE TABLE group_rooms ( group_id TEXT NOT NULL, room_id TEXT NOT NULL, is_public BOOLEAN NOT NULL ); +CREATE TABLE group_summary_rooms ( group_id TEXT NOT NULL, room_id TEXT NOT NULL, category_id TEXT NOT NULL, room_order BIGINT NOT NULL, is_public BOOLEAN NOT NULL, UNIQUE (group_id, category_id, room_id, room_order), CHECK (room_order > 0) ); +CREATE UNIQUE INDEX group_summary_rooms_g_idx ON group_summary_rooms(group_id, room_id, category_id); +CREATE TABLE group_summary_room_categories ( group_id TEXT NOT NULL, category_id TEXT NOT NULL, cat_order BIGINT NOT NULL, UNIQUE (group_id, category_id, cat_order), CHECK (cat_order > 0) ); +CREATE TABLE group_room_categories ( group_id TEXT NOT NULL, category_id TEXT NOT NULL, profile TEXT NOT NULL, is_public BOOLEAN NOT NULL, UNIQUE (group_id, category_id) ); +CREATE TABLE group_summary_users ( group_id TEXT NOT NULL, user_id TEXT NOT NULL, role_id TEXT NOT NULL, user_order BIGINT NOT NULL, is_public BOOLEAN NOT NULL ); +CREATE INDEX group_summary_users_g_idx ON group_summary_users(group_id); +CREATE TABLE group_summary_roles ( group_id TEXT NOT NULL, role_id TEXT NOT NULL, role_order BIGINT NOT NULL, UNIQUE (group_id, role_id, role_order), CHECK (role_order > 0) ); +CREATE TABLE group_roles ( group_id TEXT NOT NULL, role_id TEXT NOT NULL, profile TEXT NOT NULL, is_public BOOLEAN NOT NULL, UNIQUE (group_id, role_id) ); +CREATE TABLE group_attestations_renewals ( group_id TEXT NOT NULL, user_id TEXT NOT NULL, valid_until_ms BIGINT NOT NULL ); +CREATE INDEX group_attestations_renewals_g_idx ON group_attestations_renewals(group_id, user_id); +CREATE INDEX group_attestations_renewals_u_idx ON group_attestations_renewals(user_id); +CREATE INDEX group_attestations_renewals_v_idx ON group_attestations_renewals(valid_until_ms); +CREATE TABLE group_attestations_remote ( group_id TEXT NOT NULL, user_id TEXT NOT NULL, valid_until_ms BIGINT NOT NULL, attestation_json TEXT NOT NULL ); +CREATE INDEX group_attestations_remote_g_idx ON group_attestations_remote(group_id, user_id); +CREATE INDEX group_attestations_remote_u_idx ON group_attestations_remote(user_id); +CREATE INDEX group_attestations_remote_v_idx ON group_attestations_remote(valid_until_ms); +CREATE TABLE local_group_membership ( group_id TEXT NOT NULL, user_id TEXT NOT NULL, is_admin BOOLEAN NOT NULL, membership TEXT NOT NULL, is_publicised BOOLEAN NOT NULL, content TEXT NOT NULL ); +CREATE INDEX local_group_membership_u_idx ON local_group_membership(user_id, group_id); +CREATE INDEX local_group_membership_g_idx ON local_group_membership(group_id); +CREATE TABLE local_group_updates ( stream_id BIGINT NOT NULL, group_id TEXT NOT NULL, user_id TEXT NOT NULL, type TEXT NOT NULL, content TEXT NOT NULL ); +CREATE TABLE remote_profile_cache ( user_id TEXT NOT NULL, displayname TEXT, avatar_url TEXT, last_check BIGINT NOT NULL ); +CREATE UNIQUE INDEX remote_profile_cache_user_id ON remote_profile_cache(user_id); +CREATE INDEX remote_profile_cache_time ON remote_profile_cache(last_check); +CREATE TABLE IF NOT EXISTS "deleted_pushers" ( stream_id BIGINT NOT NULL, app_id TEXT NOT NULL, pushkey TEXT NOT NULL, user_id TEXT NOT NULL ); +CREATE INDEX deleted_pushers_stream_id ON deleted_pushers (stream_id); +CREATE TABLE IF NOT EXISTS "groups" ( group_id TEXT NOT NULL, name TEXT, avatar_url TEXT, short_description TEXT, long_description TEXT, is_public BOOL NOT NULL , join_policy TEXT NOT NULL DEFAULT 'invite'); +CREATE UNIQUE INDEX groups_idx ON groups(group_id); +CREATE TABLE IF NOT EXISTS "user_directory" ( user_id TEXT NOT NULL, room_id TEXT, display_name TEXT, avatar_url TEXT ); +CREATE INDEX user_directory_room_idx ON user_directory(room_id); +CREATE UNIQUE INDEX user_directory_user_idx ON user_directory(user_id); +CREATE TABLE event_push_actions_staging ( event_id TEXT NOT NULL, user_id TEXT NOT NULL, actions TEXT NOT NULL, notif SMALLINT NOT NULL, highlight SMALLINT NOT NULL ); +CREATE INDEX event_push_actions_staging_id ON event_push_actions_staging(event_id); +CREATE TABLE users_pending_deactivation ( user_id TEXT NOT NULL ); +CREATE UNIQUE INDEX group_invites_g_idx ON group_invites(group_id, user_id); +CREATE UNIQUE INDEX group_users_g_idx ON group_users(group_id, user_id); +CREATE INDEX group_users_u_idx ON group_users(user_id); +CREATE INDEX group_invites_u_idx ON group_invites(user_id); +CREATE UNIQUE INDEX group_rooms_g_idx ON group_rooms(group_id, room_id); +CREATE INDEX group_rooms_r_idx ON group_rooms(room_id); +CREATE TABLE user_daily_visits ( user_id TEXT NOT NULL, device_id TEXT, timestamp BIGINT NOT NULL ); +CREATE INDEX user_daily_visits_uts_idx ON user_daily_visits(user_id, timestamp); +CREATE INDEX user_daily_visits_ts_idx ON user_daily_visits(timestamp); +CREATE TABLE erased_users ( user_id TEXT NOT NULL ); +CREATE UNIQUE INDEX erased_users_user ON erased_users(user_id); +CREATE TABLE monthly_active_users ( user_id TEXT NOT NULL, timestamp BIGINT NOT NULL ); +CREATE UNIQUE INDEX monthly_active_users_users ON monthly_active_users(user_id); +CREATE INDEX monthly_active_users_time_stamp ON monthly_active_users(timestamp); +CREATE TABLE IF NOT EXISTS "e2e_room_keys_versions" ( user_id TEXT NOT NULL, version BIGINT NOT NULL, algorithm TEXT NOT NULL, auth_data TEXT NOT NULL, deleted SMALLINT DEFAULT 0 NOT NULL ); +CREATE UNIQUE INDEX e2e_room_keys_versions_idx ON e2e_room_keys_versions(user_id, version); +CREATE TABLE IF NOT EXISTS "e2e_room_keys" ( user_id TEXT NOT NULL, room_id TEXT NOT NULL, session_id TEXT NOT NULL, version BIGINT NOT NULL, first_message_index INT, forwarded_count INT, is_verified BOOLEAN, session_data TEXT NOT NULL ); +CREATE UNIQUE INDEX e2e_room_keys_idx ON e2e_room_keys(user_id, room_id, session_id); +CREATE TABLE users_who_share_private_rooms ( user_id TEXT NOT NULL, other_user_id TEXT NOT NULL, room_id TEXT NOT NULL ); +CREATE UNIQUE INDEX users_who_share_private_rooms_u_idx ON users_who_share_private_rooms(user_id, other_user_id, room_id); +CREATE INDEX users_who_share_private_rooms_r_idx ON users_who_share_private_rooms(room_id); +CREATE INDEX users_who_share_private_rooms_o_idx ON users_who_share_private_rooms(other_user_id); +CREATE TABLE user_threepid_id_server ( user_id TEXT NOT NULL, medium TEXT NOT NULL, address TEXT NOT NULL, id_server TEXT NOT NULL ); +CREATE UNIQUE INDEX user_threepid_id_server_idx ON user_threepid_id_server( user_id, medium, address, id_server ); +CREATE TABLE users_in_public_rooms ( user_id TEXT NOT NULL, room_id TEXT NOT NULL ); +CREATE UNIQUE INDEX users_in_public_rooms_u_idx ON users_in_public_rooms(user_id, room_id); +CREATE TABLE account_validity ( user_id TEXT PRIMARY KEY, expiration_ts_ms BIGINT NOT NULL, email_sent BOOLEAN NOT NULL, renewal_token TEXT ); +CREATE TABLE event_relations ( event_id TEXT NOT NULL, relates_to_id TEXT NOT NULL, relation_type TEXT NOT NULL, aggregation_key TEXT ); +CREATE UNIQUE INDEX event_relations_id ON event_relations(event_id); +CREATE INDEX event_relations_relates ON event_relations(relates_to_id, relation_type, aggregation_key); +CREATE TABLE stats_stream_pos ( Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, stream_id BIGINT, CHECK (Lock='X') ); +CREATE TABLE user_stats ( user_id TEXT NOT NULL, ts BIGINT NOT NULL, bucket_size INT NOT NULL, public_rooms INT NOT NULL, private_rooms INT NOT NULL ); +CREATE UNIQUE INDEX user_stats_user_ts ON user_stats(user_id, ts); +CREATE TABLE room_stats ( room_id TEXT NOT NULL, ts BIGINT NOT NULL, bucket_size INT NOT NULL, current_state_events INT NOT NULL, joined_members INT NOT NULL, invited_members INT NOT NULL, left_members INT NOT NULL, banned_members INT NOT NULL, state_events INT NOT NULL ); +CREATE UNIQUE INDEX room_stats_room_ts ON room_stats(room_id, ts); +CREATE TABLE room_state ( room_id TEXT NOT NULL, join_rules TEXT, history_visibility TEXT, encryption TEXT, name TEXT, topic TEXT, avatar TEXT, canonical_alias TEXT ); +CREATE UNIQUE INDEX room_state_room ON room_state(room_id); +CREATE TABLE room_stats_earliest_token ( room_id TEXT NOT NULL, token BIGINT NOT NULL ); +CREATE UNIQUE INDEX room_stats_earliest_token_idx ON room_stats_earliest_token(room_id); +CREATE INDEX access_tokens_device_id ON access_tokens (user_id, device_id); +CREATE INDEX user_ips_device_id ON user_ips (user_id, device_id, last_seen); +CREATE INDEX event_contains_url_index ON events (room_id, topological_ordering, stream_ordering); +CREATE INDEX event_push_actions_u_highlight ON event_push_actions (user_id, stream_ordering); +CREATE INDEX event_push_actions_highlights_index ON event_push_actions (user_id, room_id, topological_ordering, stream_ordering); +CREATE INDEX current_state_events_member_index ON current_state_events (state_key); +CREATE INDEX device_inbox_stream_id_user_id ON device_inbox (stream_id, user_id); +CREATE INDEX device_lists_stream_user_id ON device_lists_stream (user_id, device_id); +CREATE INDEX local_media_repository_url_idx ON local_media_repository (created_ts); +CREATE INDEX user_ips_last_seen ON user_ips (user_id, last_seen); +CREATE INDEX user_ips_last_seen_only ON user_ips (last_seen); +CREATE INDEX users_creation_ts ON users (creation_ts); +CREATE INDEX event_to_state_groups_sg_index ON event_to_state_groups (state_group); +CREATE UNIQUE INDEX device_lists_remote_cache_unique_id ON device_lists_remote_cache (user_id, device_id); +CREATE TABLE sqlite_stat1(tbl,idx,stat); +CREATE INDEX state_groups_state_type_idx ON state_groups_state(state_group, type, state_key); +CREATE UNIQUE INDEX device_lists_remote_extremeties_unique_idx ON device_lists_remote_extremeties (user_id); +CREATE UNIQUE INDEX user_ips_user_token_ip_unique_index ON user_ips (user_id, access_token, ip); diff --git a/synapse/storage/schema/full_schemas/README.txt b/synapse/storage/schema/full_schemas/README.txt new file mode 100644 index 0000000000..12d4eb0746 --- /dev/null +++ b/synapse/storage/schema/full_schemas/README.txt @@ -0,0 +1,14 @@ +Building full schema dumps +========================== + +Postgres +-------- + +$ pg_dump --format=plain --schema-only --no-tablespaces --no-acl --no-owner $DATABASE_NAME| sed -e '/^--/d' -e 's/public.//g' -e '/^SET /d' -e '/^SELECT /d' > full.sql.postgres + +SQLite +------ + +$ sqlite3 $DATABASE_FILE ".schema" > full.sql.sqlite + +Delete the CREATE statements for "schema_version", "applied_schema_deltas", and "applied_module_schemas". \ No newline at end of file -- cgit 1.5.1 From 7f81b967ca1d4004832e96513ad33e802f9a6f78 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Mon, 3 Jun 2019 22:23:40 +1000 Subject: fix schemas --- synapse/storage/prepare_database.py | 4 +- .../schema/full_schemas/54/full.sql.postgres | 72 +++++----------------- .../storage/schema/full_schemas/54/full.sql.sqlite | 1 - synapse/storage/schema/full_schemas/README.txt | 4 +- tests/handlers/test_user_directory.py | 2 + 5 files changed, 22 insertions(+), 61 deletions(-) (limited to 'synapse/storage/schema') diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 07478b6672..b81c05369f 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -146,9 +146,9 @@ def _setup_new_database(cur, database_engine): directory_entries = os.listdir(sql_dir) - for filename in fnmatch.filter(directory_entries, "*.sql") + fnmatch.filter( + for filename in sorted(fnmatch.filter(directory_entries, "*.sql") + fnmatch.filter( directory_entries, "*.sql." + specific - ): + )): sql_loc = os.path.join(sql_dir, filename) logger.debug("Applying schema %s", sql_loc) executescript(cur, sql_loc) diff --git a/synapse/storage/schema/full_schemas/54/full.sql.postgres b/synapse/storage/schema/full_schemas/54/full.sql.postgres index 5fb54cfe77..ea3859fd24 100644 --- a/synapse/storage/schema/full_schemas/54/full.sql.postgres +++ b/synapse/storage/schema/full_schemas/54/full.sql.postgres @@ -60,21 +60,6 @@ CREATE TABLE application_services_txns ( ); - -CREATE TABLE applied_module_schemas ( - module_name text NOT NULL, - file text NOT NULL -); - - - -CREATE TABLE applied_schema_deltas ( - version integer NOT NULL, - file text NOT NULL -); - - - CREATE TABLE appservice_room_list ( appservice_id text NOT NULL, network_id text NOT NULL, @@ -475,7 +460,7 @@ CREATE TABLE group_roles ( group_id text NOT NULL, role_id text NOT NULL, profile text NOT NULL, - is_boolean NOT NULL + is_public boolean NOT NULL ); @@ -484,7 +469,7 @@ CREATE TABLE group_room_categories ( group_id text NOT NULL, category_id text NOT NULL, profile text NOT NULL, - is_boolean NOT NULL + is_public boolean NOT NULL ); @@ -492,7 +477,7 @@ CREATE TABLE group_room_categories ( CREATE TABLE group_rooms ( group_id text NOT NULL, room_id text NOT NULL, - is_boolean NOT NULL + is_public boolean NOT NULL ); @@ -520,7 +505,7 @@ CREATE TABLE group_summary_rooms ( room_id text NOT NULL, category_id text NOT NULL, room_order bigint NOT NULL, - is_boolean NOT NULL, + is_public boolean NOT NULL, CONSTRAINT group_summary_rooms_room_order_check CHECK ((room_order > 0)) ); @@ -531,7 +516,7 @@ CREATE TABLE group_summary_users ( user_id text NOT NULL, role_id text NOT NULL, user_order bigint NOT NULL, - is_boolean NOT NULL + is_public boolean NOT NULL ); @@ -540,7 +525,7 @@ CREATE TABLE group_users ( group_id text NOT NULL, user_id text NOT NULL, is_admin boolean NOT NULL, - is_boolean NOT NULL + is_public boolean NOT NULL ); @@ -551,7 +536,7 @@ CREATE TABLE groups ( avatar_url text, short_description text, long_description text, - is_boolean NOT NULL, + is_public boolean NOT NULL, join_policy text DEFAULT 'invite'::text NOT NULL ); @@ -578,7 +563,7 @@ CREATE TABLE local_group_membership ( user_id text NOT NULL, is_admin boolean NOT NULL, membership text NOT NULL, - is_sed boolean NOT NULL, + is_publicised boolean NOT NULL, content text NOT NULL ); @@ -695,7 +680,7 @@ CREATE TABLE profiles ( -CREATE TABLE room_list_stream ( +CREATE TABLE public_room_list_stream ( stream_id bigint NOT NULL, room_id text NOT NULL, visibility boolean NOT NULL, @@ -966,21 +951,12 @@ CREATE TABLE room_tags_revisions ( CREATE TABLE rooms ( room_id text NOT NULL, - is_boolean, + is_public boolean, creator text ); -CREATE TABLE schema_version ( - lock character(1) DEFAULT 'X'::bpchar NOT NULL, - version integer NOT NULL, - upgraded boolean NOT NULL, - CONSTRAINT schema_version_lock_check CHECK ((lock = 'X'::bpchar)) -); - - - CREATE TABLE server_keys_json ( server_name text NOT NULL, key_id text NOT NULL, @@ -1135,7 +1111,7 @@ CREATE TABLE user_stats ( user_id text NOT NULL, ts bigint NOT NULL, bucket_size integer NOT NULL, - rooms integer NOT NULL, + public_rooms integer NOT NULL, private_rooms integer NOT NULL ); @@ -1175,7 +1151,7 @@ CREATE TABLE users ( -CREATE TABLE users_in_rooms ( +CREATE TABLE users_in_public_rooms ( user_id text NOT NULL, room_id text NOT NULL ); @@ -1225,17 +1201,6 @@ ALTER TABLE ONLY application_services_txns ADD CONSTRAINT application_services_txns_as_id_txn_id_key UNIQUE (as_id, txn_id); - -ALTER TABLE ONLY applied_module_schemas - ADD CONSTRAINT applied_module_schemas_module_name_file_key UNIQUE (module_name, file); - - - -ALTER TABLE ONLY applied_schema_deltas - ADD CONSTRAINT applied_schema_deltas_version_file_key UNIQUE (version, file); - - - ALTER TABLE ONLY appservice_stream_position ADD CONSTRAINT appservice_stream_position_lock_key UNIQUE (lock); @@ -1521,11 +1486,6 @@ ALTER TABLE ONLY rooms -ALTER TABLE ONLY schema_version - ADD CONSTRAINT schema_version_lock_key UNIQUE (lock); - - - ALTER TABLE ONLY server_keys_json ADD CONSTRAINT server_keys_json_uniqueness UNIQUE (server_name, key_id, from_server); @@ -1846,15 +1806,15 @@ CREATE INDEX presence_stream_user_id ON presence_stream USING btree (user_id); -CREATE INDEX room_index ON rooms USING btree (is_; +CREATE INDEX public_room_index ON rooms USING btree (is_public); -CREATE INDEX room_list_stream_idx ON room_list_stream USING btree (stream_id); +CREATE INDEX public_room_list_stream_idx ON public_room_list_stream USING btree (stream_id); -CREATE INDEX room_list_stream_rm_idx ON room_list_stream USING btree (room_id, stream_id); +CREATE INDEX public_room_list_stream_rm_idx ON public_room_list_stream USING btree (room_id, stream_id); @@ -2022,7 +1982,7 @@ CREATE INDEX user_threepids_user_id ON user_threepids USING btree (user_id); -CREATE UNIQUE INDEX users_in_rooms_u_idx ON users_in_rooms USING btree (user_id, room_id); +CREATE UNIQUE INDEX users_in_public_rooms_u_idx ON users_in_public_rooms USING btree (user_id, room_id); diff --git a/synapse/storage/schema/full_schemas/54/full.sql.sqlite b/synapse/storage/schema/full_schemas/54/full.sql.sqlite index 0b60a6c789..be9295e4c9 100644 --- a/synapse/storage/schema/full_schemas/54/full.sql.sqlite +++ b/synapse/storage/schema/full_schemas/54/full.sql.sqlite @@ -255,7 +255,6 @@ CREATE INDEX user_ips_last_seen_only ON user_ips (last_seen); CREATE INDEX users_creation_ts ON users (creation_ts); CREATE INDEX event_to_state_groups_sg_index ON event_to_state_groups (state_group); CREATE UNIQUE INDEX device_lists_remote_cache_unique_id ON device_lists_remote_cache (user_id, device_id); -CREATE TABLE sqlite_stat1(tbl,idx,stat); CREATE INDEX state_groups_state_type_idx ON state_groups_state(state_group, type, state_key); CREATE UNIQUE INDEX device_lists_remote_extremeties_unique_idx ON device_lists_remote_extremeties (user_id); CREATE UNIQUE INDEX user_ips_user_token_ip_unique_index ON user_ips (user_id, access_token, ip); diff --git a/synapse/storage/schema/full_schemas/README.txt b/synapse/storage/schema/full_schemas/README.txt index 12d4eb0746..df49f9b39e 100644 --- a/synapse/storage/schema/full_schemas/README.txt +++ b/synapse/storage/schema/full_schemas/README.txt @@ -4,11 +4,11 @@ Building full schema dumps Postgres -------- -$ pg_dump --format=plain --schema-only --no-tablespaces --no-acl --no-owner $DATABASE_NAME| sed -e '/^--/d' -e 's/public.//g' -e '/^SET /d' -e '/^SELECT /d' > full.sql.postgres +$ pg_dump --format=plain --schema-only --no-tablespaces --no-acl --no-owner $DATABASE_NAME| sed -e '/^--/d' -e 's/public\.//g' -e '/^SET /d' -e '/^SELECT /d' > full.sql.postgres SQLite ------ $ sqlite3 $DATABASE_FILE ".schema" > full.sql.sqlite -Delete the CREATE statements for "schema_version", "applied_schema_deltas", and "applied_module_schemas". \ No newline at end of file +Delete the CREATE statements for "sqlite_stat1", "schema_version", "applied_schema_deltas", and "applied_module_schemas". \ No newline at end of file diff --git a/tests/handlers/test_user_directory.py b/tests/handlers/test_user_directory.py index 9021e647fe..b919694f54 100644 --- a/tests/handlers/test_user_directory.py +++ b/tests/handlers/test_user_directory.py @@ -96,6 +96,7 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase): self.get_success(self.handler.handle_user_deactivated(r_user_id)) self.store.remove_from_user_dir.called_once_with(r_user_id) + @unittest.DEBUG def test_private_room(self): """ A user can be searched for only by people that are either in a public @@ -340,6 +341,7 @@ class TestUserDirSearchDisabled(unittest.HomeserverTestCase): return hs + @unittest.DEBUG def test_disabling_room_list(self): self.config.user_directory_search_enabled = True -- cgit 1.5.1 From be452fc9ace4f501398be769766e3f8bbd798571 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Mon, 3 Jun 2019 22:24:23 +1000 Subject: more fix --- .../schema/full_schemas/54/stream_positions.sql | 38 ++++++++++++++++++++++ tests/handlers/test_user_directory.py | 2 -- 2 files changed, 38 insertions(+), 2 deletions(-) create mode 100644 synapse/storage/schema/full_schemas/54/stream_positions.sql (limited to 'synapse/storage/schema') diff --git a/synapse/storage/schema/full_schemas/54/stream_positions.sql b/synapse/storage/schema/full_schemas/54/stream_positions.sql new file mode 100644 index 0000000000..d6433a5af2 --- /dev/null +++ b/synapse/storage/schema/full_schemas/54/stream_positions.sql @@ -0,0 +1,38 @@ + +INSERT INTO appservice_stream_position (stream_ordering) SELECT COALESCE(MAX(stream_ordering), 0) FROM events; +INSERT INTO federation_stream_position (type, stream_id) VALUES ('federation', -1); +INSERT INTO federation_stream_position (type, stream_id) SELECT 'events', coalesce(max(stream_ordering), -1) FROM events; +INSERT INTO user_directory_stream_pos (stream_id) VALUES (null); +INSERT INTO stats_stream_pos (stream_id) VALUES (null); + +--- User dir population + +-- Set up staging tables +INSERT INTO background_updates (update_name, progress_json) VALUES + ('populate_user_directory_createtables', '{}'); + +-- Run through each room and update the user directory according to who is in it +INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES + ('populate_user_directory_process_rooms', '{}', 'populate_user_directory_createtables'); + +-- Insert all users, if search_all_users is on +INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES + ('populate_user_directory_process_users', '{}', 'populate_user_directory_process_rooms'); + +-- Clean up staging tables +INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES + ('populate_user_directory_cleanup', '{}', 'populate_user_directory_process_users'); + +--- Stats population + +-- Set up staging tables +INSERT INTO background_updates (update_name, progress_json) VALUES + ('populate_stats_createtables', '{}'); + +-- Run through each room and update stats +INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES + ('populate_stats_process_rooms', '{}', 'populate_stats_createtables'); + +-- Clean up staging tables +INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES + ('populate_stats_cleanup', '{}', 'populate_stats_process_rooms'); diff --git a/tests/handlers/test_user_directory.py b/tests/handlers/test_user_directory.py index b919694f54..9021e647fe 100644 --- a/tests/handlers/test_user_directory.py +++ b/tests/handlers/test_user_directory.py @@ -96,7 +96,6 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase): self.get_success(self.handler.handle_user_deactivated(r_user_id)) self.store.remove_from_user_dir.called_once_with(r_user_id) - @unittest.DEBUG def test_private_room(self): """ A user can be searched for only by people that are either in a public @@ -341,7 +340,6 @@ class TestUserDirSearchDisabled(unittest.HomeserverTestCase): return hs - @unittest.DEBUG def test_disabling_room_list(self): self.config.user_directory_search_enabled = True -- cgit 1.5.1 From 4e75c5e02a6dead87e9a24cbdb8fb015221070ce Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Mon, 3 Jun 2019 22:42:12 +1000 Subject: WHY IS THIS CALLED A SLIGHTLY DIFFERENT THING --- synapse/storage/schema/full_schemas/54/stream_positions.sql | 1 + 1 file changed, 1 insertion(+) (limited to 'synapse/storage/schema') diff --git a/synapse/storage/schema/full_schemas/54/stream_positions.sql b/synapse/storage/schema/full_schemas/54/stream_positions.sql index d6433a5af2..0febedcc5e 100644 --- a/synapse/storage/schema/full_schemas/54/stream_positions.sql +++ b/synapse/storage/schema/full_schemas/54/stream_positions.sql @@ -4,6 +4,7 @@ INSERT INTO federation_stream_position (type, stream_id) VALUES ('federation', - INSERT INTO federation_stream_position (type, stream_id) SELECT 'events', coalesce(max(stream_ordering), -1) FROM events; INSERT INTO user_directory_stream_pos (stream_id) VALUES (null); INSERT INTO stats_stream_pos (stream_id) VALUES (null); +INSERT INTO event_push_summary_stream_ordering (stream_ordering) VALUES (0); --- User dir population -- cgit 1.5.1 From 2198b7ce2a1316dfaf8dde061c1a6f30a818ed5a Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Tue, 4 Jun 2019 01:06:00 +1000 Subject: add stuff in bg updates --- .../schema/full_schemas/54/full.sql.postgres | 72 +++++++++++++++++++--- 1 file changed, 62 insertions(+), 10 deletions(-) (limited to 'synapse/storage/schema') diff --git a/synapse/storage/schema/full_schemas/54/full.sql.postgres b/synapse/storage/schema/full_schemas/54/full.sql.postgres index ea3859fd24..098434356f 100644 --- a/synapse/storage/schema/full_schemas/54/full.sql.postgres +++ b/synapse/storage/schema/full_schemas/54/full.sql.postgres @@ -3,12 +3,6 @@ -CREATE TABLE _extremities_to_check ( - event_id text -); - - - CREATE TABLE access_tokens ( id bigint NOT NULL, user_id text NOT NULL, @@ -60,6 +54,7 @@ CREATE TABLE application_services_txns ( ); + CREATE TABLE appservice_room_list ( appservice_id text NOT NULL, network_id text NOT NULL, @@ -1201,6 +1196,7 @@ ALTER TABLE ONLY application_services_txns ADD CONSTRAINT application_services_txns_as_id_txn_id_key UNIQUE (as_id, txn_id); + ALTER TABLE ONLY appservice_stream_position ADD CONSTRAINT appservice_stream_position_lock_key UNIQUE (lock); @@ -1526,7 +1522,7 @@ ALTER TABLE ONLY users -CREATE INDEX _extremities_to_check_id ON _extremities_to_check USING btree (event_id); +CREATE INDEX access_tokens_device_id ON access_tokens USING btree (user_id, device_id); @@ -1554,6 +1550,10 @@ CREATE INDEX current_state_delta_stream_idx ON current_state_delta_stream USING +CREATE INDEX current_state_events_member_index ON current_state_events USING btree (state_key) WHERE (type = 'm.room.member'::text); + + + CREATE INDEX deleted_pushers_stream_id ON deleted_pushers USING btree (stream_id); @@ -1570,7 +1570,7 @@ CREATE INDEX device_federation_outbox_id ON device_federation_outbox USING btree -CREATE INDEX device_inbox_stream_id ON device_inbox USING btree (stream_id); +CREATE INDEX device_inbox_stream_id_user_id ON device_inbox USING btree (stream_id, user_id); @@ -1594,10 +1594,22 @@ CREATE INDEX device_lists_outbound_pokes_user ON device_lists_outbound_pokes USI +CREATE UNIQUE INDEX device_lists_remote_cache_unique_id ON device_lists_remote_cache USING btree (user_id, device_id); + + + +CREATE UNIQUE INDEX device_lists_remote_extremeties_unique_idx ON device_lists_remote_extremeties USING btree (user_id); + + + CREATE INDEX device_lists_stream_id ON device_lists_stream USING btree (stream_id, user_id); +CREATE INDEX device_lists_stream_user_id ON device_lists_stream USING btree (user_id, device_id); + + + CREATE UNIQUE INDEX e2e_room_keys_idx ON e2e_room_keys USING btree (user_id, room_id, session_id); @@ -1638,10 +1650,18 @@ CREATE INDEX evauth_edges_id ON event_auth USING btree (event_id); +CREATE INDEX event_contains_url_index ON events USING btree (room_id, topological_ordering, stream_ordering) WHERE ((contains_url = true) AND (outlier = false)); + + + CREATE INDEX event_json_room_id ON event_json USING btree (room_id); +CREATE INDEX event_push_actions_highlights_index ON event_push_actions USING btree (user_id, room_id, topological_ordering, stream_ordering) WHERE (highlight = 1); + + + CREATE INDEX event_push_actions_rm_tokens ON event_push_actions USING btree (user_id, room_id, topological_ordering, stream_ordering); @@ -1658,6 +1678,10 @@ CREATE INDEX event_push_actions_stream_ordering ON event_push_actions USING btre +CREATE INDEX event_push_actions_u_highlight ON event_push_actions USING btree (user_id, stream_ordering); + + + CREATE INDEX event_push_summary_user_rm ON event_push_summary USING btree (user_id, room_id); @@ -1678,10 +1702,18 @@ CREATE INDEX event_search_ev_ridx ON event_search USING btree (room_id); +CREATE UNIQUE INDEX event_search_event_id_idx ON event_search USING btree (event_id); + + + CREATE INDEX event_search_fts_idx ON event_search USING gin (vector); +CREATE INDEX event_to_state_groups_sg_index ON event_to_state_groups USING btree (state_group); + + + CREATE INDEX events_order_room ON events USING btree (room_id, topological_ordering, stream_ordering); @@ -1786,6 +1818,10 @@ CREATE INDEX local_media_repository_url_cache_media_idx ON local_media_repositor +CREATE INDEX local_media_repository_url_idx ON local_media_repository USING btree (created_ts) WHERE (url_cache IS NOT NULL); + + + CREATE INDEX monthly_active_users_time_stamp ON monthly_active_users USING btree ("timestamp"); @@ -1914,7 +1950,7 @@ CREATE INDEX state_group_edges_prev_idx ON state_group_edges USING btree (prev_s -CREATE INDEX state_groups_state_id ON state_groups_state USING btree (state_group); +CREATE INDEX state_groups_state_type_idx ON state_groups_state USING btree (state_group, type, state_key); @@ -1962,7 +1998,19 @@ CREATE INDEX user_filters_by_user_id_filter_id ON user_filters USING btree (user -CREATE INDEX user_ips_user_ip ON user_ips USING btree (user_id, access_token, ip); +CREATE INDEX user_ips_device_id ON user_ips USING btree (user_id, device_id, last_seen); + + + +CREATE INDEX user_ips_last_seen ON user_ips USING btree (user_id, last_seen); + + + +CREATE INDEX user_ips_last_seen_only ON user_ips USING btree (last_seen); + + + +CREATE UNIQUE INDEX user_ips_user_token_ip_unique_index ON user_ips USING btree (user_id, access_token, ip); @@ -1982,6 +2030,10 @@ CREATE INDEX user_threepids_user_id ON user_threepids USING btree (user_id); +CREATE INDEX users_creation_ts ON users USING btree (creation_ts); + + + CREATE UNIQUE INDEX users_in_public_rooms_u_idx ON users_in_public_rooms USING btree (user_id, room_id); -- cgit 1.5.1 From 6362e3af14cff28bc51d0e66d207b84bae7fd422 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Thu, 6 Jun 2019 04:20:35 +1000 Subject: add more comments --- synapse/storage/schema/full_schemas/README.txt | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'synapse/storage/schema') diff --git a/synapse/storage/schema/full_schemas/README.txt b/synapse/storage/schema/full_schemas/README.txt index df49f9b39e..d3f6401344 100644 --- a/synapse/storage/schema/full_schemas/README.txt +++ b/synapse/storage/schema/full_schemas/README.txt @@ -1,6 +1,8 @@ Building full schema dumps ========================== +These schemas need to be made from a database that has had all background updates run. + Postgres -------- @@ -11,4 +13,7 @@ SQLite $ sqlite3 $DATABASE_FILE ".schema" > full.sql.sqlite +After +----- + Delete the CREATE statements for "sqlite_stat1", "schema_version", "applied_schema_deltas", and "applied_module_schemas". \ No newline at end of file -- cgit 1.5.1 From 3b6645d3bf3d3be449e1162e4135b677a1086ade Mon Sep 17 00:00:00 2001 From: "Amber H. Brown" Date: Fri, 7 Jun 2019 01:20:58 +1000 Subject: remove background updates that arent needed --- .../schema/full_schemas/54/stream_positions.sql | 34 +--------------------- 1 file changed, 1 insertion(+), 33 deletions(-) (limited to 'synapse/storage/schema') diff --git a/synapse/storage/schema/full_schemas/54/stream_positions.sql b/synapse/storage/schema/full_schemas/54/stream_positions.sql index 0febedcc5e..084a70db65 100644 --- a/synapse/storage/schema/full_schemas/54/stream_positions.sql +++ b/synapse/storage/schema/full_schemas/54/stream_positions.sql @@ -4,36 +4,4 @@ INSERT INTO federation_stream_position (type, stream_id) VALUES ('federation', - INSERT INTO federation_stream_position (type, stream_id) SELECT 'events', coalesce(max(stream_ordering), -1) FROM events; INSERT INTO user_directory_stream_pos (stream_id) VALUES (null); INSERT INTO stats_stream_pos (stream_id) VALUES (null); -INSERT INTO event_push_summary_stream_ordering (stream_ordering) VALUES (0); - ---- User dir population - --- Set up staging tables -INSERT INTO background_updates (update_name, progress_json) VALUES - ('populate_user_directory_createtables', '{}'); - --- Run through each room and update the user directory according to who is in it -INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES - ('populate_user_directory_process_rooms', '{}', 'populate_user_directory_createtables'); - --- Insert all users, if search_all_users is on -INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES - ('populate_user_directory_process_users', '{}', 'populate_user_directory_process_rooms'); - --- Clean up staging tables -INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES - ('populate_user_directory_cleanup', '{}', 'populate_user_directory_process_users'); - ---- Stats population - --- Set up staging tables -INSERT INTO background_updates (update_name, progress_json) VALUES - ('populate_stats_createtables', '{}'); - --- Run through each room and update stats -INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES - ('populate_stats_process_rooms', '{}', 'populate_stats_createtables'); - --- Clean up staging tables -INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES - ('populate_stats_cleanup', '{}', 'populate_stats_process_rooms'); +INSERT INTO event_push_summary_stream_ordering (stream_ordering) VALUES (0); \ No newline at end of file -- cgit 1.5.1 From 837e32ef551b9c53d23615cdde56cc9babcc9059 Mon Sep 17 00:00:00 2001 From: "Amber H. Brown" Date: Fri, 7 Jun 2019 01:49:25 +1000 Subject: just user dir? --- .../schema/full_schemas/54/stream_positions.sql | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) (limited to 'synapse/storage/schema') diff --git a/synapse/storage/schema/full_schemas/54/stream_positions.sql b/synapse/storage/schema/full_schemas/54/stream_positions.sql index 084a70db65..575ab6b354 100644 --- a/synapse/storage/schema/full_schemas/54/stream_positions.sql +++ b/synapse/storage/schema/full_schemas/54/stream_positions.sql @@ -4,4 +4,22 @@ INSERT INTO federation_stream_position (type, stream_id) VALUES ('federation', - INSERT INTO federation_stream_position (type, stream_id) SELECT 'events', coalesce(max(stream_ordering), -1) FROM events; INSERT INTO user_directory_stream_pos (stream_id) VALUES (null); INSERT INTO stats_stream_pos (stream_id) VALUES (null); -INSERT INTO event_push_summary_stream_ordering (stream_ordering) VALUES (0); \ No newline at end of file +INSERT INTO event_push_summary_stream_ordering (stream_ordering) VALUES (0); + +--- User dir population + +-- Set up staging tables +INSERT INTO background_updates (update_name, progress_json) VALUES + ('populate_user_directory_createtables', '{}'); + +-- Run through each room and update the user directory according to who is in it +INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES + ('populate_user_directory_process_rooms', '{}', 'populate_user_directory_createtables'); + +-- Insert all users, if search_all_users is on +INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES + ('populate_user_directory_process_users', '{}', 'populate_user_directory_process_rooms'); + +-- Clean up staging tables +INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES + ('populate_user_directory_cleanup', '{}', 'populate_user_directory_process_users'); -- cgit 1.5.1 From 3719680ee42b72b8480fa76a1455576897b65ef0 Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Thu, 6 Jun 2019 17:34:07 +0100 Subject: Add ability to perform password reset via email without trusting the identity server (#5377) Sends password reset emails from the homeserver instead of proxying to the identity server. This is now the default behaviour for security reasons. If you wish to continue proxying password reset requests to the identity server you must now enable the email.trust_identity_server_for_password_resets option. This PR is a culmination of 3 smaller PRs which have each been separately reviewed: * #5308 * #5345 * #5368 --- changelog.d/5377.feature | 1 + docs/sample_config.yaml | 60 ++++- synapse/api/errors.py | 9 + synapse/app/homeserver.py | 1 + synapse/config/emailconfig.py | 153 +++++++++-- synapse/handlers/auth.py | 64 ++++- synapse/handlers/identity.py | 13 +- synapse/push/mailer.py | 85 ++++-- synapse/push/pusher.py | 4 +- synapse/python_dependencies.py | 2 +- synapse/res/templates/password_reset.html | 9 + synapse/res/templates/password_reset.txt | 7 + synapse/res/templates/password_reset_failure.html | 6 + synapse/res/templates/password_reset_success.html | 6 + synapse/rest/client/v2_alpha/account.py | 243 ++++++++++++++++- synapse/storage/_base.py | 6 +- synapse/storage/prepare_database.py | 2 +- synapse/storage/registration.py | 290 ++++++++++++++++++++- .../schema/delta/55/track_threepid_validations.sql | 31 +++ tests/utils.py | 1 - 20 files changed, 922 insertions(+), 71 deletions(-) create mode 100644 changelog.d/5377.feature create mode 100644 synapse/res/templates/password_reset.html create mode 100644 synapse/res/templates/password_reset.txt create mode 100644 synapse/res/templates/password_reset_failure.html create mode 100644 synapse/res/templates/password_reset_success.html create mode 100644 synapse/storage/schema/delta/55/track_threepid_validations.sql (limited to 'synapse/storage/schema') diff --git a/changelog.d/5377.feature b/changelog.d/5377.feature new file mode 100644 index 0000000000..6aae41847a --- /dev/null +++ b/changelog.d/5377.feature @@ -0,0 +1 @@ +Add ability to perform password reset via email without trusting the identity server. diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml index a2e815ea52..ea73306fb9 100644 --- a/docs/sample_config.yaml +++ b/docs/sample_config.yaml @@ -1065,10 +1065,8 @@ password_config: -# Enable sending emails for notification events or expiry notices -# Defining a custom URL for Riot is only needed if email notifications -# should contain links to a self-hosted installation of Riot; when set -# the "app_name" setting is ignored. +# Enable sending emails for password resets, notification events or +# account expiry notices # # If your SMTP server requires authentication, the optional smtp_user & # smtp_pass variables should be used @@ -1076,22 +1074,64 @@ password_config: #email: # enable_notifs: false # smtp_host: "localhost" -# smtp_port: 25 +# smtp_port: 25 # SSL: 465, STARTTLS: 587 # smtp_user: "exampleusername" # smtp_pass: "examplepassword" # require_transport_security: False # notif_from: "Your Friendly %(app)s Home Server " # app_name: Matrix -# # if template_dir is unset, uses the example templates that are part of -# # the Synapse distribution. +# +# # Enable email notifications by default +# notif_for_new_users: True +# +# # Defining a custom URL for Riot is only needed if email notifications +# # should contain links to a self-hosted installation of Riot; when set +# # the "app_name" setting is ignored +# riot_base_url: "http://localhost/riot" +# +# # Enable sending password reset emails via the configured, trusted +# # identity servers +# # +# # IMPORTANT! This will give a malicious or overtaken identity server +# # the ability to reset passwords for your users! Make absolutely sure +# # that you want to do this! It is strongly recommended that password +# # reset emails be sent by the homeserver instead +# # +# # If this option is set to false and SMTP options have not been +# # configured, resetting user passwords via email will be disabled +# #trust_identity_server_for_password_resets: false +# +# # Configure the time that a validation email or text message code +# # will expire after sending +# # +# # This is currently used for password resets +# #validation_token_lifetime: 1h +# +# # Template directory. All template files should be stored within this +# # directory +# # # #template_dir: res/templates +# +# # Templates for email notifications +# # # notif_template_html: notif_mail.html # notif_template_text: notif_mail.txt -# # Templates for account expiry notices. +# +# # Templates for account expiry notices +# # # expiry_template_html: notice_expiry.html # expiry_template_text: notice_expiry.txt -# notif_for_new_users: True -# riot_base_url: "http://localhost/riot" +# +# # Templates for password reset emails sent by the homeserver +# # +# #password_reset_template_html: password_reset.html +# #password_reset_template_text: password_reset.txt +# +# # Templates for password reset success and failure pages that a user +# # will see after attempting to reset their password +# # +# #password_reset_template_success_html: password_reset_success.html +# #password_reset_template_failure_html: password_reset_failure.html #password_providers: diff --git a/synapse/api/errors.py b/synapse/api/errors.py index e91697049c..66201d6efe 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -339,6 +339,15 @@ class UnsupportedRoomVersionError(SynapseError): ) +class ThreepidValidationError(SynapseError): + """An error raised when there was a problem authorising an event.""" + + def __init__(self, *args, **kwargs): + if "errcode" not in kwargs: + kwargs["errcode"] = Codes.FORBIDDEN + super(ThreepidValidationError, self).__init__(*args, **kwargs) + + class IncompatibleRoomVersionError(SynapseError): """A server is trying to join a room whose version it does not support. diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 1045d28949..df524a23dd 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -176,6 +176,7 @@ class SynapseHomeServer(HomeServer): resources.update({ "/_matrix/client/api/v1": client_resource, + "/_synapse/password_reset": client_resource, "/_matrix/client/r0": client_resource, "/_matrix/client/unstable": client_resource, "/_matrix/client/v2_alpha": client_resource, diff --git a/synapse/config/emailconfig.py b/synapse/config/emailconfig.py index 8400471f40..ae04252906 100644 --- a/synapse/config/emailconfig.py +++ b/synapse/config/emailconfig.py @@ -50,6 +50,11 @@ class EmailConfig(Config): else: self.email_app_name = "Matrix" + # TODO: Rename notif_from to something more generic, or have a separate + # from for password resets, message notifications, etc? + # Currently the email section is a bit bogged down with settings for + # multiple functions. Would be good to split it out into separate + # sections and only put the common ones under email: self.email_notif_from = email_config.get("notif_from", None) if self.email_notif_from is not None: # make sure it's valid @@ -74,7 +79,28 @@ class EmailConfig(Config): "account_validity", {}, ).get("renew_at") - if self.email_enable_notifs or account_validity_renewal_enabled: + email_trust_identity_server_for_password_resets = email_config.get( + "trust_identity_server_for_password_resets", False, + ) + self.email_password_reset_behaviour = ( + "remote" if email_trust_identity_server_for_password_resets else "local" + ) + if self.email_password_reset_behaviour == "local" and email_config == {}: + logger.warn( + "User password resets have been disabled due to lack of email config" + ) + self.email_password_reset_behaviour = "off" + + # Get lifetime of a validation token in milliseconds + self.email_validation_token_lifetime = self.parse_duration( + email_config.get("validation_token_lifetime", "1h") + ) + + if ( + self.email_enable_notifs + or account_validity_renewal_enabled + or self.email_password_reset_behaviour == "local" + ): # make sure we can import the required deps import jinja2 import bleach @@ -82,6 +108,67 @@ class EmailConfig(Config): jinja2 bleach + if self.email_password_reset_behaviour == "local": + required = [ + "smtp_host", + "smtp_port", + "notif_from", + ] + + missing = [] + for k in required: + if k not in email_config: + missing.append(k) + + if (len(missing) > 0): + raise RuntimeError( + "email.password_reset_behaviour is set to 'local' " + "but required keys are missing: %s" % + (", ".join(["email." + k for k in missing]),) + ) + + # Templates for password reset emails + self.email_password_reset_template_html = email_config.get( + "password_reset_template_html", "password_reset.html", + ) + self.email_password_reset_template_text = email_config.get( + "password_reset_template_text", "password_reset.txt", + ) + self.email_password_reset_failure_template = email_config.get( + "password_reset_failure_template", "password_reset_failure.html", + ) + # This template does not support any replaceable variables, so we will + # read it from the disk once during setup + email_password_reset_success_template = email_config.get( + "password_reset_success_template", "password_reset_success.html", + ) + + # Check templates exist + for f in [self.email_password_reset_template_html, + self.email_password_reset_template_text, + self.email_password_reset_failure_template, + email_password_reset_success_template]: + p = os.path.join(self.email_template_dir, f) + if not os.path.isfile(p): + raise ConfigError("Unable to find template file %s" % (p, )) + + # Retrieve content of web templates + filepath = os.path.join( + self.email_template_dir, + email_password_reset_success_template, + ) + self.email_password_reset_success_html_content = self.read_file( + filepath, + "email.password_reset_template_success_html", + ) + + if config.get("public_baseurl") is None: + raise RuntimeError( + "email.password_reset_behaviour is set to 'local' but no " + "public_baseurl is set. This is necessary to generate password " + "reset links" + ) + if self.email_enable_notifs: required = [ "smtp_host", @@ -121,10 +208,6 @@ class EmailConfig(Config): self.email_riot_base_url = email_config.get( "riot_base_url", None ) - else: - self.email_enable_notifs = False - # Not much point setting defaults for the rest: it would be an - # error for them to be used. if account_validity_renewal_enabled: self.email_expiry_template_html = email_config.get( @@ -141,10 +224,8 @@ class EmailConfig(Config): def default_config(self, config_dir_path, server_name, **kwargs): return """ - # Enable sending emails for notification events or expiry notices - # Defining a custom URL for Riot is only needed if email notifications - # should contain links to a self-hosted installation of Riot; when set - # the "app_name" setting is ignored. + # Enable sending emails for password resets, notification events or + # account expiry notices # # If your SMTP server requires authentication, the optional smtp_user & # smtp_pass variables should be used @@ -152,20 +233,62 @@ class EmailConfig(Config): #email: # enable_notifs: false # smtp_host: "localhost" - # smtp_port: 25 + # smtp_port: 25 # SSL: 465, STARTTLS: 587 # smtp_user: "exampleusername" # smtp_pass: "examplepassword" # require_transport_security: False # notif_from: "Your Friendly %(app)s Home Server " # app_name: Matrix - # # if template_dir is unset, uses the example templates that are part of - # # the Synapse distribution. + # + # # Enable email notifications by default + # notif_for_new_users: True + # + # # Defining a custom URL for Riot is only needed if email notifications + # # should contain links to a self-hosted installation of Riot; when set + # # the "app_name" setting is ignored + # riot_base_url: "http://localhost/riot" + # + # # Enable sending password reset emails via the configured, trusted + # # identity servers + # # + # # IMPORTANT! This will give a malicious or overtaken identity server + # # the ability to reset passwords for your users! Make absolutely sure + # # that you want to do this! It is strongly recommended that password + # # reset emails be sent by the homeserver instead + # # + # # If this option is set to false and SMTP options have not been + # # configured, resetting user passwords via email will be disabled + # #trust_identity_server_for_password_resets: false + # + # # Configure the time that a validation email or text message code + # # will expire after sending + # # + # # This is currently used for password resets + # #validation_token_lifetime: 1h + # + # # Template directory. All template files should be stored within this + # # directory + # # # #template_dir: res/templates + # + # # Templates for email notifications + # # # notif_template_html: notif_mail.html # notif_template_text: notif_mail.txt - # # Templates for account expiry notices. + # + # # Templates for account expiry notices + # # # expiry_template_html: notice_expiry.html # expiry_template_text: notice_expiry.txt - # notif_for_new_users: True - # riot_base_url: "http://localhost/riot" + # + # # Templates for password reset emails sent by the homeserver + # # + # #password_reset_template_html: password_reset.html + # #password_reset_template_text: password_reset.txt + # + # # Templates for password reset success and failure pages that a user + # # will see after attempting to reset their password + # # + # #password_reset_template_success_html: password_reset_success.html + # #password_reset_template_failure_html: password_reset_failure.html """ diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index aa5d89a9ac..7f8ddc99c6 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -162,7 +162,7 @@ class AuthHandler(BaseHandler): defer.returnValue(params) @defer.inlineCallbacks - def check_auth(self, flows, clientdict, clientip): + def check_auth(self, flows, clientdict, clientip, password_servlet=False): """ Takes a dictionary sent by the client in the login / registration protocol and handles the User-Interactive Auth flow. @@ -186,6 +186,16 @@ class AuthHandler(BaseHandler): clientip (str): The IP address of the client. + password_servlet (bool): Whether the request originated from + PasswordRestServlet. + XXX: This is a temporary hack to distinguish between checking + for threepid validations locally (in the case of password + resets) and using the identity server (in the case of binding + a 3PID during registration). Once we start using the + homeserver for both tasks, this distinction will no longer be + necessary. + + Returns: defer.Deferred[dict, dict, str]: a deferred tuple of (creds, params, session_id). @@ -241,7 +251,9 @@ class AuthHandler(BaseHandler): if 'type' in authdict: login_type = authdict['type'] try: - result = yield self._check_auth_dict(authdict, clientip) + result = yield self._check_auth_dict( + authdict, clientip, password_servlet=password_servlet, + ) if result: creds[login_type] = result self._save_session(session) @@ -351,7 +363,7 @@ class AuthHandler(BaseHandler): return sess.setdefault('serverdict', {}).get(key, default) @defer.inlineCallbacks - def _check_auth_dict(self, authdict, clientip): + def _check_auth_dict(self, authdict, clientip, password_servlet=False): """Attempt to validate the auth dict provided by a client Args: @@ -369,7 +381,13 @@ class AuthHandler(BaseHandler): login_type = authdict['type'] checker = self.checkers.get(login_type) if checker is not None: - res = yield checker(authdict, clientip) + # XXX: Temporary workaround for having Synapse handle password resets + # See AuthHandler.check_auth for further details + res = yield checker( + authdict, + clientip=clientip, + password_servlet=password_servlet, + ) defer.returnValue(res) # build a v1-login-style dict out of the authdict and fall back to the @@ -383,7 +401,7 @@ class AuthHandler(BaseHandler): defer.returnValue(canonical_id) @defer.inlineCallbacks - def _check_recaptcha(self, authdict, clientip): + def _check_recaptcha(self, authdict, clientip, **kwargs): try: user_response = authdict["response"] except KeyError: @@ -429,20 +447,20 @@ class AuthHandler(BaseHandler): defer.returnValue(True) raise LoginError(401, "", errcode=Codes.UNAUTHORIZED) - def _check_email_identity(self, authdict, _): - return self._check_threepid('email', authdict) + def _check_email_identity(self, authdict, **kwargs): + return self._check_threepid('email', authdict, **kwargs) - def _check_msisdn(self, authdict, _): + def _check_msisdn(self, authdict, **kwargs): return self._check_threepid('msisdn', authdict) - def _check_dummy_auth(self, authdict, _): + def _check_dummy_auth(self, authdict, **kwargs): return defer.succeed(True) - def _check_terms_auth(self, authdict, _): + def _check_terms_auth(self, authdict, **kwargs): return defer.succeed(True) @defer.inlineCallbacks - def _check_threepid(self, medium, authdict): + def _check_threepid(self, medium, authdict, password_servlet=False, **kwargs): if 'threepid_creds' not in authdict: raise LoginError(400, "Missing threepid_creds", Codes.MISSING_PARAM) @@ -451,7 +469,29 @@ class AuthHandler(BaseHandler): identity_handler = self.hs.get_handlers().identity_handler logger.info("Getting validated threepid. threepidcreds: %r", (threepid_creds,)) - threepid = yield identity_handler.threepid_from_creds(threepid_creds) + if ( + not password_servlet + or self.hs.config.email_password_reset_behaviour == "remote" + ): + threepid = yield identity_handler.threepid_from_creds(threepid_creds) + elif self.hs.config.email_password_reset_behaviour == "local": + row = yield self.store.get_threepid_validation_session( + medium, + threepid_creds["client_secret"], + sid=threepid_creds["sid"], + ) + + threepid = { + "medium": row["medium"], + "address": row["address"], + "validated_at": row["validated_at"], + } if row else None + + if row: + # Valid threepid returned, delete from the db + yield self.store.delete_threepid_session(threepid_creds["sid"]) + else: + raise SynapseError(400, "Password resets are not enabled on this homeserver") if not threepid: raise LoginError(401, "", errcode=Codes.UNAUTHORIZED) diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py index 22469486d7..04caf65793 100644 --- a/synapse/handlers/identity.py +++ b/synapse/handlers/identity.py @@ -247,7 +247,14 @@ class IdentityHandler(BaseHandler): defer.returnValue(changed) @defer.inlineCallbacks - def requestEmailToken(self, id_server, email, client_secret, send_attempt, **kwargs): + def requestEmailToken( + self, + id_server, + email, + client_secret, + send_attempt, + next_link=None, + ): if not self._should_trust_id_server(id_server): raise SynapseError( 400, "Untrusted ID server '%s'" % id_server, @@ -259,7 +266,9 @@ class IdentityHandler(BaseHandler): 'client_secret': client_secret, 'send_attempt': send_attempt, } - params.update(kwargs) + + if next_link: + params.update({'next_link': next_link}) try: data = yield self.http_client.post_json_get_json( diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py index c269bcf4a4..4bc9eb7313 100644 --- a/synapse/push/mailer.py +++ b/synapse/push/mailer.py @@ -80,10 +80,10 @@ ALLOWED_ATTRS = { class Mailer(object): - def __init__(self, hs, app_name, notif_template_html, notif_template_text): + def __init__(self, hs, app_name, template_html, template_text): self.hs = hs - self.notif_template_html = notif_template_html - self.notif_template_text = notif_template_text + self.template_html = template_html + self.template_text = template_text self.sendmail = self.hs.get_sendmail() self.store = self.hs.get_datastore() @@ -94,21 +94,48 @@ class Mailer(object): logger.info("Created Mailer for app_name %s" % app_name) @defer.inlineCallbacks - def send_notification_mail(self, app_id, user_id, email_address, - push_actions, reason): - try: - from_string = self.hs.config.email_notif_from % { - "app": self.app_name - } - except TypeError: - from_string = self.hs.config.email_notif_from + def send_password_reset_mail( + self, + email_address, + token, + client_secret, + sid, + ): + """Send an email with a password reset link to a user + + Args: + email_address (str): Email address we're sending the password + reset to + token (str): Unique token generated by the server to verify + password reset email was received + client_secret (str): Unique token generated by the client to + group together multiple email sending attempts + sid (str): The generated session ID + """ + if email.utils.parseaddr(email_address)[1] == '': + raise RuntimeError("Invalid 'to' email address") + + link = ( + self.hs.config.public_baseurl + + "_synapse/password_reset/email/submit_token" + "?token=%s&client_secret=%s&sid=%s" % + (token, client_secret, sid) + ) - raw_from = email.utils.parseaddr(from_string)[1] - raw_to = email.utils.parseaddr(email_address)[1] + template_vars = { + "link": link, + } - if raw_to == '': - raise RuntimeError("Invalid 'to' address") + yield self.send_email( + email_address, + "[%s] Password Reset Email" % self.hs.config.server_name, + template_vars, + ) + @defer.inlineCallbacks + def send_notification_mail(self, app_id, user_id, email_address, + push_actions, reason): + """Send email regarding a user's room notifications""" rooms_in_order = deduped_ordered_list( [pa['room_id'] for pa in push_actions] ) @@ -176,14 +203,36 @@ class Mailer(object): "reason": reason, } - html_text = self.notif_template_html.render(**template_vars) + yield self.send_email( + email_address, + "[%s] %s" % (self.app_name, summary_text), + template_vars, + ) + + @defer.inlineCallbacks + def send_email(self, email_address, subject, template_vars): + """Send an email with the given information and template text""" + try: + from_string = self.hs.config.email_notif_from % { + "app": self.app_name + } + except TypeError: + from_string = self.hs.config.email_notif_from + + raw_from = email.utils.parseaddr(from_string)[1] + raw_to = email.utils.parseaddr(email_address)[1] + + if raw_to == '': + raise RuntimeError("Invalid 'to' address") + + html_text = self.template_html.render(**template_vars) html_part = MIMEText(html_text, "html", "utf8") - plain_text = self.notif_template_text.render(**template_vars) + plain_text = self.template_text.render(**template_vars) text_part = MIMEText(plain_text, "plain", "utf8") multipart_msg = MIMEMultipart('alternative') - multipart_msg['Subject'] = "[%s] %s" % (self.app_name, summary_text) + multipart_msg['Subject'] = subject multipart_msg['From'] = from_string multipart_msg['To'] = email_address multipart_msg['Date'] = email.utils.formatdate() diff --git a/synapse/push/pusher.py b/synapse/push/pusher.py index 14bc7823cf..aff85daeb5 100644 --- a/synapse/push/pusher.py +++ b/synapse/push/pusher.py @@ -70,8 +70,8 @@ class PusherFactory(object): mailer = Mailer( hs=self.hs, app_name=app_name, - notif_template_html=self.notif_template_html, - notif_template_text=self.notif_template_text, + template_html=self.notif_template_html, + template_text=self.notif_template_text, ) self.mailers[app_name] = mailer return EmailPusher(self.hs, pusherdict, mailer) diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index f64baa4d58..c78f2cb15e 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -77,7 +77,7 @@ REQUIREMENTS = [ ] CONDITIONAL_REQUIREMENTS = { - "email.enable_notifs": ["Jinja2>=2.9", "bleach>=1.4.2"], + "email": ["Jinja2>=2.9", "bleach>=1.4.2"], "matrix-synapse-ldap3": ["matrix-synapse-ldap3>=0.1"], # we use execute_batch, which arrived in psycopg 2.7. diff --git a/synapse/res/templates/password_reset.html b/synapse/res/templates/password_reset.html new file mode 100644 index 0000000000..4fa7b36734 --- /dev/null +++ b/synapse/res/templates/password_reset.html @@ -0,0 +1,9 @@ + + +

A password reset request has been received for your Matrix account. If this was you, please click the link below to confirm resetting your password:

+ + {{ link }} + +

If this was not you, please disregard this email and contact your server administrator. Thank you.

+ + diff --git a/synapse/res/templates/password_reset.txt b/synapse/res/templates/password_reset.txt new file mode 100644 index 0000000000..f0deff59a7 --- /dev/null +++ b/synapse/res/templates/password_reset.txt @@ -0,0 +1,7 @@ +A password reset request has been received for your Matrix account. If this +was you, please click the link below to confirm resetting your password: + +{{ link }} + +If this was not you, please disregard this email and contact your server +administrator. Thank you. diff --git a/synapse/res/templates/password_reset_failure.html b/synapse/res/templates/password_reset_failure.html new file mode 100644 index 0000000000..0b132cf8db --- /dev/null +++ b/synapse/res/templates/password_reset_failure.html @@ -0,0 +1,6 @@ + + + +

{{ failure_reason }}. Your password has not been reset.

+ + diff --git a/synapse/res/templates/password_reset_success.html b/synapse/res/templates/password_reset_success.html new file mode 100644 index 0000000000..7b6fa5e6f0 --- /dev/null +++ b/synapse/res/templates/password_reset_success.html @@ -0,0 +1,6 @@ + + + +

Your password was successfully reset. You may now close this window.

+ + diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py index ca35dc3c83..e4c63b69b9 100644 --- a/synapse/rest/client/v2_alpha/account.py +++ b/synapse/rest/client/v2_alpha/account.py @@ -15,19 +15,25 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +import re from six.moves import http_client +import jinja2 + from twisted.internet import defer from synapse.api.constants import LoginType -from synapse.api.errors import Codes, SynapseError +from synapse.api.errors import Codes, SynapseError, ThreepidValidationError +from synapse.http.server import finish_request from synapse.http.servlet import ( RestServlet, assert_params_in_dict, parse_json_object_from_request, + parse_string, ) from synapse.util.msisdn import phone_number_to_msisdn +from synapse.util.stringutils import random_string from synapse.util.threepids import check_3pid_allowed from ._base import client_patterns, interactive_auth_handler @@ -41,17 +47,42 @@ class EmailPasswordRequestTokenRestServlet(RestServlet): def __init__(self, hs): super(EmailPasswordRequestTokenRestServlet, self).__init__() self.hs = hs + self.datastore = hs.get_datastore() + self.config = hs.config self.identity_handler = hs.get_handlers().identity_handler + if self.config.email_password_reset_behaviour == "local": + from synapse.push.mailer import Mailer, load_jinja2_templates + templates = load_jinja2_templates( + config=hs.config, + template_html_name=hs.config.email_password_reset_template_html, + template_text_name=hs.config.email_password_reset_template_text, + ) + self.mailer = Mailer( + hs=self.hs, + app_name=self.config.email_app_name, + template_html=templates[0], + template_text=templates[1], + ) + @defer.inlineCallbacks def on_POST(self, request): + if self.config.email_password_reset_behaviour == "off": + raise SynapseError(400, "Password resets have been disabled on this server") + body = parse_json_object_from_request(request) assert_params_in_dict(body, [ - 'id_server', 'client_secret', 'email', 'send_attempt' + 'client_secret', 'email', 'send_attempt' ]) - if not check_3pid_allowed(self.hs, "email", body['email']): + # Extract params from body + client_secret = body["client_secret"] + email = body["email"] + send_attempt = body["send_attempt"] + next_link = body.get("next_link") # Optional param + + if not check_3pid_allowed(self.hs, "email", email): raise SynapseError( 403, "Your email domain is not authorized on this server", @@ -59,15 +90,100 @@ class EmailPasswordRequestTokenRestServlet(RestServlet): ) existingUid = yield self.hs.get_datastore().get_user_id_by_threepid( - 'email', body['email'] + 'email', email, ) if existingUid is None: raise SynapseError(400, "Email not found", Codes.THREEPID_NOT_FOUND) - ret = yield self.identity_handler.requestEmailToken(**body) + if self.config.email_password_reset_behaviour == "remote": + if 'id_server' not in body: + raise SynapseError(400, "Missing 'id_server' param in body") + + # Have the identity server handle the password reset flow + ret = yield self.identity_handler.requestEmailToken( + body["id_server"], email, client_secret, send_attempt, next_link, + ) + else: + # Send password reset emails from Synapse + sid = yield self.send_password_reset( + email, client_secret, send_attempt, next_link, + ) + + # Wrap the session id in a JSON object + ret = {"sid": sid} + defer.returnValue((200, ret)) + @defer.inlineCallbacks + def send_password_reset( + self, + email, + client_secret, + send_attempt, + next_link=None, + ): + """Send a password reset email + + Args: + email (str): The user's email address + client_secret (str): The provided client secret + send_attempt (int): Which send attempt this is + + Returns: + The new session_id upon success + + Raises: + SynapseError is an error occurred when sending the email + """ + # Check that this email/client_secret/send_attempt combo is new or + # greater than what we've seen previously + session = yield self.datastore.get_threepid_validation_session( + "email", client_secret, address=email, validated=False, + ) + + # Check to see if a session already exists and that it is not yet + # marked as validated + if session and session.get("validated_at") is None: + session_id = session['session_id'] + last_send_attempt = session['last_send_attempt'] + + # Check that the send_attempt is higher than previous attempts + if send_attempt <= last_send_attempt: + # If not, just return a success without sending an email + defer.returnValue(session_id) + else: + # An non-validated session does not exist yet. + # Generate a session id + session_id = random_string(16) + + # Generate a new validation token + token = random_string(32) + + # Send the mail with the link containing the token, client_secret + # and session_id + try: + yield self.mailer.send_password_reset_mail( + email, token, client_secret, session_id, + ) + except Exception: + logger.exception( + "Error sending a password reset email to %s", email, + ) + raise SynapseError( + 500, "An error was encountered when sending the password reset email" + ) + + token_expires = (self.hs.clock.time_msec() + + self.config.email_validation_token_lifetime) + + yield self.datastore.start_or_continue_validation_session( + "email", email, session_id, client_secret, send_attempt, + next_link, token, token_expires, + ) + + defer.returnValue(session_id) + class MsisdnPasswordRequestTokenRestServlet(RestServlet): PATTERNS = client_patterns("/account/password/msisdn/requestToken$") @@ -80,6 +196,9 @@ class MsisdnPasswordRequestTokenRestServlet(RestServlet): @defer.inlineCallbacks def on_POST(self, request): + if not self.config.email_password_reset_behaviour == "off": + raise SynapseError(400, "Password resets have been disabled on this server") + body = parse_json_object_from_request(request) assert_params_in_dict(body, [ @@ -107,6 +226,118 @@ class MsisdnPasswordRequestTokenRestServlet(RestServlet): defer.returnValue((200, ret)) +class PasswordResetSubmitTokenServlet(RestServlet): + """Handles 3PID validation token submission""" + PATTERNS = [ + re.compile("^/_synapse/password_reset/(?P[^/]*)/submit_token/*$"), + ] + + def __init__(self, hs): + """ + Args: + hs (synapse.server.HomeServer): server + """ + super(PasswordResetSubmitTokenServlet, self).__init__() + self.hs = hs + self.auth = hs.get_auth() + self.config = hs.config + self.clock = hs.get_clock() + self.datastore = hs.get_datastore() + + @defer.inlineCallbacks + def on_GET(self, request, medium): + if medium != "email": + raise SynapseError( + 400, + "This medium is currently not supported for password resets", + ) + + sid = parse_string(request, "sid") + client_secret = parse_string(request, "client_secret") + token = parse_string(request, "token") + + # Attempt to validate a 3PID sesssion + try: + # Mark the session as valid + next_link = yield self.datastore.validate_threepid_session( + sid, + client_secret, + token, + self.clock.time_msec(), + ) + + # Perform a 302 redirect if next_link is set + if next_link: + if next_link.startswith("file:///"): + logger.warn( + "Not redirecting to next_link as it is a local file: address" + ) + else: + request.setResponseCode(302) + request.setHeader("Location", next_link) + finish_request(request) + defer.returnValue(None) + + # Otherwise show the success template + html = self.config.email_password_reset_success_html_content + request.setResponseCode(200) + except ThreepidValidationError as e: + # Show a failure page with a reason + html = self.load_jinja2_template( + self.config.email_template_dir, + self.config.email_password_reset_failure_template, + template_vars={ + "failure_reason": e.msg, + } + ) + request.setResponseCode(e.code) + + request.write(html.encode('utf-8')) + finish_request(request) + defer.returnValue(None) + + def load_jinja2_template(self, template_dir, template_filename, template_vars): + """Loads a jinja2 template with variables to insert + + Args: + template_dir (str): The directory where templates are stored + template_filename (str): The name of the template in the template_dir + template_vars (Dict): Dictionary of keys in the template + alongside their values to insert + + Returns: + str containing the contents of the rendered template + """ + loader = jinja2.FileSystemLoader(template_dir) + env = jinja2.Environment(loader=loader) + + template = env.get_template(template_filename) + return template.render(**template_vars) + + @defer.inlineCallbacks + def on_POST(self, request, medium): + if medium != "email": + raise SynapseError( + 400, + "This medium is currently not supported for password resets", + ) + + body = parse_json_object_from_request(request) + assert_params_in_dict(body, [ + 'sid', 'client_secret', 'token', + ]) + + valid, _ = yield self.datastore.validate_threepid_validation_token( + body['sid'], + body['client_secret'], + body['token'], + self.clock.time_msec(), + ) + response_code = 200 if valid else 400 + + defer.returnValue((response_code, {"success": valid})) + + class PasswordRestServlet(RestServlet): PATTERNS = client_patterns("/account/password$") @@ -144,6 +375,7 @@ class PasswordRestServlet(RestServlet): result, params, _ = yield self.auth_handler.check_auth( [[LoginType.EMAIL_IDENTITY], [LoginType.MSISDN]], body, self.hs.get_ip_from_request(request), + password_servlet=True, ) if LoginType.EMAIL_IDENTITY in result: @@ -417,6 +649,7 @@ class WhoamiRestServlet(RestServlet): def register_servlets(hs, http_server): EmailPasswordRequestTokenRestServlet(hs).register(http_server) MsisdnPasswordRequestTokenRestServlet(hs).register(http_server) + PasswordResetSubmitTokenServlet(hs).register(http_server) PasswordRestServlet(hs).register(http_server) DeactivateAccountRestServlet(hs).register(http_server) EmailThreepidRequestTokenRestServlet(hs).register(http_server) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 52891bb9eb..ae891aa332 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -588,6 +588,10 @@ class SQLBaseStore(object): Args: table : string giving the table name values : dict of new column names and values for them + or_ignore : bool stating whether an exception should be raised + when a conflicting row already exists. If True, False will be + returned by the function instead + desc : string giving a description of the transaction Returns: bool: Whether the row was inserted or not. Only useful when @@ -1228,8 +1232,8 @@ class SQLBaseStore(object): ) txn.execute(select_sql, list(keyvalues.values())) - row = txn.fetchone() + if not row: if allow_none: return None diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index c1711bc8bd..23a4baa484 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -25,7 +25,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 54 +SCHEMA_VERSION = 55 dir_path = os.path.abspath(os.path.dirname(__file__)) diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 4cf159ba81..9b41cbd757 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -17,17 +17,20 @@ import re +from six import iterkeys from six.moves import range from twisted.internet import defer from synapse.api.constants import UserTypes -from synapse.api.errors import Codes, StoreError +from synapse.api.errors import Codes, StoreError, ThreepidValidationError from synapse.storage import background_updates from synapse.storage._base import SQLBaseStore from synapse.types import UserID from synapse.util.caches.descriptors import cached, cachedInlineCallbacks +THIRTY_MINUTES_IN_MS = 30 * 60 * 1000 + class RegistrationWorkerStore(SQLBaseStore): def __init__(self, db_conn, hs): @@ -422,7 +425,7 @@ class RegistrationWorkerStore(SQLBaseStore): defer.returnValue(None) @defer.inlineCallbacks - def get_user_id_by_threepid(self, medium, address): + def get_user_id_by_threepid(self, medium, address, require_verified=False): """Returns user id from threepid Args: @@ -595,6 +598,11 @@ class RegistrationStore( "user_threepids_grandfather", self._bg_user_threepids_grandfather, ) + # Create a background job for culling expired 3PID validity tokens + hs.get_clock().looping_call( + self.cull_expired_threepid_validation_tokens, THIRTY_MINUTES_IN_MS, + ) + @defer.inlineCallbacks def add_access_token_to_user(self, user_id, token, device_id=None): """Adds an access token for the given user. @@ -963,7 +971,6 @@ class RegistrationStore( We do this by grandfathering in existing user threepids assuming that they used one of the server configured trusted identity servers. """ - id_servers = set(self.config.trusted_third_party_id_servers) def _bg_user_threepids_grandfather_txn(txn): @@ -984,3 +991,280 @@ class RegistrationStore( yield self._end_background_update("user_threepids_grandfather") defer.returnValue(1) + + def get_threepid_validation_session( + self, + medium, + client_secret, + address=None, + sid=None, + validated=None, + ): + """Gets a session_id and last_send_attempt (if available) for a + client_secret/medium/(address|session_id) combo + + Args: + medium (str|None): The medium of the 3PID + address (str|None): The address of the 3PID + sid (str|None): The ID of the validation session + client_secret (str|None): A unique string provided by the client to + help identify this validation attempt + validated (bool|None): Whether sessions should be filtered by + whether they have been validated already or not. None to + perform no filtering + + Returns: + deferred {str, int}|None: A dict containing the + latest session_id and send_attempt count for this 3PID. + Otherwise None if there hasn't been a previous attempt + """ + keyvalues = { + "medium": medium, + "client_secret": client_secret, + } + if address: + keyvalues["address"] = address + if sid: + keyvalues["session_id"] = sid + + assert(address or sid) + + def get_threepid_validation_session_txn(txn): + sql = """ + SELECT address, session_id, medium, client_secret, + last_send_attempt, validated_at + FROM threepid_validation_session WHERE %s + """ % (" AND ".join("%s = ?" % k for k in iterkeys(keyvalues)),) + + if validated is not None: + sql += " AND validated_at IS " + ("NOT NULL" if validated else "NULL") + + sql += " LIMIT 1" + + txn.execute(sql, list(keyvalues.values())) + rows = self.cursor_to_dict(txn) + if not rows: + return None + + return rows[0] + + return self.runInteraction( + "get_threepid_validation_session", + get_threepid_validation_session_txn, + ) + + def validate_threepid_session( + self, + session_id, + client_secret, + token, + current_ts, + ): + """Attempt to validate a threepid session using a token + + Args: + session_id (str): The id of a validation session + client_secret (str): A unique string provided by the client to + help identify this validation attempt + token (str): A validation token + current_ts (int): The current unix time in milliseconds. Used for + checking token expiry status + + Returns: + deferred str|None: A str representing a link to redirect the user + to if there is one. + """ + # Insert everything into a transaction in order to run atomically + def validate_threepid_session_txn(txn): + row = self._simple_select_one_txn( + txn, + table="threepid_validation_session", + keyvalues={"session_id": session_id}, + retcols=["client_secret", "validated_at"], + allow_none=True, + ) + + if not row: + raise ThreepidValidationError(400, "Unknown session_id") + retrieved_client_secret = row["client_secret"] + validated_at = row["validated_at"] + + if retrieved_client_secret != client_secret: + raise ThreepidValidationError( + 400, "This client_secret does not match the provided session_id", + ) + + row = self._simple_select_one_txn( + txn, + table="threepid_validation_token", + keyvalues={"session_id": session_id, "token": token}, + retcols=["expires", "next_link"], + allow_none=True, + ) + + if not row: + raise ThreepidValidationError( + 400, "Validation token not found or has expired", + ) + expires = row["expires"] + next_link = row["next_link"] + + # If the session is already validated, no need to revalidate + if validated_at: + return next_link + + if expires <= current_ts: + raise ThreepidValidationError( + 400, "This token has expired. Please request a new one", + ) + + # Looks good. Validate the session + self._simple_update_txn( + txn, + table="threepid_validation_session", + keyvalues={"session_id": session_id}, + updatevalues={"validated_at": self.clock.time_msec()}, + ) + + return next_link + + # Return next_link if it exists + return self.runInteraction( + "validate_threepid_session_txn", + validate_threepid_session_txn, + ) + + def upsert_threepid_validation_session( + self, + medium, + address, + client_secret, + send_attempt, + session_id, + validated_at=None, + ): + """Upsert a threepid validation session + Args: + medium (str): The medium of the 3PID + address (str): The address of the 3PID + client_secret (str): A unique string provided by the client to + help identify this validation attempt + send_attempt (int): The latest send_attempt on this session + session_id (str): The id of this validation session + validated_at (int|None): The unix timestamp in milliseconds of + when the session was marked as valid + """ + insertion_values = { + "medium": medium, + "address": address, + "client_secret": client_secret, + } + + if validated_at: + insertion_values["validated_at"] = validated_at + + return self._simple_upsert( + table="threepid_validation_session", + keyvalues={"session_id": session_id}, + values={"last_send_attempt": send_attempt}, + insertion_values=insertion_values, + desc="upsert_threepid_validation_session", + ) + + def start_or_continue_validation_session( + self, + medium, + address, + session_id, + client_secret, + send_attempt, + next_link, + token, + token_expires, + ): + """Creates a new threepid validation session if it does not already + exist and associates a new validation token with it + + Args: + medium (str): The medium of the 3PID + address (str): The address of the 3PID + session_id (str): The id of this validation session + client_secret (str): A unique string provided by the client to + help identify this validation attempt + send_attempt (int): The latest send_attempt on this session + next_link (str|None): The link to redirect the user to upon + successful validation + token (str): The validation token + token_expires (int): The timestamp for which after the token + will no longer be valid + """ + def start_or_continue_validation_session_txn(txn): + # Create or update a validation session + self._simple_upsert_txn( + txn, + table="threepid_validation_session", + keyvalues={"session_id": session_id}, + values={"last_send_attempt": send_attempt}, + insertion_values={ + "medium": medium, + "address": address, + "client_secret": client_secret, + }, + ) + + # Create a new validation token with this session ID + self._simple_insert_txn( + txn, + table="threepid_validation_token", + values={ + "session_id": session_id, + "token": token, + "next_link": next_link, + "expires": token_expires, + }, + ) + + return self.runInteraction( + "start_or_continue_validation_session", + start_or_continue_validation_session_txn, + ) + + def cull_expired_threepid_validation_tokens(self): + """Remove threepid validation tokens with expiry dates that have passed""" + def cull_expired_threepid_validation_tokens_txn(txn, ts): + sql = """ + DELETE FROM threepid_validation_token WHERE + expires < ? + """ + return txn.execute(sql, (ts,)) + + return self.runInteraction( + "cull_expired_threepid_validation_tokens", + cull_expired_threepid_validation_tokens_txn, + self.clock.time_msec(), + ) + + def delete_threepid_session(self, session_id): + """Removes a threepid validation session from the database. This can + be done after validation has been performed and whatever action was + waiting on it has been carried out + + Args: + session_id (str): The ID of the session to delete + """ + def delete_threepid_session_txn(txn): + self._simple_delete_txn( + txn, + table="threepid_validation_token", + keyvalues={"session_id": session_id}, + ) + self._simple_delete_txn( + txn, + table="threepid_validation_session", + keyvalues={"session_id": session_id}, + ) + + return self.runInteraction( + "delete_threepid_session", + delete_threepid_session_txn, + ) diff --git a/synapse/storage/schema/delta/55/track_threepid_validations.sql b/synapse/storage/schema/delta/55/track_threepid_validations.sql new file mode 100644 index 0000000000..a8eced2e0a --- /dev/null +++ b/synapse/storage/schema/delta/55/track_threepid_validations.sql @@ -0,0 +1,31 @@ +/* Copyright 2019 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +CREATE TABLE IF NOT EXISTS threepid_validation_session ( + session_id TEXT PRIMARY KEY, + medium TEXT NOT NULL, + address TEXT NOT NULL, + client_secret TEXT NOT NULL, + last_send_attempt BIGINT NOT NULL, + validated_at BIGINT +); + +CREATE TABLE IF NOT EXISTS threepid_validation_token ( + token TEXT PRIMARY KEY, + session_id TEXT NOT NULL, + next_link TEXT, + expires BIGINT NOT NULL +); + +CREATE INDEX threepid_validation_token_session_id ON threepid_validation_token(session_id); diff --git a/tests/utils.py b/tests/utils.py index 200c1ceabe..b2817cf22c 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -131,7 +131,6 @@ def default_config(name, parse=False): "password_providers": [], "worker_replication_url": "", "worker_app": None, - "email_enable_notifs": False, "block_non_admin_invites": False, "federation_domain_whitelist": None, "filter_timeline_limit": 5000, -- cgit 1.5.1 From ed872db8df61f5ee019bdfdb68e1e83e9e2b7298 Mon Sep 17 00:00:00 2001 From: "Amber H. Brown" Date: Fri, 7 Jun 2019 02:53:47 +1000 Subject: fix maybe --- .../schema/full_schemas/54/stream_positions.sql | 22 ++-------------------- 1 file changed, 2 insertions(+), 20 deletions(-) (limited to 'synapse/storage/schema') diff --git a/synapse/storage/schema/full_schemas/54/stream_positions.sql b/synapse/storage/schema/full_schemas/54/stream_positions.sql index 575ab6b354..c265fd20e2 100644 --- a/synapse/storage/schema/full_schemas/54/stream_positions.sql +++ b/synapse/storage/schema/full_schemas/54/stream_positions.sql @@ -2,24 +2,6 @@ INSERT INTO appservice_stream_position (stream_ordering) SELECT COALESCE(MAX(stream_ordering), 0) FROM events; INSERT INTO federation_stream_position (type, stream_id) VALUES ('federation', -1); INSERT INTO federation_stream_position (type, stream_id) SELECT 'events', coalesce(max(stream_ordering), -1) FROM events; -INSERT INTO user_directory_stream_pos (stream_id) VALUES (null); -INSERT INTO stats_stream_pos (stream_id) VALUES (null); +INSERT INTO user_directory_stream_pos (stream_id) VALUES (0); +INSERT INTO stats_stream_pos (stream_id) VALUES (0); INSERT INTO event_push_summary_stream_ordering (stream_ordering) VALUES (0); - ---- User dir population - --- Set up staging tables -INSERT INTO background_updates (update_name, progress_json) VALUES - ('populate_user_directory_createtables', '{}'); - --- Run through each room and update the user directory according to who is in it -INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES - ('populate_user_directory_process_rooms', '{}', 'populate_user_directory_createtables'); - --- Insert all users, if search_all_users is on -INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES - ('populate_user_directory_process_users', '{}', 'populate_user_directory_process_rooms'); - --- Clean up staging tables -INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES - ('populate_user_directory_cleanup', '{}', 'populate_user_directory_process_users'); -- cgit 1.5.1 From d0530382eeff053547304532167c0e4654af172c Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Fri, 14 Jun 2019 13:18:24 +0100 Subject: Track deactivated accounts in the database (#5378) --- changelog.d/5378.misc | 1 + synapse/handlers/deactivate_account.py | 4 + synapse/storage/registration.py | 114 +++++++++++++++++++++ .../schema/delta/55/users_alter_deactivated.sql | 19 ++++ tests/rest/client/v2_alpha/test_account.py | 45 ++++++++ 5 files changed, 183 insertions(+) create mode 100644 changelog.d/5378.misc create mode 100644 synapse/storage/schema/delta/55/users_alter_deactivated.sql (limited to 'synapse/storage/schema') diff --git a/changelog.d/5378.misc b/changelog.d/5378.misc new file mode 100644 index 0000000000..365e49d634 --- /dev/null +++ b/changelog.d/5378.misc @@ -0,0 +1 @@ +Track deactivated accounts in the database. diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py index 6a91f7698e..b29089d82c 100644 --- a/synapse/handlers/deactivate_account.py +++ b/synapse/handlers/deactivate_account.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2017, 2018 New Vector Ltd +# Copyright 2019 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -114,6 +115,9 @@ class DeactivateAccountHandler(BaseHandler): # parts users from rooms (if it isn't already running) self._start_user_parting() + # Mark the user as deactivated. + yield self.store.set_user_deactivated_status(user_id, True) + defer.returnValue(identity_server_supports_unbinding) def _start_user_parting(self): diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 1dd1182e82..4c5751b57f 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -15,6 +15,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging import re from six import iterkeys @@ -31,6 +32,8 @@ from synapse.util.caches.descriptors import cached, cachedInlineCallbacks THIRTY_MINUTES_IN_MS = 30 * 60 * 1000 +logger = logging.getLogger(__name__) + class RegistrationWorkerStore(SQLBaseStore): def __init__(self, db_conn, hs): @@ -598,11 +601,75 @@ class RegistrationStore( "user_threepids_grandfather", self._bg_user_threepids_grandfather, ) + self.register_background_update_handler( + "users_set_deactivated_flag", self._backgroud_update_set_deactivated_flag, + ) + # Create a background job for culling expired 3PID validity tokens hs.get_clock().looping_call( self.cull_expired_threepid_validation_tokens, THIRTY_MINUTES_IN_MS, ) + @defer.inlineCallbacks + def _backgroud_update_set_deactivated_flag(self, progress, batch_size): + """Retrieves a list of all deactivated users and sets the 'deactivated' flag to 1 + for each of them. + """ + + last_user = progress.get("user_id", "") + + def _backgroud_update_set_deactivated_flag_txn(txn): + txn.execute( + """ + SELECT + users.name, + COUNT(access_tokens.token) AS count_tokens, + COUNT(user_threepids.address) AS count_threepids + FROM users + LEFT JOIN access_tokens ON (access_tokens.user_id = users.name) + LEFT JOIN user_threepids ON (user_threepids.user_id = users.name) + WHERE password_hash IS NULL OR password_hash = '' + AND users.name > ? + GROUP BY users.name + ORDER BY users.name ASC + LIMIT ?; + """, + (last_user, batch_size), + ) + + rows = self.cursor_to_dict(txn) + + if not rows: + return True + + rows_processed_nb = 0 + + for user in rows: + if not user["count_tokens"] and not user["count_threepids"]: + self.set_user_deactivated_status_txn(txn, user["user_id"], True) + rows_processed_nb += 1 + + logger.info("Marked %d rows as deactivated", rows_processed_nb) + + self._background_update_progress_txn( + txn, "users_set_deactivated_flag", {"user_id": rows[-1]["user_id"]} + ) + + if batch_size > len(rows): + return True + else: + return False + + end = yield self.runInteraction( + "users_set_deactivated_flag", + _backgroud_update_set_deactivated_flag_txn, + ) + + if end: + yield self._end_background_update("users_set_deactivated_flag") + + defer.returnValue(batch_size) + @defer.inlineCallbacks def add_access_token_to_user(self, user_id, token, device_id=None): """Adds an access token for the given user. @@ -1268,3 +1335,50 @@ class RegistrationStore( "delete_threepid_session", delete_threepid_session_txn, ) + + def set_user_deactivated_status_txn(self, txn, user_id, deactivated): + self._simple_update_one_txn( + txn=txn, + table="users", + keyvalues={"name": user_id}, + updatevalues={"deactivated": 1 if deactivated else 0}, + ) + self._invalidate_cache_and_stream( + txn, self.get_user_deactivated_status, (user_id,), + ) + + @defer.inlineCallbacks + def set_user_deactivated_status(self, user_id, deactivated): + """Set the `deactivated` property for the provided user to the provided value. + + Args: + user_id (str): The ID of the user to set the status for. + deactivated (bool): The value to set for `deactivated`. + """ + + yield self.runInteraction( + "set_user_deactivated_status", + self.set_user_deactivated_status_txn, + user_id, deactivated, + ) + + @cachedInlineCallbacks() + def get_user_deactivated_status(self, user_id): + """Retrieve the value for the `deactivated` property for the provided user. + + Args: + user_id (str): The ID of the user to retrieve the status for. + + Returns: + defer.Deferred(bool): The requested value. + """ + + res = yield self._simple_select_one_onecol( + table="users", + keyvalues={"name": user_id}, + retcol="deactivated", + desc="get_user_deactivated_status", + ) + + # Convert the integer into a boolean. + defer.returnValue(res == 1) diff --git a/synapse/storage/schema/delta/55/users_alter_deactivated.sql b/synapse/storage/schema/delta/55/users_alter_deactivated.sql new file mode 100644 index 0000000000..dabdde489b --- /dev/null +++ b/synapse/storage/schema/delta/55/users_alter_deactivated.sql @@ -0,0 +1,19 @@ +/* Copyright 2019 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE users ADD deactivated SMALLINT DEFAULT 0 NOT NULL; + +INSERT INTO background_updates (update_name, progress_json) VALUES + ('users_set_deactivated_flag', '{}'); diff --git a/tests/rest/client/v2_alpha/test_account.py b/tests/rest/client/v2_alpha/test_account.py index 0d1c0868ce..a60a4a3b87 100644 --- a/tests/rest/client/v2_alpha/test_account.py +++ b/tests/rest/client/v2_alpha/test_account.py @@ -15,6 +15,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import json import os import re from email.parser import Parser @@ -239,3 +240,47 @@ class PasswordResetTestCase(unittest.HomeserverTestCase): ) self.render(request) self.assertEquals(expected_code, channel.code, channel.result) + + +class DeactivateTestCase(unittest.HomeserverTestCase): + + servlets = [ + synapse.rest.admin.register_servlets_for_client_rest_resource, + login.register_servlets, + account.register_servlets, + ] + + def make_homeserver(self, reactor, clock): + hs = self.setup_test_homeserver() + return hs + + def test_deactivate_account(self): + user_id = self.register_user("kermit", "test") + tok = self.login("kermit", "test") + + request_data = json.dumps({ + "auth": { + "type": "m.login.password", + "user": user_id, + "password": "test", + }, + "erase": False, + }) + request, channel = self.make_request( + "POST", + "account/deactivate", + request_data, + access_token=tok, + ) + self.render(request) + self.assertEqual(request.code, 200) + + store = self.hs.get_datastore() + + # Check that the user has been marked as deactivated. + self.assertTrue(self.get_success(store.get_user_deactivated_status(user_id))) + + # Check that this access token has been invalidated. + request, channel = self.make_request("GET", "account/whoami") + self.render(request) + self.assertEqual(request.code, 401) -- cgit 1.5.1