diff options
-rw-r--r-- | .buildkite/pipeline.yml | 12 | ||||
-rw-r--r-- | changelog.d/5678.removal | 1 | ||||
-rw-r--r-- | changelog.d/5695.misc | 1 | ||||
-rw-r--r-- | changelog.d/5706.misc | 1 | ||||
-rw-r--r-- | changelog.d/5713.misc | 1 | ||||
-rw-r--r-- | changelog.d/5715.misc | 1 | ||||
-rw-r--r-- | changelog.d/5717.misc | 1 | ||||
-rw-r--r-- | changelog.d/5719.misc | 1 | ||||
-rw-r--r-- | changelog.d/5720.misc | 1 | ||||
-rw-r--r-- | docs/code_style.rst | 124 | ||||
-rw-r--r-- | synapse/config/logger.py | 81 | ||||
-rw-r--r-- | synapse/config/workers.py | 6 | ||||
-rw-r--r-- | synapse/logging/opentracing.py | 3 | ||||
-rw-r--r-- | synapse/static/index.html | 4 | ||||
-rw-r--r-- | synapse/storage/_base.py | 18 | ||||
-rw-r--r-- | synapse/storage/event_push_actions.py | 2 | ||||
-rw-r--r-- | synapse/storage/events.py | 26 | ||||
-rw-r--r-- | synapse/storage/prepare_database.py | 2 | ||||
-rw-r--r-- | synapse/storage/roommember.py | 154 | ||||
-rw-r--r-- | synapse/storage/schema/delta/56/current_state_events_membership.sql | 25 | ||||
-rw-r--r-- | synapse/storage/state.py | 8 | ||||
-rw-r--r-- | synapse/storage/transactions.py | 20 | ||||
-rw-r--r-- | synapse/storage/user_directory.py | 8 |
23 files changed, 346 insertions, 155 deletions
diff --git a/.buildkite/pipeline.yml b/.buildkite/pipeline.yml index 7f42fad909..d5e5aeec6b 100644 --- a/.buildkite/pipeline.yml +++ b/.buildkite/pipeline.yml @@ -117,8 +117,10 @@ steps: limit: 2 - label: ":python: 3.5 / :postgres: 9.5" + agents: + queue: "medium" env: - TRIAL_FLAGS: "-j 4" + TRIAL_FLAGS: "-j 8" command: - "bash -c 'python -m pip install tox && python -m tox -e py35-postgres,codecov'" plugins: @@ -134,8 +136,10 @@ steps: limit: 2 - label: ":python: 3.7 / :postgres: 9.5" + agents: + queue: "medium" env: - TRIAL_FLAGS: "-j 4" + TRIAL_FLAGS: "-j 8" command: - "bash -c 'python -m pip install tox && python -m tox -e py37-postgres,codecov'" plugins: @@ -151,8 +155,10 @@ steps: limit: 2 - label: ":python: 3.7 / :postgres: 11" + agents: + queue: "medium" env: - TRIAL_FLAGS: "-j 4" + TRIAL_FLAGS: "-j 8" command: - "bash -c 'python -m pip install tox && python -m tox -e py37-postgres,codecov'" plugins: diff --git a/changelog.d/5678.removal b/changelog.d/5678.removal new file mode 100644 index 0000000000..085b84fda6 --- /dev/null +++ b/changelog.d/5678.removal @@ -0,0 +1 @@ +Synapse now no longer accepts the `-v`/`--verbose`, `-f`/`--log-file`, or `--log-config` command line flags, and removes the deprecated `verbose` and `log_file` configuration file options. Users of these options should migrate their options into the dedicated log configuration. diff --git a/changelog.d/5695.misc b/changelog.d/5695.misc new file mode 100644 index 0000000000..4741d32e25 --- /dev/null +++ b/changelog.d/5695.misc @@ -0,0 +1 @@ +Add precautionary measures to prevent future abuse of `window.opener` in default welcome page. diff --git a/changelog.d/5706.misc b/changelog.d/5706.misc new file mode 100644 index 0000000000..5e15dfd5fa --- /dev/null +++ b/changelog.d/5706.misc @@ -0,0 +1 @@ +Reduce database IO usage by optimising queries for current membership. diff --git a/changelog.d/5713.misc b/changelog.d/5713.misc new file mode 100644 index 0000000000..01ea1cf8d7 --- /dev/null +++ b/changelog.d/5713.misc @@ -0,0 +1 @@ +Improve caching when fetching `get_filtered_current_state_ids`. diff --git a/changelog.d/5715.misc b/changelog.d/5715.misc new file mode 100644 index 0000000000..a77366e0c0 --- /dev/null +++ b/changelog.d/5715.misc @@ -0,0 +1 @@ +Don't accept opentracing data from clients. diff --git a/changelog.d/5717.misc b/changelog.d/5717.misc new file mode 100644 index 0000000000..07dc3bca94 --- /dev/null +++ b/changelog.d/5717.misc @@ -0,0 +1 @@ +Speed up PostgreSQL unit tests in CI. diff --git a/changelog.d/5719.misc b/changelog.d/5719.misc new file mode 100644 index 0000000000..6d5294724c --- /dev/null +++ b/changelog.d/5719.misc @@ -0,0 +1 @@ +Update the coding style document. diff --git a/changelog.d/5720.misc b/changelog.d/5720.misc new file mode 100644 index 0000000000..590f64f19d --- /dev/null +++ b/changelog.d/5720.misc @@ -0,0 +1 @@ +Improve database query performance when recording retry intervals for remote hosts. diff --git a/docs/code_style.rst b/docs/code_style.rst index e3ca626bfd..39ac4ebedc 100644 --- a/docs/code_style.rst +++ b/docs/code_style.rst @@ -1,4 +1,8 @@ -# Code Style +Code Style +========== + +Formatting tools +---------------- The Synapse codebase uses a number of code formatting tools in order to quickly and automatically check for formatting (and sometimes logical) errors @@ -6,20 +10,20 @@ in code. The necessary tools are detailed below. -## Formatting tools +- **black** -The Synapse codebase uses [black](https://pypi.org/project/black/) as an -opinionated code formatter, ensuring all comitted code is properly -formatted. + The Synapse codebase uses `black <https://pypi.org/project/black/>`_ as an + opinionated code formatter, ensuring all comitted code is properly + formatted. -First install ``black`` with:: + First install ``black`` with:: - pip install --upgrade black + pip install --upgrade black -Have ``black`` auto-format your code (it shouldn't change any -functionality) with:: + Have ``black`` auto-format your code (it shouldn't change any functionality) + with:: - black . --exclude="\.tox|build|env" + black . --exclude="\.tox|build|env" - **flake8** @@ -54,17 +58,16 @@ functionality is supported in your editor for a more convenient development workflow. It is not, however, recommended to run ``flake8`` on save as it takes a while and is very resource intensive. -## General rules +General rules +------------- - **Naming**: - Use camel case for class and type names - Use underscores for functions and variables. -- Use double quotes ``"foo"`` rather than single quotes ``'foo'``. - -- **Comments**: should follow the `google code style - <http://google.github.io/styleguide/pyguide.html?showone=Comments#Comments>`_. +- **Docstrings**: should follow the `google code style + <https://google.github.io/styleguide/pyguide.html#38-comments-and-docstrings>`_. This is so that we can generate documentation with `sphinx <http://sphinxcontrib-napoleon.readthedocs.org/en/latest/>`_. See the `examples @@ -73,6 +76,8 @@ takes a while and is very resource intensive. - **Imports**: + - Imports should be sorted by ``isort`` as described above. + - Prefer to import classes and functions rather than packages or modules. Example:: @@ -92,25 +97,84 @@ takes a while and is very resource intensive. This goes against the advice in the Google style guide, but it means that errors in the name are caught early (at import time). - - Multiple imports from the same package can be combined onto one line:: + - Avoid wildcard imports (``from synapse.types import *``) and relative + imports (``from .types import UserID``). - from synapse.types import GroupID, RoomID, UserID +Configuration file format +------------------------- - An effort should be made to keep the individual imports in alphabetical - order. +The `sample configuration file <./sample_config.yaml>`_ acts as a reference to +Synapse's configuration options for server administrators. Remember that many +readers will be unfamiliar with YAML and server administration in general, so +that it is important that the file be as easy to understand as possible, which +includes following a consistent format. - If the list becomes long, wrap it with parentheses and split it over - multiple lines. +Some guidelines follow: - - As per `PEP-8 <https://www.python.org/dev/peps/pep-0008/#imports>`_, - imports should be grouped in the following order, with a blank line between - each group: +* Sections should be separated with a heading consisting of a single line + prefixed and suffixed with ``##``. There should be **two** blank lines + before the section header, and **one** after. - 1. standard library imports - 2. related third party imports - 3. local application/library specific imports +* Each option should be listed in the file with the following format: - - Imports within each group should be sorted alphabetically by module name. + * A comment describing the setting. Each line of this comment should be + prefixed with a hash (``#``) and a space. - - Avoid wildcard imports (``from synapse.types import *``) and relative - imports (``from .types import UserID``). + The comment should describe the default behaviour (ie, what happens if + the setting is omitted), as well as what the effect will be if the + setting is changed. + + Often, the comment end with something like "uncomment the + following to \<do action>". + + * A line consisting of only ``#``. + + * A commented-out example setting, prefixed with only ``#``. + + For boolean (on/off) options, convention is that this example should be + the *opposite* to the default (so the comment will end with "Uncomment + the following to enable [or disable] \<feature\>." For other options, + the example should give some non-default value which is likely to be + useful to the reader. + +* There should be a blank line between each option. + +* Where several settings are grouped into a single dict, *avoid* the + convention where the whole block is commented out, resulting in comment + lines starting ``# #``, as this is hard to read and confusing to + edit. Instead, leave the top-level config option uncommented, and follow + the conventions above for sub-options. Ensure that your code correctly + handles the top-level option being set to ``None`` (as it will be if no + sub-options are enabled). + +* Lines should be wrapped at 80 characters. + +Example:: + + ## Frobnication ## + + # The frobnicator will ensure that all requests are fully frobnicated. + # To enable it, uncomment the following. + # + #frobnicator_enabled: true + + # By default, the frobnicator will frobnicate with the default frobber. + # The following will make it use an alternative frobber. + # + #frobincator_frobber: special_frobber + + # Settings for the frobber + # + frobber: + # frobbing speed. Defaults to 1. + # + #speed: 10 + + # frobbing distance. Defaults to 1000. + # + #distance: 100 + +Note that the sample configuration is generated from the synapse code and is +maintained by a script, ``scripts-dev/generate_sample_config``. Making sure +that the output from this script matches the desired format is left as an +exercise for the reader! diff --git a/synapse/config/logger.py b/synapse/config/logger.py index 40502a5798..d321d00b80 100644 --- a/synapse/config/logger.py +++ b/synapse/config/logger.py @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + import logging import logging.config import os @@ -75,10 +76,8 @@ root: class LoggingConfig(Config): def read_config(self, config, **kwargs): - self.verbosity = config.get("verbose", 0) - self.no_redirect_stdio = config.get("no_redirect_stdio", False) self.log_config = self.abspath(config.get("log_config")) - self.log_file = self.abspath(config.get("log_file")) + self.no_redirect_stdio = config.get("no_redirect_stdio", False) def generate_config_section(self, config_dir_path, server_name, **kwargs): log_config = os.path.join(config_dir_path, server_name + ".log.config") @@ -94,39 +93,13 @@ class LoggingConfig(Config): ) def read_arguments(self, args): - if args.verbose is not None: - self.verbosity = args.verbose if args.no_redirect_stdio is not None: self.no_redirect_stdio = args.no_redirect_stdio - if args.log_config is not None: - self.log_config = args.log_config - if args.log_file is not None: - self.log_file = args.log_file @staticmethod def add_arguments(parser): logging_group = parser.add_argument_group("logging") logging_group.add_argument( - "-v", - "--verbose", - dest="verbose", - action="count", - help="The verbosity level. Specify multiple times to increase " - "verbosity. (Ignored if --log-config is specified.)", - ) - logging_group.add_argument( - "-f", - "--log-file", - dest="log_file", - help="File to log to. (Ignored if --log-config is specified.)", - ) - logging_group.add_argument( - "--log-config", - dest="log_config", - default=None, - help="Python logging config file", - ) - logging_group.add_argument( "-n", "--no-redirect-stdio", action="store_true", @@ -153,58 +126,29 @@ def setup_logging(config, use_worker_options=False): config (LoggingConfig | synapse.config.workers.WorkerConfig): configuration data - use_worker_options (bool): True to use 'worker_log_config' and - 'worker_log_file' options instead of 'log_config' and 'log_file'. + use_worker_options (bool): True to use the 'worker_log_config' option + instead of 'log_config'. register_sighup (func | None): Function to call to register a sighup handler. """ log_config = config.worker_log_config if use_worker_options else config.log_config - log_file = config.worker_log_file if use_worker_options else config.log_file - - log_format = ( - "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s" - " - %(message)s" - ) if log_config is None: - # We don't have a logfile, so fall back to the 'verbosity' param from - # the config or cmdline. (Note that we generate a log config for new - # installs, so this will be an unusual case) - level = logging.INFO - level_for_storage = logging.INFO - if config.verbosity: - level = logging.DEBUG - if config.verbosity > 1: - level_for_storage = logging.DEBUG + log_format = ( + "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s" + " - %(message)s" + ) logger = logging.getLogger("") - logger.setLevel(level) - - logging.getLogger("synapse.storage.SQL").setLevel(level_for_storage) + logger.setLevel(logging.INFO) + logging.getLogger("synapse.storage.SQL").setLevel(logging.INFO) formatter = logging.Formatter(log_format) - if log_file: - # TODO: Customisable file size / backup count - handler = logging.handlers.RotatingFileHandler( - log_file, maxBytes=(1000 * 1000 * 100), backupCount=3, encoding="utf8" - ) - - def sighup(signum, stack): - logger.info("Closing log file due to SIGHUP") - handler.doRollover() - logger.info("Opened new log file due to SIGHUP") - - else: - handler = logging.StreamHandler() - - def sighup(*args): - pass + handler = logging.StreamHandler() handler.setFormatter(formatter) - handler.addFilter(LoggingContextFilter(request="")) - logger.addHandler(handler) else: @@ -218,8 +162,7 @@ def setup_logging(config, use_worker_options=False): logging.info("Reloaded log config from %s due to SIGHUP", log_config) load_log_config() - - appbase.register_sighup(sighup) + appbase.register_sighup(sighup) # make sure that the first thing we log is a thing we can grep backwards # for diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 3b75471d85..246d72cd61 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -31,8 +31,6 @@ class WorkerConfig(Config): self.worker_listeners = config.get("worker_listeners", []) self.worker_daemonize = config.get("worker_daemonize") self.worker_pid_file = config.get("worker_pid_file") - self.worker_log_file = config.get("worker_log_file") - self.worker_log_config = config.get("worker_log_config") # The host used to connect to the main synapse self.worker_replication_host = config.get("worker_replication_host", None) @@ -78,9 +76,5 @@ class WorkerConfig(Config): if args.daemonize is not None: self.worker_daemonize = args.daemonize - if args.log_config is not None: - self.worker_log_config = args.log_config - if args.log_file is not None: - self.worker_log_file = args.log_file if args.manhole is not None: self.worker_manhole = args.worker_manhole diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py index 3da33d7826..04393697c0 100644 --- a/synapse/logging/opentracing.py +++ b/synapse/logging/opentracing.py @@ -465,8 +465,7 @@ def trace_servlet(servlet_name, func): @wraps(func) @defer.inlineCallbacks def _trace_servlet_inner(request, *args, **kwargs): - with start_active_span_from_context( - request.requestHeaders, + with start_active_span( "incoming-client-request", tags={ "request_id": request.get_request_id(), diff --git a/synapse/static/index.html b/synapse/static/index.html index d3f1c7dce0..bf46df9097 100644 --- a/synapse/static/index.html +++ b/synapse/static/index.html @@ -48,13 +48,13 @@ </div> <h1>It works! Synapse is running</h1> <p>Your Synapse server is listening on this port and is ready for messages.</p> - <p>To use this server you'll need <a href="https://matrix.org/docs/projects/try-matrix-now.html#clients" target="_blank">a Matrix client</a>. + <p>To use this server you'll need <a href="https://matrix.org/docs/projects/try-matrix-now.html#clients" target="_blank" rel="noopener noreferrer">a Matrix client</a>. </p> <p>Welcome to the Matrix universe :)</p> <hr> <p> <small> - <a href="https://matrix.org" target="_blank"> + <a href="https://matrix.org" target="_blank" rel="noopener noreferrer"> matrix.org </a> </small> diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 2f940dbae6..a7c93efa46 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -86,7 +86,21 @@ _CURRENT_STATE_CACHE_NAME = "cs_cache_fake" class LoggingTransaction(object): """An object that almost-transparently proxies for the 'txn' object passed to the constructor. Adds logging and metrics to the .execute() - method.""" + method. + + Args: + txn: The database transcation object to wrap. + name (str): The name of this transactions for logging. + database_engine (Sqlite3Engine|PostgresEngine) + after_callbacks(list|None): A list that callbacks will be appended to + that have been added by `call_after` which should be run on + successful completion of the transaction. None indicates that no + callbacks should be allowed to be scheduled to run. + exception_callbacks(list|None): A list that callbacks will be appended + to that have been added by `call_on_exception` which should be run + if transaction ends with an error. None indicates that no callbacks + should be allowed to be scheduled to run. + """ __slots__ = [ "txn", @@ -97,7 +111,7 @@ class LoggingTransaction(object): ] def __init__( - self, txn, name, database_engine, after_callbacks, exception_callbacks + self, txn, name, database_engine, after_callbacks=None, exception_callbacks=None ): object.__setattr__(self, "txn", txn) object.__setattr__(self, "name", name) diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index eca77069fd..dcfb67e029 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -79,8 +79,6 @@ class EventPushActionsWorkerStore(SQLBaseStore): db_conn.cursor(), name="_find_stream_orderings_for_times_txn", database_engine=self.database_engine, - after_callbacks=[], - exception_callbacks=[], ) self._find_stream_orderings_for_times_txn(cur) cur.close() diff --git a/synapse/storage/events.py b/synapse/storage/events.py index b486ca50eb..b70457bfc6 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -918,8 +918,6 @@ class EventsStore( min_stream_order = events_and_contexts[0][0].internal_metadata.stream_ordering max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering - self._update_current_state_txn(txn, state_delta_for_room, min_stream_order) - self._update_forward_extremities_txn( txn, new_forward_extremities=new_forward_extremeties, @@ -993,6 +991,10 @@ class EventsStore( backfilled=backfilled, ) + # We call this last as it assumes we've inserted the events into + # room_memberships, where applicable. + self._update_current_state_txn(txn, state_delta_for_room, min_stream_order) + def _update_current_state_txn(self, txn, state_delta_by_room, stream_id): for room_id, current_state_tuple in iteritems(state_delta_by_room): to_delete, to_insert = current_state_tuple @@ -1062,16 +1064,16 @@ class EventsStore( ), ) - self._simple_insert_many_txn( - txn, - table="current_state_events", - values=[ - { - "event_id": ev_id, - "room_id": room_id, - "type": key[0], - "state_key": key[1], - } + # We include the membership in the current state table, hence we do + # a lookup when we insert. This assumes that all events have already + # been inserted into room_memberships. + txn.executemany( + """INSERT INTO current_state_events + (room_id, type, state_key, event_id, membership) + VALUES (?, ?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?)) + """, + [ + (room_id, key[0], key[1], ev_id, ev_id) for key, ev_id in iteritems(to_insert) ], ) diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 7c4e1dc7ec..d20eacda59 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -27,7 +27,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 55 +SCHEMA_VERSION = 56 dir_path = os.path.abspath(os.path.dirname(__file__)) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 32cfd010a5..257bcdb2f8 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -24,6 +24,8 @@ from canonicaljson import json from twisted.internet import defer from synapse.api.constants import EventTypes, Membership +from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.storage._base import LoggingTransaction from synapse.storage.events_worker import EventsWorkerStore from synapse.types import get_domain_from_id from synapse.util.async_helpers import Linearizer @@ -53,9 +55,51 @@ ProfileInfo = namedtuple("ProfileInfo", ("avatar_url", "display_name")) MemberSummary = namedtuple("MemberSummary", ("members", "count")) _MEMBERSHIP_PROFILE_UPDATE_NAME = "room_membership_profile_update" +_CURRENT_STATE_MEMBERSHIP_UPDATE_NAME = "current_state_events_membership" class RoomMemberWorkerStore(EventsWorkerStore): + def __init__(self, db_conn, hs): + super(RoomMemberWorkerStore, self).__init__(db_conn, hs) + + # Is the current_state_events.membership up to date? Or is the + # background update still running? + self._current_state_events_membership_up_to_date = False + + txn = LoggingTransaction( + db_conn.cursor(), + name="_check_safe_current_state_events_membership_updated", + database_engine=self.database_engine, + ) + self._check_safe_current_state_events_membership_updated_txn(txn) + txn.close() + + def _check_safe_current_state_events_membership_updated_txn(self, txn): + """Checks if it is safe to assume the new current_state_events + membership column is up to date + """ + + pending_update = self._simple_select_one_txn( + txn, + table="background_updates", + keyvalues={"update_name": _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME}, + retcols=["update_name"], + allow_none=True, + ) + + self._current_state_events_membership_up_to_date = not pending_update + + # If the update is still running, reschedule to run. + if pending_update: + self._clock.call_later( + 15.0, + run_as_background_process, + "_check_safe_current_state_events_membership_updated", + self.runInteraction, + "_check_safe_current_state_events_membership_updated", + self._check_safe_current_state_events_membership_updated_txn, + ) + @cachedInlineCallbacks(max_entries=100000, iterable=True, cache_context=True) def get_hosts_in_room(self, room_id, cache_context): """Returns the set of all hosts currently in the room @@ -69,14 +113,23 @@ class RoomMemberWorkerStore(EventsWorkerStore): @cached(max_entries=100000, iterable=True) def get_users_in_room(self, room_id): def f(txn): - sql = ( - "SELECT m.user_id FROM room_memberships as m" - " INNER JOIN current_state_events as c" - " ON m.event_id = c.event_id " - " AND m.room_id = c.room_id " - " AND m.user_id = c.state_key" - " WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?" - ) + # If we can assume current_state_events.membership is up to date + # then we can avoid a join, which is a Very Good Thing given how + # frequently this function gets called. + if self._current_state_events_membership_up_to_date: + sql = """ + SELECT state_key FROM current_state_events + WHERE type = 'm.room.member' AND room_id = ? AND membership = ? + """ + else: + sql = """ + SELECT state_key FROM room_memberships as m + INNER JOIN current_state_events as c + ON m.event_id = c.event_id + AND m.room_id = c.room_id + AND m.user_id = c.state_key + WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ? + """ txn.execute(sql, (room_id, Membership.JOIN)) return [to_ascii(r[0]) for r in txn] @@ -98,15 +151,26 @@ class RoomMemberWorkerStore(EventsWorkerStore): # first get counts. # We do this all in one transaction to keep the cache small. # FIXME: get rid of this when we have room_stats - sql = """ - SELECT count(*), m.membership FROM room_memberships as m - INNER JOIN current_state_events as c - ON m.event_id = c.event_id - AND m.room_id = c.room_id - AND m.user_id = c.state_key - WHERE c.type = 'm.room.member' AND c.room_id = ? - GROUP BY m.membership - """ + + # If we can assume current_state_events.membership is up to date + # then we can avoid a join, which is a Very Good Thing given how + # frequently this function gets called. + if self._current_state_events_membership_up_to_date: + sql = """ + SELECT count(*), membership FROM current_state_events + WHERE type = 'm.room.member' AND room_id = ? + GROUP BY membership + """ + else: + sql = """ + SELECT count(*), m.membership FROM room_memberships as m + INNER JOIN current_state_events as c + ON m.event_id = c.event_id + AND m.room_id = c.room_id + AND m.user_id = c.state_key + WHERE c.type = 'm.room.member' AND c.room_id = ? + GROUP BY m.membership + """ txn.execute(sql, (room_id,)) res = {} @@ -224,7 +288,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): results = [] if membership_list: where_clause = "user_id = ? AND (%s) AND forgotten = 0" % ( - " OR ".join(["membership = ?" for _ in membership_list]), + " OR ".join(["m.membership = ?" for _ in membership_list]), ) args = [user_id] @@ -453,8 +517,8 @@ class RoomMemberWorkerStore(EventsWorkerStore): sql = """ SELECT state_key FROM current_state_events AS c - INNER JOIN room_memberships USING (event_id) - WHERE membership = 'join' + INNER JOIN room_memberships AS m USING (event_id) + WHERE m.membership = 'join' AND type = 'm.room.member' AND c.room_id = ? AND state_key LIKE ? @@ -602,6 +666,10 @@ class RoomMemberStore(RoomMemberWorkerStore): self.register_background_update_handler( _MEMBERSHIP_PROFILE_UPDATE_NAME, self._background_add_membership_profile ) + self.register_background_update_handler( + _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME, + self._background_current_state_membership, + ) def _store_room_members_txn(self, txn, events, backfilled): """Store a room member in the database. @@ -781,6 +849,52 @@ class RoomMemberStore(RoomMemberWorkerStore): defer.returnValue(result) + @defer.inlineCallbacks + def _background_current_state_membership(self, progress, batch_size): + """Update the new membership column on current_state_events. + """ + + if "rooms" not in progress: + rooms = yield self._simple_select_onecol( + table="current_state_events", + keyvalues={}, + retcol="DISTINCT room_id", + desc="_background_current_state_membership_get_rooms", + ) + progress["rooms"] = rooms + + rooms = progress["rooms"] + + def _background_current_state_membership_txn(txn): + processed = 0 + while rooms and processed < batch_size: + sql = """ + UPDATE current_state_events AS c + SET membership = ( + SELECT membership FROM room_memberships + WHERE event_id = c.event_id + ) + WHERE room_id = ? + """ + txn.execute(sql, (rooms.pop(),)) + processed += txn.rowcount + + self._background_update_progress_txn( + txn, _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME, progress + ) + + return processed + + result = yield self.runInteraction( + "_background_current_state_membership_update", + _background_current_state_membership_txn, + ) + + if not rooms: + yield self._end_background_update(_CURRENT_STATE_MEMBERSHIP_UPDATE_NAME) + + defer.returnValue(result) + class _JoinedHostsCache(object): """Cache for joined hosts in a room that is optimised to handle updates diff --git a/synapse/storage/schema/delta/56/current_state_events_membership.sql b/synapse/storage/schema/delta/56/current_state_events_membership.sql new file mode 100644 index 0000000000..b2e08cd85d --- /dev/null +++ b/synapse/storage/schema/delta/56/current_state_events_membership.sql @@ -0,0 +1,25 @@ +/* Copyright 2019 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- We add membership to current state so that we don't need to join against +-- room_memberships, which can be surprisingly costly (we do such queries +-- very frequently). +-- This will be null for non-membership events and the content.membership key +-- for membership events. (Will also be null for membership events until the +-- background update job has finished). +ALTER TABLE current_state_events ADD membership TEXT; + +INSERT INTO background_updates (update_name, progress_json) VALUES + ('current_state_events_membership', '{}'); diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 0bfe1b4550..a35289876d 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -510,6 +510,12 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): event ID. """ + where_clause, where_args = state_filter.make_sql_filter_clause() + + if not where_clause: + # We delegate to the cached version + return self.get_current_state_ids(room_id) + def _get_filtered_current_state_ids_txn(txn): results = {} sql = """ @@ -517,8 +523,6 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): WHERE room_id = ? """ - where_clause, where_args = state_filter.make_sql_filter_clause() - if where_clause: sql += " AND (%s)" % (where_clause,) diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index fd18619178..c585cf6cf7 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -196,6 +196,26 @@ class TransactionStore(SQLBaseStore): def _set_destination_retry_timings( self, txn, destination, retry_last_ts, retry_interval ): + + if self.database_engine.can_native_upsert: + # Upsert retry time interval if retry_interval is zero (i.e. we're + # resetting it) or greater than the existing retry interval. + + sql = """ + INSERT INTO destinations (destination, retry_last_ts, retry_interval) + VALUES (?, ?, ?) + ON CONFLICT (destination) DO UPDATE SET + retry_last_ts = EXCLUDED.retry_last_ts, + retry_interval = EXCLUDED.retry_interval + WHERE + EXCLUDED.retry_interval = 0 + OR destinations.retry_interval < EXCLUDED.retry_interval + """ + + txn.execute(sql, (destination, retry_last_ts, retry_interval)) + + return + self.database_engine.lock_table(txn, "destinations") # We need to be careful here as the data may have changed from under us diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 83466e25d9..7fd16fe65e 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -618,15 +618,15 @@ class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore): sql = """ SELECT room_id FROM ( SELECT c.room_id FROM current_state_events AS c - INNER JOIN room_memberships USING (event_id) + INNER JOIN room_memberships AS m USING (event_id) WHERE type = 'm.room.member' - AND membership = 'join' + AND m.membership = 'join' AND state_key = ? ) AS f1 INNER JOIN ( SELECT c.room_id FROM current_state_events AS c - INNER JOIN room_memberships USING (event_id) + INNER JOIN room_memberships AS m USING (event_id) WHERE type = 'm.room.member' - AND membership = 'join' + AND m.membership = 'join' AND state_key = ? ) f2 USING (room_id) """ |