From b3e843be88d67633d11711ecc80d4e0390b1e723 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 27 Oct 2021 10:48:02 -0400 Subject: Fix URL preview errors when previewing XML documents. (#11196) --- changelog.d/11196.bugfix | 1 + synapse/rest/media/v1/preview_url_resource.py | 9 ++++++--- tests/test_preview.py | 15 +++++++++++++++ 3 files changed, 22 insertions(+), 3 deletions(-) create mode 100644 changelog.d/11196.bugfix diff --git a/changelog.d/11196.bugfix b/changelog.d/11196.bugfix new file mode 100644 index 0000000000..3861eeb908 --- /dev/null +++ b/changelog.d/11196.bugfix @@ -0,0 +1 @@ +Fix a bug introduced in v1.46.0rc1 where URL previews of some XML documents would fail. diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index 278fd901e2..8ca97b5b18 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -718,9 +718,12 @@ def decode_body( if not body: return None + # The idea here is that multiple encodings are tried until one works. + # Unfortunately the result is never used and then LXML will decode the string + # again with the found encoding. for encoding in get_html_media_encodings(body, content_type): try: - body_str = body.decode(encoding) + body.decode(encoding) except Exception: pass else: @@ -732,11 +735,11 @@ def decode_body( from lxml import etree # Create an HTML parser. - parser = etree.HTMLParser(recover=True, encoding="utf-8") + parser = etree.HTMLParser(recover=True, encoding=encoding) # Attempt to parse the body. Returns None if the body was successfully # parsed, but no tree was found. - return etree.fromstring(body_str, parser) + return etree.fromstring(body, parser) def _calc_og(tree: "etree.Element", media_uri: str) -> Dict[str, Optional[str]]: diff --git a/tests/test_preview.py b/tests/test_preview.py index 9a576f9a4e..40b89fb2ef 100644 --- a/tests/test_preview.py +++ b/tests/test_preview.py @@ -277,6 +277,21 @@ class CalcOgTestCase(unittest.TestCase): tree = decode_body(html, "http://example.com/test.html") self.assertIsNone(tree) + def test_xml(self): + """Test decoding XML and ensure it works properly.""" + # Note that the strip() call is important to ensure the xml tag starts + # at the initial byte. + html = b""" + + + + + FooSome text. + """.strip() + tree = decode_body(html, "http://example.com/test.html") + og = _calc_og(tree, "http://example.com/test.html") + self.assertEqual(og, {"og:title": "Foo", "og:description": "Some text."}) + def test_invalid_encoding(self): """An invalid character encoding should be ignored and treated as UTF-8, if possible.""" html = b""" -- cgit 1.4.1 From 576921c66a35fa1023f7e9baf97b6304ff463549 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Wed, 27 Oct 2021 18:06:32 +0200 Subject: Force deb compression with `xz`. (#11197) Fixes a problem where `impish` packages could not be processed by `reprepro`. --- debian/changelog | 6 ++++++ debian/rules | 6 ++++++ 2 files changed, 12 insertions(+) diff --git a/debian/changelog b/debian/changelog index ea96676f74..c2ea5d2cfb 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,9 @@ +matrix-synapse-py3 (1.46.0~rc1ubuntu1) UNRELEASED; urgency=medium + + * Compress debs with xz, to fix incompatibility of impish debs with reprepro. + + -- Richard van der Hoff Wed, 27 Oct 2021 15:32:51 +0100 + matrix-synapse-py3 (1.46.0~rc1) stable; urgency=medium * New synapse release 1.46.0~rc1. diff --git a/debian/rules b/debian/rules index b9d490adc9..5baf2475f0 100755 --- a/debian/rules +++ b/debian/rules @@ -51,5 +51,11 @@ override_dh_shlibdeps: override_dh_virtualenv: ./debian/build_virtualenv +override_dh_builddeb: + # force the compression to xzip, to stop dpkg-deb on impish defaulting to zstd + # (which requires reprepro 5.3.0-1.3, which is currently only in 'experimental' in Debian: + # https://metadata.ftp-master.debian.org/changelogs/main/r/reprepro/reprepro_5.3.0-1.3_changelog) + dh_builddeb -- -Zxz + %: dh $@ --with python-virtualenv -- cgit 1.4.1 From 71f9966f2790c6b24281bb9f109bff28ff05d962 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Mon, 1 Nov 2021 15:10:16 +0000 Subject: Support for serving server well-known files (#11211) Fixes https://github.com/matrix-org/synapse/issues/8308 --- changelog.d/11211.feature | 1 + docs/delegate.md | 82 ++++++++++++++++++++++++------------------- docs/sample_config.yaml | 18 ++++++++++ synapse/app/generic_worker.py | 3 ++ synapse/app/homeserver.py | 4 +-- synapse/config/server.py | 19 ++++++++++ synapse/rest/well_known.py | 47 +++++++++++++++++++++++-- tests/rest/test_well_known.py | 32 +++++++++++++---- 8 files changed, 159 insertions(+), 47 deletions(-) create mode 100644 changelog.d/11211.feature diff --git a/changelog.d/11211.feature b/changelog.d/11211.feature new file mode 100644 index 0000000000..feeb0cf089 --- /dev/null +++ b/changelog.d/11211.feature @@ -0,0 +1 @@ +Add support for serving `/.well-known/matrix/server` files, to redirect federation traffic to port 443. diff --git a/docs/delegate.md b/docs/delegate.md index f3f89075d1..ee9cbb3b1c 100644 --- a/docs/delegate.md +++ b/docs/delegate.md @@ -1,4 +1,8 @@ -# Delegation +# Delegation of incoming federation traffic + +In the following documentation, we use the term `server_name` to refer to that setting +in your homeserver configuration file. It appears at the ends of user ids, and tells +other homeservers where they can find your server. By default, other homeservers will expect to be able to reach yours via your `server_name`, on port 8448. For example, if you set your `server_name` @@ -12,13 +16,21 @@ to a different server and/or port (e.g. `synapse.example.com:443`). ## .well-known delegation -To use this method, you need to be able to alter the -`server_name` 's https server to serve the `/.well-known/matrix/server` -URL. Having an active server (with a valid TLS certificate) serving your -`server_name` domain is out of the scope of this documentation. +To use this method, you need to be able to configure the server at +`https://` to serve a file at +`https:///.well-known/matrix/server`. There are two ways to do this, shown below. + +Note that the `.well-known` file is hosted on the default port for `https` (port 443). + +### External server + +For maximum flexibility, you need to configure an external server such as nginx, Apache +or HAProxy to serve the `https:///.well-known/matrix/server` file. Setting +up such a server is out of the scope of this documentation, but note that it is often +possible to configure your [reverse proxy](reverse_proxy.md) for this. -The URL `https:///.well-known/matrix/server` should -return a JSON structure containing the key `m.server` like so: +The URL `https:///.well-known/matrix/server` should be configured +return a JSON structure containing the key `m.server` like this: ```json { @@ -26,8 +38,9 @@ return a JSON structure containing the key `m.server` like so: } ``` -In our example, this would mean that URL `https://example.com/.well-known/matrix/server` -should return: +In our example (where we want federation traffic to be routed to +`https://synapse.example.com`, on port 443), this would mean that +`https://example.com/.well-known/matrix/server` should return: ```json { @@ -38,16 +51,29 @@ should return: Note, specifying a port is optional. If no port is specified, then it defaults to 8448. -With .well-known delegation, federating servers will check for a valid TLS -certificate for the delegated hostname (in our example: `synapse.example.com`). +### Serving a `.well-known/matrix/server` file with Synapse + +If you are able to set up your domain so that `https://` is routed to +Synapse (i.e., the only change needed is to direct federation traffic to port 443 +instead of port 8448), then it is possible to configure Synapse to serve a suitable +`.well-known/matrix/server` file. To do so, add the following to your `homeserver.yaml` +file: + +```yaml +serve_server_wellknown: true +``` + +**Note**: this *only* works if `https://` is routed to Synapse, so is +generally not suitable if Synapse is hosted at a subdomain such as +`https://synapse.example.com`. ## SRV DNS record delegation -It is also possible to do delegation using a SRV DNS record. However, that is -considered an advanced topic since it's a bit complex to set up, and `.well-known` -delegation is already enough in most cases. +It is also possible to do delegation using a SRV DNS record. However, that is generally +not recommended, as it can be difficult to configure the TLS certificates correctly in +this case, and it offers little advantage over `.well-known` delegation. -However, if you really need it, you can find some documentation on how such a +However, if you really need it, you can find some documentation on what such a record should look like and how Synapse will use it in [the Matrix specification](https://matrix.org/docs/spec/server_server/latest#resolving-server-names). @@ -68,27 +94,9 @@ wouldn't need any delegation set up. domain `server_name` points to, you will need to let other servers know how to find it using delegation. -### Do you still recommend against using a reverse proxy on the federation port? - -We no longer actively recommend against using a reverse proxy. Many admins will -find it easier to direct federation traffic to a reverse proxy and manage their -own TLS certificates, and this is a supported configuration. +### Should I use a reverse proxy for federation traffic? -See [the reverse proxy documentation](reverse_proxy.md) for information on setting up a +Generally, using a reverse proxy for both the federation and client traffic is a good +idea, since it saves handling TLS traffic in Synapse. See +[the reverse proxy documentation](reverse_proxy.md) for information on setting up a reverse proxy. - -### Do I still need to give my TLS certificates to Synapse if I am using a reverse proxy? - -This is no longer necessary. If you are using a reverse proxy for all of your -TLS traffic, then you can set `no_tls: True` in the Synapse config. - -In that case, the only reason Synapse needs the certificate is to populate a legacy -`tls_fingerprints` field in the federation API. This is ignored by Synapse 0.99.0 -and later, and the only time pre-0.99 Synapses will check it is when attempting to -fetch the server keys - and generally this is delegated via `matrix.org`, which -is running a modern version of Synapse. - -### Do I need the same certificate for the client and federation port? - -No. There is nothing stopping you from using different certificates, -particularly if you are using a reverse proxy. diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml index b90ed62d61..c3a4148f74 100644 --- a/docs/sample_config.yaml +++ b/docs/sample_config.yaml @@ -93,6 +93,24 @@ pid_file: DATADIR/homeserver.pid # #public_baseurl: https://example.com/ +# Uncomment the following to tell other servers to send federation traffic on +# port 443. +# +# By default, other servers will try to reach our server on port 8448, which can +# be inconvenient in some environments. +# +# Provided 'https:///' on port 443 is routed to Synapse, this +# option configures Synapse to serve a file at +# 'https:///.well-known/matrix/server'. This will tell other +# servers to send traffic to port 443 instead. +# +# See https://matrix-org.github.io/synapse/latest/delegate.html for more +# information. +# +# Defaults to 'false'. +# +#serve_server_wellknown: true + # Set the soft limit on the number of file descriptors synapse can use # Zero is used to indicate synapse should set the soft limit to the # hard limit. diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 51eadf122d..218826741e 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -100,6 +100,7 @@ from synapse.rest.client.register import ( from synapse.rest.health import HealthResource from synapse.rest.key.v2 import KeyApiV2Resource from synapse.rest.synapse.client import build_synapse_client_resource_tree +from synapse.rest.well_known import well_known_resource from synapse.server import HomeServer from synapse.storage.databases.main.censor_events import CensorEventsStore from synapse.storage.databases.main.client_ips import ClientIpWorkerStore @@ -318,6 +319,8 @@ class GenericWorkerServer(HomeServer): resources.update({CLIENT_API_PREFIX: resource}) resources.update(build_synapse_client_resource_tree(self)) + resources.update({"/.well-known": well_known_resource(self)}) + elif name == "federation": resources.update({FEDERATION_PREFIX: TransportLayerServer(self)}) elif name == "media": diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 93e2299266..336c279a44 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -66,7 +66,7 @@ from synapse.rest.admin import AdminRestResource from synapse.rest.health import HealthResource from synapse.rest.key.v2 import KeyApiV2Resource from synapse.rest.synapse.client import build_synapse_client_resource_tree -from synapse.rest.well_known import WellKnownResource +from synapse.rest.well_known import well_known_resource from synapse.server import HomeServer from synapse.storage import DataStore from synapse.util.httpresourcetree import create_resource_tree @@ -189,7 +189,7 @@ class SynapseHomeServer(HomeServer): "/_matrix/client/unstable": client_resource, "/_matrix/client/v2_alpha": client_resource, "/_matrix/client/versions": client_resource, - "/.well-known/matrix/client": WellKnownResource(self), + "/.well-known": well_known_resource(self), "/_synapse/admin": AdminRestResource(self), **build_synapse_client_resource_tree(self), } diff --git a/synapse/config/server.py b/synapse/config/server.py index ed094bdc44..a387fd9310 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -262,6 +262,7 @@ class ServerConfig(Config): self.print_pidfile = config.get("print_pidfile") self.user_agent_suffix = config.get("user_agent_suffix") self.use_frozen_dicts = config.get("use_frozen_dicts", False) + self.serve_server_wellknown = config.get("serve_server_wellknown", False) self.public_baseurl = config.get("public_baseurl") if self.public_baseurl is not None: @@ -774,6 +775,24 @@ class ServerConfig(Config): # #public_baseurl: https://example.com/ + # Uncomment the following to tell other servers to send federation traffic on + # port 443. + # + # By default, other servers will try to reach our server on port 8448, which can + # be inconvenient in some environments. + # + # Provided 'https:///' on port 443 is routed to Synapse, this + # option configures Synapse to serve a file at + # 'https:///.well-known/matrix/server'. This will tell other + # servers to send traffic to port 443 instead. + # + # See https://matrix-org.github.io/synapse/latest/delegate.html for more + # information. + # + # Defaults to 'false'. + # + #serve_server_wellknown: true + # Set the soft limit on the number of file descriptors synapse can use # Zero is used to indicate synapse should set the soft limit to the # hard limit. diff --git a/synapse/rest/well_known.py b/synapse/rest/well_known.py index 7ac01faab4..edbf5ce5d0 100644 --- a/synapse/rest/well_known.py +++ b/synapse/rest/well_known.py @@ -21,6 +21,7 @@ from twisted.web.server import Request from synapse.http.server import set_cors_headers from synapse.types import JsonDict from synapse.util import json_encoder +from synapse.util.stringutils import parse_server_name if TYPE_CHECKING: from synapse.server import HomeServer @@ -47,8 +48,8 @@ class WellKnownBuilder: return result -class WellKnownResource(Resource): - """A Twisted web resource which renders the .well-known file""" +class ClientWellKnownResource(Resource): + """A Twisted web resource which renders the .well-known/matrix/client file""" isLeaf = 1 @@ -67,3 +68,45 @@ class WellKnownResource(Resource): logger.debug("returning: %s", r) request.setHeader(b"Content-Type", b"application/json") return json_encoder.encode(r).encode("utf-8") + + +class ServerWellKnownResource(Resource): + """Resource for .well-known/matrix/server, redirecting to port 443""" + + isLeaf = 1 + + def __init__(self, hs: "HomeServer"): + super().__init__() + self._serve_server_wellknown = hs.config.server.serve_server_wellknown + + host, port = parse_server_name(hs.config.server.server_name) + + # If we've got this far, then https:/// must route to us, so + # we just redirect the traffic to port 443 instead of 8448. + if port is None: + port = 443 + + self._response = json_encoder.encode({"m.server": f"{host}:{port}"}).encode( + "utf-8" + ) + + def render_GET(self, request: Request) -> bytes: + if not self._serve_server_wellknown: + request.setResponseCode(404) + request.setHeader(b"Content-Type", b"text/plain") + return b"404. Is anything ever truly *well* known?\n" + + request.setHeader(b"Content-Type", b"application/json") + return self._response + + +def well_known_resource(hs: "HomeServer") -> Resource: + """Returns a Twisted web resource which handles '.well-known' requests""" + res = Resource() + matrix_resource = Resource() + res.putChild(b"matrix", matrix_resource) + + matrix_resource.putChild(b"server", ServerWellKnownResource(hs)) + matrix_resource.putChild(b"client", ClientWellKnownResource(hs)) + + return res diff --git a/tests/rest/test_well_known.py b/tests/rest/test_well_known.py index b2c0279ba0..118aa93a32 100644 --- a/tests/rest/test_well_known.py +++ b/tests/rest/test_well_known.py @@ -11,17 +11,19 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from twisted.web.resource import Resource - -from synapse.rest.well_known import WellKnownResource +from synapse.rest.well_known import well_known_resource from tests import unittest class WellKnownTests(unittest.HomeserverTestCase): def create_test_resource(self): - # replace the JsonResource with a WellKnownResource - return WellKnownResource(self.hs) + # replace the JsonResource with a Resource wrapping the WellKnownResource + res = Resource() + res.putChild(b".well-known", well_known_resource(self.hs)) + return res @unittest.override_config( { @@ -29,7 +31,7 @@ class WellKnownTests(unittest.HomeserverTestCase): "default_identity_server": "https://testis", } ) - def test_well_known(self): + def test_client_well_known(self): channel = self.make_request( "GET", "/.well-known/matrix/client", shorthand=False ) @@ -48,9 +50,27 @@ class WellKnownTests(unittest.HomeserverTestCase): "public_baseurl": None, } ) - def test_well_known_no_public_baseurl(self): + def test_client_well_known_no_public_baseurl(self): channel = self.make_request( "GET", "/.well-known/matrix/client", shorthand=False ) self.assertEqual(channel.code, 404) + + @unittest.override_config({"serve_server_wellknown": True}) + def test_server_well_known(self): + channel = self.make_request( + "GET", "/.well-known/matrix/server", shorthand=False + ) + + self.assertEqual(channel.code, 200) + self.assertEqual( + channel.json_body, + {"m.server": "test:443"}, + ) + + def test_server_well_known_disabled(self): + channel = self.make_request( + "GET", "/.well-known/matrix/server", shorthand=False + ) + self.assertEqual(channel.code, 404) -- cgit 1.4.1 From 66bdca3e317d1fa764cf52547aee7409acc59676 Mon Sep 17 00:00:00 2001 From: Dirk Klimpel <5740567+dklimpel@users.noreply.github.com> Date: Mon, 1 Nov 2021 16:11:24 +0100 Subject: Remove deprecated delete room admin API (#11213) Remove deprecated delete room admin API, `POST /_synapse/admin/v1/rooms//delete` --- changelog.d/11213.removal | 1 + docs/admin_api/rooms.md | 10 --- docs/upgrade.md | 10 +++ synapse/rest/admin/__init__.py | 2 - synapse/rest/admin/rooms.py | 141 ++++++++++++++++------------------------- tests/rest/admin/test_room.py | 39 +++++------- 6 files changed, 79 insertions(+), 124 deletions(-) create mode 100644 changelog.d/11213.removal diff --git a/changelog.d/11213.removal b/changelog.d/11213.removal new file mode 100644 index 0000000000..9e5ec936e3 --- /dev/null +++ b/changelog.d/11213.removal @@ -0,0 +1 @@ +Remove deprecated admin API to delete rooms (`POST /_synapse/admin/v1/rooms//delete`). \ No newline at end of file diff --git a/docs/admin_api/rooms.md b/docs/admin_api/rooms.md index acf1cab2a2..62eeff9e1a 100644 --- a/docs/admin_api/rooms.md +++ b/docs/admin_api/rooms.md @@ -520,16 +520,6 @@ With all that being said, if you still want to try and recover the room: 4. If `new_room_user_id` was given, a 'Content Violation' will have been created. Consider whether you want to delete that roomm. -## Deprecated endpoint - -The previous deprecated API will be removed in a future release, it was: - -``` -POST /_synapse/admin/v1/rooms//delete -``` - -It behaves the same way than the current endpoint except the path and the method. - # Make Room Admin API Grants another user the highest power available to a local user who is in the room. diff --git a/docs/upgrade.md b/docs/upgrade.md index 06f479f86c..136c806c41 100644 --- a/docs/upgrade.md +++ b/docs/upgrade.md @@ -87,6 +87,16 @@ process, for example: # Upgrading to v1.47.0 +## Removal of old Room Admin API + +The following admin APIs were deprecated in [Synapse 1.34](https://github.com/matrix-org/synapse/blob/v1.34.0/CHANGES.md#deprecations-and-removals) +(released on 2021-05-17) and have now been removed: + +- `POST /_synapse/admin/v1//delete` + +Any scripts still using the above APIs should be converted to use the +[Delete Room API](https://matrix-org.github.io/synapse/latest/admin_api/rooms.html#delete-room-api). + ## Deprecation of the `user_may_create_room_with_invites` module callback The `user_may_create_room_with_invites` is deprecated and will be removed in a future diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py index e1506deb2b..70514e814f 100644 --- a/synapse/rest/admin/__init__.py +++ b/synapse/rest/admin/__init__.py @@ -42,7 +42,6 @@ from synapse.rest.admin.registration_tokens import ( RegistrationTokenRestServlet, ) from synapse.rest.admin.rooms import ( - DeleteRoomRestServlet, ForwardExtremitiesRestServlet, JoinRoomAliasServlet, ListRoomRestServlet, @@ -221,7 +220,6 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: RoomStateRestServlet(hs).register(http_server) RoomRestServlet(hs).register(http_server) RoomMembersRestServlet(hs).register(http_server) - DeleteRoomRestServlet(hs).register(http_server) JoinRoomAliasServlet(hs).register(http_server) VersionServlet(hs).register(http_server) UserAdminServlet(hs).register(http_server) diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py index a4823ca6e7..05c5b4bf0c 100644 --- a/synapse/rest/admin/rooms.py +++ b/synapse/rest/admin/rooms.py @@ -46,41 +46,6 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) -class DeleteRoomRestServlet(RestServlet): - """Delete a room from server. - - It is a combination and improvement of shutdown and purge room. - - Shuts down a room by removing all local users from the room. - Blocking all future invites and joins to the room is optional. - - If desired any local aliases will be repointed to a new room - created by `new_room_user_id` and kicked users will be auto- - joined to the new room. - - If 'purge' is true, it will remove all traces of a room from the database. - """ - - PATTERNS = admin_patterns("/rooms/(?P[^/]+)/delete$") - - def __init__(self, hs: "HomeServer"): - self.hs = hs - self.auth = hs.get_auth() - self.room_shutdown_handler = hs.get_room_shutdown_handler() - self.pagination_handler = hs.get_pagination_handler() - - async def on_POST( - self, request: SynapseRequest, room_id: str - ) -> Tuple[int, JsonDict]: - return await _delete_room( - request, - room_id, - self.auth, - self.room_shutdown_handler, - self.pagination_handler, - ) - - class ListRoomRestServlet(RestServlet): """ List all rooms that are known to the homeserver. Results are returned @@ -218,7 +183,7 @@ class RoomRestServlet(RestServlet): async def on_DELETE( self, request: SynapseRequest, room_id: str ) -> Tuple[int, JsonDict]: - return await _delete_room( + return await self._delete_room( request, room_id, self.auth, @@ -226,6 +191,58 @@ class RoomRestServlet(RestServlet): self.pagination_handler, ) + async def _delete_room( + self, + request: SynapseRequest, + room_id: str, + auth: "Auth", + room_shutdown_handler: "RoomShutdownHandler", + pagination_handler: "PaginationHandler", + ) -> Tuple[int, JsonDict]: + requester = await auth.get_user_by_req(request) + await assert_user_is_admin(auth, requester.user) + + content = parse_json_object_from_request(request) + + block = content.get("block", False) + if not isinstance(block, bool): + raise SynapseError( + HTTPStatus.BAD_REQUEST, + "Param 'block' must be a boolean, if given", + Codes.BAD_JSON, + ) + + purge = content.get("purge", True) + if not isinstance(purge, bool): + raise SynapseError( + HTTPStatus.BAD_REQUEST, + "Param 'purge' must be a boolean, if given", + Codes.BAD_JSON, + ) + + force_purge = content.get("force_purge", False) + if not isinstance(force_purge, bool): + raise SynapseError( + HTTPStatus.BAD_REQUEST, + "Param 'force_purge' must be a boolean, if given", + Codes.BAD_JSON, + ) + + ret = await room_shutdown_handler.shutdown_room( + room_id=room_id, + new_room_user_id=content.get("new_room_user_id"), + new_room_name=content.get("room_name"), + message=content.get("message"), + requester_user_id=requester.user.to_string(), + block=block, + ) + + # Purge room + if purge: + await pagination_handler.purge_room(room_id, force=force_purge) + + return 200, ret + class RoomMembersRestServlet(RestServlet): """ @@ -617,55 +634,3 @@ class RoomEventContextServlet(RestServlet): ) return 200, results - - -async def _delete_room( - request: SynapseRequest, - room_id: str, - auth: "Auth", - room_shutdown_handler: "RoomShutdownHandler", - pagination_handler: "PaginationHandler", -) -> Tuple[int, JsonDict]: - requester = await auth.get_user_by_req(request) - await assert_user_is_admin(auth, requester.user) - - content = parse_json_object_from_request(request) - - block = content.get("block", False) - if not isinstance(block, bool): - raise SynapseError( - HTTPStatus.BAD_REQUEST, - "Param 'block' must be a boolean, if given", - Codes.BAD_JSON, - ) - - purge = content.get("purge", True) - if not isinstance(purge, bool): - raise SynapseError( - HTTPStatus.BAD_REQUEST, - "Param 'purge' must be a boolean, if given", - Codes.BAD_JSON, - ) - - force_purge = content.get("force_purge", False) - if not isinstance(force_purge, bool): - raise SynapseError( - HTTPStatus.BAD_REQUEST, - "Param 'force_purge' must be a boolean, if given", - Codes.BAD_JSON, - ) - - ret = await room_shutdown_handler.shutdown_room( - room_id=room_id, - new_room_user_id=content.get("new_room_user_id"), - new_room_name=content.get("room_name"), - message=content.get("message"), - requester_user_id=requester.user.to_string(), - block=block, - ) - - # Purge room - if purge: - await pagination_handler.purge_room(room_id, force=force_purge) - - return 200, ret diff --git a/tests/rest/admin/test_room.py b/tests/rest/admin/test_room.py index 0fa55e03b4..ba6db51c4c 100644 --- a/tests/rest/admin/test_room.py +++ b/tests/rest/admin/test_room.py @@ -17,8 +17,6 @@ import urllib.parse from typing import List, Optional from unittest.mock import Mock -from parameterized import parameterized_class - import synapse.rest.admin from synapse.api.constants import EventTypes, Membership from synapse.api.errors import Codes @@ -29,13 +27,6 @@ from tests import unittest """Tests admin REST events for /rooms paths.""" -@parameterized_class( - ("method", "url_template"), - [ - ("POST", "/_synapse/admin/v1/rooms/%s/delete"), - ("DELETE", "/_synapse/admin/v1/rooms/%s"), - ], -) class DeleteRoomTestCase(unittest.HomeserverTestCase): servlets = [ synapse.rest.admin.register_servlets, @@ -67,7 +58,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase): self.room_id = self.helper.create_room_as( self.other_user, tok=self.other_user_tok ) - self.url = self.url_template % self.room_id + self.url = "/_synapse/admin/v1/rooms/%s" % self.room_id def test_requester_is_no_admin(self): """ @@ -75,7 +66,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase): """ channel = self.make_request( - self.method, + "DELETE", self.url, json.dumps({}), access_token=self.other_user_tok, @@ -88,10 +79,10 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase): """ Check that unknown rooms/server return error 404. """ - url = self.url_template % "!unknown:test" + url = "/_synapse/admin/v1/rooms/%s" % "!unknown:test" channel = self.make_request( - self.method, + "DELETE", url, json.dumps({}), access_token=self.admin_user_tok, @@ -104,10 +95,10 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase): """ Check that invalid room names, return an error 400. """ - url = self.url_template % "invalidroom" + url = "/_synapse/admin/v1/rooms/%s" % "invalidroom" channel = self.make_request( - self.method, + "DELETE", url, json.dumps({}), access_token=self.admin_user_tok, @@ -126,7 +117,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase): body = json.dumps({"new_room_user_id": "@unknown:test"}) channel = self.make_request( - self.method, + "DELETE", self.url, content=body.encode(encoding="utf_8"), access_token=self.admin_user_tok, @@ -145,7 +136,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase): body = json.dumps({"new_room_user_id": "@not:exist.bla"}) channel = self.make_request( - self.method, + "DELETE", self.url, content=body.encode(encoding="utf_8"), access_token=self.admin_user_tok, @@ -164,7 +155,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase): body = json.dumps({"block": "NotBool"}) channel = self.make_request( - self.method, + "DELETE", self.url, content=body.encode(encoding="utf_8"), access_token=self.admin_user_tok, @@ -180,7 +171,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase): body = json.dumps({"purge": "NotBool"}) channel = self.make_request( - self.method, + "DELETE", self.url, content=body.encode(encoding="utf_8"), access_token=self.admin_user_tok, @@ -206,7 +197,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase): body = json.dumps({"block": True, "purge": True}) channel = self.make_request( - self.method, + "DELETE", self.url.encode("ascii"), content=body.encode(encoding="utf_8"), access_token=self.admin_user_tok, @@ -239,7 +230,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase): body = json.dumps({"block": False, "purge": True}) channel = self.make_request( - self.method, + "DELETE", self.url.encode("ascii"), content=body.encode(encoding="utf_8"), access_token=self.admin_user_tok, @@ -273,7 +264,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase): body = json.dumps({"block": False, "purge": False}) channel = self.make_request( - self.method, + "DELETE", self.url.encode("ascii"), content=body.encode(encoding="utf_8"), access_token=self.admin_user_tok, @@ -319,7 +310,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase): # Test that the admin can still send shutdown channel = self.make_request( - self.method, + "DELETE", self.url, json.dumps({"new_room_user_id": self.admin_user}), access_token=self.admin_user_tok, @@ -365,7 +356,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase): # Test that the admin can still send shutdown channel = self.make_request( - self.method, + "DELETE", self.url, json.dumps({"new_room_user_id": self.admin_user}), access_token=self.admin_user_tok, -- cgit 1.4.1 From 69ab3dddbc1595ee64c428df7a7f3c861a84b5b0 Mon Sep 17 00:00:00 2001 From: reivilibre Date: Mon, 1 Nov 2021 15:45:56 +0000 Subject: Make `check_event_allowed` module API callback not fail open (accept events) when an exception is raised (#11033) --- changelog.d/11033.bugfix | 1 + docs/modules/third_party_rules_callbacks.md | 8 ++++++++ synapse/api/errors.py | 7 +++++++ synapse/events/third_party_rules.py | 9 +++++---- tests/rest/client/test_third_party_rules.py | 16 +++------------- 5 files changed, 24 insertions(+), 17 deletions(-) create mode 100644 changelog.d/11033.bugfix diff --git a/changelog.d/11033.bugfix b/changelog.d/11033.bugfix new file mode 100644 index 0000000000..fa99f187b8 --- /dev/null +++ b/changelog.d/11033.bugfix @@ -0,0 +1 @@ +Do not accept events if a third-party rule module API callback raises an exception. diff --git a/docs/modules/third_party_rules_callbacks.md b/docs/modules/third_party_rules_callbacks.md index a16e272f79..a3a17096a8 100644 --- a/docs/modules/third_party_rules_callbacks.md +++ b/docs/modules/third_party_rules_callbacks.md @@ -43,6 +43,14 @@ event with new data by returning the new event's data as a dictionary. In order that, it is recommended the module calls `event.get_dict()` to get the current event as a dictionary, and modify the returned dictionary accordingly. +If `check_event_allowed` raises an exception, the module is assumed to have failed. +The event will not be accepted but is not treated as explicitly rejected, either. +An HTTP request causing the module check will likely result in a 500 Internal +Server Error. + +When the boolean returned by the module is `False`, the event is rejected. +(Module developers should not use exceptions for rejection.) + Note that replacing the event only works for events sent by local users, not for events received over federation. diff --git a/synapse/api/errors.py b/synapse/api/errors.py index 685d1c25cf..85302163da 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -596,3 +596,10 @@ class ShadowBanError(Exception): This should be caught and a proper "fake" success response sent to the user. """ + + +class ModuleFailedException(Exception): + """ + Raised when a module API callback fails, for example because it raised an + exception. + """ diff --git a/synapse/events/third_party_rules.py b/synapse/events/third_party_rules.py index 8816ef4b76..1bb8ca7145 100644 --- a/synapse/events/third_party_rules.py +++ b/synapse/events/third_party_rules.py @@ -14,7 +14,7 @@ import logging from typing import TYPE_CHECKING, Any, Awaitable, Callable, List, Optional, Tuple -from synapse.api.errors import SynapseError +from synapse.api.errors import ModuleFailedException, SynapseError from synapse.events import EventBase from synapse.events.snapshot import EventContext from synapse.types import Requester, StateMap @@ -233,9 +233,10 @@ class ThirdPartyEventRules: # This module callback needs a rework so that hacks such as # this one are not necessary. raise e - except Exception as e: - logger.warning("Failed to run module API callback %s: %s", callback, e) - continue + except Exception: + raise ModuleFailedException( + "Failed to run `check_event_allowed` module API callback" + ) # Return if the event shouldn't be allowed or if the module came up with a # replacement dict for the event. diff --git a/tests/rest/client/test_third_party_rules.py b/tests/rest/client/test_third_party_rules.py index 1c42c46630..4e71b6ec12 100644 --- a/tests/rest/client/test_third_party_rules.py +++ b/tests/rest/client/test_third_party_rules.py @@ -216,19 +216,9 @@ class ThirdPartyRulesTestCase(unittest.FederatingHomeserverTestCase): {"x": "x"}, access_token=self.tok, ) - # check_event_allowed has some error handling, so it shouldn't 500 just because a - # module did something bad. - self.assertEqual(channel.code, 200, channel.result) - event_id = channel.json_body["event_id"] - - channel = self.make_request( - "GET", - "/_matrix/client/r0/rooms/%s/event/%s" % (self.room_id, event_id), - access_token=self.tok, - ) - self.assertEqual(channel.code, 200, channel.result) - ev = channel.json_body - self.assertEqual(ev["content"]["x"], "x") + # Because check_event_allowed raises an exception, it leads to a + # 500 Internal Server Error + self.assertEqual(channel.code, 500, channel.result) def test_modify_event(self): """The module can return a modified version of the event""" -- cgit 1.4.1 From caa706d82545cda8d0f7c7243623a6de898b55bc Mon Sep 17 00:00:00 2001 From: Dirk Klimpel <5740567+dklimpel@users.noreply.github.com> Date: Mon, 1 Nov 2021 17:10:09 +0100 Subject: Fix a bug in unit test `test_block_room_and_not_purge` (#11226) --- changelog.d/11226.misc | 1 + tests/rest/admin/test_room.py | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) create mode 100644 changelog.d/11226.misc diff --git a/changelog.d/11226.misc b/changelog.d/11226.misc new file mode 100644 index 0000000000..9ed4760ae0 --- /dev/null +++ b/changelog.d/11226.misc @@ -0,0 +1 @@ +Fix a bug in unit test `test_block_room_and_not_purge`. diff --git a/tests/rest/admin/test_room.py b/tests/rest/admin/test_room.py index ba6db51c4c..b62a7248e8 100644 --- a/tests/rest/admin/test_room.py +++ b/tests/rest/admin/test_room.py @@ -261,7 +261,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase): # Assert one user in room self._is_member(room_id=self.room_id, user_id=self.other_user) - body = json.dumps({"block": False, "purge": False}) + body = json.dumps({"block": True, "purge": False}) channel = self.make_request( "DELETE", @@ -278,7 +278,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase): with self.assertRaises(AssertionError): self._is_purged(self.room_id) - self._is_blocked(self.room_id, expect=False) + self._is_blocked(self.room_id, expect=True) self._has_no_members(self.room_id) def test_shutdown_room_consent(self): -- cgit 1.4.1 From e81fa9264873369653171157514ff68226491fff Mon Sep 17 00:00:00 2001 From: Shay Date: Mon, 1 Nov 2021 09:28:04 -0700 Subject: Add `use_float=true` to ijson calls in Synapse (#11217) * add use_float=true to ijson calls * lints * add changelog * Update changelog.d/11217.bugfix Co-authored-by: Sean Quah <8349537+squahtx@users.noreply.github.com> Co-authored-by: Sean Quah <8349537+squahtx@users.noreply.github.com> --- changelog.d/11217.bugfix | 1 + synapse/federation/transport/client.py | 3 +++ 2 files changed, 4 insertions(+) create mode 100644 changelog.d/11217.bugfix diff --git a/changelog.d/11217.bugfix b/changelog.d/11217.bugfix new file mode 100644 index 0000000000..67ebb0d0e3 --- /dev/null +++ b/changelog.d/11217.bugfix @@ -0,0 +1 @@ +Fix a bug introduced in 1.35.0 which made it impossible to join rooms that return a `send_join` response containing floats. \ No newline at end of file diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index d963178838..10b5aa5af8 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -1310,14 +1310,17 @@ class SendJoinParser(ByteParser[SendJoinResponse]): self._coro_state = ijson.items_coro( _event_list_parser(room_version, self._response.state), prefix + "state.item", + use_float=True, ) self._coro_auth = ijson.items_coro( _event_list_parser(room_version, self._response.auth_events), prefix + "auth_chain.item", + use_float=True, ) self._coro_event = ijson.kvitems_coro( _event_parser(self._response.event_dict), prefix + "org.matrix.msc3083.v2.event", + use_float=True, ) def write(self, data: bytes) -> int: -- cgit 1.4.1 From f5c6a80886ac00482aaffa8e8ce3d98b31eab661 Mon Sep 17 00:00:00 2001 From: Shay Date: Mon, 1 Nov 2021 10:26:02 -0700 Subject: Handle missing Content-Type header when accessing remote media (#11200) * add code to handle missing content-type header and a test to verify that it works * add handling for missing content-type in the /upload endpoint as well * slightly refactor test code to put private method in approriate place * handle possible null value for content-type when pulling from the local db * add changelog * refactor test and add code to handle missing content-type in cached remote media * requested changes * Update changelog.d/11200.bugfix Co-authored-by: Sean Quah <8349537+squahtx@users.noreply.github.com> Co-authored-by: Sean Quah <8349537+squahtx@users.noreply.github.com> --- changelog.d/11200.bugfix | 1 + synapse/rest/media/v1/media_repository.py | 12 +++++++++++- synapse/rest/media/v1/upload_resource.py | 2 +- tests/rest/media/v1/test_media_storage.py | 18 ++++++++++++++++-- 4 files changed, 29 insertions(+), 4 deletions(-) create mode 100644 changelog.d/11200.bugfix diff --git a/changelog.d/11200.bugfix b/changelog.d/11200.bugfix new file mode 100644 index 0000000000..c855081986 --- /dev/null +++ b/changelog.d/11200.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug wherein a missing `Content-Type` header when downloading remote media would cause Synapse to throw an error. \ No newline at end of file diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index abd88a2d4f..244ba261bb 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -215,6 +215,8 @@ class MediaRepository: self.mark_recently_accessed(None, media_id) media_type = media_info["media_type"] + if not media_type: + media_type = "application/octet-stream" media_length = media_info["media_length"] upload_name = name if name else media_info["upload_name"] url_cache = media_info["url_cache"] @@ -333,6 +335,9 @@ class MediaRepository: logger.info("Media is quarantined") raise NotFoundError() + if not media_info["media_type"]: + media_info["media_type"] = "application/octet-stream" + responder = await self.media_storage.fetch_media(file_info) if responder: return responder, media_info @@ -354,6 +359,8 @@ class MediaRepository: raise e file_id = media_info["filesystem_id"] + if not media_info["media_type"]: + media_info["media_type"] = "application/octet-stream" file_info = FileInfo(server_name, file_id) # We generate thumbnails even if another process downloaded the media @@ -445,7 +452,10 @@ class MediaRepository: await finish() - media_type = headers[b"Content-Type"][0].decode("ascii") + if b"Content-Type" in headers: + media_type = headers[b"Content-Type"][0].decode("ascii") + else: + media_type = "application/octet-stream" upload_name = get_filename_from_headers(headers) time_now_ms = self.clock.time_msec() diff --git a/synapse/rest/media/v1/upload_resource.py b/synapse/rest/media/v1/upload_resource.py index 7dcb1428e4..8162094cf6 100644 --- a/synapse/rest/media/v1/upload_resource.py +++ b/synapse/rest/media/v1/upload_resource.py @@ -80,7 +80,7 @@ class UploadResource(DirectServeJsonResource): assert content_type_headers # for mypy media_type = content_type_headers[0].decode("ascii") else: - raise SynapseError(msg="Upload request missing 'Content-Type'", code=400) + media_type = "application/octet-stream" # if headers.hasHeader(b"Content-Disposition"): # disposition = headers.getRawHeaders(b"Content-Disposition")[0] diff --git a/tests/rest/media/v1/test_media_storage.py b/tests/rest/media/v1/test_media_storage.py index 4ae00755c9..4cf1ed5ddf 100644 --- a/tests/rest/media/v1/test_media_storage.py +++ b/tests/rest/media/v1/test_media_storage.py @@ -248,7 +248,7 @@ class MediaRepoTests(unittest.HomeserverTestCase): self.media_id = "example.com/12345" - def _req(self, content_disposition): + def _req(self, content_disposition, include_content_type=True): channel = make_request( self.reactor, @@ -271,8 +271,11 @@ class MediaRepoTests(unittest.HomeserverTestCase): headers = { b"Content-Length": [b"%d" % (len(self.test_image.data))], - b"Content-Type": [self.test_image.content_type], } + + if include_content_type: + headers[b"Content-Type"] = [self.test_image.content_type] + if content_disposition: headers[b"Content-Disposition"] = [content_disposition] @@ -285,6 +288,17 @@ class MediaRepoTests(unittest.HomeserverTestCase): return channel + def test_handle_missing_content_type(self): + channel = self._req( + b"inline; filename=out" + self.test_image.extension, + include_content_type=False, + ) + headers = channel.headers + self.assertEqual(channel.code, 200) + self.assertEqual( + headers.getRawHeaders(b"Content-Type"), [b"application/octet-stream"] + ) + def test_disposition_filename_ascii(self): """ If the filename is filename= then Synapse will decode it as an -- cgit 1.4.1 From 93aa670642cafcb6ed732094887bb2aac4b34b0f Mon Sep 17 00:00:00 2001 From: Dirk Klimpel <5740567+dklimpel@users.noreply.github.com> Date: Mon, 1 Nov 2021 18:29:51 +0100 Subject: Update outdated links in `PULL_REQUEST_TEMPLATE.md` (#11225) --- .github/PULL_REQUEST_TEMPLATE.md | 9 +++++---- changelog.d/11225.misc | 1 + 2 files changed, 6 insertions(+), 4 deletions(-) create mode 100644 changelog.d/11225.misc diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md index fc22d89426..6c3a998499 100644 --- a/.github/PULL_REQUEST_TEMPLATE.md +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -1,12 +1,13 @@ ### Pull Request Checklist - + * [ ] Pull request is based on the develop branch -* [ ] Pull request includes a [changelog file](https://github.com/matrix-org/synapse/blob/master/CONTRIBUTING.md#changelog). The entry should: +* [ ] Pull request includes a [changelog file](https://matrix-org.github.io/synapse/latest/development/contributing_guide.html#changelog). The entry should: - Be a short description of your change which makes sense to users. "Fixed a bug that prevented receiving messages from other servers." instead of "Moved X method from `EventStore` to `EventWorkerStore`.". - Use markdown where necessary, mostly for `code blocks`. - End with either a period (.) or an exclamation mark (!). - Start with a capital letter. -* [ ] Pull request includes a [sign off](https://github.com/matrix-org/synapse/blob/master/CONTRIBUTING.md#sign-off) -* [ ] Code style is correct (run the [linters](https://github.com/matrix-org/synapse/blob/master/CONTRIBUTING.md#code-style)) +* [ ] Pull request includes a [sign off](https://matrix-org.github.io/synapse/latest/development/contributing_guide.html#sign-off) +* [ ] [Code style](https://matrix-org.github.io/synapse/latest/code_style.html) is correct + (run the [linters](https://matrix-org.github.io/synapse/latest/development/contributing_guide.html#run-the-linters)) diff --git a/changelog.d/11225.misc b/changelog.d/11225.misc new file mode 100644 index 0000000000..f14f65f9d4 --- /dev/null +++ b/changelog.d/11225.misc @@ -0,0 +1 @@ +Replace outdated links in the pull request checklist with links to the rendered documentation. -- cgit 1.4.1 From 46d0937447479761a22a8c843f6ba51bbcdc914b Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Tue, 2 Nov 2021 00:17:35 +0000 Subject: ObservableDeferred: run observers in order (#11229) --- changelog.d/11229.misc | 1 + synapse/util/async_helpers.py | 34 +++--- tests/util/caches/test_deferred_cache.py | 4 +- tests/util/test_async_helpers.py | 173 +++++++++++++++++++++++++++++++ tests/util/test_async_utils.py | 106 ------------------- 5 files changed, 193 insertions(+), 125 deletions(-) create mode 100644 changelog.d/11229.misc create mode 100644 tests/util/test_async_helpers.py delete mode 100644 tests/util/test_async_utils.py diff --git a/changelog.d/11229.misc b/changelog.d/11229.misc new file mode 100644 index 0000000000..7bb01cf079 --- /dev/null +++ b/changelog.d/11229.misc @@ -0,0 +1 @@ +`ObservableDeferred`: run registered observers in order. diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index 5df80ea8e7..96efc5f3e3 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -22,11 +22,11 @@ from typing import ( Any, Awaitable, Callable, + Collection, Dict, Generic, Hashable, Iterable, - List, Optional, Set, TypeVar, @@ -76,12 +76,17 @@ class ObservableDeferred(Generic[_T]): def __init__(self, deferred: "defer.Deferred[_T]", consumeErrors: bool = False): object.__setattr__(self, "_deferred", deferred) object.__setattr__(self, "_result", None) - object.__setattr__(self, "_observers", set()) + object.__setattr__(self, "_observers", []) def callback(r): object.__setattr__(self, "_result", (True, r)) - while self._observers: - observer = self._observers.pop() + + # once we have set _result, no more entries will be added to _observers, + # so it's safe to replace it with the empty tuple. + observers = self._observers + object.__setattr__(self, "_observers", ()) + + for observer in observers: try: observer.callback(r) except Exception as e: @@ -95,12 +100,16 @@ class ObservableDeferred(Generic[_T]): def errback(f): object.__setattr__(self, "_result", (False, f)) - while self._observers: + + # once we have set _result, no more entries will be added to _observers, + # so it's safe to replace it with the empty tuple. + observers = self._observers + object.__setattr__(self, "_observers", ()) + + for observer in observers: # This is a little bit of magic to correctly propagate stack # traces when we `await` on one of the observer deferreds. f.value.__failure__ = f - - observer = self._observers.pop() try: observer.errback(f) except Exception as e: @@ -127,20 +136,13 @@ class ObservableDeferred(Generic[_T]): """ if not self._result: d: "defer.Deferred[_T]" = defer.Deferred() - - def remove(r): - self._observers.discard(d) - return r - - d.addBoth(remove) - - self._observers.add(d) + self._observers.append(d) return d else: success, res = self._result return defer.succeed(res) if success else defer.fail(res) - def observers(self) -> "List[defer.Deferred[_T]]": + def observers(self) -> "Collection[defer.Deferred[_T]]": return self._observers def has_called(self) -> bool: diff --git a/tests/util/caches/test_deferred_cache.py b/tests/util/caches/test_deferred_cache.py index 54a88a8325..c613ce3f10 100644 --- a/tests/util/caches/test_deferred_cache.py +++ b/tests/util/caches/test_deferred_cache.py @@ -47,9 +47,7 @@ class DeferredCacheTestCase(TestCase): self.assertTrue(set_d.called) return r - # TODO: Actually ObservableDeferred *doesn't* run its tests in order on py3.8. - # maybe we should fix that? - # get_d.addCallback(check1) + get_d.addCallback(check1) # now fire off all the deferreds origin_d.callback(99) diff --git a/tests/util/test_async_helpers.py b/tests/util/test_async_helpers.py new file mode 100644 index 0000000000..ab89cab812 --- /dev/null +++ b/tests/util/test_async_helpers.py @@ -0,0 +1,173 @@ +# Copyright 2019 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from twisted.internet import defer +from twisted.internet.defer import CancelledError, Deferred +from twisted.internet.task import Clock + +from synapse.logging.context import ( + SENTINEL_CONTEXT, + LoggingContext, + PreserveLoggingContext, + current_context, +) +from synapse.util.async_helpers import ObservableDeferred, timeout_deferred + +from tests.unittest import TestCase + + +class ObservableDeferredTest(TestCase): + def test_succeed(self): + origin_d = Deferred() + observable = ObservableDeferred(origin_d) + + observer1 = observable.observe() + observer2 = observable.observe() + + self.assertFalse(observer1.called) + self.assertFalse(observer2.called) + + # check the first observer is called first + def check_called_first(res): + self.assertFalse(observer2.called) + return res + + observer1.addBoth(check_called_first) + + # store the results + results = [None, None] + + def check_val(res, idx): + results[idx] = res + return res + + observer1.addCallback(check_val, 0) + observer2.addCallback(check_val, 1) + + origin_d.callback(123) + self.assertEqual(results[0], 123, "observer 1 callback result") + self.assertEqual(results[1], 123, "observer 2 callback result") + + def test_failure(self): + origin_d = Deferred() + observable = ObservableDeferred(origin_d, consumeErrors=True) + + observer1 = observable.observe() + observer2 = observable.observe() + + self.assertFalse(observer1.called) + self.assertFalse(observer2.called) + + # check the first observer is called first + def check_called_first(res): + self.assertFalse(observer2.called) + return res + + observer1.addBoth(check_called_first) + + # store the results + results = [None, None] + + def check_val(res, idx): + results[idx] = res + return None + + observer1.addErrback(check_val, 0) + observer2.addErrback(check_val, 1) + + try: + raise Exception("gah!") + except Exception as e: + origin_d.errback(e) + self.assertEqual(str(results[0].value), "gah!", "observer 1 errback result") + self.assertEqual(str(results[1].value), "gah!", "observer 2 errback result") + + +class TimeoutDeferredTest(TestCase): + def setUp(self): + self.clock = Clock() + + def test_times_out(self): + """Basic test case that checks that the original deferred is cancelled and that + the timing-out deferred is errbacked + """ + cancelled = [False] + + def canceller(_d): + cancelled[0] = True + + non_completing_d = Deferred(canceller) + timing_out_d = timeout_deferred(non_completing_d, 1.0, self.clock) + + self.assertNoResult(timing_out_d) + self.assertFalse(cancelled[0], "deferred was cancelled prematurely") + + self.clock.pump((1.0,)) + + self.assertTrue(cancelled[0], "deferred was not cancelled by timeout") + self.failureResultOf(timing_out_d, defer.TimeoutError) + + def test_times_out_when_canceller_throws(self): + """Test that we have successfully worked around + https://twistedmatrix.com/trac/ticket/9534""" + + def canceller(_d): + raise Exception("can't cancel this deferred") + + non_completing_d = Deferred(canceller) + timing_out_d = timeout_deferred(non_completing_d, 1.0, self.clock) + + self.assertNoResult(timing_out_d) + + self.clock.pump((1.0,)) + + self.failureResultOf(timing_out_d, defer.TimeoutError) + + def test_logcontext_is_preserved_on_cancellation(self): + blocking_was_cancelled = [False] + + @defer.inlineCallbacks + def blocking(): + non_completing_d = Deferred() + with PreserveLoggingContext(): + try: + yield non_completing_d + except CancelledError: + blocking_was_cancelled[0] = True + raise + + with LoggingContext("one") as context_one: + # the errbacks should be run in the test logcontext + def errback(res, deferred_name): + self.assertIs( + current_context(), + context_one, + "errback %s run in unexpected logcontext %s" + % (deferred_name, current_context()), + ) + return res + + original_deferred = blocking() + original_deferred.addErrback(errback, "orig") + timing_out_d = timeout_deferred(original_deferred, 1.0, self.clock) + self.assertNoResult(timing_out_d) + self.assertIs(current_context(), SENTINEL_CONTEXT) + timing_out_d.addErrback(errback, "timingout") + + self.clock.pump((1.0,)) + + self.assertTrue( + blocking_was_cancelled[0], "non-completing deferred was not cancelled" + ) + self.failureResultOf(timing_out_d, defer.TimeoutError) + self.assertIs(current_context(), context_one) diff --git a/tests/util/test_async_utils.py b/tests/util/test_async_utils.py deleted file mode 100644 index 069f875962..0000000000 --- a/tests/util/test_async_utils.py +++ /dev/null @@ -1,106 +0,0 @@ -# Copyright 2019 New Vector Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from twisted.internet import defer -from twisted.internet.defer import CancelledError, Deferred -from twisted.internet.task import Clock - -from synapse.logging.context import ( - SENTINEL_CONTEXT, - LoggingContext, - PreserveLoggingContext, - current_context, -) -from synapse.util.async_helpers import timeout_deferred - -from tests.unittest import TestCase - - -class TimeoutDeferredTest(TestCase): - def setUp(self): - self.clock = Clock() - - def test_times_out(self): - """Basic test case that checks that the original deferred is cancelled and that - the timing-out deferred is errbacked - """ - cancelled = [False] - - def canceller(_d): - cancelled[0] = True - - non_completing_d = Deferred(canceller) - timing_out_d = timeout_deferred(non_completing_d, 1.0, self.clock) - - self.assertNoResult(timing_out_d) - self.assertFalse(cancelled[0], "deferred was cancelled prematurely") - - self.clock.pump((1.0,)) - - self.assertTrue(cancelled[0], "deferred was not cancelled by timeout") - self.failureResultOf(timing_out_d, defer.TimeoutError) - - def test_times_out_when_canceller_throws(self): - """Test that we have successfully worked around - https://twistedmatrix.com/trac/ticket/9534""" - - def canceller(_d): - raise Exception("can't cancel this deferred") - - non_completing_d = Deferred(canceller) - timing_out_d = timeout_deferred(non_completing_d, 1.0, self.clock) - - self.assertNoResult(timing_out_d) - - self.clock.pump((1.0,)) - - self.failureResultOf(timing_out_d, defer.TimeoutError) - - def test_logcontext_is_preserved_on_cancellation(self): - blocking_was_cancelled = [False] - - @defer.inlineCallbacks - def blocking(): - non_completing_d = Deferred() - with PreserveLoggingContext(): - try: - yield non_completing_d - except CancelledError: - blocking_was_cancelled[0] = True - raise - - with LoggingContext("one") as context_one: - # the errbacks should be run in the test logcontext - def errback(res, deferred_name): - self.assertIs( - current_context(), - context_one, - "errback %s run in unexpected logcontext %s" - % (deferred_name, current_context()), - ) - return res - - original_deferred = blocking() - original_deferred.addErrback(errback, "orig") - timing_out_d = timeout_deferred(original_deferred, 1.0, self.clock) - self.assertNoResult(timing_out_d) - self.assertIs(current_context(), SENTINEL_CONTEXT) - timing_out_d.addErrback(errback, "timingout") - - self.clock.pump((1.0,)) - - self.assertTrue( - blocking_was_cancelled[0], "non-completing deferred was not cancelled" - ) - self.failureResultOf(timing_out_d, defer.TimeoutError) - self.assertIs(current_context(), context_one) -- cgit 1.4.1 From 753720184042e01bf56478d15bd8c8db11da4b69 Mon Sep 17 00:00:00 2001 From: Dirk Klimpel <5740567+dklimpel@users.noreply.github.com> Date: Tue, 2 Nov 2021 11:01:13 +0100 Subject: Add search by room ID and room alias to List Room admin API (#11099) Fixes: #10874 Signed-off-by: Dirk Klimpel dirk@klimpel.org --- changelog.d/11099.feature | 1 + docs/admin_api/rooms.md | 11 +++-- synapse/storage/databases/main/room.py | 29 ++++++----- tests/rest/admin/test_room.py | 88 +++++++++++++++++++--------------- 4 files changed, 76 insertions(+), 53 deletions(-) create mode 100644 changelog.d/11099.feature diff --git a/changelog.d/11099.feature b/changelog.d/11099.feature new file mode 100644 index 0000000000..c9126d4a9d --- /dev/null +++ b/changelog.d/11099.feature @@ -0,0 +1 @@ +Add search by room ID and room alias to List Room admin API. \ No newline at end of file diff --git a/docs/admin_api/rooms.md b/docs/admin_api/rooms.md index 62eeff9e1a..1fc3cc3c42 100644 --- a/docs/admin_api/rooms.md +++ b/docs/admin_api/rooms.md @@ -38,9 +38,14 @@ The following query parameters are available: - `history_visibility` - Rooms are ordered alphabetically by visibility of history of the room. - `state_events` - Rooms are ordered by number of state events. Largest to smallest. * `dir` - Direction of room order. Either `f` for forwards or `b` for backwards. Setting - this value to `b` will reverse the above sort order. Defaults to `f`. -* `search_term` - Filter rooms by their room name. Search term can be contained in any - part of the room name. Defaults to no filtering. + this value to `b` will reverse the above sort order. Defaults to `f`. +* `search_term` - Filter rooms by their room name, canonical alias and room id. + Specifically, rooms are selected if the search term is contained in + - the room's name, + - the local part of the room's canonical alias, or + - the complete (local and server part) room's id (case sensitive). + + Defaults to no filtering. **Response** diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index f879bbe7c7..cefc77fa0f 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -412,22 +412,33 @@ class RoomWorkerStore(SQLBaseStore): limit: maximum amount of rooms to retrieve order_by: the sort order of the returned list reverse_order: whether to reverse the room list - search_term: a string to filter room names by + search_term: a string to filter room names, + canonical alias and room ids by. + Room ID must match exactly. Canonical alias must match a substring of the local part. Returns: A list of room dicts and an integer representing the total number of rooms that exist given this query """ # Filter room names by a string where_statement = "" + search_pattern = [] if search_term: - where_statement = "WHERE LOWER(state.name) LIKE ?" + where_statement = """ + WHERE LOWER(state.name) LIKE ? + OR LOWER(state.canonical_alias) LIKE ? + OR state.room_id = ? + """ # Our postgres db driver converts ? -> %s in SQL strings as that's the # placeholder for postgres. # HOWEVER, if you put a % into your SQL then everything goes wibbly. # To get around this, we're going to surround search_term with %'s # before giving it to the database in python instead - search_term = "%" + search_term.lower() + "%" + search_pattern = [ + "%" + search_term.lower() + "%", + "#%" + search_term.lower() + "%:%", + search_term, + ] # Set ordering if RoomSortOrder(order_by) == RoomSortOrder.SIZE: @@ -519,12 +530,9 @@ class RoomWorkerStore(SQLBaseStore): ) def _get_rooms_paginate_txn(txn): - # Execute the data query - sql_values = (limit, start) - if search_term: - # Add the search term into the WHERE clause - sql_values = (search_term,) + sql_values - txn.execute(info_sql, sql_values) + # Add the search term into the WHERE clause + # and execute the data query + txn.execute(info_sql, search_pattern + [limit, start]) # Refactor room query data into a structured dictionary rooms = [] @@ -551,8 +559,7 @@ class RoomWorkerStore(SQLBaseStore): # Execute the count query # Add the search term into the WHERE clause if present - sql_values = (search_term,) if search_term else () - txn.execute(count_sql, sql_values) + txn.execute(count_sql, search_pattern) room_count = txn.fetchone() return rooms, room_count[0] diff --git a/tests/rest/admin/test_room.py b/tests/rest/admin/test_room.py index b62a7248e8..46116644ce 100644 --- a/tests/rest/admin/test_room.py +++ b/tests/rest/admin/test_room.py @@ -680,36 +680,6 @@ class RoomTestCase(unittest.HomeserverTestCase): reversing the order, etc. """ - def _set_canonical_alias(room_id: str, test_alias: str, admin_user_tok: str): - # Create a new alias to this room - url = "/_matrix/client/r0/directory/room/%s" % ( - urllib.parse.quote(test_alias), - ) - channel = self.make_request( - "PUT", - url.encode("ascii"), - {"room_id": room_id}, - access_token=admin_user_tok, - ) - self.assertEqual( - 200, int(channel.result["code"]), msg=channel.result["body"] - ) - - # Set this new alias as the canonical alias for this room - self.helper.send_state( - room_id, - "m.room.aliases", - {"aliases": [test_alias]}, - tok=admin_user_tok, - state_key="test", - ) - self.helper.send_state( - room_id, - "m.room.canonical_alias", - {"alias": test_alias}, - tok=admin_user_tok, - ) - def _order_test( order_type: str, expected_room_list: List[str], @@ -781,9 +751,9 @@ class RoomTestCase(unittest.HomeserverTestCase): ) # Set room canonical room aliases - _set_canonical_alias(room_id_1, "#A_alias:test", self.admin_user_tok) - _set_canonical_alias(room_id_2, "#B_alias:test", self.admin_user_tok) - _set_canonical_alias(room_id_3, "#C_alias:test", self.admin_user_tok) + self._set_canonical_alias(room_id_1, "#A_alias:test", self.admin_user_tok) + self._set_canonical_alias(room_id_2, "#B_alias:test", self.admin_user_tok) + self._set_canonical_alias(room_id_3, "#C_alias:test", self.admin_user_tok) # Set room member size in the reverse order. room 1 -> 1 member, 2 -> 2, 3 -> 3 user_1 = self.register_user("bob1", "pass") @@ -850,7 +820,7 @@ class RoomTestCase(unittest.HomeserverTestCase): room_id_2 = self.helper.create_room_as(self.admin_user, tok=self.admin_user_tok) room_name_1 = "something" - room_name_2 = "else" + room_name_2 = "LoremIpsum" # Set the name for each room self.helper.send_state( @@ -866,6 +836,8 @@ class RoomTestCase(unittest.HomeserverTestCase): tok=self.admin_user_tok, ) + self._set_canonical_alias(room_id_1, "#Room_Alias1:test", self.admin_user_tok) + def _search_test( expected_room_id: Optional[str], search_term: str, @@ -914,24 +886,36 @@ class RoomTestCase(unittest.HomeserverTestCase): r = rooms[0] self.assertEqual(expected_room_id, r["room_id"]) - # Perform search tests + # Test searching by room name _search_test(room_id_1, "something") _search_test(room_id_1, "thing") - _search_test(room_id_2, "else") - _search_test(room_id_2, "se") + _search_test(room_id_2, "LoremIpsum") + _search_test(room_id_2, "lorem") # Test case insensitive _search_test(room_id_1, "SOMETHING") _search_test(room_id_1, "THING") - _search_test(room_id_2, "ELSE") - _search_test(room_id_2, "SE") + _search_test(room_id_2, "LOREMIPSUM") + _search_test(room_id_2, "LOREM") _search_test(None, "foo") _search_test(None, "bar") _search_test(None, "", expected_http_code=400) + # Test that the whole room id returns the room + _search_test(room_id_1, room_id_1) + # Test that the search by room_id is case sensitive + _search_test(None, room_id_1.lower()) + # Test search part of local part of room id do not match + _search_test(None, room_id_1[1:10]) + + # Test that whole room alias return no result, because of domain + _search_test(None, "#Room_Alias1:test") + # Test search local part of alias + _search_test(room_id_1, "alias1") + def test_search_term_non_ascii(self): """Test that searching for a room with non-ASCII characters works correctly""" @@ -1114,6 +1098,32 @@ class RoomTestCase(unittest.HomeserverTestCase): # the create_room already does the right thing, so no need to verify that we got # the state events it created. + def _set_canonical_alias(self, room_id: str, test_alias: str, admin_user_tok: str): + # Create a new alias to this room + url = "/_matrix/client/r0/directory/room/%s" % (urllib.parse.quote(test_alias),) + channel = self.make_request( + "PUT", + url.encode("ascii"), + {"room_id": room_id}, + access_token=admin_user_tok, + ) + self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) + + # Set this new alias as the canonical alias for this room + self.helper.send_state( + room_id, + "m.room.aliases", + {"aliases": [test_alias]}, + tok=admin_user_tok, + state_key="test", + ) + self.helper.send_state( + room_id, + "m.room.canonical_alias", + {"alias": test_alias}, + tok=admin_user_tok, + ) + class JoinAliasRoomTestCase(unittest.HomeserverTestCase): -- cgit 1.4.1 From c9c3aea9b189cb606d7ec2905dad2c87acc039ef Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Tue, 2 Nov 2021 10:39:02 +0000 Subject: Fix providing a `RoomStreamToken` instance to `_notify_app_services_ephemeral` (#11137) Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- changelog.d/11137.misc | 1 + synapse/handlers/appservice.py | 22 +++++++++++++---- synapse/notifier.py | 38 +++++++----------------------- synapse/storage/databases/main/devices.py | 4 ++-- synapse/storage/databases/main/presence.py | 2 +- 5 files changed, 30 insertions(+), 37 deletions(-) create mode 100644 changelog.d/11137.misc diff --git a/changelog.d/11137.misc b/changelog.d/11137.misc new file mode 100644 index 0000000000..f0d6476f48 --- /dev/null +++ b/changelog.d/11137.misc @@ -0,0 +1 @@ +Remove and document unnecessary `RoomStreamToken` checks in application service ephemeral event code. \ No newline at end of file diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 36c206dae6..67f8ffcaff 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -182,7 +182,7 @@ class ApplicationServicesHandler: def notify_interested_services_ephemeral( self, stream_key: str, - new_token: Optional[int], + new_token: Union[int, RoomStreamToken], users: Optional[Collection[Union[str, UserID]]] = None, ) -> None: """ @@ -203,7 +203,7 @@ class ApplicationServicesHandler: Appservices will only receive ephemeral events that fall within their registered user and room namespaces. - new_token: The latest stream token. + new_token: The stream token of the event. users: The users that should be informed of the new event, if any. """ if not self.notify_appservices: @@ -212,6 +212,19 @@ class ApplicationServicesHandler: if stream_key not in ("typing_key", "receipt_key", "presence_key"): return + # Assert that new_token is an integer (and not a RoomStreamToken). + # All of the supported streams that this function handles use an + # integer to track progress (rather than a RoomStreamToken - a + # vector clock implementation) as they don't support multiple + # stream writers. + # + # As a result, we simply assert that new_token is an integer. + # If we do end up needing to pass a RoomStreamToken down here + # in the future, using RoomStreamToken.stream (the minimum stream + # position) to convert to an ascending integer value should work. + # Additional context: https://github.com/matrix-org/synapse/pull/11137 + assert isinstance(new_token, int) + services = [ service for service in self.store.get_app_services() @@ -231,14 +244,13 @@ class ApplicationServicesHandler: self, services: List[ApplicationService], stream_key: str, - new_token: Optional[int], + new_token: int, users: Collection[Union[str, UserID]], ) -> None: logger.debug("Checking interested services for %s" % (stream_key)) with Measure(self.clock, "notify_interested_services_ephemeral"): for service in services: - # Only handle typing if we have the latest token - if stream_key == "typing_key" and new_token is not None: + if stream_key == "typing_key": # Note that we don't persist the token (via set_type_stream_id_for_appservice) # for typing_key due to performance reasons and due to their highly # ephemeral nature. diff --git a/synapse/notifier.py b/synapse/notifier.py index 1882fffd2a..60e5409895 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -383,29 +383,6 @@ class Notifier: except Exception: logger.exception("Error notifying application services of event") - def _notify_app_services_ephemeral( - self, - stream_key: str, - new_token: Union[int, RoomStreamToken], - users: Optional[Collection[Union[str, UserID]]] = None, - ) -> None: - """Notify application services of ephemeral event activity. - - Args: - stream_key: The stream the event came from. - new_token: The value of the new stream token. - users: The users that should be informed of the new event, if any. - """ - try: - stream_token = None - if isinstance(new_token, int): - stream_token = new_token - self.appservice_handler.notify_interested_services_ephemeral( - stream_key, stream_token, users or [] - ) - except Exception: - logger.exception("Error notifying application services of event") - def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken): try: self._pusher_pool.on_new_notifications(max_room_stream_token) @@ -467,12 +444,15 @@ class Notifier: self.notify_replication() - # Notify appservices - self._notify_app_services_ephemeral( - stream_key, - new_token, - users, - ) + # Notify appservices. + try: + self.appservice_handler.notify_interested_services_ephemeral( + stream_key, + new_token, + users, + ) + except Exception: + logger.exception("Error notifying application services of event") def on_new_replication_data(self) -> None: """Used to inform replication listeners that something has happened diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index b15cd030e0..9ccc66e589 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -427,7 +427,7 @@ class DeviceWorkerStore(SQLBaseStore): user_ids: the users who were signed Returns: - THe new stream ID. + The new stream ID. """ async with self._device_list_id_gen.get_next() as stream_id: @@ -1322,7 +1322,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): async def add_device_change_to_streams( self, user_id: str, device_ids: Collection[str], hosts: List[str] - ): + ) -> int: """Persist that a user's devices have been updated, and which hosts (if any) should be poked. """ diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py index 12cf6995eb..cc0eebdb46 100644 --- a/synapse/storage/databases/main/presence.py +++ b/synapse/storage/databases/main/presence.py @@ -92,7 +92,7 @@ class PresenceStore(PresenceBackgroundUpdateStore): prefilled_cache=presence_cache_prefill, ) - async def update_presence(self, presence_states): + async def update_presence(self, presence_states) -> Tuple[int, int]: assert self._can_persist_presence stream_ordering_manager = self._presence_id_gen.get_next_mult( -- cgit 1.4.1 From 4535532526581834ab798996ffe73f6d19c25123 Mon Sep 17 00:00:00 2001 From: Dirk Klimpel <5740567+dklimpel@users.noreply.github.com> Date: Tue, 2 Nov 2021 14:18:30 +0100 Subject: Delete messages for hidden devices from `device_inbox` (#11199) --- changelog.d/11199.bugfix | 1 + synapse/storage/databases/main/deviceinbox.py | 89 ++++++++++++++++++++++ .../03remove_hidden_devices_from_device_inbox.sql | 22 ++++++ tests/storage/databases/main/test_deviceinbox.py | 74 ++++++++++++++++++ 4 files changed, 186 insertions(+) create mode 100644 changelog.d/11199.bugfix create mode 100644 synapse/storage/schema/main/delta/65/03remove_hidden_devices_from_device_inbox.sql diff --git a/changelog.d/11199.bugfix b/changelog.d/11199.bugfix new file mode 100644 index 0000000000..dc3ea8d515 --- /dev/null +++ b/changelog.d/11199.bugfix @@ -0,0 +1 @@ +Delete `to_device` messages for hidden devices that will never be read, reducing database size. \ No newline at end of file diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 25e9c1efe1..264e625bd7 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -561,6 +561,7 @@ class DeviceInboxWorkerStore(SQLBaseStore): class DeviceInboxBackgroundUpdateStore(SQLBaseStore): DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop" REMOVE_DELETED_DEVICES = "remove_deleted_devices_from_device_inbox" + REMOVE_HIDDEN_DEVICES = "remove_hidden_devices_from_device_inbox" def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): super().__init__(database, db_conn, hs) @@ -581,6 +582,11 @@ class DeviceInboxBackgroundUpdateStore(SQLBaseStore): self._remove_deleted_devices_from_device_inbox, ) + self.db_pool.updates.register_background_update_handler( + self.REMOVE_HIDDEN_DEVICES, + self._remove_hidden_devices_from_device_inbox, + ) + async def _background_drop_index_device_inbox(self, progress, batch_size): def reindex_txn(conn): txn = conn.cursor() @@ -676,6 +682,89 @@ class DeviceInboxBackgroundUpdateStore(SQLBaseStore): return number_deleted + async def _remove_hidden_devices_from_device_inbox( + self, progress: JsonDict, batch_size: int + ) -> int: + """A background update that deletes all device_inboxes for hidden devices. + + This should only need to be run once (when users upgrade to v1.47.0) + + Args: + progress: JsonDict used to store progress of this background update + batch_size: the maximum number of rows to retrieve in a single select query + + Returns: + The number of deleted rows + """ + + def _remove_hidden_devices_from_device_inbox_txn( + txn: LoggingTransaction, + ) -> int: + """stream_id is not unique + we need to use an inclusive `stream_id >= ?` clause, + since we might not have deleted all hidden device messages for the stream_id + returned from the previous query + + Then delete only rows matching the `(user_id, device_id, stream_id)` tuple, + to avoid problems of deleting a large number of rows all at once + due to a single device having lots of device messages. + """ + + last_stream_id = progress.get("stream_id", 0) + + sql = """ + SELECT device_id, user_id, stream_id + FROM device_inbox + WHERE + stream_id >= ? + AND (device_id, user_id) IN ( + SELECT device_id, user_id FROM devices WHERE hidden = ? + ) + ORDER BY stream_id + LIMIT ? + """ + + txn.execute(sql, (last_stream_id, True, batch_size)) + rows = txn.fetchall() + + num_deleted = 0 + for row in rows: + num_deleted += self.db_pool.simple_delete_txn( + txn, + "device_inbox", + {"device_id": row[0], "user_id": row[1], "stream_id": row[2]}, + ) + + if rows: + # We don't just save the `stream_id` in progress as + # otherwise it can happen in large deployments that + # no change of status is visible in the log file, as + # it may be that the stream_id does not change in several runs + self.db_pool.updates._background_update_progress_txn( + txn, + self.REMOVE_HIDDEN_DEVICES, + { + "device_id": rows[-1][0], + "user_id": rows[-1][1], + "stream_id": rows[-1][2], + }, + ) + + return num_deleted + + number_deleted = await self.db_pool.runInteraction( + "_remove_hidden_devices_from_device_inbox", + _remove_hidden_devices_from_device_inbox_txn, + ) + + # The task is finished when no more lines are deleted. + if not number_deleted: + await self.db_pool.updates._end_background_update( + self.REMOVE_HIDDEN_DEVICES + ) + + return number_deleted + class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore): pass diff --git a/synapse/storage/schema/main/delta/65/03remove_hidden_devices_from_device_inbox.sql b/synapse/storage/schema/main/delta/65/03remove_hidden_devices_from_device_inbox.sql new file mode 100644 index 0000000000..7b3592dcf0 --- /dev/null +++ b/synapse/storage/schema/main/delta/65/03remove_hidden_devices_from_device_inbox.sql @@ -0,0 +1,22 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +-- Remove messages from the device_inbox table which were orphaned +-- because a device was hidden using Synapse earlier than 1.47.0. +-- This runs as background task, but may take a bit to finish. + +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (6503, 'remove_hidden_devices_from_device_inbox', '{}'); diff --git a/tests/storage/databases/main/test_deviceinbox.py b/tests/storage/databases/main/test_deviceinbox.py index 4cfd2677f7..4b67bd15b7 100644 --- a/tests/storage/databases/main/test_deviceinbox.py +++ b/tests/storage/databases/main/test_deviceinbox.py @@ -88,3 +88,77 @@ class DeviceInboxBackgroundUpdateStoreTestCase(HomeserverTestCase): ) self.assertEqual(1, len(res)) self.assertEqual(res[0], "cur_device") + + def test_background_remove_hidden_devices_from_device_inbox(self): + """Test that the background task to delete hidden devices + from device_inboxes works properly.""" + + # create a valid device + self.get_success( + self.store.store_device(self.user_id, "cur_device", "display_name") + ) + + # create a hidden device + self.get_success( + self.store.db_pool.simple_insert( + "devices", + values={ + "user_id": self.user_id, + "device_id": "hidden_device", + "display_name": "hidden_display_name", + "hidden": True, + }, + ) + ) + + # Add device_inbox to devices + self.get_success( + self.store.db_pool.simple_insert( + "device_inbox", + { + "user_id": self.user_id, + "device_id": "cur_device", + "stream_id": 1, + "message_json": "{}", + }, + ) + ) + self.get_success( + self.store.db_pool.simple_insert( + "device_inbox", + { + "user_id": self.user_id, + "device_id": "hidden_device", + "stream_id": 2, + "message_json": "{}", + }, + ) + ) + + # Insert and run the background update. + self.get_success( + self.store.db_pool.simple_insert( + "background_updates", + { + "update_name": "remove_hidden_devices_from_device_inbox", + "progress_json": "{}", + }, + ) + ) + + # ... and tell the DataStore that it hasn't finished all updates yet + self.store.db_pool.updates._all_done = False + + self.wait_for_background_updates() + + # Make sure the background task deleted hidden devices from device_inbox + res = self.get_success( + self.store.db_pool.simple_select_onecol( + table="device_inbox", + keyvalues={}, + retcol="device_id", + desc="get_device_id_from_device_inbox", + ) + ) + self.assertEqual(1, len(res)) + self.assertEqual(res[0], "cur_device") -- cgit 1.4.1 From df84ad602b21a4cea3a63c9117b5cd7884f1ab05 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 2 Nov 2021 13:23:01 +0000 Subject: 1.46.0 --- CHANGES.md | 9 +++++++++ changelog.d/11196.bugfix | 1 - debian/changelog | 8 ++++++-- synapse/__init__.py | 2 +- 4 files changed, 16 insertions(+), 4 deletions(-) delete mode 100644 changelog.d/11196.bugfix diff --git a/CHANGES.md b/CHANGES.md index f61d5c706f..124bdf320a 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,12 @@ +Synapse 1.46.0 (2021-11-02) +=========================== + +Bugfixes +-------- + +- Fix a bug introduced in v1.46.0rc1 where URL previews of some XML documents would fail. ([\#11196](https://github.com/matrix-org/synapse/issues/11196)) + + Synapse 1.46.0rc1 (2021-10-27) ============================== diff --git a/changelog.d/11196.bugfix b/changelog.d/11196.bugfix deleted file mode 100644 index 3861eeb908..0000000000 --- a/changelog.d/11196.bugfix +++ /dev/null @@ -1 +0,0 @@ -Fix a bug introduced in v1.46.0rc1 where URL previews of some XML documents would fail. diff --git a/debian/changelog b/debian/changelog index c2ea5d2cfb..06e7a0862d 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,8 +1,12 @@ -matrix-synapse-py3 (1.46.0~rc1ubuntu1) UNRELEASED; urgency=medium +matrix-synapse-py3 (1.46.0) stable; urgency=medium + [ Richard van der Hoff ] * Compress debs with xz, to fix incompatibility of impish debs with reprepro. - -- Richard van der Hoff Wed, 27 Oct 2021 15:32:51 +0100 + [ Synapse Packaging team ] + * New synapse release 1.46.0. + + -- Synapse Packaging team Tue, 02 Nov 2021 13:22:53 +0000 matrix-synapse-py3 (1.46.0~rc1) stable; urgency=medium diff --git a/synapse/__init__.py b/synapse/__init__.py index 355b36fc63..5ef34bce40 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -47,7 +47,7 @@ try: except ImportError: pass -__version__ = "1.46.0rc1" +__version__ = "1.46.0" if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)): # We import here so that we don't have to install a bunch of deps when -- cgit 1.4.1 From 2d44ee6868805d4ff23489a8dd6b4072ff358663 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 2 Nov 2021 13:25:42 +0000 Subject: Update changelog --- CHANGES.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 124bdf320a..e74544f489 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,6 +1,8 @@ Synapse 1.46.0 (2021-11-02) =========================== +The cause of the [performance regression affecting Synapse 1.44](https://github.com/matrix-org/synapse/issues/11049) has been identified and fixed. ([\#11177](https://github.com/matrix-org/synapse/issues/11177)) + Bugfixes -------- @@ -10,8 +12,6 @@ Bugfixes Synapse 1.46.0rc1 (2021-10-27) ============================== -The cause of the [performance regression affecting Synapse 1.44](https://github.com/matrix-org/synapse/issues/11049) has been identified and fixed. ([\#11177](https://github.com/matrix-org/synapse/issues/11177)) - Features -------- -- cgit 1.4.1 From c01bc5f43d1c7d0a25f397b542ced57894395519 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 2 Nov 2021 09:55:52 -0400 Subject: Add remaining type hints to `synapse.events`. (#11098) --- changelog.d/11098.misc | 1 + mypy.ini | 8 +- synapse/events/__init__.py | 227 +++++++++++++++++---------- synapse/events/validator.py | 2 +- synapse/handlers/federation_event.py | 2 +- synapse/handlers/message.py | 14 +- synapse/handlers/room.py | 2 +- synapse/handlers/room_batch.py | 2 +- synapse/handlers/room_member.py | 4 +- synapse/push/bulk_push_rule_evaluator.py | 4 +- synapse/push/push_rule_evaluator.py | 10 +- synapse/rest/client/room_batch.py | 2 +- synapse/state/__init__.py | 2 +- synapse/storage/databases/main/events.py | 7 +- synapse/storage/databases/main/roommember.py | 8 +- 15 files changed, 185 insertions(+), 110 deletions(-) create mode 100644 changelog.d/11098.misc diff --git a/changelog.d/11098.misc b/changelog.d/11098.misc new file mode 100644 index 0000000000..1e337bee54 --- /dev/null +++ b/changelog.d/11098.misc @@ -0,0 +1 @@ +Add type hints to `synapse.events`. diff --git a/mypy.ini b/mypy.ini index 119a7d8c91..600402a5d3 100644 --- a/mypy.ini +++ b/mypy.ini @@ -22,13 +22,7 @@ files = synapse/config, synapse/crypto, synapse/event_auth.py, - synapse/events/builder.py, - synapse/events/presence_router.py, - synapse/events/snapshot.py, - synapse/events/spamcheck.py, - synapse/events/third_party_rules.py, - synapse/events/utils.py, - synapse/events/validator.py, + synapse/events, synapse/federation, synapse/groups, synapse/handlers, diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 157669ea88..38f3cf4d33 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -16,8 +16,23 @@ import abc import os -from typing import Dict, Optional, Tuple, Type - +from typing import ( + TYPE_CHECKING, + Any, + Dict, + Generic, + Iterable, + List, + Optional, + Sequence, + Tuple, + Type, + TypeVar, + Union, + overload, +) + +from typing_extensions import Literal from unpaddedbase64 import encode_base64 from synapse.api.room_versions import EventFormatVersions, RoomVersion, RoomVersions @@ -26,6 +41,9 @@ from synapse.util.caches import intern_dict from synapse.util.frozenutils import freeze from synapse.util.stringutils import strtobool +if TYPE_CHECKING: + from synapse.events.builder import EventBuilder + # Whether we should use frozen_dict in FrozenEvent. Using frozen_dicts prevents # bugs where we accidentally share e.g. signature dicts. However, converting a # dict to frozen_dicts is expensive. @@ -37,7 +55,23 @@ from synapse.util.stringutils import strtobool USE_FROZEN_DICTS = strtobool(os.environ.get("SYNAPSE_USE_FROZEN_DICTS", "0")) -class DictProperty: +T = TypeVar("T") + + +# DictProperty (and DefaultDictProperty) require the classes they're used with to +# have a _dict property to pull properties from. +# +# TODO _DictPropertyInstance should not include EventBuilder but due to +# https://github.com/python/mypy/issues/5570 it thinks the DictProperty and +# DefaultDictProperty get applied to EventBuilder when it is in a Union with +# EventBase. This is the least invasive hack to get mypy to comply. +# +# Note that DictProperty/DefaultDictProperty cannot actually be used with +# EventBuilder as it lacks a _dict property. +_DictPropertyInstance = Union["_EventInternalMetadata", "EventBase", "EventBuilder"] + + +class DictProperty(Generic[T]): """An object property which delegates to the `_dict` within its parent object.""" __slots__ = ["key"] @@ -45,12 +79,33 @@ class DictProperty: def __init__(self, key: str): self.key = key - def __get__(self, instance, owner=None): + @overload + def __get__( + self, + instance: Literal[None], + owner: Optional[Type[_DictPropertyInstance]] = None, + ) -> "DictProperty": + ... + + @overload + def __get__( + self, + instance: _DictPropertyInstance, + owner: Optional[Type[_DictPropertyInstance]] = None, + ) -> T: + ... + + def __get__( + self, + instance: Optional[_DictPropertyInstance], + owner: Optional[Type[_DictPropertyInstance]] = None, + ) -> Union[T, "DictProperty"]: # if the property is accessed as a class property rather than an instance # property, return the property itself rather than the value if instance is None: return self try: + assert isinstance(instance, (EventBase, _EventInternalMetadata)) return instance._dict[self.key] except KeyError as e1: # We want this to look like a regular attribute error (mostly so that @@ -65,10 +120,12 @@ class DictProperty: "'%s' has no '%s' property" % (type(instance), self.key) ) from e1.__context__ - def __set__(self, instance, v): + def __set__(self, instance: _DictPropertyInstance, v: T) -> None: + assert isinstance(instance, (EventBase, _EventInternalMetadata)) instance._dict[self.key] = v - def __delete__(self, instance): + def __delete__(self, instance: _DictPropertyInstance) -> None: + assert isinstance(instance, (EventBase, _EventInternalMetadata)) try: del instance._dict[self.key] except KeyError as e1: @@ -77,7 +134,7 @@ class DictProperty: ) from e1.__context__ -class DefaultDictProperty(DictProperty): +class DefaultDictProperty(DictProperty, Generic[T]): """An extension of DictProperty which provides a default if the property is not present in the parent's _dict. @@ -86,13 +143,34 @@ class DefaultDictProperty(DictProperty): __slots__ = ["default"] - def __init__(self, key, default): + def __init__(self, key: str, default: T): super().__init__(key) self.default = default - def __get__(self, instance, owner=None): + @overload + def __get__( + self, + instance: Literal[None], + owner: Optional[Type[_DictPropertyInstance]] = None, + ) -> "DefaultDictProperty": + ... + + @overload + def __get__( + self, + instance: _DictPropertyInstance, + owner: Optional[Type[_DictPropertyInstance]] = None, + ) -> T: + ... + + def __get__( + self, + instance: Optional[_DictPropertyInstance], + owner: Optional[Type[_DictPropertyInstance]] = None, + ) -> Union[T, "DefaultDictProperty"]: if instance is None: return self + assert isinstance(instance, (EventBase, _EventInternalMetadata)) return instance._dict.get(self.key, self.default) @@ -111,22 +189,22 @@ class _EventInternalMetadata: # in the DAG) self.outlier = False - out_of_band_membership: bool = DictProperty("out_of_band_membership") - send_on_behalf_of: str = DictProperty("send_on_behalf_of") - recheck_redaction: bool = DictProperty("recheck_redaction") - soft_failed: bool = DictProperty("soft_failed") - proactively_send: bool = DictProperty("proactively_send") - redacted: bool = DictProperty("redacted") - txn_id: str = DictProperty("txn_id") - token_id: int = DictProperty("token_id") - historical: bool = DictProperty("historical") + out_of_band_membership: DictProperty[bool] = DictProperty("out_of_band_membership") + send_on_behalf_of: DictProperty[str] = DictProperty("send_on_behalf_of") + recheck_redaction: DictProperty[bool] = DictProperty("recheck_redaction") + soft_failed: DictProperty[bool] = DictProperty("soft_failed") + proactively_send: DictProperty[bool] = DictProperty("proactively_send") + redacted: DictProperty[bool] = DictProperty("redacted") + txn_id: DictProperty[str] = DictProperty("txn_id") + token_id: DictProperty[int] = DictProperty("token_id") + historical: DictProperty[bool] = DictProperty("historical") # XXX: These are set by StreamWorkerStore._set_before_and_after. # I'm pretty sure that these are never persisted to the database, so shouldn't # be here - before: RoomStreamToken = DictProperty("before") - after: RoomStreamToken = DictProperty("after") - order: Tuple[int, int] = DictProperty("order") + before: DictProperty[RoomStreamToken] = DictProperty("before") + after: DictProperty[RoomStreamToken] = DictProperty("after") + order: DictProperty[Tuple[int, int]] = DictProperty("order") def get_dict(self) -> JsonDict: return dict(self._dict) @@ -162,9 +240,6 @@ class _EventInternalMetadata: If the sender of the redaction event is allowed to redact any event due to auth rules, then this will always return false. - - Returns: - bool """ return self._dict.get("recheck_redaction", False) @@ -176,32 +251,23 @@ class _EventInternalMetadata: sent to clients. 2. They should not be added to the forward extremities (and therefore not to current state). - - Returns: - bool """ return self._dict.get("soft_failed", False) - def should_proactively_send(self): + def should_proactively_send(self) -> bool: """Whether the event, if ours, should be sent to other clients and servers. This is used for sending dummy events internally. Servers and clients can still explicitly fetch the event. - - Returns: - bool """ return self._dict.get("proactively_send", True) - def is_redacted(self): + def is_redacted(self) -> bool: """Whether the event has been redacted. This is used for efficiently checking whether an event has been marked as redacted without needing to make another database call. - - Returns: - bool """ return self._dict.get("redacted", False) @@ -241,29 +307,31 @@ class EventBase(metaclass=abc.ABCMeta): self.internal_metadata = _EventInternalMetadata(internal_metadata_dict) - auth_events = DictProperty("auth_events") - depth = DictProperty("depth") - content = DictProperty("content") - hashes = DictProperty("hashes") - origin = DictProperty("origin") - origin_server_ts = DictProperty("origin_server_ts") - prev_events = DictProperty("prev_events") - redacts = DefaultDictProperty("redacts", None) - room_id = DictProperty("room_id") - sender = DictProperty("sender") - state_key = DictProperty("state_key") - type = DictProperty("type") - user_id = DictProperty("sender") + depth: DictProperty[int] = DictProperty("depth") + content: DictProperty[JsonDict] = DictProperty("content") + hashes: DictProperty[Dict[str, str]] = DictProperty("hashes") + origin: DictProperty[str] = DictProperty("origin") + origin_server_ts: DictProperty[int] = DictProperty("origin_server_ts") + redacts: DefaultDictProperty[Optional[str]] = DefaultDictProperty("redacts", None) + room_id: DictProperty[str] = DictProperty("room_id") + sender: DictProperty[str] = DictProperty("sender") + # TODO state_key should be Optional[str], this is generally asserted in Synapse + # by calling is_state() first (which ensures this), but it is hard (not possible?) + # to properly annotate that calling is_state() asserts that state_key exists + # and is non-None. + state_key: DictProperty[str] = DictProperty("state_key") + type: DictProperty[str] = DictProperty("type") + user_id: DictProperty[str] = DictProperty("sender") @property def event_id(self) -> str: raise NotImplementedError() @property - def membership(self): + def membership(self) -> str: return self.content["membership"] - def is_state(self): + def is_state(self) -> bool: return hasattr(self, "state_key") and self.state_key is not None def get_dict(self) -> JsonDict: @@ -272,13 +340,13 @@ class EventBase(metaclass=abc.ABCMeta): return d - def get(self, key, default=None): + def get(self, key: str, default: Optional[Any] = None) -> Any: return self._dict.get(key, default) - def get_internal_metadata_dict(self): + def get_internal_metadata_dict(self) -> JsonDict: return self.internal_metadata.get_dict() - def get_pdu_json(self, time_now=None) -> JsonDict: + def get_pdu_json(self, time_now: Optional[int] = None) -> JsonDict: pdu_json = self.get_dict() if time_now is not None and "age_ts" in pdu_json["unsigned"]: @@ -305,49 +373,46 @@ class EventBase(metaclass=abc.ABCMeta): return template_json - def __set__(self, instance, value): - raise AttributeError("Unrecognized attribute %s" % (instance,)) - - def __getitem__(self, field): + def __getitem__(self, field: str) -> Optional[Any]: return self._dict[field] - def __contains__(self, field): + def __contains__(self, field: str) -> bool: return field in self._dict - def items(self): + def items(self) -> List[Tuple[str, Optional[Any]]]: return list(self._dict.items()) - def keys(self): + def keys(self) -> Iterable[str]: return self._dict.keys() - def prev_event_ids(self): + def prev_event_ids(self) -> Sequence[str]: """Returns the list of prev event IDs. The order matches the order specified in the event, though there is no meaning to it. Returns: - list[str]: The list of event IDs of this event's prev_events + The list of event IDs of this event's prev_events """ - return [e for e, _ in self.prev_events] + return [e for e, _ in self._dict["prev_events"]] - def auth_event_ids(self): + def auth_event_ids(self) -> Sequence[str]: """Returns the list of auth event IDs. The order matches the order specified in the event, though there is no meaning to it. Returns: - list[str]: The list of event IDs of this event's auth_events + The list of event IDs of this event's auth_events """ - return [e for e, _ in self.auth_events] + return [e for e, _ in self._dict["auth_events"]] - def freeze(self): + def freeze(self) -> None: """'Freeze' the event dict, so it cannot be modified by accident""" # this will be a no-op if the event dict is already frozen. self._dict = freeze(self._dict) - def __str__(self): + def __str__(self) -> str: return self.__repr__() - def __repr__(self): + def __repr__(self) -> str: rejection = f"REJECTED={self.rejected_reason}, " if self.rejected_reason else "" return ( @@ -443,7 +508,7 @@ class FrozenEventV2(EventBase): else: frozen_dict = event_dict - self._event_id = None + self._event_id: Optional[str] = None super().__init__( frozen_dict, @@ -455,7 +520,7 @@ class FrozenEventV2(EventBase): ) @property - def event_id(self): + def event_id(self) -> str: # We have to import this here as otherwise we get an import loop which # is hard to break. from synapse.crypto.event_signing import compute_event_reference_hash @@ -465,23 +530,23 @@ class FrozenEventV2(EventBase): self._event_id = "$" + encode_base64(compute_event_reference_hash(self)[1]) return self._event_id - def prev_event_ids(self): + def prev_event_ids(self) -> Sequence[str]: """Returns the list of prev event IDs. The order matches the order specified in the event, though there is no meaning to it. Returns: - list[str]: The list of event IDs of this event's prev_events + The list of event IDs of this event's prev_events """ - return self.prev_events + return self._dict["prev_events"] - def auth_event_ids(self): + def auth_event_ids(self) -> Sequence[str]: """Returns the list of auth event IDs. The order matches the order specified in the event, though there is no meaning to it. Returns: - list[str]: The list of event IDs of this event's auth_events + The list of event IDs of this event's auth_events """ - return self.auth_events + return self._dict["auth_events"] class FrozenEventV3(FrozenEventV2): @@ -490,7 +555,7 @@ class FrozenEventV3(FrozenEventV2): format_version = EventFormatVersions.V3 # All events of this type are V3 @property - def event_id(self): + def event_id(self) -> str: # We have to import this here as otherwise we get an import loop which # is hard to break. from synapse.crypto.event_signing import compute_event_reference_hash @@ -503,12 +568,14 @@ class FrozenEventV3(FrozenEventV2): return self._event_id -def _event_type_from_format_version(format_version: int) -> Type[EventBase]: +def _event_type_from_format_version( + format_version: int, +) -> Type[Union[FrozenEvent, FrozenEventV2, FrozenEventV3]]: """Returns the python type to use to construct an Event object for the given event format version. Args: - format_version (int): The event format version + format_version: The event format version Returns: type: A type that can be initialized as per the initializer of diff --git a/synapse/events/validator.py b/synapse/events/validator.py index 4d459c17f1..cf86934968 100644 --- a/synapse/events/validator.py +++ b/synapse/events/validator.py @@ -55,7 +55,7 @@ class EventValidator: ] for k in required: - if not hasattr(event, k): + if k not in event: raise SynapseError(400, "Event does not have key %s" % (k,)) # Check that the following keys have string values diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index e617db4c0d..1a1cd93b1a 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -1643,7 +1643,7 @@ class FederationEventHandler: event: the event whose auth_events we want Returns: - all of the events in `event.auth_events`, after deduplication + all of the events listed in `event.auth_events_ids`, after deduplication Raises: AuthError if we were unable to fetch the auth_events for any reason. diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 4a0fccfcc6..b7bc187169 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1318,6 +1318,8 @@ class EventCreationHandler: # user is actually admin or not). is_admin_redaction = False if event.type == EventTypes.Redaction: + assert event.redacts is not None + original_event = await self.store.get_event( event.redacts, redact_behaviour=EventRedactBehaviour.AS_IS, @@ -1413,6 +1415,8 @@ class EventCreationHandler: ) if event.type == EventTypes.Redaction: + assert event.redacts is not None + original_event = await self.store.get_event( event.redacts, redact_behaviour=EventRedactBehaviour.AS_IS, @@ -1500,11 +1504,13 @@ class EventCreationHandler: next_batch_id = event.content.get( EventContentFields.MSC2716_NEXT_BATCH_ID ) - conflicting_insertion_event_id = ( - await self.store.get_insertion_event_by_batch_id( - event.room_id, next_batch_id + conflicting_insertion_event_id = None + if next_batch_id: + conflicting_insertion_event_id = ( + await self.store.get_insertion_event_by_batch_id( + event.room_id, next_batch_id + ) ) - ) if conflicting_insertion_event_id is not None: # The current insertion event that we're processing is invalid # because an insertion event already exists in the room with the diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 99e9b37344..969eb3b9b0 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -525,7 +525,7 @@ class RoomCreationHandler: ): await self.room_member_handler.update_membership( requester, - UserID.from_string(old_event["state_key"]), + UserID.from_string(old_event.state_key), new_room_id, "ban", ratelimit=False, diff --git a/synapse/handlers/room_batch.py b/synapse/handlers/room_batch.py index 2f5a3e4d19..0723286383 100644 --- a/synapse/handlers/room_batch.py +++ b/synapse/handlers/room_batch.py @@ -355,7 +355,7 @@ class RoomBatchHandler: for (event, context) in reversed(events_to_persist): await self.event_creation_handler.handle_new_client_event( await self.create_requester_for_user_id_from_app_service( - event["sender"], app_service_requester.app_service + event.sender, app_service_requester.app_service ), event=event, context=context, diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 74e6c7eca6..08244b690d 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -1669,7 +1669,9 @@ class RoomMemberMasterHandler(RoomMemberHandler): # # the prev_events consist solely of the previous membership event. prev_event_ids = [previous_membership_event.event_id] - auth_event_ids = previous_membership_event.auth_event_ids() + prev_event_ids + auth_event_ids = ( + list(previous_membership_event.auth_event_ids()) + prev_event_ids + ) event, context = await self.event_creation_handler.create_event( requester, diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 0622a37ae8..009d8e77b0 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -232,6 +232,8 @@ class BulkPushRuleEvaluator: # that user, as they might not be already joined. if event.type == EventTypes.Member and event.state_key == uid: display_name = event.content.get("displayname", None) + if not isinstance(display_name, str): + display_name = None if count_as_unread: # Add an element for the current user if the event needs to be marked as @@ -268,7 +270,7 @@ def _condition_checker( evaluator: PushRuleEvaluatorForEvent, conditions: List[dict], uid: str, - display_name: str, + display_name: Optional[str], cache: Dict[str, bool], ) -> bool: for cond in conditions: diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py index 7a8dc63976..7f68092ec5 100644 --- a/synapse/push/push_rule_evaluator.py +++ b/synapse/push/push_rule_evaluator.py @@ -18,7 +18,7 @@ import re from typing import Any, Dict, List, Optional, Pattern, Tuple, Union from synapse.events import EventBase -from synapse.types import UserID +from synapse.types import JsonDict, UserID from synapse.util import glob_to_regex, re_word_boundary from synapse.util.caches.lrucache import LruCache @@ -129,7 +129,7 @@ class PushRuleEvaluatorForEvent: self._value_cache = _flatten_dict(event) def matches( - self, condition: Dict[str, Any], user_id: str, display_name: str + self, condition: Dict[str, Any], user_id: str, display_name: Optional[str] ) -> bool: if condition["kind"] == "event_match": return self._event_match(condition, user_id) @@ -172,7 +172,7 @@ class PushRuleEvaluatorForEvent: return _glob_matches(pattern, haystack) - def _contains_display_name(self, display_name: str) -> bool: + def _contains_display_name(self, display_name: Optional[str]) -> bool: if not display_name: return False @@ -222,7 +222,7 @@ def _glob_matches(glob: str, value: str, word_boundary: bool = False) -> bool: def _flatten_dict( - d: Union[EventBase, dict], + d: Union[EventBase, JsonDict], prefix: Optional[List[str]] = None, result: Optional[Dict[str, str]] = None, ) -> Dict[str, str]: @@ -233,7 +233,7 @@ def _flatten_dict( for key, value in d.items(): if isinstance(value, str): result[".".join(prefix + [key])] = value.lower() - elif hasattr(value, "items"): + elif isinstance(value, dict): _flatten_dict(value, prefix=(prefix + [key]), result=result) return result diff --git a/synapse/rest/client/room_batch.py b/synapse/rest/client/room_batch.py index 99f8156ad0..ab9a743bba 100644 --- a/synapse/rest/client/room_batch.py +++ b/synapse/rest/client/room_batch.py @@ -191,7 +191,7 @@ class RoomBatchSendEventRestServlet(RestServlet): depth=inherited_depth, ) - batch_id_to_connect_to = base_insertion_event["content"][ + batch_id_to_connect_to = base_insertion_event.content[ EventContentFields.MSC2716_NEXT_BATCH_ID ] diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index 98a0239759..1605411b00 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -247,7 +247,7 @@ class StateHandler: return await self.get_hosts_in_room_at_events(room_id, event_ids) async def get_hosts_in_room_at_events( - self, room_id: str, event_ids: List[str] + self, room_id: str, event_ids: Iterable[str] ) -> Set[str]: """Get the hosts that were in a room at the given event ids diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 8d9086ecf0..596275c23c 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -24,6 +24,7 @@ from typing import ( Iterable, List, Optional, + Sequence, Set, Tuple, ) @@ -494,7 +495,7 @@ class PersistEventsStore: event_chain_id_gen: SequenceGenerator, event_to_room_id: Dict[str, str], event_to_types: Dict[str, Tuple[str, str]], - event_to_auth_chain: Dict[str, List[str]], + event_to_auth_chain: Dict[str, Sequence[str]], ) -> None: """Calculate the chain cover index for the given events. @@ -786,7 +787,7 @@ class PersistEventsStore: event_chain_id_gen: SequenceGenerator, event_to_room_id: Dict[str, str], event_to_types: Dict[str, Tuple[str, str]], - event_to_auth_chain: Dict[str, List[str]], + event_to_auth_chain: Dict[str, Sequence[str]], events_to_calc_chain_id_for: Set[str], chain_map: Dict[str, Tuple[int, int]], ) -> Dict[str, Tuple[int, int]]: @@ -1794,7 +1795,7 @@ class PersistEventsStore: ) # Insert an edge for every prev_event connection - for prev_event_id in event.prev_events: + for prev_event_id in event.prev_event_ids(): self.db_pool.simple_insert_txn( txn, table="insertion_event_edges", diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 4b288bb2e7..033a9831d6 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -570,7 +570,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): async def get_joined_users_from_context( self, event: EventBase, context: EventContext - ): + ) -> Dict[str, ProfileInfo]: state_group = context.state_group if not state_group: # If state_group is None it means it has yet to be assigned a @@ -584,7 +584,9 @@ class RoomMemberWorkerStore(EventsWorkerStore): event.room_id, state_group, current_state_ids, event=event, context=context ) - async def get_joined_users_from_state(self, room_id, state_entry): + async def get_joined_users_from_state( + self, room_id, state_entry + ) -> Dict[str, ProfileInfo]: state_group = state_entry.state_group if not state_group: # If state_group is None it means it has yet to be assigned a @@ -607,7 +609,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): cache_context, event=None, context=None, - ): + ) -> Dict[str, ProfileInfo]: # We don't use `state_group`, it's there so that we can cache based # on it. However, it's important that it's never None, since two current_states # with a state_group of None are likely to be different. -- cgit 1.4.1 From 6250b95efe88385bb3ec2842d5eb76f42ef762ef Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 2 Nov 2021 15:46:48 +0000 Subject: Add index to `local_group_updates.stream_id` (#11231) This should speed up startup times and generally increase performance of groups. --- changelog.d/11231.misc | 1 + scripts/synapse_port_db | 2 ++ synapse/storage/databases/main/group_server.py | 17 ++++++++++++++++- .../schema/main/delta/65/04_local_group_updates.sql | 18 ++++++++++++++++++ 4 files changed, 37 insertions(+), 1 deletion(-) create mode 100644 changelog.d/11231.misc create mode 100644 synapse/storage/schema/main/delta/65/04_local_group_updates.sql diff --git a/changelog.d/11231.misc b/changelog.d/11231.misc new file mode 100644 index 0000000000..c7fca7071e --- /dev/null +++ b/changelog.d/11231.misc @@ -0,0 +1 @@ +Minor speed up to start up times and getting updates for groups by adding missing index to `local_group_updates.stream_id`. diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index 349866eb9a..640ff15277 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -43,6 +43,7 @@ from synapse.storage.databases.main.end_to_end_keys import EndToEndKeyBackground from synapse.storage.databases.main.events_bg_updates import ( EventsBackgroundUpdatesStore, ) +from synapse.storage.databases.main.group_server import GroupServerWorkerStore from synapse.storage.databases.main.media_repository import ( MediaRepositoryBackgroundUpdateStore, ) @@ -181,6 +182,7 @@ class Store( StatsStore, PusherWorkerStore, PresenceBackgroundUpdateStore, + GroupServerWorkerStore, ): def execute(self, f, *args, **kwargs): return self.db_pool.runInteraction(f.__name__, f, *args, **kwargs) diff --git a/synapse/storage/databases/main/group_server.py b/synapse/storage/databases/main/group_server.py index e70d3649ff..bb621df0dd 100644 --- a/synapse/storage/databases/main/group_server.py +++ b/synapse/storage/databases/main/group_server.py @@ -13,15 +13,20 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Any, Dict, List, Optional, Tuple +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple from typing_extensions import TypedDict from synapse.api.errors import SynapseError from synapse.storage._base import SQLBaseStore, db_to_json +from synapse.storage.database import DatabasePool +from synapse.storage.types import Connection from synapse.types import JsonDict from synapse.util import json_encoder +if TYPE_CHECKING: + from synapse.server import HomeServer + # The category ID for the "default" category. We don't store as null in the # database to avoid the fun of null != null _DEFAULT_CATEGORY_ID = "" @@ -35,6 +40,16 @@ class _RoomInGroup(TypedDict): class GroupServerWorkerStore(SQLBaseStore): + def __init__(self, database: DatabasePool, db_conn: Connection, hs: "HomeServer"): + database.updates.register_background_index_update( + update_name="local_group_updates_index", + index_name="local_group_updates_stream_id_index", + table="local_group_updates", + columns=("stream_id",), + unique=True, + ) + super().__init__(database, db_conn, hs) + async def get_group(self, group_id: str) -> Optional[Dict[str, Any]]: return await self.db_pool.simple_select_one( table="groups", diff --git a/synapse/storage/schema/main/delta/65/04_local_group_updates.sql b/synapse/storage/schema/main/delta/65/04_local_group_updates.sql new file mode 100644 index 0000000000..a178abfe12 --- /dev/null +++ b/synapse/storage/schema/main/delta/65/04_local_group_updates.sql @@ -0,0 +1,18 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Check index on `local_group_updates.stream_id`. +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (6504, 'local_group_updates_index', '{}'); -- cgit 1.4.1 From da0040785e81f2cb26dd7b568c9d622abf2dd21b Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 3 Nov 2021 03:13:51 -0500 Subject: Support sending no `state_events_at_start` in the MSC2716 `/batch_send` endpoint (#11188) As brought up by @tulir, https://matrix.to/#/!SBYNQlpqkwJzFIdzxI:nevarro.space/$Gwnb2ZvXHc3poYXuBhho0cmoYq4KJ11Jh3m5s8kjNOM?via=nevarro.space&via=beeper.com&via=matrix.org This use case only works if the user is already joined in the current room state at the given `?prev_event_id` --- changelog.d/11188.bugfix | 1 + synapse/rest/client/room_batch.py | 29 +++++++++++++++++------------ 2 files changed, 18 insertions(+), 12 deletions(-) create mode 100644 changelog.d/11188.bugfix diff --git a/changelog.d/11188.bugfix b/changelog.d/11188.bugfix new file mode 100644 index 0000000000..0688743c00 --- /dev/null +++ b/changelog.d/11188.bugfix @@ -0,0 +1 @@ +Allow an empty list of `state_events_at_start` to be sent when using the [MSC2716](https://github.com/matrix-org/matrix-doc/pull/2716) `/batch_send` endpoint and the author of the historical messages is already part of the current room state at the given `?prev_event_id`. diff --git a/synapse/rest/client/room_batch.py b/synapse/rest/client/room_batch.py index ab9a743bba..46f033eee2 100644 --- a/synapse/rest/client/room_batch.py +++ b/synapse/rest/client/room_batch.py @@ -131,20 +131,22 @@ class RoomBatchSendEventRestServlet(RestServlet): prev_event_ids_from_query ) + state_event_ids_at_start = [] # Create and persist all of the state events that float off on their own # before the batch. These will most likely be all of the invite/member # state events used to auth the upcoming historical messages. - state_event_ids_at_start = ( - await self.room_batch_handler.persist_state_events_at_start( - state_events_at_start=body["state_events_at_start"], - room_id=room_id, - initial_auth_event_ids=auth_event_ids, - app_service_requester=requester, + if body["state_events_at_start"]: + state_event_ids_at_start = ( + await self.room_batch_handler.persist_state_events_at_start( + state_events_at_start=body["state_events_at_start"], + room_id=room_id, + initial_auth_event_ids=auth_event_ids, + app_service_requester=requester, + ) ) - ) - # Update our ongoing auth event ID list with all of the new state we - # just created - auth_event_ids.extend(state_event_ids_at_start) + # Update our ongoing auth event ID list with all of the new state we + # just created + auth_event_ids.extend(state_event_ids_at_start) inherited_depth = await self.room_batch_handler.inherit_depth_from_prev_ids( prev_event_ids_from_query @@ -197,8 +199,11 @@ class RoomBatchSendEventRestServlet(RestServlet): # Also connect the historical event chain to the end of the floating # state chain, which causes the HS to ask for the state at the start of - # the batch later. - prev_event_ids = [state_event_ids_at_start[-1]] + # the batch later. If there is no state chain to connect to, just make + # the insertion event float itself. + prev_event_ids = [] + if len(state_event_ids_at_start): + prev_event_ids = [state_event_ids_at_start[-1]] # Create and persist all of the historical events as well as insertion # and batch meta events to make the batch navigable in the DAG. -- cgit 1.4.1 From d688a6dee5cb3fc82b63a48e5654629959a439d2 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Wed, 3 Nov 2021 11:09:00 +0000 Subject: fix a small typo in the delete room api docs --- docs/admin_api/rooms.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/admin_api/rooms.md b/docs/admin_api/rooms.md index 1fc3cc3c42..ab6b82a082 100644 --- a/docs/admin_api/rooms.md +++ b/docs/admin_api/rooms.md @@ -385,7 +385,7 @@ A response body like the following is returned: # Delete Room API -The Delete Room admin API allows server admins to remove rooms from server +The Delete Room admin API allows server admins to remove rooms from the server and block these rooms. Shuts down a room. Moves all local users and room aliases automatically to a -- cgit 1.4.1 From bcc115c28d857fa54919bbe564f37e97f7a8ac81 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 3 Nov 2021 11:10:25 +0000 Subject: Add twine and towncrier as dev dependencies (#11233) We don't pin them as we execute them as commands, rather than use them as libs. --- changelog.d/11233.misc | 1 + setup.py | 3 +++ 2 files changed, 4 insertions(+) create mode 100644 changelog.d/11233.misc diff --git a/changelog.d/11233.misc b/changelog.d/11233.misc new file mode 100644 index 0000000000..fdf9e5642e --- /dev/null +++ b/changelog.d/11233.misc @@ -0,0 +1 @@ +Add `twine` and `towncrier` as dev dependencies, as they're used by the release script. diff --git a/setup.py b/setup.py index 220084a49d..345cff09c3 100755 --- a/setup.py +++ b/setup.py @@ -132,6 +132,9 @@ CONDITIONAL_REQUIREMENTS["dev"] = ( "GitPython==3.1.14", "commonmark==0.9.1", "pygithub==1.55", + # The following are executed as commands by the release script. + "twine", + "towncrier", ] ) -- cgit 1.4.1 From 2735b3e6f203813e72ca1845225dedd7d791dbb7 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 3 Nov 2021 09:11:16 -0400 Subject: Remove a debug statement from tests. (#11239) --- changelog.d/11239.misc | 1 + tests/module_api/test_api.py | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) create mode 100644 changelog.d/11239.misc diff --git a/changelog.d/11239.misc b/changelog.d/11239.misc new file mode 100644 index 0000000000..48a796bed0 --- /dev/null +++ b/changelog.d/11239.misc @@ -0,0 +1 @@ +Remove debugging statement in tests. diff --git a/tests/module_api/test_api.py b/tests/module_api/test_api.py index 525b83141b..d16cd141a7 100644 --- a/tests/module_api/test_api.py +++ b/tests/module_api/test_api.py @@ -116,7 +116,6 @@ class ModuleApiTestCase(HomeserverTestCase): # Insert a second ip, agent at a later date. We should be able to retrieve it. last_seen_2 = last_seen_1 + 10000 - print("%s => %s" % (last_seen_1, last_seen_2)) self.get_success( self.store.insert_client_ip( user_id, "access_token", "ip_2", "user_agent_2", "device_2", last_seen_2 -- cgit 1.4.1 From af54167516c7211937efa5b800853f3088ef5178 Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Wed, 3 Nov 2021 14:25:47 +0000 Subject: Enable passing typing stream writers as a list. (#11237) This makes the typing stream writer config match the other stream writers that only currently support a single worker. --- changelog.d/11237.misc | 1 + synapse/config/workers.py | 18 +++++++++++++++--- synapse/federation/federation_server.py | 4 ---- synapse/handlers/typing.py | 6 +++--- synapse/replication/tcp/handler.py | 2 +- synapse/replication/tcp/streams/_base.py | 3 +-- synapse/rest/client/room.py | 2 +- synapse/server.py | 4 ++-- 8 files changed, 24 insertions(+), 16 deletions(-) create mode 100644 changelog.d/11237.misc diff --git a/changelog.d/11237.misc b/changelog.d/11237.misc new file mode 100644 index 0000000000..b90efc6535 --- /dev/null +++ b/changelog.d/11237.misc @@ -0,0 +1 @@ +Allow `stream_writers.typing` config to be a list of one worker. diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 462630201d..4507992031 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -63,7 +63,8 @@ class WriterLocations: Attributes: events: The instances that write to the event and backfill streams. - typing: The instance that writes to the typing stream. + typing: The instances that write to the typing stream. Currently + can only be a single instance. to_device: The instances that write to the to_device stream. Currently can only be a single instance. account_data: The instances that write to the account data streams. Currently @@ -75,9 +76,15 @@ class WriterLocations: """ events = attr.ib( - default=["master"], type=List[str], converter=_instance_to_list_converter + default=["master"], + type=List[str], + converter=_instance_to_list_converter, + ) + typing = attr.ib( + default=["master"], + type=List[str], + converter=_instance_to_list_converter, ) - typing = attr.ib(default="master", type=str) to_device = attr.ib( default=["master"], type=List[str], @@ -217,6 +224,11 @@ class WorkerConfig(Config): % (instance, stream) ) + if len(self.writers.typing) != 1: + raise ConfigError( + "Must only specify one instance to handle `typing` messages." + ) + if len(self.writers.to_device) != 1: raise ConfigError( "Must only specify one instance to handle `to_device` messages." diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 32a75993d9..42e3acecb4 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -1232,10 +1232,6 @@ class FederationHandlerRegistry: self.query_handlers[query_type] = handler - def register_instance_for_edu(self, edu_type: str, instance_name: str) -> None: - """Register that the EDU handler is on a different instance than master.""" - self._edu_type_to_instance[edu_type] = [instance_name] - def register_instances_for_edu( self, edu_type: str, instance_names: List[str] ) -> None: diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index c411d69924..22c6174821 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -62,8 +62,8 @@ class FollowerTypingHandler: if hs.should_send_federation(): self.federation = hs.get_federation_sender() - if hs.config.worker.writers.typing != hs.get_instance_name(): - hs.get_federation_registry().register_instance_for_edu( + if hs.get_instance_name() not in hs.config.worker.writers.typing: + hs.get_federation_registry().register_instances_for_edu( "m.typing", hs.config.worker.writers.typing, ) @@ -205,7 +205,7 @@ class TypingWriterHandler(FollowerTypingHandler): def __init__(self, hs: "HomeServer"): super().__init__(hs) - assert hs.config.worker.writers.typing == hs.get_instance_name() + assert hs.get_instance_name() in hs.config.worker.writers.typing self.auth = hs.get_auth() self.notifier = hs.get_notifier() diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 06fd06fdf3..21293038ef 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -138,7 +138,7 @@ class ReplicationCommandHandler: if isinstance(stream, TypingStream): # Only add TypingStream as a source on the instance in charge of # typing. - if hs.config.worker.writers.typing == hs.get_instance_name(): + if hs.get_instance_name() in hs.config.worker.writers.typing: self._streams_to_replicate.append(stream) continue diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index c8b188ae4e..743a01da08 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -328,8 +328,7 @@ class TypingStream(Stream): ROW_TYPE = TypingStreamRow def __init__(self, hs: "HomeServer"): - writer_instance = hs.config.worker.writers.typing - if writer_instance == hs.get_instance_name(): + if hs.get_instance_name() in hs.config.worker.writers.typing: # On the writer, query the typing handler typing_writer_handler = hs.get_typing_writer_handler() update_function: Callable[ diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py index ed95189b6d..6a876cfa2f 100644 --- a/synapse/rest/client/room.py +++ b/synapse/rest/client/room.py @@ -914,7 +914,7 @@ class RoomTypingRestServlet(RestServlet): # If we're not on the typing writer instance we should scream if we get # requests. self._is_typing_writer = ( - hs.config.worker.writers.typing == hs.get_instance_name() + hs.get_instance_name() in hs.config.worker.writers.typing ) async def on_PUT( diff --git a/synapse/server.py b/synapse/server.py index 0fbf36ba99..013a7bacaa 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -463,7 +463,7 @@ class HomeServer(metaclass=abc.ABCMeta): @cache_in_self def get_typing_writer_handler(self) -> TypingWriterHandler: - if self.config.worker.writers.typing == self.get_instance_name(): + if self.get_instance_name() in self.config.worker.writers.typing: return TypingWriterHandler(self) else: raise Exception("Workers cannot write typing") @@ -474,7 +474,7 @@ class HomeServer(metaclass=abc.ABCMeta): @cache_in_self def get_typing_handler(self) -> FollowerTypingHandler: - if self.config.worker.writers.typing == self.get_instance_name(): + if self.get_instance_name() in self.config.worker.writers.typing: # Use get_typing_writer_handler to ensure that we use the same # cached version. return self.get_typing_writer_handler() -- cgit 1.4.1 From a271e233e9f846193c22b6d74f33ae7d7f2c1167 Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Wed, 3 Nov 2021 16:51:00 +0000 Subject: Add a linearizer on (appservice, stream) when handling ephemeral events. (#11207) Co-authored-by: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> --- changelog.d/11207.bugfix | 1 + synapse/handlers/appservice.py | 69 +++++++++++++++++++++++++++++---------- tests/handlers/test_appservice.py | 51 +++++++++++++++++++++++++++++ 3 files changed, 103 insertions(+), 18 deletions(-) create mode 100644 changelog.d/11207.bugfix diff --git a/changelog.d/11207.bugfix b/changelog.d/11207.bugfix new file mode 100644 index 0000000000..7e98d565a1 --- /dev/null +++ b/changelog.d/11207.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug which could result in serialization errors and potentially duplicate transaction data when sending ephemeral events to application services. Contributed by @Fizzadar at Beeper. diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 67f8ffcaff..ddc9105ee9 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -34,6 +34,7 @@ from synapse.metrics.background_process_metrics import ( ) from synapse.storage.databases.main.directory import RoomAliasMapping from synapse.types import JsonDict, RoomAlias, RoomStreamToken, UserID +from synapse.util.async_helpers import Linearizer from synapse.util.metrics import Measure if TYPE_CHECKING: @@ -58,6 +59,10 @@ class ApplicationServicesHandler: self.current_max = 0 self.is_processing = False + self._ephemeral_events_linearizer = Linearizer( + name="appservice_ephemeral_events" + ) + def notify_interested_services(self, max_token: RoomStreamToken) -> None: """Notifies (pushes) all application services interested in this event. @@ -260,26 +265,37 @@ class ApplicationServicesHandler: events = await self._handle_typing(service, new_token) if events: self.scheduler.submit_ephemeral_events_for_as(service, events) + continue - elif stream_key == "receipt_key": - events = await self._handle_receipts(service) - if events: - self.scheduler.submit_ephemeral_events_for_as(service, events) - - # Persist the latest handled stream token for this appservice - await self.store.set_type_stream_id_for_appservice( - service, "read_receipt", new_token + # Since we read/update the stream position for this AS/stream + with ( + await self._ephemeral_events_linearizer.queue( + (service.id, stream_key) ) + ): + if stream_key == "receipt_key": + events = await self._handle_receipts(service, new_token) + if events: + self.scheduler.submit_ephemeral_events_for_as( + service, events + ) + + # Persist the latest handled stream token for this appservice + await self.store.set_type_stream_id_for_appservice( + service, "read_receipt", new_token + ) - elif stream_key == "presence_key": - events = await self._handle_presence(service, users) - if events: - self.scheduler.submit_ephemeral_events_for_as(service, events) + elif stream_key == "presence_key": + events = await self._handle_presence(service, users, new_token) + if events: + self.scheduler.submit_ephemeral_events_for_as( + service, events + ) - # Persist the latest handled stream token for this appservice - await self.store.set_type_stream_id_for_appservice( - service, "presence", new_token - ) + # Persist the latest handled stream token for this appservice + await self.store.set_type_stream_id_for_appservice( + service, "presence", new_token + ) async def _handle_typing( self, service: ApplicationService, new_token: int @@ -316,7 +332,9 @@ class ApplicationServicesHandler: ) return typing - async def _handle_receipts(self, service: ApplicationService) -> List[JsonDict]: + async def _handle_receipts( + self, service: ApplicationService, new_token: Optional[int] + ) -> List[JsonDict]: """ Return the latest read receipts that the given application service should receive. @@ -335,6 +353,12 @@ class ApplicationServicesHandler: from_key = await self.store.get_type_stream_id_for_appservice( service, "read_receipt" ) + if new_token is not None and new_token <= from_key: + logger.debug( + "Rejecting token lower than or equal to stored: %s" % (new_token,) + ) + return [] + receipts_source = self.event_sources.sources.receipt receipts, _ = await receipts_source.get_new_events_as( service=service, from_key=from_key @@ -342,7 +366,10 @@ class ApplicationServicesHandler: return receipts async def _handle_presence( - self, service: ApplicationService, users: Collection[Union[str, UserID]] + self, + service: ApplicationService, + users: Collection[Union[str, UserID]], + new_token: Optional[int], ) -> List[JsonDict]: """ Return the latest presence updates that the given application service should receive. @@ -365,6 +392,12 @@ class ApplicationServicesHandler: from_key = await self.store.get_type_stream_id_for_appservice( service, "presence" ) + if new_token is not None and new_token <= from_key: + logger.debug( + "Rejecting token lower than or equal to stored: %s" % (new_token,) + ) + return [] + for user in users: if isinstance(user, str): user = UserID.from_string(user) diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index 43998020b2..1f6a924452 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -40,6 +40,7 @@ class AppServiceHandlerTestCase(unittest.TestCase): hs.get_application_service_scheduler.return_value = self.mock_scheduler hs.get_clock.return_value = MockClock() self.handler = ApplicationServicesHandler(hs) + self.event_source = hs.get_event_sources() def test_notify_interested_services(self): interested_service = self._mkservice(is_interested=True) @@ -252,6 +253,56 @@ class AppServiceHandlerTestCase(unittest.TestCase): }, ) + def test_notify_interested_services_ephemeral(self): + """ + Test sending ephemeral events to the appservice handler are scheduled + to be pushed out to interested appservices, and that the stream ID is + updated accordingly. + """ + interested_service = self._mkservice(is_interested=True) + services = [interested_service] + + self.mock_store.get_app_services.return_value = services + self.mock_store.get_type_stream_id_for_appservice.return_value = make_awaitable( + 579 + ) + + event = Mock(event_id="event_1") + self.event_source.sources.receipt.get_new_events_as.return_value = ( + make_awaitable(([event], None)) + ) + + self.handler.notify_interested_services_ephemeral("receipt_key", 580) + self.mock_scheduler.submit_ephemeral_events_for_as.assert_called_once_with( + interested_service, [event] + ) + self.mock_store.set_type_stream_id_for_appservice.assert_called_once_with( + interested_service, + "read_receipt", + 580, + ) + + def test_notify_interested_services_ephemeral_out_of_order(self): + """ + Test sending out of order ephemeral events to the appservice handler + are ignored. + """ + interested_service = self._mkservice(is_interested=True) + services = [interested_service] + + self.mock_store.get_app_services.return_value = services + self.mock_store.get_type_stream_id_for_appservice.return_value = make_awaitable( + 580 + ) + + event = Mock(event_id="event_1") + self.event_source.sources.receipt.get_new_events_as.return_value = ( + make_awaitable(([event], None)) + ) + + self.handler.notify_interested_services_ephemeral("receipt_key", 579) + self.mock_scheduler.submit_ephemeral_events_for_as.assert_not_called() + def _mkservice(self, is_interested, protocols=None): service = Mock() service.is_interested.return_value = make_awaitable(is_interested) -- cgit 1.4.1 From 8eec25a1d9d656905db18a2c62a5552e63db2667 Mon Sep 17 00:00:00 2001 From: Sean Quah <8349537+squahtx@users.noreply.github.com> Date: Thu, 4 Nov 2021 10:33:53 +0000 Subject: Track ongoing event fetches correctly in the presence of failure (#11240) When an event fetcher aborts due to an exception, `_event_fetch_ongoing` must be decremented, otherwise the event fetcher would never be replaced. If enough event fetchers were to fail, no more events would be fetched and requests would get stuck waiting for events. --- changelog.d/11240.bugfix | 1 + synapse/storage/databases/main/events_worker.py | 56 +++++++++++++++---------- 2 files changed, 35 insertions(+), 22 deletions(-) create mode 100644 changelog.d/11240.bugfix diff --git a/changelog.d/11240.bugfix b/changelog.d/11240.bugfix new file mode 100644 index 0000000000..94d73f67e3 --- /dev/null +++ b/changelog.d/11240.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug where all requests that read events from the database could get stuck as a result of losing the database connection. diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index ae37901be9..c6bf316d5b 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -28,6 +28,7 @@ from typing import ( import attr from constantly import NamedConstant, Names +from prometheus_client import Gauge from typing_extensions import Literal from twisted.internet import defer @@ -81,6 +82,12 @@ EVENT_QUEUE_ITERATIONS = 3 # No. times we block waiting for requests for events EVENT_QUEUE_TIMEOUT_S = 0.1 # Timeout when waiting for requests for events +event_fetch_ongoing_gauge = Gauge( + "synapse_event_fetch_ongoing", + "The number of event fetchers that are running", +) + + @attr.s(slots=True, auto_attribs=True) class _EventCacheEntry: event: EventBase @@ -222,6 +229,7 @@ class EventsWorkerStore(SQLBaseStore): self._event_fetch_lock = threading.Condition() self._event_fetch_list = [] self._event_fetch_ongoing = 0 + event_fetch_ongoing_gauge.set(self._event_fetch_ongoing) # We define this sequence here so that it can be referenced from both # the DataStore and PersistEventStore. @@ -732,28 +740,31 @@ class EventsWorkerStore(SQLBaseStore): """Takes a database connection and waits for requests for events from the _event_fetch_list queue. """ - i = 0 - while True: - with self._event_fetch_lock: - event_list = self._event_fetch_list - self._event_fetch_list = [] - - if not event_list: - single_threaded = self.database_engine.single_threaded - if ( - not self.USE_DEDICATED_DB_THREADS_FOR_EVENT_FETCHING - or single_threaded - or i > EVENT_QUEUE_ITERATIONS - ): - self._event_fetch_ongoing -= 1 - return - else: - self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S) - i += 1 - continue - i = 0 - - self._fetch_event_list(conn, event_list) + try: + i = 0 + while True: + with self._event_fetch_lock: + event_list = self._event_fetch_list + self._event_fetch_list = [] + + if not event_list: + single_threaded = self.database_engine.single_threaded + if ( + not self.USE_DEDICATED_DB_THREADS_FOR_EVENT_FETCHING + or single_threaded + or i > EVENT_QUEUE_ITERATIONS + ): + break + else: + self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S) + i += 1 + continue + i = 0 + + self._fetch_event_list(conn, event_list) + finally: + self._event_fetch_ongoing -= 1 + event_fetch_ongoing_gauge.set(self._event_fetch_ongoing) def _fetch_event_list( self, conn: Connection, event_list: List[Tuple[List[str], defer.Deferred]] @@ -977,6 +988,7 @@ class EventsWorkerStore(SQLBaseStore): if self._event_fetch_ongoing < EVENT_QUEUE_THREADS: self._event_fetch_ongoing += 1 + event_fetch_ongoing_gauge.set(self._event_fetch_ongoing) should_start = True else: should_start = False -- cgit 1.4.1 From f36434590c1baafad4621afc9d2b583e9f89b6bb Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Thu, 4 Nov 2021 14:45:34 +0000 Subject: Additional test for `cachedList` (#11246) I was trying to understand how `cachedList` works, and ended up writing this extra test. I figure we may as well keep it. --- changelog.d/11246.misc | 1 + tests/util/caches/test_descriptors.py | 43 +++++++++++++++++++++++++++++++++++ 2 files changed, 44 insertions(+) create mode 100644 changelog.d/11246.misc diff --git a/changelog.d/11246.misc b/changelog.d/11246.misc new file mode 100644 index 0000000000..e5e912c1b0 --- /dev/null +++ b/changelog.d/11246.misc @@ -0,0 +1 @@ +Add an additional test for the `cachedList` method decorator. diff --git a/tests/util/caches/test_descriptors.py b/tests/util/caches/test_descriptors.py index 39947a166b..ced3efd93f 100644 --- a/tests/util/caches/test_descriptors.py +++ b/tests/util/caches/test_descriptors.py @@ -17,6 +17,7 @@ from typing import Set from unittest import mock from twisted.internet import defer, reactor +from twisted.internet.defer import Deferred from synapse.api.errors import SynapseError from synapse.logging.context import ( @@ -703,6 +704,48 @@ class CachedListDescriptorTestCase(unittest.TestCase): obj.mock.assert_called_once_with((40,), 2) self.assertEqual(r, {10: "fish", 40: "gravy"}) + def test_concurrent_lookups(self): + """All concurrent lookups should get the same result""" + + class Cls: + def __init__(self): + self.mock = mock.Mock() + + @descriptors.cached() + def fn(self, arg1): + pass + + @descriptors.cachedList("fn", "args1") + def list_fn(self, args1) -> "Deferred[dict]": + return self.mock(args1) + + obj = Cls() + deferred_result = Deferred() + obj.mock.return_value = deferred_result + + # start off several concurrent lookups of the same key + d1 = obj.list_fn([10]) + d2 = obj.list_fn([10]) + d3 = obj.list_fn([10]) + + # the mock should have been called exactly once + obj.mock.assert_called_once_with((10,)) + obj.mock.reset_mock() + + # ... and none of the calls should yet be complete + self.assertFalse(d1.called) + self.assertFalse(d2.called) + self.assertFalse(d3.called) + + # complete the lookup. @cachedList functions need to complete with a map + # of input->result + deferred_result.callback({10: "peas"}) + + # ... which should give the right result to all the callers + self.assertEqual(self.successResultOf(d1), {10: "peas"}) + self.assertEqual(self.successResultOf(d2), {10: "peas"}) + self.assertEqual(self.successResultOf(d3), {10: "peas"}) + @defer.inlineCallbacks def test_invalidate(self): """Make sure that invalidation callbacks are called.""" -- cgit 1.4.1 From 499c44d69685c1c1e347ff252ad08f5dfe089a83 Mon Sep 17 00:00:00 2001 From: reivilibre Date: Thu, 4 Nov 2021 17:10:11 +0000 Subject: Make minor correction to type of auth_checkers callbacks (#11253) --- changelog.d/11253.misc | 1 + docs/modules/password_auth_provider_callbacks.md | 2 +- synapse/handlers/auth.py | 4 +++- 3 files changed, 5 insertions(+), 2 deletions(-) create mode 100644 changelog.d/11253.misc diff --git a/changelog.d/11253.misc b/changelog.d/11253.misc new file mode 100644 index 0000000000..71c55a2751 --- /dev/null +++ b/changelog.d/11253.misc @@ -0,0 +1 @@ +Make minor correction to the type of `auth_checkers` callbacks. diff --git a/docs/modules/password_auth_provider_callbacks.md b/docs/modules/password_auth_provider_callbacks.md index 0de60b128a..e53abf6409 100644 --- a/docs/modules/password_auth_provider_callbacks.md +++ b/docs/modules/password_auth_provider_callbacks.md @@ -11,7 +11,7 @@ registered by using the Module API's `register_password_auth_provider_callbacks` _First introduced in Synapse v1.46.0_ ```python - auth_checkers: Dict[Tuple[str,Tuple], Callable] +auth_checkers: Dict[Tuple[str, Tuple[str, ...]], Callable] ``` A dict mapping from tuples of a login type identifier (such as `m.login.password`) and a diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index d508d7d32a..60e59d11a0 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -1989,7 +1989,9 @@ class PasswordAuthProvider: self, check_3pid_auth: Optional[CHECK_3PID_AUTH_CALLBACK] = None, on_logged_out: Optional[ON_LOGGED_OUT_CALLBACK] = None, - auth_checkers: Optional[Dict[Tuple[str, Tuple], CHECK_AUTH_CALLBACK]] = None, + auth_checkers: Optional[ + Dict[Tuple[str, Tuple[str, ...]], CHECK_AUTH_CALLBACK] + ] = None, ) -> None: # Register check_3pid_auth callback if check_3pid_auth is not None: -- cgit 1.4.1 From a37df1b091c3cc9c5549243ef02c4f2a9d90bd16 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 5 Nov 2021 11:12:10 +0000 Subject: Fix rolling back when using workers (#11255) Fixes #11252 --- changelog.d/11255.bugfix | 1 + synapse/storage/prepare_database.py | 23 ++++++------ tests/storage/test_rollback_worker.py | 69 +++++++++++++++++++++++++++++++++++ 3 files changed, 82 insertions(+), 11 deletions(-) create mode 100644 changelog.d/11255.bugfix create mode 100644 tests/storage/test_rollback_worker.py diff --git a/changelog.d/11255.bugfix b/changelog.d/11255.bugfix new file mode 100644 index 0000000000..ce72592624 --- /dev/null +++ b/changelog.d/11255.bugfix @@ -0,0 +1 @@ +Fix rolling back Synapse version when using workers. diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 1629d2a53c..b5c1c14ee3 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -133,22 +133,23 @@ def prepare_database( # if it's a worker app, refuse to upgrade the database, to avoid multiple # workers doing it at once. - if ( - config.worker.worker_app is not None - and version_info.current_version != SCHEMA_VERSION - ): + if config.worker.worker_app is None: + _upgrade_existing_database( + cur, + version_info, + database_engine, + config, + databases=databases, + ) + elif version_info.current_version < SCHEMA_VERSION: + # If the DB is on an older version than we expect the we refuse + # to start the worker (as the main process needs to run first to + # update the schema). raise UpgradeDatabaseException( OUTDATED_SCHEMA_ON_WORKER_ERROR % (SCHEMA_VERSION, version_info.current_version) ) - _upgrade_existing_database( - cur, - version_info, - database_engine, - config, - databases=databases, - ) else: logger.info("%r: Initialising new database", databases) diff --git a/tests/storage/test_rollback_worker.py b/tests/storage/test_rollback_worker.py new file mode 100644 index 0000000000..a6be9a1bb1 --- /dev/null +++ b/tests/storage/test_rollback_worker.py @@ -0,0 +1,69 @@ +# Copyright 2021 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from synapse.app.generic_worker import GenericWorkerServer +from synapse.storage.database import LoggingDatabaseConnection +from synapse.storage.prepare_database import PrepareDatabaseException, prepare_database +from synapse.storage.schema import SCHEMA_VERSION + +from tests.unittest import HomeserverTestCase + + +class WorkerSchemaTests(HomeserverTestCase): + def make_homeserver(self, reactor, clock): + hs = self.setup_test_homeserver( + federation_http_client=None, homeserver_to_use=GenericWorkerServer + ) + return hs + + def default_config(self): + conf = super().default_config() + + # Mark this as a worker app. + conf["worker_app"] = "yes" + + return conf + + def test_rolling_back(self): + """Test that workers can start if the DB is a newer schema version""" + + db_pool = self.hs.get_datastore().db_pool + db_conn = LoggingDatabaseConnection( + db_pool._db_pool.connect(), + db_pool.engine, + "tests", + ) + + cur = db_conn.cursor() + cur.execute("UPDATE schema_version SET version = ?", (SCHEMA_VERSION + 1,)) + + db_conn.commit() + + prepare_database(db_conn, db_pool.engine, self.hs.config) + + def test_not_upgraded(self): + """Test that workers don't start if the DB has an older schema version""" + db_pool = self.hs.get_datastore().db_pool + db_conn = LoggingDatabaseConnection( + db_pool._db_pool.connect(), + db_pool.engine, + "tests", + ) + + cur = db_conn.cursor() + cur.execute("UPDATE schema_version SET version = ?", (SCHEMA_VERSION - 1,)) + + db_conn.commit() + + with self.assertRaises(PrepareDatabaseException): + prepare_database(db_conn, db_pool.engine, self.hs.config) -- cgit 1.4.1 From 09cb441a043947ee367820b56d189c02f5fd35a6 Mon Sep 17 00:00:00 2001 From: Julian <374571+l00ptr@users.noreply.github.com> Date: Fri, 5 Nov 2021 13:08:02 +0100 Subject: Add doc to integrate synapse with LemonLDAP OIDC (#11257) Co-authored-by: David Robertson Co-authored-by: Julian Vanden Broeck --- changelog.d/11257.doc | 1 + docs/openid.md | 38 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 39 insertions(+) create mode 100644 changelog.d/11257.doc diff --git a/changelog.d/11257.doc b/changelog.d/11257.doc new file mode 100644 index 0000000000..1205be2add --- /dev/null +++ b/changelog.d/11257.doc @@ -0,0 +1 @@ +Add documentation for using LemonLDAP as an OpenID Connect Identity Provider. Contributed by @l00ptr. diff --git a/docs/openid.md b/docs/openid.md index 4a340ef107..c74e8bda60 100644 --- a/docs/openid.md +++ b/docs/openid.md @@ -22,6 +22,7 @@ such as [Github][github-idp]. [google-idp]: https://developers.google.com/identity/protocols/oauth2/openid-connect [auth0]: https://auth0.com/ [authentik]: https://goauthentik.io/ +[lemonldap]: https://lemonldap-ng.org/ [okta]: https://www.okta.com/ [dex-idp]: https://github.com/dexidp/dex [keycloak-idp]: https://www.keycloak.org/docs/latest/server_admin/#sso-protocols @@ -243,6 +244,43 @@ oidc_providers: display_name_template: "{{ user.preferred_username|capitalize }}" # TO BE FILLED: If your users have names in Authentik and you want those in Synapse, this should be replaced with user.name|capitalize. ``` +### LemonLDAP + +[LemonLDAP::NG][lemonldap] is an open-source IdP solution. + +1. Create an OpenID Connect Relying Parties in LemonLDAP::NG +2. The parameters are: +- Client ID under the basic menu of the new Relying Parties (`Options > Basic > + Client ID`) +- Client secret (`Options > Basic > Client secret`) +- JWT Algorithm: RS256 within the security menu of the new Relying Parties + (`Options > Security > ID Token signature algorithm` and `Options > Security > + Access Token signature algorithm`) +- Scopes: OpenID, Email and Profile +- Allowed redirection addresses for login (`Options > Basic > Allowed + redirection addresses for login` ) : + `[synapse public baseurl]/_synapse/client/oidc/callback` + +Synapse config: +```yaml +oidc_providers: + - idp_id: lemonldap + idp_name: lemonldap + discover: true + issuer: "https://auth.example.org/" # TO BE FILLED: replace with your domain + client_id: "your client id" # TO BE FILLED + client_secret: "your client secret" # TO BE FILLED + scopes: + - "openid" + - "profile" + - "email" + user_mapping_provider: + config: + localpart_template: "{{ user.preferred_username }}}" + # TO BE FILLED: If your users have names in LemonLDAP::NG and you want those in Synapse, this should be replaced with user.name|capitalize or any valid filter. + display_name_template: "{{ user.preferred_username|capitalize }}" +``` + ### GitHub [GitHub][github-idp] is a bit special as it is not an OpenID Connect compliant provider, but -- cgit 1.4.1 From 9799c569bb481622b5882b2008a24e6c4658c431 Mon Sep 17 00:00:00 2001 From: Dan Callahan Date: Sun, 7 Nov 2021 21:18:33 +0000 Subject: Minor cleanup to Debian packaging (#11269) * Remove unused Vagrant scripts * Change package Architecture to any * Preinstall the wheel package when building venvs. Addresses the following warnings during Debian builds: Using legacy 'setup.py install' for jaeger-client, since package 'wheel' is not installed. Using legacy 'setup.py install' for matrix-synapse-ldap3, since package 'wheel' is not installed. Using legacy 'setup.py install' for opentracing, since package 'wheel' is not installed. Using legacy 'setup.py install' for psycopg2, since package 'wheel' is not installed. Using legacy 'setup.py install' for systemd-python, since package 'wheel' is not installed. Using legacy 'setup.py install' for pympler, since package 'wheel' is not installed. Using legacy 'setup.py install' for threadloop, since package 'wheel' is not installed. Using legacy 'setup.py install' for thrift, since package 'wheel' is not installed. * Allow /etc/default/matrix-synapse to be missing Per the systemd.exec manpage, prefixing an EnvironmentFile with "-": > indicates that if the file does not exist, it will not be read and no > error or warning message is logged. Signed-off-by: Dan Callahan --- changelog.d/11269.misc | 1 + debian/build_virtualenv | 1 + debian/changelog | 4 ++++ debian/control | 2 +- debian/matrix-synapse.service | 2 +- debian/test/.gitignore | 2 -- debian/test/provision.sh | 24 ---------------------- debian/test/stretch/Vagrantfile | 13 ------------ debian/test/xenial/Vagrantfile | 10 --------- .../system/matrix-synapse-worker@.service | 2 +- .../system/matrix-synapse.service | 2 +- 11 files changed, 10 insertions(+), 53 deletions(-) create mode 100644 changelog.d/11269.misc delete mode 100644 debian/test/.gitignore delete mode 100644 debian/test/provision.sh delete mode 100644 debian/test/stretch/Vagrantfile delete mode 100644 debian/test/xenial/Vagrantfile diff --git a/changelog.d/11269.misc b/changelog.d/11269.misc new file mode 100644 index 0000000000..a2149c2d2d --- /dev/null +++ b/changelog.d/11269.misc @@ -0,0 +1 @@ +Clean up trivial aspects of the Debian package build tooling. diff --git a/debian/build_virtualenv b/debian/build_virtualenv index 3097371d59..e691163619 100755 --- a/debian/build_virtualenv +++ b/debian/build_virtualenv @@ -40,6 +40,7 @@ dh_virtualenv \ --upgrade-pip \ --preinstall="lxml" \ --preinstall="mock" \ + --preinstall="wheel" \ --extra-pip-arg="--no-cache-dir" \ --extra-pip-arg="--compile" \ --extras="all,systemd,test" diff --git a/debian/changelog b/debian/changelog index 14748f8c25..7e41bde858 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,6 +1,10 @@ matrix-synapse-py3 (1.47.0+nmu1) UNRELEASED; urgency=medium * Update scripts to pass Shellcheck lints. + * Remove unused Vagrant scripts from debian/ directory. + * Change package Architecture to any. + * Preinstall the "wheel" package when building virtualenvs. + * Do not error if /etc/default/matrix-synapse is missing. -- root Fri, 22 Oct 2021 22:20:31 +0000 diff --git a/debian/control b/debian/control index 763fabd6f6..412a9e1d4c 100644 --- a/debian/control +++ b/debian/control @@ -19,7 +19,7 @@ Standards-Version: 3.9.8 Homepage: https://github.com/matrix-org/synapse Package: matrix-synapse-py3 -Architecture: amd64 +Architecture: any Provides: matrix-synapse Conflicts: matrix-synapse (<< 0.34.0.1-0matrix2), diff --git a/debian/matrix-synapse.service b/debian/matrix-synapse.service index 553babf549..bde1c6cb9f 100644 --- a/debian/matrix-synapse.service +++ b/debian/matrix-synapse.service @@ -5,7 +5,7 @@ Description=Synapse Matrix homeserver Type=notify User=matrix-synapse WorkingDirectory=/var/lib/matrix-synapse -EnvironmentFile=/etc/default/matrix-synapse +EnvironmentFile=-/etc/default/matrix-synapse ExecStartPre=/opt/venvs/matrix-synapse/bin/python -m synapse.app.homeserver --config-path=/etc/matrix-synapse/homeserver.yaml --config-path=/etc/matrix-synapse/conf.d/ --generate-keys ExecStart=/opt/venvs/matrix-synapse/bin/python -m synapse.app.homeserver --config-path=/etc/matrix-synapse/homeserver.yaml --config-path=/etc/matrix-synapse/conf.d/ ExecReload=/bin/kill -HUP $MAINPID diff --git a/debian/test/.gitignore b/debian/test/.gitignore deleted file mode 100644 index 95eda73fcc..0000000000 --- a/debian/test/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -.vagrant -*.log diff --git a/debian/test/provision.sh b/debian/test/provision.sh deleted file mode 100644 index 55d7b8e03a..0000000000 --- a/debian/test/provision.sh +++ /dev/null @@ -1,24 +0,0 @@ -#!/bin/bash -# -# provisioning script for vagrant boxes for testing the matrix-synapse debs. -# -# Will install the most recent matrix-synapse-py3 deb for this platform from -# the /debs directory. - -set -e - -apt-get update -apt-get install -y lsb-release - -deb=$(find /debs -name "matrix-synapse-py3_*+$(lsb_release -cs)*.deb" | sort | tail -n1) - -debconf-set-selections <> /etc/matrix-synapse/homeserver.yaml -systemctl restart matrix-synapse diff --git a/debian/test/stretch/Vagrantfile b/debian/test/stretch/Vagrantfile deleted file mode 100644 index d8eff6fe11..0000000000 --- a/debian/test/stretch/Vagrantfile +++ /dev/null @@ -1,13 +0,0 @@ -# -*- mode: ruby -*- -# vi: set ft=ruby : - -ver = `cd ../../..; dpkg-parsechangelog -S Version`.strip() - -Vagrant.configure("2") do |config| - config.vm.box = "debian/stretch64" - - config.vm.synced_folder ".", "/vagrant", disabled: true - config.vm.synced_folder "../../../../debs", "/debs", type: "nfs" - - config.vm.provision "shell", path: "../provision.sh" -end diff --git a/debian/test/xenial/Vagrantfile b/debian/test/xenial/Vagrantfile deleted file mode 100644 index 189236da17..0000000000 --- a/debian/test/xenial/Vagrantfile +++ /dev/null @@ -1,10 +0,0 @@ -# -*- mode: ruby -*- -# vi: set ft=ruby : - -Vagrant.configure("2") do |config| - config.vm.box = "ubuntu/xenial64" - - config.vm.synced_folder ".", "/vagrant", disabled: true - config.vm.synced_folder "../../../../debs", "/debs" - config.vm.provision "shell", path: "../provision.sh" -end diff --git a/docs/systemd-with-workers/system/matrix-synapse-worker@.service b/docs/systemd-with-workers/system/matrix-synapse-worker@.service index d164e8ce1f..8f5c44c9d4 100644 --- a/docs/systemd-with-workers/system/matrix-synapse-worker@.service +++ b/docs/systemd-with-workers/system/matrix-synapse-worker@.service @@ -15,7 +15,7 @@ Type=notify NotifyAccess=main User=matrix-synapse WorkingDirectory=/var/lib/matrix-synapse -EnvironmentFile=/etc/default/matrix-synapse +EnvironmentFile=-/etc/default/matrix-synapse ExecStart=/opt/venvs/matrix-synapse/bin/python -m synapse.app.generic_worker --config-path=/etc/matrix-synapse/homeserver.yaml --config-path=/etc/matrix-synapse/conf.d/ --config-path=/etc/matrix-synapse/workers/%i.yaml ExecReload=/bin/kill -HUP $MAINPID Restart=always diff --git a/docs/systemd-with-workers/system/matrix-synapse.service b/docs/systemd-with-workers/system/matrix-synapse.service index f6b6dfd3ce..0c73fb55fb 100644 --- a/docs/systemd-with-workers/system/matrix-synapse.service +++ b/docs/systemd-with-workers/system/matrix-synapse.service @@ -10,7 +10,7 @@ Type=notify NotifyAccess=main User=matrix-synapse WorkingDirectory=/var/lib/matrix-synapse -EnvironmentFile=/etc/default/matrix-synapse +EnvironmentFile=-/etc/default/matrix-synapse ExecStartPre=/opt/venvs/matrix-synapse/bin/python -m synapse.app.homeserver --config-path=/etc/matrix-synapse/homeserver.yaml --config-path=/etc/matrix-synapse/conf.d/ --generate-keys ExecStart=/opt/venvs/matrix-synapse/bin/python -m synapse.app.homeserver --config-path=/etc/matrix-synapse/homeserver.yaml --config-path=/etc/matrix-synapse/conf.d/ ExecReload=/bin/kill -HUP $MAINPID -- cgit 1.4.1 From 98c8fc6ce82d9d6b1bd21bf70df6a0e1ce91c1dc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 8 Nov 2021 09:54:47 +0000 Subject: Handle federation inbound instances being killed more gracefully (#11262) * Make lock better handle process being killed If the process gets killed and restarted (so that it didn't have a chance to drop its locks gracefully) then there may still be locks in the DB that are for the same instance that haven't yet timed out but are safe to delete. We handle this case by a) checking if the current instance already has taken out the lock, and b) if not then ignoring locks that are for the same instance. * Periodically check for old staged events This is to protect against other instances dying and their locks timing out. --- changelog.d/11262.bugfix | 1 + synapse/federation/federation_server.py | 5 +++++ synapse/storage/databases/main/lock.py | 31 +++++++++++++++++++++---------- 3 files changed, 27 insertions(+), 10 deletions(-) create mode 100644 changelog.d/11262.bugfix diff --git a/changelog.d/11262.bugfix b/changelog.d/11262.bugfix new file mode 100644 index 0000000000..768fbb8973 --- /dev/null +++ b/changelog.d/11262.bugfix @@ -0,0 +1 @@ +Fix a bug where if a remote event is being processed by a worker when it gets killed then it won't get processed on restart. Introduced in v1.37.1. diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 42e3acecb4..9a8758e9a6 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -213,6 +213,11 @@ class FederationServer(FederationBase): self._started_handling_of_staged_events = True self._handle_old_staged_events() + # Start a periodic check for old staged events. This is to handle + # the case where locks time out, e.g. if another process gets killed + # without dropping its locks. + self._clock.looping_call(self._handle_old_staged_events, 60 * 1000) + # keep this as early as possible to make the calculated origin ts as # accurate as possible. request_time = self._clock.time_msec() diff --git a/synapse/storage/databases/main/lock.py b/synapse/storage/databases/main/lock.py index 3d1dff660b..3d0df0cbd4 100644 --- a/synapse/storage/databases/main/lock.py +++ b/synapse/storage/databases/main/lock.py @@ -14,6 +14,7 @@ import logging from types import TracebackType from typing import TYPE_CHECKING, Dict, Optional, Tuple, Type +from weakref import WeakValueDictionary from twisted.internet.interfaces import IReactorCore @@ -61,7 +62,7 @@ class LockStore(SQLBaseStore): # A map from `(lock_name, lock_key)` to the token of any locks that we # think we currently hold. - self._live_tokens: Dict[Tuple[str, str], str] = {} + self._live_tokens: Dict[Tuple[str, str], Lock] = WeakValueDictionary() # When we shut down we want to remove the locks. Technically this can # lead to a race, as we may drop the lock while we are still processing. @@ -80,10 +81,10 @@ class LockStore(SQLBaseStore): # We need to take a copy of the tokens dict as dropping the locks will # cause the dictionary to change. - tokens = dict(self._live_tokens) + locks = dict(self._live_tokens) - for (lock_name, lock_key), token in tokens.items(): - await self._drop_lock(lock_name, lock_key, token) + for lock in locks.values(): + await lock.release() logger.info("Dropped locks due to shutdown") @@ -93,6 +94,11 @@ class LockStore(SQLBaseStore): used (otherwise the lock will leak). """ + # Check if this process has taken out a lock and if it's still valid. + lock = self._live_tokens.get((lock_name, lock_key)) + if lock and await lock.is_still_valid(): + return None + now = self._clock.time_msec() token = random_string(6) @@ -100,7 +106,9 @@ class LockStore(SQLBaseStore): def _try_acquire_lock_txn(txn: LoggingTransaction) -> bool: # We take out the lock if either a) there is no row for the lock - # already or b) the existing row has timed out. + # already, b) the existing row has timed out, or c) the row is + # for this instance (which means the process got killed and + # restarted) sql = """ INSERT INTO worker_locks (lock_name, lock_key, instance_name, token, last_renewed_ts) VALUES (?, ?, ?, ?, ?) @@ -112,6 +120,7 @@ class LockStore(SQLBaseStore): last_renewed_ts = EXCLUDED.last_renewed_ts WHERE worker_locks.last_renewed_ts < ? + OR worker_locks.instance_name = EXCLUDED.instance_name """ txn.execute( sql, @@ -148,11 +157,11 @@ class LockStore(SQLBaseStore): WHERE lock_name = ? AND lock_key = ? - AND last_renewed_ts < ? + AND (last_renewed_ts < ? OR instance_name = ?) """ txn.execute( sql, - (lock_name, lock_key, now - _LOCK_TIMEOUT_MS), + (lock_name, lock_key, now - _LOCK_TIMEOUT_MS, self._instance_name), ) inserted = self.db_pool.simple_upsert_txn_emulated( @@ -179,9 +188,7 @@ class LockStore(SQLBaseStore): if not did_lock: return None - self._live_tokens[(lock_name, lock_key)] = token - - return Lock( + lock = Lock( self._reactor, self._clock, self, @@ -190,6 +197,10 @@ class LockStore(SQLBaseStore): token=token, ) + self._live_tokens[(lock_name, lock_key)] = lock + + return lock + async def _is_lock_still_valid( self, lock_name: str, lock_key: str, token: str ) -> bool: -- cgit 1.4.1 From a55e1ec9afc45a36f962e1e552128aeab5b1ee3c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 8 Nov 2021 10:37:43 +0000 Subject: Blacklist new sytest validation test (#11270) --- changelog.d/11270.misc | 1 + sytest-blacklist | 3 +++ 2 files changed, 4 insertions(+) create mode 100644 changelog.d/11270.misc diff --git a/changelog.d/11270.misc b/changelog.d/11270.misc new file mode 100644 index 0000000000..e2181b9b2a --- /dev/null +++ b/changelog.d/11270.misc @@ -0,0 +1 @@ +Blacklist new SyTest that checks that key uploads are valid pending the validation being implemented in Synapse. diff --git a/sytest-blacklist b/sytest-blacklist index 65bf1774e3..57e603a4a6 100644 --- a/sytest-blacklist +++ b/sytest-blacklist @@ -32,3 +32,6 @@ We can't peek into rooms with invited history_visibility We can't peek into rooms with joined history_visibility Local users can peek by room alias Peeked rooms only turn up in the sync for the device who peeked them + +# Validation needs to be added to Synapse: #10554 +Rejects invalid device keys -- cgit 1.4.1