summary refs log tree commit diff
path: root/synapse/util/batching_queue.py
diff options
context:
space:
mode:
authorBrendan Abolivier <babolivier@matrix.org>2021-08-31 17:16:11 +0100
committerBrendan Abolivier <babolivier@matrix.org>2021-08-31 17:16:11 +0100
commit200ee12326bc8b8e73556f81272eecdcbc8f856f (patch)
tree6250a311d2e812297c03243f77140051abacb0e3 /synapse/util/batching_queue.py
parentMerge tag 'v1.34.0' into babolivier/dinsic_1.41.0 (diff)
parentMerge v1.35.0rc3 into v1.35.0 due to incorrect tagging (diff)
downloadsynapse-200ee12326bc8b8e73556f81272eecdcbc8f856f.tar.xz
Merge tag 'v1.35.0' into babolivier/dinsic_1.41.0
Synapse 1.35.0 (2021-06-01)
===========================

Note that [the tag](https://github.com/matrix-org/synapse/releases/tag/v1.35.0rc3) and [docker images](https://hub.docker.com/layers/matrixdotorg/synapse/v1.35.0rc3/images/sha256-34ccc87bd99a17e2cbc0902e678b5937d16bdc1991ead097eee6096481ecf2c4?context=explore) for `v1.35.0rc3` were incorrectly built. If you are experiencing issues with either, it is recommended to upgrade to the equivalent tag or docker image for the `v1.35.0` release.

Deprecations and Removals
-------------------------

- The core Synapse development team plan to drop support for the [unstable API of MSC2858](https://github.com/matrix-org/matrix-doc/blob/master/proposals/2858-Multiple-SSO-Identity-Providers.md#unstable-prefix), including the undocumented `experimental.msc2858_enabled` config option, in August 2021. Client authors should ensure that their clients are updated to use the stable API (which has been supported since Synapse 1.30) well before that time, to give their users time to upgrade. ([\#10101](https://github.com/matrix-org/synapse/issues/10101))

Bugfixes
--------

- Fixed a bug causing replication requests to fail when receiving a lot of events via federation. Introduced in v1.33.0. ([\#10082](https://github.com/matrix-org/synapse/issues/10082))
- Fix HTTP response size limit to allow joining very large rooms over federation. Introduced in v1.33.0. ([\#10093](https://github.com/matrix-org/synapse/issues/10093))

Internal Changes
----------------

- Log method and path when dropping request due to size limit. ([\#10091](https://github.com/matrix-org/synapse/issues/10091))

Synapse 1.35.0rc2 (2021-05-27)
==============================

Bugfixes
--------

- Fix a bug introduced in v1.35.0rc1 when calling the spaces summary API via a GET request. ([\#10079](https://github.com/matrix-org/synapse/issues/10079))

Synapse 1.35.0rc1 (2021-05-25)
==============================

Features
--------

- Add experimental support to allow a user who could join a restricted room to view it in the spaces summary. ([\#9922](https://github.com/matrix-org/synapse/issues/9922), [\#10007](https://github.com/matrix-org/synapse/issues/10007), [\#10038](https://github.com/matrix-org/synapse/issues/10038))
- Reduce memory usage when joining very large rooms over federation. ([\#9958](https://github.com/matrix-org/synapse/issues/9958))
- Add a configuration option which allows enabling opentracing by user id. ([\#9978](https://github.com/matrix-org/synapse/issues/9978))
- Enable experimental support for [MSC2946](https://github.com/matrix-org/matrix-doc/pull/2946) (spaces summary API) and [MSC3083](https://github.com/matrix-org/matrix-doc/pull/3083) (restricted join rules) by default. ([\#10011](https://github.com/matrix-org/synapse/issues/10011))

Bugfixes
--------

- Fix a bug introduced in v1.26.0 which meant that `synapse_port_db` would not correctly initialise some postgres sequences, requiring manual updates afterwards. ([\#9991](https://github.com/matrix-org/synapse/issues/9991))
- Fix `synctl`'s `--no-daemonize` parameter to work correctly with worker processes. ([\#9995](https://github.com/matrix-org/synapse/issues/9995))
- Fix a validation bug introduced in v1.34.0 in the ordering of spaces in the space summary API. ([\#10002](https://github.com/matrix-org/synapse/issues/10002))
- Fixed deletion of new presence stream states from database. ([\#10014](https://github.com/matrix-org/synapse/issues/10014), [\#10033](https://github.com/matrix-org/synapse/issues/10033))
- Fixed a bug with very high resolution image uploads throwing internal server errors. ([\#10029](https://github.com/matrix-org/synapse/issues/10029))

Updates to the Docker image
---------------------------

- Fix bug introduced in Synapse 1.33.0 which caused a `Permission denied: '/homeserver.log'` error when starting Synapse with the generated log configuration. Contributed by Sergio Miguéns Iglesias. ([\#10045](https://github.com/matrix-org/synapse/issues/10045))

Improved Documentation
----------------------

- Add hardened systemd files as proposed in [#9760](https://github.com/matrix-org/synapse/issues/9760) and added them to `contrib/`. Change the docs to reflect the presence of these files. ([\#9803](https://github.com/matrix-org/synapse/issues/9803))
- Clarify documentation around SSO mapping providers generating unique IDs and localparts. ([\#9980](https://github.com/matrix-org/synapse/issues/9980))
- Updates to the PostgreSQL documentation (`postgres.md`). ([\#9988](https://github.com/matrix-org/synapse/issues/9988), [\#9989](https://github.com/matrix-org/synapse/issues/9989))
- Fix broken link in user directory documentation. Contributed by @junquera. ([\#10016](https://github.com/matrix-org/synapse/issues/10016))
- Add missing room state entry to the table of contents of room admin API. ([\#10043](https://github.com/matrix-org/synapse/issues/10043))

Deprecations and Removals
-------------------------

- Removed support for the deprecated `tls_fingerprints` configuration setting. Contributed by Jerin J Titus. ([\#9280](https://github.com/matrix-org/synapse/issues/9280))

Internal Changes
----------------

- Allow sending full presence to users via workers other than the one that called `ModuleApi.send_local_online_presence_to`. ([\#9823](https://github.com/matrix-org/synapse/issues/9823))
- Update comments in the space summary handler. ([\#9974](https://github.com/matrix-org/synapse/issues/9974))
- Minor enhancements to the `@cachedList` descriptor. ([\#9975](https://github.com/matrix-org/synapse/issues/9975))
- Split multipart email sending into a dedicated handler. ([\#9977](https://github.com/matrix-org/synapse/issues/9977))
- Run `black` on files in the `scripts` directory. ([\#9981](https://github.com/matrix-org/synapse/issues/9981))
- Add missing type hints to `synapse.util` module. ([\#9982](https://github.com/matrix-org/synapse/issues/9982))
- Simplify a few helper functions. ([\#9984](https://github.com/matrix-org/synapse/issues/9984), [\#9985](https://github.com/matrix-org/synapse/issues/9985), [\#9986](https://github.com/matrix-org/synapse/issues/9986))
- Remove unnecessary property from SQLBaseStore. ([\#9987](https://github.com/matrix-org/synapse/issues/9987))
- Remove `keylen` param on `LruCache`. ([\#9993](https://github.com/matrix-org/synapse/issues/9993))
- Update the Grafana dashboard in `contrib/`. ([\#10001](https://github.com/matrix-org/synapse/issues/10001))
- Add a batching queue implementation. ([\#10017](https://github.com/matrix-org/synapse/issues/10017))
- Reduce memory usage when verifying signatures on large numbers of events at once. ([\#10018](https://github.com/matrix-org/synapse/issues/10018))
- Properly invalidate caches for destination retry timings every (instead of expiring entries every 5 minutes). ([\#10036](https://github.com/matrix-org/synapse/issues/10036))
- Fix running complement tests with Synapse workers. ([\#10039](https://github.com/matrix-org/synapse/issues/10039))
- Fix typo in `get_state_ids_for_event` docstring where the return type was incorrect. ([\#10050](https://github.com/matrix-org/synapse/issues/10050))
Diffstat (limited to 'synapse/util/batching_queue.py')
-rw-r--r--synapse/util/batching_queue.py153
1 files changed, 153 insertions, 0 deletions
diff --git a/synapse/util/batching_queue.py b/synapse/util/batching_queue.py
new file mode 100644

index 0000000000..44bbb7b1a8 --- /dev/null +++ b/synapse/util/batching_queue.py
@@ -0,0 +1,153 @@ +# Copyright 2021 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from typing import ( + Awaitable, + Callable, + Dict, + Generic, + Hashable, + List, + Set, + Tuple, + TypeVar, +) + +from twisted.internet import defer + +from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable +from synapse.metrics import LaterGauge +from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.util import Clock + +logger = logging.getLogger(__name__) + + +V = TypeVar("V") +R = TypeVar("R") + + +class BatchingQueue(Generic[V, R]): + """A queue that batches up work, calling the provided processing function + with all pending work (for a given key). + + The provided processing function will only be called once at a time for each + key. It will be called the next reactor tick after `add_to_queue` has been + called, and will keep being called until the queue has been drained (for the + given key). + + Note that the return value of `add_to_queue` will be the return value of the + processing function that processed the given item. This means that the + returned value will likely include data for other items that were in the + batch. + """ + + def __init__( + self, + name: str, + clock: Clock, + process_batch_callback: Callable[[List[V]], Awaitable[R]], + ): + self._name = name + self._clock = clock + + # The set of keys currently being processed. + self._processing_keys = set() # type: Set[Hashable] + + # The currently pending batch of values by key, with a Deferred to call + # with the result of the corresponding `_process_batch_callback` call. + self._next_values = {} # type: Dict[Hashable, List[Tuple[V, defer.Deferred]]] + + # The function to call with batches of values. + self._process_batch_callback = process_batch_callback + + LaterGauge( + "synapse_util_batching_queue_number_queued", + "The number of items waiting in the queue across all keys", + labels=("name",), + caller=lambda: sum(len(v) for v in self._next_values.values()), + ) + + LaterGauge( + "synapse_util_batching_queue_number_of_keys", + "The number of distinct keys that have items queued", + labels=("name",), + caller=lambda: len(self._next_values), + ) + + async def add_to_queue(self, value: V, key: Hashable = ()) -> R: + """Adds the value to the queue with the given key, returning the result + of the processing function for the batch that included the given value. + + The optional `key` argument allows sharding the queue by some key. The + queues will then be processed in parallel, i.e. the process batch + function will be called in parallel with batched values from a single + key. + """ + + # First we create a defer and add it and the value to the list of + # pending items. + d = defer.Deferred() + self._next_values.setdefault(key, []).append((value, d)) + + # If we're not currently processing the key fire off a background + # process to start processing. + if key not in self._processing_keys: + run_as_background_process(self._name, self._process_queue, key) + + return await make_deferred_yieldable(d) + + async def _process_queue(self, key: Hashable) -> None: + """A background task to repeatedly pull things off the queue for the + given key and call the `self._process_batch_callback` with the values. + """ + + try: + if key in self._processing_keys: + return + + self._processing_keys.add(key) + + while True: + # We purposefully wait a reactor tick to allow us to batch + # together requests that we're about to receive. A common + # pattern is to call `add_to_queue` multiple times at once, and + # deferring to the next reactor tick allows us to batch all of + # those up. + await self._clock.sleep(0) + + next_values = self._next_values.pop(key, []) + if not next_values: + # We've exhausted the queue. + break + + try: + values = [value for value, _ in next_values] + results = await self._process_batch_callback(values) + + for _, deferred in next_values: + with PreserveLoggingContext(): + deferred.callback(results) + + except Exception as e: + for _, deferred in next_values: + if deferred.called: + continue + + with PreserveLoggingContext(): + deferred.errback(e) + + finally: + self._processing_keys.discard(key)