From 557635f69ab734142ae5889e215ea512f7678f21 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 25 May 2021 11:00:13 +0100 Subject: 1.35.0rc1 --- synapse/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/__init__.py b/synapse/__init__.py index 7498a6016f..e60e9db71e 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -47,7 +47,7 @@ try: except ImportError: pass -__version__ = "1.34.0" +__version__ = "1.35.0rc1" if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)): # We import here so that we don't have to install a bunch of deps when -- cgit 1.5.1 From 65e6c64d8317d3a10527a7e422753281f3e9ec81 Mon Sep 17 00:00:00 2001 From: Dirk Klimpel <5740567+dklimpel@users.noreply.github.com> Date: Wed, 26 May 2021 12:19:47 +0200 Subject: Add an admin API for unprotecting local media from quarantine (#10040) Signed-off-by: Dirk Klimpel dirk@klimpel.org --- changelog.d/10040.feature | 1 + docs/admin_api/media_admin_api.md | 21 +++++ synapse/rest/admin/media.py | 28 +++++- synapse/storage/databases/main/media_repository.py | 7 +- tests/rest/admin/test_media.py | 99 ++++++++++++++++++++++ 5 files changed, 151 insertions(+), 5 deletions(-) create mode 100644 changelog.d/10040.feature (limited to 'synapse') diff --git a/changelog.d/10040.feature b/changelog.d/10040.feature new file mode 100644 index 0000000000..ec78a30f00 --- /dev/null +++ b/changelog.d/10040.feature @@ -0,0 +1 @@ +Add an admin API for unprotecting local media from quarantine. Contributed by @dklimpel. diff --git a/docs/admin_api/media_admin_api.md b/docs/admin_api/media_admin_api.md index 9dbec68c19..d1b7e390d5 100644 --- a/docs/admin_api/media_admin_api.md +++ b/docs/admin_api/media_admin_api.md @@ -7,6 +7,7 @@ * [Quarantining media in a room](#quarantining-media-in-a-room) * [Quarantining all media of a user](#quarantining-all-media-of-a-user) * [Protecting media from being quarantined](#protecting-media-from-being-quarantined) + * [Unprotecting media from being quarantined](#unprotecting-media-from-being-quarantined) - [Delete local media](#delete-local-media) * [Delete a specific local media](#delete-a-specific-local-media) * [Delete local media by date or size](#delete-local-media-by-date-or-size) @@ -159,6 +160,26 @@ Response: {} ``` +## Unprotecting media from being quarantined + +This API reverts the protection of a media. + +Request: + +``` +POST /_synapse/admin/v1/media/unprotect/ + +{} +``` + +Where `media_id` is in the form of `abcdefg12345...`. + +Response: + +```json +{} +``` + # Delete local media This API deletes the *local* media from the disk of your own server. This includes any local thumbnails and copies of media downloaded from diff --git a/synapse/rest/admin/media.py b/synapse/rest/admin/media.py index 24dd46113a..2c71af4279 100644 --- a/synapse/rest/admin/media.py +++ b/synapse/rest/admin/media.py @@ -137,8 +137,31 @@ class ProtectMediaByID(RestServlet): logging.info("Protecting local media by ID: %s", media_id) - # Quarantine this media id - await self.store.mark_local_media_as_safe(media_id) + # Protect this media id + await self.store.mark_local_media_as_safe(media_id, safe=True) + + return 200, {} + + +class UnprotectMediaByID(RestServlet): + """Unprotect local media from being quarantined.""" + + PATTERNS = admin_patterns("/media/unprotect/(?P[^/]+)") + + def __init__(self, hs: "HomeServer"): + self.store = hs.get_datastore() + self.auth = hs.get_auth() + + async def on_POST( + self, request: SynapseRequest, media_id: str + ) -> Tuple[int, JsonDict]: + requester = await self.auth.get_user_by_req(request) + await assert_user_is_admin(self.auth, requester.user) + + logging.info("Unprotecting local media by ID: %s", media_id) + + # Unprotect this media id + await self.store.mark_local_media_as_safe(media_id, safe=False) return 200, {} @@ -269,6 +292,7 @@ def register_servlets_for_media_repo(hs: "HomeServer", http_server): QuarantineMediaByID(hs).register(http_server) QuarantineMediaByUser(hs).register(http_server) ProtectMediaByID(hs).register(http_server) + UnprotectMediaByID(hs).register(http_server) ListMediaInRoom(hs).register(http_server) DeleteMediaByID(hs).register(http_server) DeleteMediaByDateSize(hs).register(http_server) diff --git a/synapse/storage/databases/main/media_repository.py b/synapse/storage/databases/main/media_repository.py index c584868188..2fa945d171 100644 --- a/synapse/storage/databases/main/media_repository.py +++ b/synapse/storage/databases/main/media_repository.py @@ -143,6 +143,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): "created_ts", "quarantined_by", "url_cache", + "safe_from_quarantine", ), allow_none=True, desc="get_local_media", @@ -296,12 +297,12 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): desc="store_local_media", ) - async def mark_local_media_as_safe(self, media_id: str) -> None: - """Mark a local media as safe from quarantining.""" + async def mark_local_media_as_safe(self, media_id: str, safe: bool = True) -> None: + """Mark a local media as safe or unsafe from quarantining.""" await self.db_pool.simple_update_one( table="local_media_repository", keyvalues={"media_id": media_id}, - updatevalues={"safe_from_quarantine": True}, + updatevalues={"safe_from_quarantine": safe}, desc="mark_local_media_as_safe", ) diff --git a/tests/rest/admin/test_media.py b/tests/rest/admin/test_media.py index ac7b219700..f741121ea2 100644 --- a/tests/rest/admin/test_media.py +++ b/tests/rest/admin/test_media.py @@ -16,6 +16,8 @@ import json import os from binascii import unhexlify +from parameterized import parameterized + import synapse.rest.admin from synapse.api.errors import Codes from synapse.rest.client.v1 import login, profile, room @@ -562,3 +564,100 @@ class DeleteMediaByDateSizeTestCase(unittest.HomeserverTestCase): ) # Test that the file is deleted self.assertFalse(os.path.exists(local_path)) + + +class ProtectMediaByIDTestCase(unittest.HomeserverTestCase): + + servlets = [ + synapse.rest.admin.register_servlets, + synapse.rest.admin.register_servlets_for_media_repo, + login.register_servlets, + ] + + def prepare(self, reactor, clock, hs): + media_repo = hs.get_media_repository_resource() + self.store = hs.get_datastore() + + self.admin_user = self.register_user("admin", "pass", admin=True) + self.admin_user_tok = self.login("admin", "pass") + + # Create media + upload_resource = media_repo.children[b"upload"] + # file size is 67 Byte + image_data = unhexlify( + b"89504e470d0a1a0a0000000d4948445200000001000000010806" + b"0000001f15c4890000000a49444154789c63000100000500010d" + b"0a2db40000000049454e44ae426082" + ) + + # Upload some media into the room + response = self.helper.upload_media( + upload_resource, image_data, tok=self.admin_user_tok, expect_code=200 + ) + # Extract media ID from the response + server_and_media_id = response["content_uri"][6:] # Cut off 'mxc://' + self.media_id = server_and_media_id.split("/")[1] + + self.url = "/_synapse/admin/v1/media/%s/%s" + + @parameterized.expand(["protect", "unprotect"]) + def test_no_auth(self, action: str): + """ + Try to protect media without authentication. + """ + + channel = self.make_request("POST", self.url % (action, self.media_id), b"{}") + + self.assertEqual(401, int(channel.result["code"]), msg=channel.result["body"]) + self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"]) + + @parameterized.expand(["protect", "unprotect"]) + def test_requester_is_no_admin(self, action: str): + """ + If the user is not a server admin, an error is returned. + """ + self.other_user = self.register_user("user", "pass") + self.other_user_token = self.login("user", "pass") + + channel = self.make_request( + "POST", + self.url % (action, self.media_id), + access_token=self.other_user_token, + ) + + self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"]) + self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"]) + + def test_protect_media(self): + """ + Tests that protect and unprotect a media is successfully + """ + + media_info = self.get_success(self.store.get_local_media(self.media_id)) + self.assertFalse(media_info["safe_from_quarantine"]) + + # protect + channel = self.make_request( + "POST", + self.url % ("protect", self.media_id), + access_token=self.admin_user_tok, + ) + + self.assertEqual(200, channel.code, msg=channel.json_body) + self.assertFalse(channel.json_body) + + media_info = self.get_success(self.store.get_local_media(self.media_id)) + self.assertTrue(media_info["safe_from_quarantine"]) + + # unprotect + channel = self.make_request( + "POST", + self.url % ("unprotect", self.media_id), + access_token=self.admin_user_tok, + ) + + self.assertEqual(200, channel.code, msg=channel.json_body) + self.assertFalse(channel.json_body) + + media_info = self.get_success(self.store.get_local_media(self.media_id)) + self.assertFalse(media_info["safe_from_quarantine"]) -- cgit 1.5.1 From f42e4c4eb9b5b84bd1da80a4f3938c1c06305364 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 26 May 2021 14:35:16 -0400 Subject: Remove the experimental spaces enabled flag. (#10063) In lieu of just always enabling the unstable spaces endpoint and unstable room version. --- changelog.d/10063.removal | 1 + docs/sample_config.yaml | 15 --------------- synapse/api/room_versions.py | 2 +- synapse/config/experimental.py | 23 ----------------------- synapse/federation/transport/server.py | 13 ++++++------- synapse/rest/client/v1/room.py | 4 +--- 6 files changed, 9 insertions(+), 49 deletions(-) create mode 100644 changelog.d/10063.removal (limited to 'synapse') diff --git a/changelog.d/10063.removal b/changelog.d/10063.removal new file mode 100644 index 0000000000..0f8889b6b4 --- /dev/null +++ b/changelog.d/10063.removal @@ -0,0 +1 @@ +Remove the experimental `spaces_enabled` flag. The spaces features are always available now. diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml index 6576b153d0..7b97f73a29 100644 --- a/docs/sample_config.yaml +++ b/docs/sample_config.yaml @@ -2916,18 +2916,3 @@ redis: # Optional password if configured on the Redis instance # #password: - - -# Enable experimental features in Synapse. -# -# Experimental features might break or be removed without a deprecation -# period. -# -experimental_features: - # Support for Spaces (MSC1772), it enables the following: - # - # * The Spaces Summary API (MSC2946). - # * Restricting room membership based on space membership (MSC3083). - # - # Uncomment to disable support for Spaces. - #spaces_enabled: false diff --git a/synapse/api/room_versions.py b/synapse/api/room_versions.py index c9f9596ada..373a4669d0 100644 --- a/synapse/api/room_versions.py +++ b/synapse/api/room_versions.py @@ -181,6 +181,6 @@ KNOWN_ROOM_VERSIONS = { RoomVersions.V5, RoomVersions.V6, RoomVersions.MSC2176, + RoomVersions.MSC3083, ) - # Note that we do not include MSC3083 here unless it is enabled in the config. } # type: Dict[str, RoomVersion] diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py index cc67377f0f..6ebce4b2f7 100644 --- a/synapse/config/experimental.py +++ b/synapse/config/experimental.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersions from synapse.config._base import Config from synapse.types import JsonDict @@ -28,27 +27,5 @@ class ExperimentalConfig(Config): # MSC2858 (multiple SSO identity providers) self.msc2858_enabled = experimental.get("msc2858_enabled", False) # type: bool - # Spaces (MSC1772, MSC2946, MSC3083, etc) - self.spaces_enabled = experimental.get("spaces_enabled", True) # type: bool - if self.spaces_enabled: - KNOWN_ROOM_VERSIONS[RoomVersions.MSC3083.identifier] = RoomVersions.MSC3083 - # MSC3026 (busy presence state) self.msc3026_enabled = experimental.get("msc3026_enabled", False) # type: bool - - def generate_config_section(self, **kwargs): - return """\ - # Enable experimental features in Synapse. - # - # Experimental features might break or be removed without a deprecation - # period. - # - experimental_features: - # Support for Spaces (MSC1772), it enables the following: - # - # * The Spaces Summary API (MSC2946). - # * Restricting room membership based on space membership (MSC3083). - # - # Uncomment to disable support for Spaces. - #spaces_enabled: false - """ diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 9d50b05d01..00ff02c7cb 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -1562,13 +1562,12 @@ def register_servlets( server_name=hs.hostname, ).register(resource) - if hs.config.experimental.spaces_enabled: - FederationSpaceSummaryServlet( - handler=hs.get_space_summary_handler(), - authenticator=authenticator, - ratelimiter=ratelimiter, - server_name=hs.hostname, - ).register(resource) + FederationSpaceSummaryServlet( + handler=hs.get_space_summary_handler(), + authenticator=authenticator, + ratelimiter=ratelimiter, + server_name=hs.hostname, + ).register(resource) if "openid" in servlet_groups: for servletclass in OPENID_SERVLET_CLASSES: diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 51813cccbe..d6d55893af 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -1060,9 +1060,7 @@ def register_servlets(hs: "HomeServer", http_server, is_worker=False): RoomRedactEventRestServlet(hs).register(http_server) RoomTypingRestServlet(hs).register(http_server) RoomEventContextServlet(hs).register(http_server) - - if hs.config.experimental.spaces_enabled: - RoomSpaceSummaryRestServlet(hs).register(http_server) + RoomSpaceSummaryRestServlet(hs).register(http_server) # Some servlets only get registered for the main process. if not is_worker: -- cgit 1.5.1 From 224f2f949b1661094a64d1105efb64159ddf4aa0 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Thu, 27 May 2021 10:33:56 +0100 Subject: Combine `LruCache.invalidate` and `invalidate_many` (#9973) * Make `invalidate` and `invalidate_many` do the same thing ... so that we can do either over the invalidation replication stream, and also because they always confused me a bit. * Kill off `invalidate_many` * changelog --- changelog.d/9973.misc | 1 + synapse/replication/slave/storage/devices.py | 2 +- synapse/storage/databases/main/cache.py | 6 ++-- synapse/storage/databases/main/devices.py | 2 +- .../storage/databases/main/event_push_actions.py | 2 +- synapse/storage/databases/main/events.py | 8 ++--- synapse/storage/databases/main/receipts.py | 6 ++-- synapse/util/caches/deferred_cache.py | 42 +++++++++------------- synapse/util/caches/descriptors.py | 8 +++-- synapse/util/caches/lrucache.py | 18 ++++++---- synapse/util/caches/treecache.py | 3 ++ tests/util/caches/test_descriptors.py | 6 ++-- 12 files changed, 52 insertions(+), 52 deletions(-) create mode 100644 changelog.d/9973.misc (limited to 'synapse') diff --git a/changelog.d/9973.misc b/changelog.d/9973.misc new file mode 100644 index 0000000000..7f22d42291 --- /dev/null +++ b/changelog.d/9973.misc @@ -0,0 +1 @@ +Make `LruCache.invalidate` support tree invalidation, and remove `invalidate_many`. diff --git a/synapse/replication/slave/storage/devices.py b/synapse/replication/slave/storage/devices.py index 70207420a6..26bdead565 100644 --- a/synapse/replication/slave/storage/devices.py +++ b/synapse/replication/slave/storage/devices.py @@ -68,7 +68,7 @@ class SlavedDeviceStore(EndToEndKeyWorkerStore, DeviceWorkerStore, BaseSlavedSto if row.entity.startswith("@"): self._device_list_stream_cache.entity_has_changed(row.entity, token) self.get_cached_devices_for_user.invalidate((row.entity,)) - self._get_cached_user_device.invalidate_many((row.entity,)) + self._get_cached_user_device.invalidate((row.entity,)) self.get_device_list_last_stream_id_for_remote.invalidate((row.entity,)) else: diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index ecc1f935e2..f7872501a0 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -171,7 +171,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore): self.get_latest_event_ids_in_room.invalidate((room_id,)) - self.get_unread_event_push_actions_by_room_for_user.invalidate_many((room_id,)) + self.get_unread_event_push_actions_by_room_for_user.invalidate((room_id,)) if not backfilled: self._events_stream_cache.entity_has_changed(room_id, stream_ordering) @@ -184,8 +184,8 @@ class CacheInvalidationWorkerStore(SQLBaseStore): self.get_invited_rooms_for_local_user.invalidate((state_key,)) if relates_to: - self.get_relations_for_event.invalidate_many((relates_to,)) - self.get_aggregation_groups_for_event.invalidate_many((relates_to,)) + self.get_relations_for_event.invalidate((relates_to,)) + self.get_aggregation_groups_for_event.invalidate((relates_to,)) self.get_applicable_edit.invalidate((relates_to,)) async def invalidate_cache_and_stream(self, cache_name: str, keys: Tuple[Any, ...]): diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index fd87ba71ab..18f07d96dc 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -1282,7 +1282,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): ) txn.call_after(self.get_cached_devices_for_user.invalidate, (user_id,)) - txn.call_after(self._get_cached_user_device.invalidate_many, (user_id,)) + txn.call_after(self._get_cached_user_device.invalidate, (user_id,)) txn.call_after( self.get_device_list_last_stream_id_for_remote.invalidate, (user_id,) ) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 5845322118..d1237c65cc 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -860,7 +860,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): not be deleted. """ txn.call_after( - self.get_unread_event_push_actions_by_room_for_user.invalidate_many, + self.get_unread_event_push_actions_by_room_for_user.invalidate, (room_id, user_id), ) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index fd25c8112d..897fa06639 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1748,9 +1748,9 @@ class PersistEventsStore: }, ) - txn.call_after(self.store.get_relations_for_event.invalidate_many, (parent_id,)) + txn.call_after(self.store.get_relations_for_event.invalidate, (parent_id,)) txn.call_after( - self.store.get_aggregation_groups_for_event.invalidate_many, (parent_id,) + self.store.get_aggregation_groups_for_event.invalidate, (parent_id,) ) if rel_type == RelationTypes.REPLACE: @@ -1903,7 +1903,7 @@ class PersistEventsStore: for user_id in user_ids: txn.call_after( - self.store.get_unread_event_push_actions_by_room_for_user.invalidate_many, + self.store.get_unread_event_push_actions_by_room_for_user.invalidate, (room_id, user_id), ) @@ -1917,7 +1917,7 @@ class PersistEventsStore: def _remove_push_actions_for_event_id_txn(self, txn, room_id, event_id): # Sad that we have to blow away the cache for the whole room here txn.call_after( - self.store.get_unread_event_push_actions_by_room_for_user.invalidate_many, + self.store.get_unread_event_push_actions_by_room_for_user.invalidate, (room_id,), ) txn.execute( diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 3647276acb..edeaacd7a6 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -460,7 +460,7 @@ class ReceiptsWorkerStore(SQLBaseStore): def invalidate_caches_for_receipt(self, room_id, receipt_type, user_id): self.get_receipts_for_user.invalidate((user_id, receipt_type)) - self._get_linearized_receipts_for_room.invalidate_many((room_id,)) + self._get_linearized_receipts_for_room.invalidate((room_id,)) self.get_last_receipt_event_id_for_user.invalidate( (user_id, room_id, receipt_type) ) @@ -659,9 +659,7 @@ class ReceiptsWorkerStore(SQLBaseStore): ) txn.call_after(self.get_receipts_for_user.invalidate, (user_id, receipt_type)) # FIXME: This shouldn't invalidate the whole cache - txn.call_after( - self._get_linearized_receipts_for_room.invalidate_many, (room_id,) - ) + txn.call_after(self._get_linearized_receipts_for_room.invalidate, (room_id,)) self.db_pool.simple_delete_txn( txn, diff --git a/synapse/util/caches/deferred_cache.py b/synapse/util/caches/deferred_cache.py index 371e7e4dd0..1044139119 100644 --- a/synapse/util/caches/deferred_cache.py +++ b/synapse/util/caches/deferred_cache.py @@ -16,16 +16,7 @@ import enum import threading -from typing import ( - Callable, - Generic, - Iterable, - MutableMapping, - Optional, - TypeVar, - Union, - cast, -) +from typing import Callable, Generic, Iterable, MutableMapping, Optional, TypeVar, Union from prometheus_client import Gauge @@ -91,7 +82,7 @@ class DeferredCache(Generic[KT, VT]): # _pending_deferred_cache maps from the key value to a `CacheEntry` object. self._pending_deferred_cache = ( cache_type() - ) # type: MutableMapping[KT, CacheEntry] + ) # type: Union[TreeCache, MutableMapping[KT, CacheEntry]] def metrics_cb(): cache_pending_metric.labels(name).set(len(self._pending_deferred_cache)) @@ -287,8 +278,17 @@ class DeferredCache(Generic[KT, VT]): self.cache.set(key, value, callbacks=callbacks) def invalidate(self, key): + """Delete a key, or tree of entries + + If the cache is backed by a regular dict, then "key" must be of + the right type for this cache + + If the cache is backed by a TreeCache, then "key" must be a tuple, but + may be of lower cardinality than the TreeCache - in which case the whole + subtree is deleted. + """ self.check_thread() - self.cache.pop(key, None) + self.cache.del_multi(key) # if we have a pending lookup for this key, remove it from the # _pending_deferred_cache, which will (a) stop it being returned @@ -299,20 +299,10 @@ class DeferredCache(Generic[KT, VT]): # run the invalidation callbacks now, rather than waiting for the # deferred to resolve. if entry: - entry.invalidate() - - def invalidate_many(self, key: KT): - self.check_thread() - if not isinstance(key, tuple): - raise TypeError("The cache key must be a tuple not %r" % (type(key),)) - key = cast(KT, key) - self.cache.del_multi(key) - - # if we have a pending lookup for this key, remove it from the - # _pending_deferred_cache, as above - entry_dict = self._pending_deferred_cache.pop(key, None) - if entry_dict is not None: - for entry in iterate_tree_cache_entry(entry_dict): + # _pending_deferred_cache.pop should either return a CacheEntry, or, in the + # case of a TreeCache, a dict of keys to cache entries. Either way calling + # iterate_tree_cache_entry on it will do the right thing. + for entry in iterate_tree_cache_entry(entry): entry.invalidate() def invalidate_all(self): diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index 2ac24a2f25..d77e8edeea 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -48,7 +48,6 @@ F = TypeVar("F", bound=Callable[..., Any]) class _CachedFunction(Generic[F]): invalidate = None # type: Any invalidate_all = None # type: Any - invalidate_many = None # type: Any prefill = None # type: Any cache = None # type: Any num_args = None # type: Any @@ -262,6 +261,11 @@ class DeferredCacheDescriptor(_CacheDescriptorBase): ): super().__init__(orig, num_args=num_args, cache_context=cache_context) + if tree and self.num_args < 2: + raise RuntimeError( + "tree=True is nonsensical for cached functions with a single parameter" + ) + self.max_entries = max_entries self.tree = tree self.iterable = iterable @@ -302,11 +306,11 @@ class DeferredCacheDescriptor(_CacheDescriptorBase): wrapped = cast(_CachedFunction, _wrapped) if self.num_args == 1: + assert not self.tree wrapped.invalidate = lambda key: cache.invalidate(key[0]) wrapped.prefill = lambda key, val: cache.prefill(key[0], val) else: wrapped.invalidate = cache.invalidate - wrapped.invalidate_many = cache.invalidate_many wrapped.prefill = cache.prefill wrapped.invalidate_all = cache.invalidate_all diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py index 54df407ff7..d89e9d9b1d 100644 --- a/synapse/util/caches/lrucache.py +++ b/synapse/util/caches/lrucache.py @@ -152,7 +152,6 @@ class LruCache(Generic[KT, VT]): """ Least-recently-used cache, supporting prometheus metrics and invalidation callbacks. - Supports del_multi only if cache_type=TreeCache If cache_type=TreeCache, all keys must be tuples. """ @@ -393,10 +392,16 @@ class LruCache(Generic[KT, VT]): @synchronized def cache_del_multi(key: KT) -> None: + """Delete an entry, or tree of entries + + If the LruCache is backed by a regular dict, then "key" must be of + the right type for this cache + + If the LruCache is backed by a TreeCache, then "key" must be a tuple, but + may be of lower cardinality than the TreeCache - in which case the whole + subtree is deleted. """ - This will only work if constructed with cache_type=TreeCache - """ - popped = cache.pop(key) + popped = cache.pop(key, None) if popped is None: return # for each deleted node, we now need to remove it from the linked list @@ -430,11 +435,10 @@ class LruCache(Generic[KT, VT]): self.set = cache_set self.setdefault = cache_set_default self.pop = cache_pop + self.del_multi = cache_del_multi # `invalidate` is exposed for consistency with DeferredCache, so that it can be # invalidated by the cache invalidation replication stream. - self.invalidate = cache_pop - if cache_type is TreeCache: - self.del_multi = cache_del_multi + self.invalidate = cache_del_multi self.len = synchronized(cache_len) self.contains = cache_contains self.clear = cache_clear diff --git a/synapse/util/caches/treecache.py b/synapse/util/caches/treecache.py index 73502a8b06..a6df81ebff 100644 --- a/synapse/util/caches/treecache.py +++ b/synapse/util/caches/treecache.py @@ -89,6 +89,9 @@ class TreeCache: value. If the key is partial, the TreeCacheNode corresponding to the part of the tree that was removed. """ + if not isinstance(key, tuple): + raise TypeError("The cache key must be a tuple not %r" % (type(key),)) + # a list of the nodes we have touched on the way down the tree nodes = [] diff --git a/tests/util/caches/test_descriptors.py b/tests/util/caches/test_descriptors.py index bbbc276697..0277998cbe 100644 --- a/tests/util/caches/test_descriptors.py +++ b/tests/util/caches/test_descriptors.py @@ -622,17 +622,17 @@ class CacheDecoratorTestCase(unittest.HomeserverTestCase): self.assertEquals(callcount2[0], 1) a.func2.invalidate(("foo",)) - self.assertEquals(a.func2.cache.cache.pop.call_count, 1) + self.assertEquals(a.func2.cache.cache.del_multi.call_count, 1) yield a.func2("foo") a.func2.invalidate(("foo",)) - self.assertEquals(a.func2.cache.cache.pop.call_count, 2) + self.assertEquals(a.func2.cache.cache.del_multi.call_count, 2) self.assertEquals(callcount[0], 1) self.assertEquals(callcount2[0], 2) a.func.invalidate(("foo",)) - self.assertEquals(a.func2.cache.cache.pop.call_count, 3) + self.assertEquals(a.func2.cache.cache.del_multi.call_count, 3) yield a.func("foo") self.assertEquals(callcount[0], 2) -- cgit 1.5.1 From fe5dad46b0da00e9757ed54eb23304ed3c6ceadf Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Thu, 27 May 2021 10:34:24 +0100 Subject: Remove redundant code to reload tls cert (#10054) we don't need to reload the tls cert if we don't have any tls listeners. Follow-up to #9280. --- changelog.d/10054.misc | 1 + synapse/app/_base.py | 5 +---- synapse/config/tls.py | 22 +++------------------- tests/config/test_tls.py | 3 +-- 4 files changed, 6 insertions(+), 25 deletions(-) create mode 100644 changelog.d/10054.misc (limited to 'synapse') diff --git a/changelog.d/10054.misc b/changelog.d/10054.misc new file mode 100644 index 0000000000..cebe39ce54 --- /dev/null +++ b/changelog.d/10054.misc @@ -0,0 +1 @@ +Remove some dead code regarding TLS certificate handling. diff --git a/synapse/app/_base.py b/synapse/app/_base.py index 59918d789e..1329af2e2b 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -261,13 +261,10 @@ def refresh_certificate(hs): Refresh the TLS certificates that Synapse is using by re-reading them from disk and updating the TLS context factories to use them. """ - if not hs.config.has_tls_listener(): - # attempt to reload the certs for the good of the tls_fingerprints - hs.config.read_certificate_from_disk(require_cert_and_key=False) return - hs.config.read_certificate_from_disk(require_cert_and_key=True) + hs.config.read_certificate_from_disk() hs.tls_server_context_factory = context_factory.ServerContextFactory(hs.config) if hs._listening_services: diff --git a/synapse/config/tls.py b/synapse/config/tls.py index 26f1150ca5..0e9bba53c9 100644 --- a/synapse/config/tls.py +++ b/synapse/config/tls.py @@ -215,28 +215,12 @@ class TlsConfig(Config): days_remaining = (expires_on - now).days return days_remaining - def read_certificate_from_disk(self, require_cert_and_key: bool): + def read_certificate_from_disk(self): """ Read the certificates and private key from disk. - - Args: - require_cert_and_key: set to True to throw an error if the certificate - and key file are not given """ - if require_cert_and_key: - self.tls_private_key = self.read_tls_private_key() - self.tls_certificate = self.read_tls_certificate() - elif self.tls_certificate_file: - # we only need the certificate for the tls_fingerprints. Reload it if we - # can, but it's not a fatal error if we can't. - try: - self.tls_certificate = self.read_tls_certificate() - except Exception as e: - logger.info( - "Unable to read TLS certificate (%s). Ignoring as no " - "tls listeners enabled.", - e, - ) + self.tls_private_key = self.read_tls_private_key() + self.tls_certificate = self.read_tls_certificate() def generate_config_section( self, diff --git a/tests/config/test_tls.py b/tests/config/test_tls.py index 183034f7d4..dcf336416c 100644 --- a/tests/config/test_tls.py +++ b/tests/config/test_tls.py @@ -74,12 +74,11 @@ s4niecZKPBizL6aucT59CsunNmmb5Glq8rlAcU+1ZTZZzGYqVYhF6axB9Qg= config = { "tls_certificate_path": os.path.join(config_dir, "cert.pem"), - "tls_fingerprints": [], } t = TestConfig() t.read_config(config, config_dir_path="", data_dir_path="") - t.read_certificate_from_disk(require_cert_and_key=False) + t.read_tls_certificate() warnings = self.flushWarnings() self.assertEqual(len(warnings), 1) -- cgit 1.5.1 From 8e15c92c2f9e581e59ff68495fa99e998849bb4d Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 27 May 2021 08:52:28 -0400 Subject: Pass the origin when calculating the spaces summary over GET. (#10079) Fixes a bug due to conflicting PRs which were merged. (One added a new caller to a method, the other added a new parameter to the same method.) --- changelog.d/10079.bugfix | 1 + synapse/federation/transport/server.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) create mode 100644 changelog.d/10079.bugfix (limited to 'synapse') diff --git a/changelog.d/10079.bugfix b/changelog.d/10079.bugfix new file mode 100644 index 0000000000..2b93c4534a --- /dev/null +++ b/changelog.d/10079.bugfix @@ -0,0 +1 @@ +Fix a bug introduced in v1.35.0rc1 when calling the spaces summary API via a GET request. diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 9d50b05d01..40eab45549 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -1398,7 +1398,7 @@ class FederationSpaceSummaryServlet(BaseFederationServlet): ) return 200, await self.handler.federation_space_summary( - room_id, suggested_only, max_rooms_per_space, exclude_rooms + origin, room_id, suggested_only, max_rooms_per_space, exclude_rooms ) # TODO When switching to the stable endpoint, remove the POST handler. -- cgit 1.5.1 From 78b5102ae71f828deb851eca8e677381710bf716 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 27 May 2021 14:32:31 +0100 Subject: Fix up `BatchingQueue` (#10078) Fixes #10068 --- changelog.d/10078.misc | 1 + synapse/util/batching_queue.py | 70 ++++++++++++++++++++++++----------- tests/util/test_batching_queue.py | 78 ++++++++++++++++++++++++++++++++++++++- 3 files changed, 125 insertions(+), 24 deletions(-) create mode 100644 changelog.d/10078.misc (limited to 'synapse') diff --git a/changelog.d/10078.misc b/changelog.d/10078.misc new file mode 100644 index 0000000000..a4b089d0fd --- /dev/null +++ b/changelog.d/10078.misc @@ -0,0 +1 @@ +Fix up `BatchingQueue` implementation. diff --git a/synapse/util/batching_queue.py b/synapse/util/batching_queue.py index 44bbb7b1a8..8fd5bfb69b 100644 --- a/synapse/util/batching_queue.py +++ b/synapse/util/batching_queue.py @@ -25,10 +25,11 @@ from typing import ( TypeVar, ) +from prometheus_client import Gauge + from twisted.internet import defer from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable -from synapse.metrics import LaterGauge from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util import Clock @@ -38,6 +39,24 @@ logger = logging.getLogger(__name__) V = TypeVar("V") R = TypeVar("R") +number_queued = Gauge( + "synapse_util_batching_queue_number_queued", + "The number of items waiting in the queue across all keys", + labelnames=("name",), +) + +number_in_flight = Gauge( + "synapse_util_batching_queue_number_pending", + "The number of items across all keys either being processed or waiting in a queue", + labelnames=("name",), +) + +number_of_keys = Gauge( + "synapse_util_batching_queue_number_of_keys", + "The number of distinct keys that have items queued", + labelnames=("name",), +) + class BatchingQueue(Generic[V, R]): """A queue that batches up work, calling the provided processing function @@ -48,10 +67,20 @@ class BatchingQueue(Generic[V, R]): called, and will keep being called until the queue has been drained (for the given key). + If the processing function raises an exception then the exception is proxied + through to the callers waiting on that batch of work. + Note that the return value of `add_to_queue` will be the return value of the processing function that processed the given item. This means that the returned value will likely include data for other items that were in the batch. + + Args: + name: A name for the queue, used for logging contexts and metrics. + This must be unique, otherwise the metrics will be wrong. + clock: The clock to use to schedule work. + process_batch_callback: The callback to to be run to process a batch of + work. """ def __init__( @@ -73,19 +102,15 @@ class BatchingQueue(Generic[V, R]): # The function to call with batches of values. self._process_batch_callback = process_batch_callback - LaterGauge( - "synapse_util_batching_queue_number_queued", - "The number of items waiting in the queue across all keys", - labels=("name",), - caller=lambda: sum(len(v) for v in self._next_values.values()), + number_queued.labels(self._name).set_function( + lambda: sum(len(q) for q in self._next_values.values()) ) - LaterGauge( - "synapse_util_batching_queue_number_of_keys", - "The number of distinct keys that have items queued", - labels=("name",), - caller=lambda: len(self._next_values), - ) + number_of_keys.labels(self._name).set_function(lambda: len(self._next_values)) + + self._number_in_flight_metric = number_in_flight.labels( + self._name + ) # type: Gauge async def add_to_queue(self, value: V, key: Hashable = ()) -> R: """Adds the value to the queue with the given key, returning the result @@ -107,17 +132,18 @@ class BatchingQueue(Generic[V, R]): if key not in self._processing_keys: run_as_background_process(self._name, self._process_queue, key) - return await make_deferred_yieldable(d) + with self._number_in_flight_metric.track_inprogress(): + return await make_deferred_yieldable(d) async def _process_queue(self, key: Hashable) -> None: """A background task to repeatedly pull things off the queue for the given key and call the `self._process_batch_callback` with the values. """ - try: - if key in self._processing_keys: - return + if key in self._processing_keys: + return + try: self._processing_keys.add(key) while True: @@ -137,16 +163,16 @@ class BatchingQueue(Generic[V, R]): values = [value for value, _ in next_values] results = await self._process_batch_callback(values) - for _, deferred in next_values: - with PreserveLoggingContext(): + with PreserveLoggingContext(): + for _, deferred in next_values: deferred.callback(results) except Exception as e: - for _, deferred in next_values: - if deferred.called: - continue + with PreserveLoggingContext(): + for _, deferred in next_values: + if deferred.called: + continue - with PreserveLoggingContext(): deferred.errback(e) finally: diff --git a/tests/util/test_batching_queue.py b/tests/util/test_batching_queue.py index 5def1e56c9..edf29e5b96 100644 --- a/tests/util/test_batching_queue.py +++ b/tests/util/test_batching_queue.py @@ -14,7 +14,12 @@ from twisted.internet import defer from synapse.logging.context import make_deferred_yieldable -from synapse.util.batching_queue import BatchingQueue +from synapse.util.batching_queue import ( + BatchingQueue, + number_in_flight, + number_of_keys, + number_queued, +) from tests.server import get_clock from tests.unittest import TestCase @@ -24,6 +29,14 @@ class BatchingQueueTestCase(TestCase): def setUp(self): self.clock, hs_clock = get_clock() + # We ensure that we remove any existing metrics for "test_queue". + try: + number_queued.remove("test_queue") + number_of_keys.remove("test_queue") + number_in_flight.remove("test_queue") + except KeyError: + pass + self._pending_calls = [] self.queue = BatchingQueue("test_queue", hs_clock, self._process_queue) @@ -32,6 +45,41 @@ class BatchingQueueTestCase(TestCase): self._pending_calls.append((values, d)) return await make_deferred_yieldable(d) + def _assert_metrics(self, queued, keys, in_flight): + """Assert that the metrics are correct""" + + self.assertEqual(len(number_queued.collect()), 1) + self.assertEqual(len(number_queued.collect()[0].samples), 1) + self.assertEqual( + number_queued.collect()[0].samples[0].labels, + {"name": self.queue._name}, + ) + self.assertEqual( + number_queued.collect()[0].samples[0].value, + queued, + "number_queued", + ) + + self.assertEqual(len(number_of_keys.collect()), 1) + self.assertEqual(len(number_of_keys.collect()[0].samples), 1) + self.assertEqual( + number_queued.collect()[0].samples[0].labels, {"name": self.queue._name} + ) + self.assertEqual( + number_of_keys.collect()[0].samples[0].value, keys, "number_of_keys" + ) + + self.assertEqual(len(number_in_flight.collect()), 1) + self.assertEqual(len(number_in_flight.collect()[0].samples), 1) + self.assertEqual( + number_queued.collect()[0].samples[0].labels, {"name": self.queue._name} + ) + self.assertEqual( + number_in_flight.collect()[0].samples[0].value, + in_flight, + "number_in_flight", + ) + def test_simple(self): """Tests the basic case of calling `add_to_queue` once and having `_process_queue` return. @@ -41,6 +89,8 @@ class BatchingQueueTestCase(TestCase): queue_d = defer.ensureDeferred(self.queue.add_to_queue("foo")) + self._assert_metrics(queued=1, keys=1, in_flight=1) + # The queue should wait a reactor tick before calling the processing # function. self.assertFalse(self._pending_calls) @@ -52,12 +102,15 @@ class BatchingQueueTestCase(TestCase): self.assertEqual(len(self._pending_calls), 1) self.assertEqual(self._pending_calls[0][0], ["foo"]) self.assertFalse(queue_d.called) + self._assert_metrics(queued=0, keys=0, in_flight=1) # Return value of the `_process_queue` should be propagated back. self._pending_calls.pop()[1].callback("bar") self.assertEqual(self.successResultOf(queue_d), "bar") + self._assert_metrics(queued=0, keys=0, in_flight=0) + def test_batching(self): """Test that multiple calls at the same time get batched up into one call to `_process_queue`. @@ -68,6 +121,8 @@ class BatchingQueueTestCase(TestCase): queue_d1 = defer.ensureDeferred(self.queue.add_to_queue("foo1")) queue_d2 = defer.ensureDeferred(self.queue.add_to_queue("foo2")) + self._assert_metrics(queued=2, keys=1, in_flight=2) + self.clock.pump([0]) # We should see only *one* call to `_process_queue` @@ -75,12 +130,14 @@ class BatchingQueueTestCase(TestCase): self.assertEqual(self._pending_calls[0][0], ["foo1", "foo2"]) self.assertFalse(queue_d1.called) self.assertFalse(queue_d2.called) + self._assert_metrics(queued=0, keys=0, in_flight=2) # Return value of the `_process_queue` should be propagated back to both. self._pending_calls.pop()[1].callback("bar") self.assertEqual(self.successResultOf(queue_d1), "bar") self.assertEqual(self.successResultOf(queue_d2), "bar") + self._assert_metrics(queued=0, keys=0, in_flight=0) def test_queuing(self): """Test that we queue up requests while a `_process_queue` is being @@ -92,13 +149,20 @@ class BatchingQueueTestCase(TestCase): queue_d1 = defer.ensureDeferred(self.queue.add_to_queue("foo1")) self.clock.pump([0]) + self.assertEqual(len(self._pending_calls), 1) + + # We queue up work after the process function has been called, testing + # that they get correctly queued up. queue_d2 = defer.ensureDeferred(self.queue.add_to_queue("foo2")) + queue_d3 = defer.ensureDeferred(self.queue.add_to_queue("foo3")) # We should see only *one* call to `_process_queue` self.assertEqual(len(self._pending_calls), 1) self.assertEqual(self._pending_calls[0][0], ["foo1"]) self.assertFalse(queue_d1.called) self.assertFalse(queue_d2.called) + self.assertFalse(queue_d3.called) + self._assert_metrics(queued=2, keys=1, in_flight=3) # Return value of the `_process_queue` should be propagated back to the # first. @@ -106,18 +170,24 @@ class BatchingQueueTestCase(TestCase): self.assertEqual(self.successResultOf(queue_d1), "bar1") self.assertFalse(queue_d2.called) + self.assertFalse(queue_d3.called) + self._assert_metrics(queued=2, keys=1, in_flight=2) # We should now see a second call to `_process_queue` self.clock.pump([0]) self.assertEqual(len(self._pending_calls), 1) - self.assertEqual(self._pending_calls[0][0], ["foo2"]) + self.assertEqual(self._pending_calls[0][0], ["foo2", "foo3"]) self.assertFalse(queue_d2.called) + self.assertFalse(queue_d3.called) + self._assert_metrics(queued=0, keys=0, in_flight=2) # Return value of the `_process_queue` should be propagated back to the # second. self._pending_calls.pop()[1].callback("bar2") self.assertEqual(self.successResultOf(queue_d2), "bar2") + self.assertEqual(self.successResultOf(queue_d3), "bar2") + self._assert_metrics(queued=0, keys=0, in_flight=0) def test_different_keys(self): """Test that calls to different keys get processed in parallel.""" @@ -140,6 +210,7 @@ class BatchingQueueTestCase(TestCase): self.assertFalse(queue_d1.called) self.assertFalse(queue_d2.called) self.assertFalse(queue_d3.called) + self._assert_metrics(queued=1, keys=1, in_flight=3) # Return value of the `_process_queue` should be propagated back to the # first. @@ -148,6 +219,7 @@ class BatchingQueueTestCase(TestCase): self.assertEqual(self.successResultOf(queue_d1), "bar1") self.assertFalse(queue_d2.called) self.assertFalse(queue_d3.called) + self._assert_metrics(queued=1, keys=1, in_flight=2) # Return value of the `_process_queue` should be propagated back to the # second. @@ -161,9 +233,11 @@ class BatchingQueueTestCase(TestCase): self.assertEqual(len(self._pending_calls), 1) self.assertEqual(self._pending_calls[0][0], ["foo3"]) self.assertFalse(queue_d3.called) + self._assert_metrics(queued=0, keys=0, in_flight=1) # Return value of the `_process_queue` should be propagated back to the # third deferred. self._pending_calls.pop()[1].callback("bar4") self.assertEqual(self.successResultOf(queue_d3), "bar4") + self._assert_metrics(queued=0, keys=0, in_flight=0) -- cgit 1.5.1 From b1bc26a909f4f9af137e72ab27952a8d2dfc1cb3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 27 May 2021 14:46:24 +0100 Subject: 1.35.0rc2 --- CHANGES.md | 9 +++++++++ changelog.d/10079.bugfix | 1 - synapse/__init__.py | 2 +- 3 files changed, 10 insertions(+), 2 deletions(-) delete mode 100644 changelog.d/10079.bugfix (limited to 'synapse') diff --git a/CHANGES.md b/CHANGES.md index 0e451f983c..1fac16580d 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,12 @@ +Synapse 1.35.0rc2 (2021-05-27) +============================== + +Bugfixes +-------- + +- Fix a bug introduced in v1.35.0rc1 when calling the spaces summary API via a GET request. ([\#10079](https://github.com/matrix-org/synapse/issues/10079)) + + Synapse 1.35.0rc1 (2021-05-25) ============================== diff --git a/changelog.d/10079.bugfix b/changelog.d/10079.bugfix deleted file mode 100644 index 2b93c4534a..0000000000 --- a/changelog.d/10079.bugfix +++ /dev/null @@ -1 +0,0 @@ -Fix a bug introduced in v1.35.0rc1 when calling the spaces summary API via a GET request. diff --git a/synapse/__init__.py b/synapse/__init__.py index e60e9db71e..6faf31dbbc 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -47,7 +47,7 @@ try: except ImportError: pass -__version__ = "1.35.0rc1" +__version__ = "1.35.0rc2" if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)): # We import here so that we don't have to install a bunch of deps when -- cgit 1.5.1 From f828a70be331105c98ebfbe3738ef57d9d54df5b Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 27 May 2021 18:10:58 +0200 Subject: Limit the number of events sent over replication when persisting events. (#10082) --- changelog.d/10082.bugfix | 1 + synapse/handlers/federation.py | 17 ++++++++++------- 2 files changed, 11 insertions(+), 7 deletions(-) create mode 100644 changelog.d/10082.bugfix (limited to 'synapse') diff --git a/changelog.d/10082.bugfix b/changelog.d/10082.bugfix new file mode 100644 index 0000000000..b4f8bcc4fa --- /dev/null +++ b/changelog.d/10082.bugfix @@ -0,0 +1 @@ +Fixed a bug causing replication requests to fail when receiving a lot of events via federation. diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 678f6b7707..bf11315251 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -91,6 +91,7 @@ from synapse.types import ( get_domain_from_id, ) from synapse.util.async_helpers import Linearizer, concurrently_execute +from synapse.util.iterutils import batch_iter from synapse.util.retryutils import NotRetryingDestination from synapse.util.stringutils import shortstr from synapse.visibility import filter_events_for_server @@ -3053,13 +3054,15 @@ class FederationHandler(BaseHandler): """ instance = self.config.worker.events_shard_config.get_instance(room_id) if instance != self._instance_name: - result = await self._send_events( - instance_name=instance, - store=self.store, - room_id=room_id, - event_and_contexts=event_and_contexts, - backfilled=backfilled, - ) + # Limit the number of events sent over federation. + for batch in batch_iter(event_and_contexts, 1000): + result = await self._send_events( + instance_name=instance, + store=self.store, + room_id=room_id, + event_and_contexts=batch, + backfilled=backfilled, + ) return result["max_stream_id"] else: assert self.storage.persistence -- cgit 1.5.1