From d4a7829b12197faf52eb487c443ee09acafeb37e Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 6 Aug 2020 08:30:06 -0400 Subject: Convert synapse.api to async/await (#8031) --- tests/rest/client/v1/test_rooms.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) (limited to 'tests/rest/client/v1/test_rooms.py') diff --git a/tests/rest/client/v1/test_rooms.py b/tests/rest/client/v1/test_rooms.py index 5ccda8b2bd..ef6b775ed2 100644 --- a/tests/rest/client/v1/test_rooms.py +++ b/tests/rest/client/v1/test_rooms.py @@ -23,8 +23,6 @@ from urllib import parse as urlparse from mock import Mock -from twisted.internet import defer - import synapse.rest.admin from synapse.api.constants import EventContentFields, EventTypes, Membership from synapse.handlers.pagination import PurgeStatus @@ -51,8 +49,8 @@ class RoomBase(unittest.HomeserverTestCase): self.hs.get_federation_handler = Mock(return_value=Mock()) - def _insert_client_ip(*args, **kwargs): - return defer.succeed(None) + async def _insert_client_ip(*args, **kwargs): + return None self.hs.get_datastore().insert_client_ip = _insert_client_ip -- cgit 1.5.1 From e259d63f73fd7599520d0c4a6f5082e5cd383d25 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 20 Aug 2020 15:07:42 -0400 Subject: Stop shadow-banned users from sending invites. (#8095) --- changelog.d/8095.feature | 1 + synapse/api/errors.py | 8 +++ synapse/handlers/room.py | 16 +++++- synapse/handlers/room_member.py | 62 ++++++++++++++++++++++- synapse/rest/admin/rooms.py | 3 ++ synapse/rest/client/v1/room.py | 67 +++++++++++++++---------- tests/rest/client/v1/test_rooms.py | 100 +++++++++++++++++++++++++++++++++++++ 7 files changed, 226 insertions(+), 31 deletions(-) create mode 100644 changelog.d/8095.feature (limited to 'tests/rest/client/v1/test_rooms.py') diff --git a/changelog.d/8095.feature b/changelog.d/8095.feature new file mode 100644 index 0000000000..813e6d0903 --- /dev/null +++ b/changelog.d/8095.feature @@ -0,0 +1 @@ +Add support for shadow-banning users (ignoring any message send requests). diff --git a/synapse/api/errors.py b/synapse/api/errors.py index a3f314118a..4888c0ec4d 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -604,3 +604,11 @@ class HttpResponseException(CodeMessageException): errmsg = j.pop("error", self.msg) return ProxiedRequestError(self.code, errmsg, errcode, j) + + +class ShadowBanError(Exception): + """ + Raised when a shadow-banned user attempts to perform an action. + + This should be caught and a proper "fake" success response sent to the user. + """ diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 442cca28e6..0fc71475c3 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -20,6 +20,7 @@ import itertools import logging import math +import random import string from collections import OrderedDict from typing import TYPE_CHECKING, Any, Awaitable, Dict, List, Optional, Tuple @@ -626,6 +627,7 @@ class RoomCreationHandler(BaseHandler): if mapping: raise SynapseError(400, "Room alias already taken", Codes.ROOM_IN_USE) + invite_3pid_list = config.get("invite_3pid", []) invite_list = config.get("invite", []) for i in invite_list: try: @@ -634,6 +636,14 @@ class RoomCreationHandler(BaseHandler): except Exception: raise SynapseError(400, "Invalid user_id: %s" % (i,)) + if (invite_list or invite_3pid_list) and requester.shadow_banned: + # We randomly sleep a bit just to annoy the requester. + await self.clock.sleep(random.randint(1, 10)) + + # Allow the request to go through, but remove any associated invites. + invite_3pid_list = [] + invite_list = [] + await self.event_creation_handler.assert_accepted_privacy_policy(requester) power_level_content_override = config.get("power_level_content_override") @@ -648,8 +658,6 @@ class RoomCreationHandler(BaseHandler): % (user_id,), ) - invite_3pid_list = config.get("invite_3pid", []) - visibility = config.get("visibility", None) is_public = visibility == "public" @@ -744,6 +752,8 @@ class RoomCreationHandler(BaseHandler): if is_direct: content["is_direct"] = is_direct + # Note that update_membership with an action of "invite" can raise a + # ShadowBanError, but this was handled above by emptying invite_list. _, last_stream_id = await self.room_member_handler.update_membership( requester, UserID.from_string(invitee), @@ -758,6 +768,8 @@ class RoomCreationHandler(BaseHandler): id_access_token = invite_3pid.get("id_access_token") # optional address = invite_3pid["address"] medium = invite_3pid["medium"] + # Note that do_3pid_invite can raise a ShadowBanError, but this was + # handled above by emptying invite_3pid_list. last_stream_id = await self.hs.get_room_member_handler().do_3pid_invite( room_id, requester.user, diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index aa1ccde211..3a6ee6378d 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -15,6 +15,7 @@ import abc import logging +import random from http import HTTPStatus from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple, Union @@ -22,7 +23,13 @@ from unpaddedbase64 import encode_base64 from synapse import types from synapse.api.constants import MAX_DEPTH, EventTypes, Membership -from synapse.api.errors import AuthError, Codes, LimitExceededError, SynapseError +from synapse.api.errors import ( + AuthError, + Codes, + LimitExceededError, + ShadowBanError, + SynapseError, +) from synapse.api.ratelimiting import Ratelimiter from synapse.api.room_versions import EventFormatVersions from synapse.crypto.event_signing import compute_event_reference_hash @@ -285,6 +292,31 @@ class RoomMemberHandler(object): content: Optional[dict] = None, require_consent: bool = True, ) -> Tuple[str, int]: + """Update a user's membership in a room. + + Params: + requester: The user who is performing the update. + target: The user whose membership is being updated. + room_id: The room ID whose membership is being updated. + action: The membership change, see synapse.api.constants.Membership. + txn_id: The transaction ID, if given. + remote_room_hosts: Remote servers to send the update to. + third_party_signed: Information from a 3PID invite. + ratelimit: Whether to rate limit the request. + content: The content of the created event. + require_consent: Whether consent is required. + + Returns: + A tuple of the new event ID and stream ID. + + Raises: + ShadowBanError if a shadow-banned requester attempts to send an invite. + """ + if action == Membership.INVITE and requester.shadow_banned: + # We randomly sleep a bit just to annoy the requester. + await self.clock.sleep(random.randint(1, 10)) + raise ShadowBanError() + key = (room_id,) with (await self.member_linearizer.queue(key)): @@ -773,6 +805,25 @@ class RoomMemberHandler(object): txn_id: Optional[str], id_access_token: Optional[str] = None, ) -> int: + """Invite a 3PID to a room. + + Args: + room_id: The room to invite the 3PID to. + inviter: The user sending the invite. + medium: The 3PID's medium. + address: The 3PID's address. + id_server: The identity server to use. + requester: The user making the request. + txn_id: The transaction ID this is part of, or None if this is not + part of a transaction. + id_access_token: The optional identity server access token. + + Returns: + The new stream ID. + + Raises: + ShadowBanError if the requester has been shadow-banned. + """ if self.config.block_non_admin_invites: is_requester_admin = await self.auth.is_server_admin(requester.user) if not is_requester_admin: @@ -780,6 +831,11 @@ class RoomMemberHandler(object): 403, "Invites have been disabled on this server", Codes.FORBIDDEN ) + if requester.shadow_banned: + # We randomly sleep a bit just to annoy the requester. + await self.clock.sleep(random.randint(1, 10)) + raise ShadowBanError() + # We need to rate limit *before* we send out any 3PID invites, so we # can't just rely on the standard ratelimiting of events. await self.base_handler.ratelimit(requester) @@ -804,6 +860,8 @@ class RoomMemberHandler(object): ) if invitee: + # Note that update_membership with an action of "invite" can raise + # a ShadowBanError, but this was done above already. _, stream_id = await self.update_membership( requester, UserID.from_string(invitee), room_id, "invite", txn_id=txn_id ) @@ -1042,7 +1100,7 @@ class RoomMemberMasterHandler(RoomMemberHandler): return event_id, stream_id # The room is too large. Leave. - requester = types.create_requester(user, None, False, None) + requester = types.create_requester(user, None, False, False, None) await self.update_membership( requester=requester, target=user, room_id=room_id, action="leave" ) diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py index 7c292ef3f9..09726d52d6 100644 --- a/synapse/rest/admin/rooms.py +++ b/synapse/rest/admin/rooms.py @@ -316,6 +316,9 @@ class JoinRoomAliasServlet(RestServlet): join_rules_event = room_state.get((EventTypes.JoinRules, "")) if join_rules_event: if not (join_rules_event.content.get("join_rule") == JoinRules.PUBLIC): + # update_membership with an action of "invite" can raise a + # ShadowBanError. This is not handled since it is assumed that + # an admin isn't going to call this API with a shadow-banned user. await self.room_member_handler.update_membership( requester=requester, target=fake_requester.user, diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index f216382636..a9dd3a6aec 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -27,6 +27,7 @@ from synapse.api.errors import ( Codes, HttpResponseException, InvalidClientCredentialsError, + ShadowBanError, SynapseError, ) from synapse.api.filtering import Filter @@ -45,6 +46,7 @@ from synapse.storage.state import StateFilter from synapse.streams.config import PaginationConfig from synapse.types import RoomAlias, RoomID, StreamToken, ThirdPartyInstanceID, UserID from synapse.util import json_decoder +from synapse.util.stringutils import random_string MYPY = False if MYPY: @@ -200,14 +202,17 @@ class RoomStateEventRestServlet(TransactionRestServlet): event_dict["state_key"] = state_key if event_type == EventTypes.Member: - membership = content.get("membership", None) - event_id, _ = await self.room_member_handler.update_membership( - requester, - target=UserID.from_string(state_key), - room_id=room_id, - action=membership, - content=content, - ) + try: + membership = content.get("membership", None) + event_id, _ = await self.room_member_handler.update_membership( + requester, + target=UserID.from_string(state_key), + room_id=room_id, + action=membership, + content=content, + ) + except ShadowBanError: + event_id = "$" + random_string(43) else: ( event, @@ -719,16 +724,20 @@ class RoomMembershipRestServlet(TransactionRestServlet): content = {} if membership_action == "invite" and self._has_3pid_invite_keys(content): - await self.room_member_handler.do_3pid_invite( - room_id, - requester.user, - content["medium"], - content["address"], - content["id_server"], - requester, - txn_id, - content.get("id_access_token"), - ) + try: + await self.room_member_handler.do_3pid_invite( + room_id, + requester.user, + content["medium"], + content["address"], + content["id_server"], + requester, + txn_id, + content.get("id_access_token"), + ) + except ShadowBanError: + # Pretend the request succeeded. + pass return 200, {} target = requester.user @@ -740,15 +749,19 @@ class RoomMembershipRestServlet(TransactionRestServlet): if "reason" in content: event_content = {"reason": content["reason"]} - await self.room_member_handler.update_membership( - requester=requester, - target=target, - room_id=room_id, - action=membership_action, - txn_id=txn_id, - third_party_signed=content.get("third_party_signed", None), - content=event_content, - ) + try: + await self.room_member_handler.update_membership( + requester=requester, + target=target, + room_id=room_id, + action=membership_action, + txn_id=txn_id, + third_party_signed=content.get("third_party_signed", None), + content=event_content, + ) + except ShadowBanError: + # Pretend the request succeeded. + pass return_value = {} diff --git a/tests/rest/client/v1/test_rooms.py b/tests/rest/client/v1/test_rooms.py index ef6b775ed2..e674eb90d7 100644 --- a/tests/rest/client/v1/test_rooms.py +++ b/tests/rest/client/v1/test_rooms.py @@ -1974,3 +1974,103 @@ class RoomCanonicalAliasTestCase(unittest.HomeserverTestCase): """An alias which does not point to the room raises a SynapseError.""" self._set_canonical_alias({"alias": "@unknown:test"}, expected_code=400) self._set_canonical_alias({"alt_aliases": ["@unknown:test"]}, expected_code=400) + + +class ShadowBannedTestCase(unittest.HomeserverTestCase): + servlets = [ + synapse.rest.admin.register_servlets_for_client_rest_resource, + directory.register_servlets, + login.register_servlets, + room.register_servlets, + ] + + def prepare(self, reactor, clock, homeserver): + self.banned_user_id = self.register_user("banned", "test") + self.banned_access_token = self.login("banned", "test") + + self.store = self.hs.get_datastore() + + self.get_success( + self.store.db_pool.simple_update( + table="users", + keyvalues={"name": self.banned_user_id}, + updatevalues={"shadow_banned": True}, + desc="shadow_ban", + ) + ) + + self.other_user_id = self.register_user("otheruser", "pass") + self.other_access_token = self.login("otheruser", "pass") + + def test_invite(self): + """Invites from shadow-banned users don't actually get sent.""" + + # The create works fine. + room_id = self.helper.create_room_as( + self.banned_user_id, tok=self.banned_access_token + ) + + # Inviting the user completes successfully. + self.helper.invite( + room=room_id, + src=self.banned_user_id, + tok=self.banned_access_token, + targ=self.other_user_id, + ) + + # But the user wasn't actually invited. + invited_rooms = self.get_success( + self.store.get_invited_rooms_for_local_user(self.other_user_id) + ) + self.assertEqual(invited_rooms, []) + + def test_invite_3pid(self): + """Ensure that a 3PID invite does not attempt to contact the identity server.""" + identity_handler = self.hs.get_handlers().identity_handler + identity_handler.lookup_3pid = Mock( + side_effect=AssertionError("This should not get called") + ) + + # The create works fine. + room_id = self.helper.create_room_as( + self.banned_user_id, tok=self.banned_access_token + ) + + # Inviting the user completes successfully. + request, channel = self.make_request( + "POST", + "/rooms/%s/invite" % (room_id,), + {"id_server": "test", "medium": "email", "address": "test@test.test"}, + access_token=self.banned_access_token, + ) + self.render(request) + self.assertEquals(200, channel.code, channel.result) + + # This should have raised an error earlier, but double check this wasn't called. + identity_handler.lookup_3pid.assert_not_called() + + def test_create_room(self): + """Invitations during a room creation should be discarded, but the room still gets created.""" + # The room creation is successful. + request, channel = self.make_request( + "POST", + "/_matrix/client/r0/createRoom", + {"visibility": "public", "invite": [self.other_user_id]}, + access_token=self.banned_access_token, + ) + self.render(request) + self.assertEquals(200, channel.code, channel.result) + room_id = channel.json_body["room_id"] + + # But the user wasn't actually invited. + invited_rooms = self.get_success( + self.store.get_invited_rooms_for_local_user(self.other_user_id) + ) + self.assertEqual(invited_rooms, []) + + # Since a real room was created, the other user should be able to join it. + self.helper.join(room_id, self.other_user_id, tok=self.other_access_token) + + # Both users should be in the room. + users = self.get_success(self.store.get_users_in_room(room_id)) + self.assertCountEqual(users, ["@banned:test", "@otheruser:test"]) -- cgit 1.5.1 From 393a811a41d51d7967f6d455017176a20eacd85c Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Mon, 24 Aug 2020 18:06:04 +0100 Subject: Fix join ratelimiter breaking profile updates and idempotency (#8153) --- changelog.d/8153.bugfix | 1 + synapse/handlers/room_member.py | 46 +++++++++++--------- tests/rest/client/v1/test_rooms.py | 87 +++++++++++++++++++++++++++++++++++++- tests/rest/client/v1/utils.py | 10 +++-- 4 files changed, 119 insertions(+), 25 deletions(-) create mode 100644 changelog.d/8153.bugfix (limited to 'tests/rest/client/v1/test_rooms.py') diff --git a/changelog.d/8153.bugfix b/changelog.d/8153.bugfix new file mode 100644 index 0000000000..87a1f46ca1 --- /dev/null +++ b/changelog.d/8153.bugfix @@ -0,0 +1 @@ +Fix a bug introduced in v1.19.0 that would cause e.g. profile updates to fail due to incorrect application of rate limits on join requests. diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 0cd59bce3b..9fcabb22c7 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -210,24 +210,40 @@ class RoomMemberHandler(object): _, stream_id = await self.store.get_event_ordering(duplicate.event_id) return duplicate.event_id, stream_id - stream_id = await self.event_creation_handler.handle_new_client_event( - requester, event, context, extra_users=[target], ratelimit=ratelimit, - ) - prev_state_ids = await context.get_prev_state_ids() prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None) + newly_joined = False if event.membership == Membership.JOIN: - # Only fire user_joined_room if the user has actually joined the - # room. Don't bother if the user is just changing their profile - # info. newly_joined = True if prev_member_event_id: prev_member_event = await self.store.get_event(prev_member_event_id) newly_joined = prev_member_event.membership != Membership.JOIN + + # Only rate-limit if the user actually joined the room, otherwise we'll end + # up blocking profile updates. if newly_joined: - await self._user_joined_room(target, room_id) + time_now_s = self.clock.time() + ( + allowed, + time_allowed, + ) = self._join_rate_limiter_local.can_requester_do_action(requester) + + if not allowed: + raise LimitExceededError( + retry_after_ms=int(1000 * (time_allowed - time_now_s)) + ) + + stream_id = await self.event_creation_handler.handle_new_client_event( + requester, event, context, extra_users=[target], ratelimit=ratelimit, + ) + + if event.membership == Membership.JOIN and newly_joined: + # Only fire user_joined_room if the user has actually joined the + # room. Don't bother if the user is just changing their profile + # info. + await self._user_joined_room(target, room_id) elif event.membership == Membership.LEAVE: if prev_member_event_id: prev_member_event = await self.store.get_event(prev_member_event_id) @@ -457,19 +473,7 @@ class RoomMemberHandler(object): # so don't really fit into the general auth process. raise AuthError(403, "Guest access not allowed") - if is_host_in_room: - time_now_s = self.clock.time() - ( - allowed, - time_allowed, - ) = self._join_rate_limiter_local.can_requester_do_action(requester,) - - if not allowed: - raise LimitExceededError( - retry_after_ms=int(1000 * (time_allowed - time_now_s)) - ) - - else: + if not is_host_in_room: time_now_s = self.clock.time() ( allowed, diff --git a/tests/rest/client/v1/test_rooms.py b/tests/rest/client/v1/test_rooms.py index ef6b775ed2..e74bddc1e5 100644 --- a/tests/rest/client/v1/test_rooms.py +++ b/tests/rest/client/v1/test_rooms.py @@ -28,7 +28,7 @@ from synapse.api.constants import EventContentFields, EventTypes, Membership from synapse.handlers.pagination import PurgeStatus from synapse.rest.client.v1 import directory, login, profile, room from synapse.rest.client.v2_alpha import account -from synapse.types import JsonDict, RoomAlias +from synapse.types import JsonDict, RoomAlias, UserID from synapse.util.stringutils import random_string from tests import unittest @@ -675,6 +675,91 @@ class RoomMemberStateTestCase(RoomBase): self.assertEquals(json.loads(content), channel.json_body) +class RoomJoinRatelimitTestCase(RoomBase): + user_id = "@sid1:red" + + servlets = [ + profile.register_servlets, + room.register_servlets, + ] + + @unittest.override_config( + {"rc_joins": {"local": {"per_second": 3, "burst_count": 3}}} + ) + def test_join_local_ratelimit(self): + """Tests that local joins are actually rate-limited.""" + for i in range(5): + self.helper.create_room_as(self.user_id) + + self.helper.create_room_as(self.user_id, expect_code=429) + + @unittest.override_config( + {"rc_joins": {"local": {"per_second": 3, "burst_count": 3}}} + ) + def test_join_local_ratelimit_profile_change(self): + """Tests that sending a profile update into all of the user's joined rooms isn't + rate-limited by the rate-limiter on joins.""" + + # Create and join more rooms than the rate-limiting config allows in a second. + room_ids = [ + self.helper.create_room_as(self.user_id), + self.helper.create_room_as(self.user_id), + self.helper.create_room_as(self.user_id), + ] + self.reactor.advance(1) + room_ids = room_ids + [ + self.helper.create_room_as(self.user_id), + self.helper.create_room_as(self.user_id), + self.helper.create_room_as(self.user_id), + ] + + # Create a profile for the user, since it hasn't been done on registration. + store = self.hs.get_datastore() + store.create_profile(UserID.from_string(self.user_id).localpart) + + # Update the display name for the user. + path = "/_matrix/client/r0/profile/%s/displayname" % self.user_id + request, channel = self.make_request("PUT", path, {"displayname": "John Doe"}) + self.render(request) + self.assertEquals(channel.code, 200, channel.json_body) + + # Check that all the rooms have been sent a profile update into. + for room_id in room_ids: + path = "/_matrix/client/r0/rooms/%s/state/m.room.member/%s" % ( + room_id, + self.user_id, + ) + + request, channel = self.make_request("GET", path) + self.render(request) + self.assertEquals(channel.code, 200) + + self.assertIn("displayname", channel.json_body) + self.assertEquals(channel.json_body["displayname"], "John Doe") + + @unittest.override_config( + {"rc_joins": {"local": {"per_second": 3, "burst_count": 3}}} + ) + def test_join_local_ratelimit_idempotent(self): + """Tests that the room join endpoints remain idempotent despite rate-limiting + on room joins.""" + room_id = self.helper.create_room_as(self.user_id) + + # Let's test both paths to be sure. + paths_to_test = [ + "/_matrix/client/r0/rooms/%s/join", + "/_matrix/client/r0/join/%s", + ] + + for path in paths_to_test: + # Make sure we send more requests than the rate-limiting config would allow + # if all of these requests ended up joining the user to a room. + for i in range(6): + request, channel = self.make_request("POST", path % room_id, {}) + self.render(request) + self.assertEquals(channel.code, 200) + + class RoomMessagesTestCase(RoomBase): """ Tests /rooms/$room_id/messages/$user_id/$msg_id REST events. """ diff --git a/tests/rest/client/v1/utils.py b/tests/rest/client/v1/utils.py index 8933b560d2..e66c9a4c4c 100644 --- a/tests/rest/client/v1/utils.py +++ b/tests/rest/client/v1/utils.py @@ -39,7 +39,9 @@ class RestHelper(object): resource = attr.ib() auth_user_id = attr.ib() - def create_room_as(self, room_creator=None, is_public=True, tok=None): + def create_room_as( + self, room_creator=None, is_public=True, tok=None, expect_code=200, + ): temp_id = self.auth_user_id self.auth_user_id = room_creator path = "/_matrix/client/r0/createRoom" @@ -54,9 +56,11 @@ class RestHelper(object): ) render(request, self.resource, self.hs.get_reactor()) - assert channel.result["code"] == b"200", channel.result + assert channel.result["code"] == b"%d" % expect_code, channel.result self.auth_user_id = temp_id - return channel.json_body["room_id"] + + if expect_code == 200: + return channel.json_body["room_id"] def invite(self, room=None, src=None, targ=None, expect_code=200, tok=None): self.change_membership( -- cgit 1.5.1 From 3f8f96be00104e1d1d42fde8e513985fc66201bf Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 24 Aug 2020 13:08:33 -0400 Subject: Fix flaky shadow-ban tests. (#8152) --- changelog.d/8152.feature | 1 + tests/rest/client/v1/test_rooms.py | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) create mode 100644 changelog.d/8152.feature (limited to 'tests/rest/client/v1/test_rooms.py') diff --git a/changelog.d/8152.feature b/changelog.d/8152.feature new file mode 100644 index 0000000000..813e6d0903 --- /dev/null +++ b/changelog.d/8152.feature @@ -0,0 +1 @@ +Add support for shadow-banning users (ignoring any message send requests). diff --git a/tests/rest/client/v1/test_rooms.py b/tests/rest/client/v1/test_rooms.py index e674eb90d7..286e0ccdcc 100644 --- a/tests/rest/client/v1/test_rooms.py +++ b/tests/rest/client/v1/test_rooms.py @@ -21,7 +21,7 @@ import json from urllib import parse as urlparse -from mock import Mock +from mock import Mock, patch import synapse.rest.admin from synapse.api.constants import EventContentFields, EventTypes, Membership @@ -1976,6 +1976,8 @@ class RoomCanonicalAliasTestCase(unittest.HomeserverTestCase): self._set_canonical_alias({"alt_aliases": ["@unknown:test"]}, expected_code=400) +# To avoid the tests timing out don't add a delay to "annoy the requester". +@patch("random.randint", new=lambda a, b: 0) class ShadowBannedTestCase(unittest.HomeserverTestCase): servlets = [ synapse.rest.admin.register_servlets_for_client_rest_resource, -- cgit 1.5.1 From cbd8d83da7d24d7434c749c4c6cfece0c507b0b9 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 24 Aug 2020 13:58:56 -0400 Subject: Stop shadow-banned users from sending non-member events. (#8142) --- changelog.d/8142.feature | 1 + synapse/handlers/directory.py | 6 ++ synapse/handlers/message.py | 10 +++ synapse/handlers/room.py | 19 +++++- synapse/rest/client/v1/room.py | 74 +++++++++++++--------- synapse/rest/client/v2_alpha/relations.py | 18 ++++-- .../client/v2_alpha/room_upgrade_rest_servlet.py | 14 ++-- tests/rest/client/v1/test_rooms.py | 55 +++++++++++++++- 8 files changed, 155 insertions(+), 42 deletions(-) create mode 100644 changelog.d/8142.feature (limited to 'tests/rest/client/v1/test_rooms.py') diff --git a/changelog.d/8142.feature b/changelog.d/8142.feature new file mode 100644 index 0000000000..813e6d0903 --- /dev/null +++ b/changelog.d/8142.feature @@ -0,0 +1 @@ +Add support for shadow-banning users (ignoring any message send requests). diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index 79a2df6201..46826eb784 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -23,6 +23,7 @@ from synapse.api.errors import ( CodeMessageException, Codes, NotFoundError, + ShadowBanError, StoreError, SynapseError, ) @@ -199,6 +200,8 @@ class DirectoryHandler(BaseHandler): try: await self._update_canonical_alias(requester, user_id, room_id, room_alias) + except ShadowBanError as e: + logger.info("Failed to update alias events due to shadow-ban: %s", e) except AuthError as e: logger.info("Failed to update alias events: %s", e) @@ -292,6 +295,9 @@ class DirectoryHandler(BaseHandler): """ Send an updated canonical alias event if the removed alias was set as the canonical alias or listed in the alt_aliases field. + + Raises: + ShadowBanError if the requester has been shadow-banned. """ alias_event = await self.state.get_current_state( room_id, EventTypes.CanonicalAlias, "" diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index c955a86be0..593c0cc6f1 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -15,6 +15,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +import random from typing import TYPE_CHECKING, Dict, List, Optional, Tuple from canonicaljson import encode_canonical_json @@ -34,6 +35,7 @@ from synapse.api.errors import ( Codes, ConsentNotGivenError, NotFoundError, + ShadowBanError, SynapseError, ) from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersions @@ -716,12 +718,20 @@ class EventCreationHandler(object): event_dict: dict, ratelimit: bool = True, txn_id: Optional[str] = None, + ignore_shadow_ban: bool = False, ) -> Tuple[EventBase, int]: """ Creates an event, then sends it. See self.create_event and self.send_nonmember_event. + + Raises: + ShadowBanError if the requester has been shadow-banned. """ + if not ignore_shadow_ban and requester.shadow_banned: + # We randomly sleep a bit just to annoy the requester. + await self.clock.sleep(random.randint(1, 10)) + raise ShadowBanError() # We limit the number of concurrent event sends in a room so that we # don't fork the DAG too much. If we don't limit then we can end up in diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 0fc71475c3..e4788ef86b 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -136,6 +136,9 @@ class RoomCreationHandler(BaseHandler): Returns: the new room id + + Raises: + ShadowBanError if the requester is shadow-banned. """ await self.ratelimit(requester) @@ -171,6 +174,15 @@ class RoomCreationHandler(BaseHandler): async def _upgrade_room( self, requester: Requester, old_room_id: str, new_version: RoomVersion ): + """ + Args: + requester: the user requesting the upgrade + old_room_id: the id of the room to be replaced + new_versions: the version to upgrade the room to + + Raises: + ShadowBanError if the requester is shadow-banned. + """ user_id = requester.user.to_string() # start by allocating a new room id @@ -257,6 +269,9 @@ class RoomCreationHandler(BaseHandler): old_room_id: the id of the room to be replaced new_room_id: the id of the replacement room old_room_state: the state map for the old room + + Raises: + ShadowBanError if the requester is shadow-banned. """ old_room_pl_event_id = old_room_state.get((EventTypes.PowerLevels, "")) @@ -829,11 +844,13 @@ class RoomCreationHandler(BaseHandler): async def send(etype: str, content: JsonDict, **kwargs) -> int: event = create(etype, content, **kwargs) logger.debug("Sending %s in new room", etype) + # Allow these events to be sent even if the user is shadow-banned to + # allow the room creation to complete. ( _, last_stream_id, ) = await self.event_creation_handler.create_and_send_nonmember_event( - creator, event, ratelimit=False + creator, event, ratelimit=False, ignore_shadow_ban=True, ) return last_stream_id diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index a9dd3a6aec..11da8bc037 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -201,8 +201,8 @@ class RoomStateEventRestServlet(TransactionRestServlet): if state_key is not None: event_dict["state_key"] = state_key - if event_type == EventTypes.Member: - try: + try: + if event_type == EventTypes.Member: membership = content.get("membership", None) event_id, _ = await self.room_member_handler.update_membership( requester, @@ -211,16 +211,16 @@ class RoomStateEventRestServlet(TransactionRestServlet): action=membership, content=content, ) - except ShadowBanError: - event_id = "$" + random_string(43) - else: - ( - event, - _, - ) = await self.event_creation_handler.create_and_send_nonmember_event( - requester, event_dict, txn_id=txn_id - ) - event_id = event.event_id + else: + ( + event, + _, + ) = await self.event_creation_handler.create_and_send_nonmember_event( + requester, event_dict, txn_id=txn_id + ) + event_id = event.event_id + except ShadowBanError: + event_id = "$" + random_string(43) set_tag("event_id", event_id) ret = {"event_id": event_id} @@ -253,12 +253,19 @@ class RoomSendEventRestServlet(TransactionRestServlet): if b"ts" in request.args and requester.app_service: event_dict["origin_server_ts"] = parse_integer(request, "ts", 0) - event, _ = await self.event_creation_handler.create_and_send_nonmember_event( - requester, event_dict, txn_id=txn_id - ) + try: + ( + event, + _, + ) = await self.event_creation_handler.create_and_send_nonmember_event( + requester, event_dict, txn_id=txn_id + ) + event_id = event.event_id + except ShadowBanError: + event_id = "$" + random_string(43) - set_tag("event_id", event.event_id) - return 200, {"event_id": event.event_id} + set_tag("event_id", event_id) + return 200, {"event_id": event_id} def on_GET(self, request, room_id, event_type, txn_id): return 200, "Not implemented" @@ -799,20 +806,27 @@ class RoomRedactEventRestServlet(TransactionRestServlet): requester = await self.auth.get_user_by_req(request) content = parse_json_object_from_request(request) - event, _ = await self.event_creation_handler.create_and_send_nonmember_event( - requester, - { - "type": EventTypes.Redaction, - "content": content, - "room_id": room_id, - "sender": requester.user.to_string(), - "redacts": event_id, - }, - txn_id=txn_id, - ) + try: + ( + event, + _, + ) = await self.event_creation_handler.create_and_send_nonmember_event( + requester, + { + "type": EventTypes.Redaction, + "content": content, + "room_id": room_id, + "sender": requester.user.to_string(), + "redacts": event_id, + }, + txn_id=txn_id, + ) + event_id = event.event_id + except ShadowBanError: + event_id = "$" + random_string(43) - set_tag("event_id", event.event_id) - return 200, {"event_id": event.event_id} + set_tag("event_id", event_id) + return 200, {"event_id": event_id} def on_PUT(self, request, room_id, event_id, txn_id): set_tag("txn_id", txn_id) diff --git a/synapse/rest/client/v2_alpha/relations.py b/synapse/rest/client/v2_alpha/relations.py index 89002ffbff..e29f49f7f5 100644 --- a/synapse/rest/client/v2_alpha/relations.py +++ b/synapse/rest/client/v2_alpha/relations.py @@ -22,7 +22,7 @@ any time to reflect changes in the MSC. import logging from synapse.api.constants import EventTypes, RelationTypes -from synapse.api.errors import SynapseError +from synapse.api.errors import ShadowBanError, SynapseError from synapse.http.servlet import ( RestServlet, parse_integer, @@ -35,6 +35,7 @@ from synapse.storage.relations import ( PaginationChunk, RelationPaginationToken, ) +from synapse.util.stringutils import random_string from ._base import client_patterns @@ -111,11 +112,18 @@ class RelationSendServlet(RestServlet): "sender": requester.user.to_string(), } - event, _ = await self.event_creation_handler.create_and_send_nonmember_event( - requester, event_dict=event_dict, txn_id=txn_id - ) + try: + ( + event, + _, + ) = await self.event_creation_handler.create_and_send_nonmember_event( + requester, event_dict=event_dict, txn_id=txn_id + ) + event_id = event.event_id + except ShadowBanError: + event_id = "$" + random_string(43) - return 200, {"event_id": event.event_id} + return 200, {"event_id": event_id} class RelationPaginationServlet(RestServlet): diff --git a/synapse/rest/client/v2_alpha/room_upgrade_rest_servlet.py b/synapse/rest/client/v2_alpha/room_upgrade_rest_servlet.py index f357015a70..39a5518614 100644 --- a/synapse/rest/client/v2_alpha/room_upgrade_rest_servlet.py +++ b/synapse/rest/client/v2_alpha/room_upgrade_rest_servlet.py @@ -15,13 +15,14 @@ import logging -from synapse.api.errors import Codes, SynapseError +from synapse.api.errors import Codes, ShadowBanError, SynapseError from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.http.servlet import ( RestServlet, assert_params_in_dict, parse_json_object_from_request, ) +from synapse.util import stringutils from ._base import client_patterns @@ -62,7 +63,6 @@ class RoomUpgradeRestServlet(RestServlet): content = parse_json_object_from_request(request) assert_params_in_dict(content, ("new_version",)) - new_version = content["new_version"] new_version = KNOWN_ROOM_VERSIONS.get(content["new_version"]) if new_version is None: @@ -72,9 +72,13 @@ class RoomUpgradeRestServlet(RestServlet): Codes.UNSUPPORTED_ROOM_VERSION, ) - new_room_id = await self._room_creation_handler.upgrade_room( - requester, room_id, new_version - ) + try: + new_room_id = await self._room_creation_handler.upgrade_room( + requester, room_id, new_version + ) + except ShadowBanError: + # Generate a random room ID. + new_room_id = stringutils.random_string(18) ret = {"replacement_room": new_room_id} diff --git a/tests/rest/client/v1/test_rooms.py b/tests/rest/client/v1/test_rooms.py index 286e0ccdcc..60fef13e9f 100644 --- a/tests/rest/client/v1/test_rooms.py +++ b/tests/rest/client/v1/test_rooms.py @@ -27,7 +27,7 @@ import synapse.rest.admin from synapse.api.constants import EventContentFields, EventTypes, Membership from synapse.handlers.pagination import PurgeStatus from synapse.rest.client.v1 import directory, login, profile, room -from synapse.rest.client.v2_alpha import account +from synapse.rest.client.v2_alpha import account, room_upgrade_rest_servlet from synapse.types import JsonDict, RoomAlias from synapse.util.stringutils import random_string @@ -1984,6 +1984,7 @@ class ShadowBannedTestCase(unittest.HomeserverTestCase): directory.register_servlets, login.register_servlets, room.register_servlets, + room_upgrade_rest_servlet.register_servlets, ] def prepare(self, reactor, clock, homeserver): @@ -2076,3 +2077,55 @@ class ShadowBannedTestCase(unittest.HomeserverTestCase): # Both users should be in the room. users = self.get_success(self.store.get_users_in_room(room_id)) self.assertCountEqual(users, ["@banned:test", "@otheruser:test"]) + + def test_message(self): + """Messages from shadow-banned users don't actually get sent.""" + + room_id = self.helper.create_room_as( + self.other_user_id, tok=self.other_access_token + ) + + # The user should be in the room. + self.helper.join(room_id, self.banned_user_id, tok=self.banned_access_token) + + # Sending a message should complete successfully. + result = self.helper.send_event( + room_id=room_id, + type=EventTypes.Message, + content={"msgtype": "m.text", "body": "with right label"}, + tok=self.banned_access_token, + ) + self.assertIn("event_id", result) + event_id = result["event_id"] + + latest_events = self.get_success( + self.store.get_latest_event_ids_in_room(room_id) + ) + self.assertNotIn(event_id, latest_events) + + def test_upgrade(self): + """A room upgrade should fail, but look like it succeeded.""" + + # The create works fine. + room_id = self.helper.create_room_as( + self.banned_user_id, tok=self.banned_access_token + ) + + request, channel = self.make_request( + "POST", + "/_matrix/client/r0/rooms/%s/upgrade" % (room_id,), + {"new_version": "6"}, + access_token=self.banned_access_token, + ) + self.render(request) + self.assertEquals(200, channel.code, channel.result) + # A new room_id should be returned. + self.assertIn("replacement_room", channel.json_body) + + new_room_id = channel.json_body["replacement_room"] + + # It doesn't really matter what API we use here, we just want to assert + # that the room doesn't exist. + summary = self.get_success(self.store.get_room_summary(new_room_id)) + # The summary should be empty since the room doesn't exist. + self.assertEqual(summary, {}) -- cgit 1.5.1 From 56efa9ec719c4416e2e87f49effc00faa1134828 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 26 Aug 2020 07:19:20 -0400 Subject: Fix rate limiting unit tests. (#8167) These were passing on the release-v1.19.1 branch but started failing once merged to develop. --- changelog.d/8167.misc | 1 + tests/rest/client/v1/test_rooms.py | 27 ++++++++++++++------------- 2 files changed, 15 insertions(+), 13 deletions(-) create mode 100644 changelog.d/8167.misc (limited to 'tests/rest/client/v1/test_rooms.py') diff --git a/changelog.d/8167.misc b/changelog.d/8167.misc new file mode 100644 index 0000000000..e2ed9be7a4 --- /dev/null +++ b/changelog.d/8167.misc @@ -0,0 +1 @@ +Fix tests that were broken due to the merge of 1.19.1. diff --git a/tests/rest/client/v1/test_rooms.py b/tests/rest/client/v1/test_rooms.py index c6c6edeac2..68c4a6a8f7 100644 --- a/tests/rest/client/v1/test_rooms.py +++ b/tests/rest/client/v1/test_rooms.py @@ -684,38 +684,39 @@ class RoomJoinRatelimitTestCase(RoomBase): ] @unittest.override_config( - {"rc_joins": {"local": {"per_second": 3, "burst_count": 3}}} + {"rc_joins": {"local": {"per_second": 0.5, "burst_count": 3}}} ) def test_join_local_ratelimit(self): """Tests that local joins are actually rate-limited.""" - for i in range(5): + for i in range(3): self.helper.create_room_as(self.user_id) self.helper.create_room_as(self.user_id, expect_code=429) @unittest.override_config( - {"rc_joins": {"local": {"per_second": 3, "burst_count": 3}}} + {"rc_joins": {"local": {"per_second": 0.5, "burst_count": 3}}} ) def test_join_local_ratelimit_profile_change(self): """Tests that sending a profile update into all of the user's joined rooms isn't rate-limited by the rate-limiter on joins.""" - # Create and join more rooms than the rate-limiting config allows in a second. + # Create and join as many rooms as the rate-limiting config allows in a second. room_ids = [ self.helper.create_room_as(self.user_id), self.helper.create_room_as(self.user_id), self.helper.create_room_as(self.user_id), ] - self.reactor.advance(1) - room_ids = room_ids + [ - self.helper.create_room_as(self.user_id), - self.helper.create_room_as(self.user_id), - self.helper.create_room_as(self.user_id), - ] + # Let some time for the rate-limiter to forget about our multi-join. + self.reactor.advance(2) + # Add one to make sure we're joined to more rooms than the config allows us to + # join in a second. + room_ids.append(self.helper.create_room_as(self.user_id)) # Create a profile for the user, since it hasn't been done on registration. store = self.hs.get_datastore() - store.create_profile(UserID.from_string(self.user_id).localpart) + self.get_success( + store.create_profile(UserID.from_string(self.user_id).localpart) + ) # Update the display name for the user. path = "/_matrix/client/r0/profile/%s/displayname" % self.user_id @@ -738,7 +739,7 @@ class RoomJoinRatelimitTestCase(RoomBase): self.assertEquals(channel.json_body["displayname"], "John Doe") @unittest.override_config( - {"rc_joins": {"local": {"per_second": 3, "burst_count": 3}}} + {"rc_joins": {"local": {"per_second": 0.5, "burst_count": 3}}} ) def test_join_local_ratelimit_idempotent(self): """Tests that the room join endpoints remain idempotent despite rate-limiting @@ -754,7 +755,7 @@ class RoomJoinRatelimitTestCase(RoomBase): for path in paths_to_test: # Make sure we send more requests than the rate-limiting config would allow # if all of these requests ended up joining the user to a room. - for i in range(6): + for i in range(4): request, channel = self.make_request("POST", path % room_id, {}) self.render(request) self.assertEquals(channel.code, 200) -- cgit 1.5.1 From 2e6c90ff847b2d79ae372933e431d3a7ebaf5381 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 26 Aug 2020 08:49:01 -0400 Subject: Do not propagate profile changes of shadow-banned users into rooms. (#8157) --- changelog.d/8157.feature | 1 + synapse/handlers/profile.py | 17 +- synapse/handlers/room_member.py | 2 +- tests/rest/client/test_shadow_banned.py | 272 ++++++++++++++++++++++++++++++++ tests/rest/client/v1/test_rooms.py | 159 +------------------ 5 files changed, 291 insertions(+), 160 deletions(-) create mode 100644 changelog.d/8157.feature create mode 100644 tests/rest/client/test_shadow_banned.py (limited to 'tests/rest/client/v1/test_rooms.py') diff --git a/changelog.d/8157.feature b/changelog.d/8157.feature new file mode 100644 index 0000000000..813e6d0903 --- /dev/null +++ b/changelog.d/8157.feature @@ -0,0 +1 @@ +Add support for shadow-banning users (ignoring any message send requests). diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 31a2e5ea18..96c9d6bab4 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -14,6 +14,7 @@ # limitations under the License. import logging +import random from synapse.api.errors import ( AuthError, @@ -213,8 +214,14 @@ class BaseProfileHandler(BaseHandler): async def set_avatar_url( self, target_user, requester, new_avatar_url, by_admin=False ): - """target_user is the user whose avatar_url is to be changed; - auth_user is the user attempting to make this change.""" + """Set a new avatar URL for a user. + + Args: + target_user (UserID): the user whose avatar URL is to be changed. + requester (Requester): The user attempting to make this change. + new_avatar_url (str): The avatar URL to give this user. + by_admin (bool): Whether this change was made by an administrator. + """ if not self.hs.is_mine(target_user): raise SynapseError(400, "User is not hosted on this homeserver") @@ -278,6 +285,12 @@ class BaseProfileHandler(BaseHandler): await self.ratelimit(requester) + # Do not actually update the room state for shadow-banned users. + if requester.shadow_banned: + # We randomly sleep a bit just to annoy the requester. + await self.clock.sleep(random.randint(1, 10)) + return + room_ids = await self.store.get_rooms_for_user(target_user.to_string()) for room_id in room_ids: diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 804463b1c0..cae4d013b8 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -380,7 +380,7 @@ class RoomMemberHandler(object): # later on. content = dict(content) - if not self.allow_per_room_profiles: + if not self.allow_per_room_profiles or requester.shadow_banned: # Strip profile data, knowing that new profile data will be added to the # event's content in event_creation_handler.create_event() using the target's # global profile. diff --git a/tests/rest/client/test_shadow_banned.py b/tests/rest/client/test_shadow_banned.py new file mode 100644 index 0000000000..3eb9aeaa9e --- /dev/null +++ b/tests/rest/client/test_shadow_banned.py @@ -0,0 +1,272 @@ +# Copyright 2020 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from mock import Mock, patch + +import synapse.rest.admin +from synapse.api.constants import EventTypes +from synapse.rest.client.v1 import directory, login, profile, room +from synapse.rest.client.v2_alpha import room_upgrade_rest_servlet + +from tests import unittest + + +class _ShadowBannedBase(unittest.HomeserverTestCase): + def prepare(self, reactor, clock, homeserver): + # Create two users, one of which is shadow-banned. + self.banned_user_id = self.register_user("banned", "test") + self.banned_access_token = self.login("banned", "test") + + self.store = self.hs.get_datastore() + + self.get_success( + self.store.db_pool.simple_update( + table="users", + keyvalues={"name": self.banned_user_id}, + updatevalues={"shadow_banned": True}, + desc="shadow_ban", + ) + ) + + self.other_user_id = self.register_user("otheruser", "pass") + self.other_access_token = self.login("otheruser", "pass") + + +# To avoid the tests timing out don't add a delay to "annoy the requester". +@patch("random.randint", new=lambda a, b: 0) +class RoomTestCase(_ShadowBannedBase): + servlets = [ + synapse.rest.admin.register_servlets_for_client_rest_resource, + directory.register_servlets, + login.register_servlets, + room.register_servlets, + room_upgrade_rest_servlet.register_servlets, + ] + + def test_invite(self): + """Invites from shadow-banned users don't actually get sent.""" + + # The create works fine. + room_id = self.helper.create_room_as( + self.banned_user_id, tok=self.banned_access_token + ) + + # Inviting the user completes successfully. + self.helper.invite( + room=room_id, + src=self.banned_user_id, + tok=self.banned_access_token, + targ=self.other_user_id, + ) + + # But the user wasn't actually invited. + invited_rooms = self.get_success( + self.store.get_invited_rooms_for_local_user(self.other_user_id) + ) + self.assertEqual(invited_rooms, []) + + def test_invite_3pid(self): + """Ensure that a 3PID invite does not attempt to contact the identity server.""" + identity_handler = self.hs.get_handlers().identity_handler + identity_handler.lookup_3pid = Mock( + side_effect=AssertionError("This should not get called") + ) + + # The create works fine. + room_id = self.helper.create_room_as( + self.banned_user_id, tok=self.banned_access_token + ) + + # Inviting the user completes successfully. + request, channel = self.make_request( + "POST", + "/rooms/%s/invite" % (room_id,), + {"id_server": "test", "medium": "email", "address": "test@test.test"}, + access_token=self.banned_access_token, + ) + self.render(request) + self.assertEquals(200, channel.code, channel.result) + + # This should have raised an error earlier, but double check this wasn't called. + identity_handler.lookup_3pid.assert_not_called() + + def test_create_room(self): + """Invitations during a room creation should be discarded, but the room still gets created.""" + # The room creation is successful. + request, channel = self.make_request( + "POST", + "/_matrix/client/r0/createRoom", + {"visibility": "public", "invite": [self.other_user_id]}, + access_token=self.banned_access_token, + ) + self.render(request) + self.assertEquals(200, channel.code, channel.result) + room_id = channel.json_body["room_id"] + + # But the user wasn't actually invited. + invited_rooms = self.get_success( + self.store.get_invited_rooms_for_local_user(self.other_user_id) + ) + self.assertEqual(invited_rooms, []) + + # Since a real room was created, the other user should be able to join it. + self.helper.join(room_id, self.other_user_id, tok=self.other_access_token) + + # Both users should be in the room. + users = self.get_success(self.store.get_users_in_room(room_id)) + self.assertCountEqual(users, ["@banned:test", "@otheruser:test"]) + + def test_message(self): + """Messages from shadow-banned users don't actually get sent.""" + + room_id = self.helper.create_room_as( + self.other_user_id, tok=self.other_access_token + ) + + # The user should be in the room. + self.helper.join(room_id, self.banned_user_id, tok=self.banned_access_token) + + # Sending a message should complete successfully. + result = self.helper.send_event( + room_id=room_id, + type=EventTypes.Message, + content={"msgtype": "m.text", "body": "with right label"}, + tok=self.banned_access_token, + ) + self.assertIn("event_id", result) + event_id = result["event_id"] + + latest_events = self.get_success( + self.store.get_latest_event_ids_in_room(room_id) + ) + self.assertNotIn(event_id, latest_events) + + def test_upgrade(self): + """A room upgrade should fail, but look like it succeeded.""" + + # The create works fine. + room_id = self.helper.create_room_as( + self.banned_user_id, tok=self.banned_access_token + ) + + request, channel = self.make_request( + "POST", + "/_matrix/client/r0/rooms/%s/upgrade" % (room_id,), + {"new_version": "6"}, + access_token=self.banned_access_token, + ) + self.render(request) + self.assertEquals(200, channel.code, channel.result) + # A new room_id should be returned. + self.assertIn("replacement_room", channel.json_body) + + new_room_id = channel.json_body["replacement_room"] + + # It doesn't really matter what API we use here, we just want to assert + # that the room doesn't exist. + summary = self.get_success(self.store.get_room_summary(new_room_id)) + # The summary should be empty since the room doesn't exist. + self.assertEqual(summary, {}) + + +# To avoid the tests timing out don't add a delay to "annoy the requester". +@patch("random.randint", new=lambda a, b: 0) +class ProfileTestCase(_ShadowBannedBase): + servlets = [ + synapse.rest.admin.register_servlets_for_client_rest_resource, + login.register_servlets, + profile.register_servlets, + room.register_servlets, + ] + + def test_displayname(self): + """Profile changes should succeed, but don't end up in a room.""" + original_display_name = "banned" + new_display_name = "new name" + + # Join a room. + room_id = self.helper.create_room_as( + self.banned_user_id, tok=self.banned_access_token + ) + + # The update should succeed. + request, channel = self.make_request( + "PUT", + "/_matrix/client/r0/profile/%s/displayname" % (self.banned_user_id,), + {"displayname": new_display_name}, + access_token=self.banned_access_token, + ) + self.render(request) + self.assertEquals(200, channel.code, channel.result) + self.assertEqual(channel.json_body, {}) + + # The user's display name should be updated. + request, channel = self.make_request( + "GET", "/profile/%s/displayname" % (self.banned_user_id,) + ) + self.render(request) + self.assertEqual(channel.code, 200, channel.result) + self.assertEqual(channel.json_body["displayname"], new_display_name) + + # But the display name in the room should not be. + message_handler = self.hs.get_message_handler() + event = self.get_success( + message_handler.get_room_data( + self.banned_user_id, + room_id, + "m.room.member", + self.banned_user_id, + False, + ) + ) + self.assertEqual( + event.content, {"membership": "join", "displayname": original_display_name} + ) + + def test_room_displayname(self): + """Changes to state events for a room should be processed, but not end up in the room.""" + original_display_name = "banned" + new_display_name = "new name" + + # Join a room. + room_id = self.helper.create_room_as( + self.banned_user_id, tok=self.banned_access_token + ) + + # The update should succeed. + request, channel = self.make_request( + "PUT", + "/_matrix/client/r0/rooms/%s/state/m.room.member/%s" + % (room_id, self.banned_user_id), + {"membership": "join", "displayname": new_display_name}, + access_token=self.banned_access_token, + ) + self.render(request) + self.assertEquals(200, channel.code, channel.result) + self.assertIn("event_id", channel.json_body) + + # The display name in the room should not be changed. + message_handler = self.hs.get_message_handler() + event = self.get_success( + message_handler.get_room_data( + self.banned_user_id, + room_id, + "m.room.member", + self.banned_user_id, + False, + ) + ) + self.assertEqual( + event.content, {"membership": "join", "displayname": original_display_name} + ) diff --git a/tests/rest/client/v1/test_rooms.py b/tests/rest/client/v1/test_rooms.py index 68c4a6a8f7..0a567b032f 100644 --- a/tests/rest/client/v1/test_rooms.py +++ b/tests/rest/client/v1/test_rooms.py @@ -21,13 +21,13 @@ import json from urllib import parse as urlparse -from mock import Mock, patch +from mock import Mock import synapse.rest.admin from synapse.api.constants import EventContentFields, EventTypes, Membership from synapse.handlers.pagination import PurgeStatus from synapse.rest.client.v1 import directory, login, profile, room -from synapse.rest.client.v2_alpha import account, room_upgrade_rest_servlet +from synapse.rest.client.v2_alpha import account from synapse.types import JsonDict, RoomAlias, UserID from synapse.util.stringutils import random_string @@ -2060,158 +2060,3 @@ class RoomCanonicalAliasTestCase(unittest.HomeserverTestCase): """An alias which does not point to the room raises a SynapseError.""" self._set_canonical_alias({"alias": "@unknown:test"}, expected_code=400) self._set_canonical_alias({"alt_aliases": ["@unknown:test"]}, expected_code=400) - - -# To avoid the tests timing out don't add a delay to "annoy the requester". -@patch("random.randint", new=lambda a, b: 0) -class ShadowBannedTestCase(unittest.HomeserverTestCase): - servlets = [ - synapse.rest.admin.register_servlets_for_client_rest_resource, - directory.register_servlets, - login.register_servlets, - room.register_servlets, - room_upgrade_rest_servlet.register_servlets, - ] - - def prepare(self, reactor, clock, homeserver): - self.banned_user_id = self.register_user("banned", "test") - self.banned_access_token = self.login("banned", "test") - - self.store = self.hs.get_datastore() - - self.get_success( - self.store.db_pool.simple_update( - table="users", - keyvalues={"name": self.banned_user_id}, - updatevalues={"shadow_banned": True}, - desc="shadow_ban", - ) - ) - - self.other_user_id = self.register_user("otheruser", "pass") - self.other_access_token = self.login("otheruser", "pass") - - def test_invite(self): - """Invites from shadow-banned users don't actually get sent.""" - - # The create works fine. - room_id = self.helper.create_room_as( - self.banned_user_id, tok=self.banned_access_token - ) - - # Inviting the user completes successfully. - self.helper.invite( - room=room_id, - src=self.banned_user_id, - tok=self.banned_access_token, - targ=self.other_user_id, - ) - - # But the user wasn't actually invited. - invited_rooms = self.get_success( - self.store.get_invited_rooms_for_local_user(self.other_user_id) - ) - self.assertEqual(invited_rooms, []) - - def test_invite_3pid(self): - """Ensure that a 3PID invite does not attempt to contact the identity server.""" - identity_handler = self.hs.get_handlers().identity_handler - identity_handler.lookup_3pid = Mock( - side_effect=AssertionError("This should not get called") - ) - - # The create works fine. - room_id = self.helper.create_room_as( - self.banned_user_id, tok=self.banned_access_token - ) - - # Inviting the user completes successfully. - request, channel = self.make_request( - "POST", - "/rooms/%s/invite" % (room_id,), - {"id_server": "test", "medium": "email", "address": "test@test.test"}, - access_token=self.banned_access_token, - ) - self.render(request) - self.assertEquals(200, channel.code, channel.result) - - # This should have raised an error earlier, but double check this wasn't called. - identity_handler.lookup_3pid.assert_not_called() - - def test_create_room(self): - """Invitations during a room creation should be discarded, but the room still gets created.""" - # The room creation is successful. - request, channel = self.make_request( - "POST", - "/_matrix/client/r0/createRoom", - {"visibility": "public", "invite": [self.other_user_id]}, - access_token=self.banned_access_token, - ) - self.render(request) - self.assertEquals(200, channel.code, channel.result) - room_id = channel.json_body["room_id"] - - # But the user wasn't actually invited. - invited_rooms = self.get_success( - self.store.get_invited_rooms_for_local_user(self.other_user_id) - ) - self.assertEqual(invited_rooms, []) - - # Since a real room was created, the other user should be able to join it. - self.helper.join(room_id, self.other_user_id, tok=self.other_access_token) - - # Both users should be in the room. - users = self.get_success(self.store.get_users_in_room(room_id)) - self.assertCountEqual(users, ["@banned:test", "@otheruser:test"]) - - def test_message(self): - """Messages from shadow-banned users don't actually get sent.""" - - room_id = self.helper.create_room_as( - self.other_user_id, tok=self.other_access_token - ) - - # The user should be in the room. - self.helper.join(room_id, self.banned_user_id, tok=self.banned_access_token) - - # Sending a message should complete successfully. - result = self.helper.send_event( - room_id=room_id, - type=EventTypes.Message, - content={"msgtype": "m.text", "body": "with right label"}, - tok=self.banned_access_token, - ) - self.assertIn("event_id", result) - event_id = result["event_id"] - - latest_events = self.get_success( - self.store.get_latest_event_ids_in_room(room_id) - ) - self.assertNotIn(event_id, latest_events) - - def test_upgrade(self): - """A room upgrade should fail, but look like it succeeded.""" - - # The create works fine. - room_id = self.helper.create_room_as( - self.banned_user_id, tok=self.banned_access_token - ) - - request, channel = self.make_request( - "POST", - "/_matrix/client/r0/rooms/%s/upgrade" % (room_id,), - {"new_version": "6"}, - access_token=self.banned_access_token, - ) - self.render(request) - self.assertEquals(200, channel.code, channel.result) - # A new room_id should be returned. - self.assertIn("replacement_room", channel.json_body) - - new_room_id = channel.json_body["replacement_room"] - - # It doesn't really matter what API we use here, we just want to assert - # that the room doesn't exist. - summary = self.get_success(self.store.get_room_summary(new_room_id)) - # The summary should be empty since the room doesn't exist. - self.assertEqual(summary, {}) -- cgit 1.5.1 From ea70f1c362dc4bd6c0f8a67e16ed0971fe095e5b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 29 Sep 2020 21:48:33 +0100 Subject: Various clean ups to room stream tokens. (#8423) --- changelog.d/8423.misc | 1 + synapse/events/__init__.py | 6 ++-- synapse/handlers/admin.py | 2 +- synapse/handlers/device.py | 4 +-- synapse/handlers/initial_sync.py | 3 +- synapse/handlers/pagination.py | 5 ++- synapse/handlers/room.py | 4 +-- synapse/handlers/sync.py | 20 ++++++++---- synapse/notifier.py | 4 +-- synapse/replication/tcp/client.py | 6 ++-- synapse/rest/admin/__init__.py | 3 +- synapse/storage/databases/main/stream.py | 38 +++++++++++++---------- synapse/storage/persist_events.py | 5 ++- synapse/types.py | 53 ++++++++++++++++++++------------ tests/rest/client/v1/test_rooms.py | 8 ++--- tests/storage/test_purge.py | 10 +++--- 16 files changed, 96 insertions(+), 76 deletions(-) create mode 100644 changelog.d/8423.misc (limited to 'tests/rest/client/v1/test_rooms.py') diff --git a/changelog.d/8423.misc b/changelog.d/8423.misc new file mode 100644 index 0000000000..7260e3fa41 --- /dev/null +++ b/changelog.d/8423.misc @@ -0,0 +1 @@ +Various refactors to simplify stream token handling. diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index bf800a3852..dc49df0812 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -23,7 +23,7 @@ from typing import Dict, Optional, Tuple, Type from unpaddedbase64 import encode_base64 from synapse.api.room_versions import EventFormatVersions, RoomVersion, RoomVersions -from synapse.types import JsonDict +from synapse.types import JsonDict, RoomStreamToken from synapse.util.caches import intern_dict from synapse.util.frozenutils import freeze @@ -118,8 +118,8 @@ class _EventInternalMetadata: # XXX: These are set by StreamWorkerStore._set_before_and_after. # I'm pretty sure that these are never persisted to the database, so shouldn't # be here - before = DictProperty("before") # type: str - after = DictProperty("after") # type: str + before = DictProperty("before") # type: RoomStreamToken + after = DictProperty("after") # type: RoomStreamToken order = DictProperty("order") # type: Tuple[int, int] def get_dict(self) -> JsonDict: diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py index dd981c597e..1ce2091b46 100644 --- a/synapse/handlers/admin.py +++ b/synapse/handlers/admin.py @@ -153,7 +153,7 @@ class AdminHandler(BaseHandler): if not events: break - from_key = RoomStreamToken.parse(events[-1].internal_metadata.after) + from_key = events[-1].internal_metadata.after events = await filter_events_for_client(self.storage, user_id, events) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 4149520d6c..b9d9098104 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -29,7 +29,6 @@ from synapse.api.errors import ( from synapse.logging.opentracing import log_kv, set_tag, trace from synapse.metrics.background_process_metrics import run_as_background_process from synapse.types import ( - RoomStreamToken, StreamToken, get_domain_from_id, get_verify_key_from_cross_signing_key, @@ -113,8 +112,7 @@ class DeviceWorkerHandler(BaseHandler): set_tag("user_id", user_id) set_tag("from_token", from_token) - now_room_id = self.store.get_room_max_stream_ordering() - now_room_key = RoomStreamToken(None, now_room_id) + now_room_key = self.store.get_room_max_token() room_ids = await self.store.get_rooms_for_user(user_id) diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index 8cd7eb22a3..43f15435de 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -325,7 +325,8 @@ class InitialSyncHandler(BaseHandler): if limit is None: limit = 10 - stream_token = await self.store.get_stream_token_for_event(member_event_id) + leave_position = await self.store.get_position_for_event(member_event_id) + stream_token = leave_position.to_room_stream_token() messages, token = await self.store.get_recent_events_for_room( room_id, limit=limit, end_token=stream_token diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index a0b3bdb5e0..d6779a4b44 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -25,7 +25,7 @@ from synapse.logging.context import run_in_background from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.state import StateFilter from synapse.streams.config import PaginationConfig -from synapse.types import Requester, RoomStreamToken +from synapse.types import Requester from synapse.util.async_helpers import ReadWriteLock from synapse.util.stringutils import random_string from synapse.visibility import filter_events_for_client @@ -373,10 +373,9 @@ class PaginationHandler: # case "JOIN" would have been returned. assert member_event_id - leave_token_str = await self.store.get_topological_token_for_event( + leave_token = await self.store.get_topological_token_for_event( member_event_id ) - leave_token = RoomStreamToken.parse(leave_token_str) assert leave_token.topological is not None if leave_token.topological < curr_topo: diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 11bf146bed..836b3f381a 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1134,14 +1134,14 @@ class RoomEventSource: events[:] = events[:limit] if events: - end_key = RoomStreamToken.parse(events[-1].internal_metadata.after) + end_key = events[-1].internal_metadata.after else: end_key = to_key return (events, end_key) def get_current_key(self) -> RoomStreamToken: - return RoomStreamToken(None, self.store.get_room_max_stream_ordering()) + return self.store.get_room_max_token() def get_current_key_for_room(self, room_id: str) -> Awaitable[str]: return self.store.get_room_events_max_id(room_id) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index e948efef2e..bfe2583002 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -519,7 +519,7 @@ class SyncHandler: if len(recents) > timeline_limit: limited = True recents = recents[-timeline_limit:] - room_key = RoomStreamToken.parse(recents[0].internal_metadata.before) + room_key = recents[0].internal_metadata.before prev_batch_token = now_token.copy_and_replace("room_key", room_key) @@ -1595,16 +1595,24 @@ class SyncHandler: if leave_events: leave_event = leave_events[-1] - leave_stream_token = await self.store.get_stream_token_for_event( + leave_position = await self.store.get_position_for_event( leave_event.event_id ) - leave_token = since_token.copy_and_replace( - "room_key", leave_stream_token - ) - if since_token and since_token.is_after(leave_token): + # If the leave event happened before the since token then we + # bail. + if since_token and not leave_position.persisted_after( + since_token.room_key + ): continue + # We can safely convert the position of the leave event into a + # stream token as it'll only be used in the context of this + # room. (c.f. the docstring of `to_room_stream_token`). + leave_token = since_token.copy_and_replace( + "room_key", leave_position.to_room_stream_token() + ) + # If this is an out of band message, like a remote invite # rejection, we include it in the recents batch. Otherwise, we # let _load_filtered_recents handle fetching the correct diff --git a/synapse/notifier.py b/synapse/notifier.py index 441b3d15e2..59415f6f88 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -163,7 +163,7 @@ class _NotifierUserStream: """ # Immediately wake up stream if something has already since happened # since their last token. - if self.last_notified_token.is_after(token): + if self.last_notified_token != token: return _NotificationListener(defer.succeed(self.current_token)) else: return _NotificationListener(self.notify_deferred.observe()) @@ -470,7 +470,7 @@ class Notifier: async def check_for_updates( before_token: StreamToken, after_token: StreamToken ) -> EventStreamResult: - if not after_token.is_after(before_token): + if after_token == before_token: return EventStreamResult([], (from_token, from_token)) events = [] # type: List[EventBase] diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 55af3d41ea..e165429cad 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -29,7 +29,7 @@ from synapse.replication.tcp.streams.events import ( EventsStreamEventRow, EventsStreamRow, ) -from synapse.types import PersistedEventPosition, RoomStreamToken, UserID +from synapse.types import PersistedEventPosition, UserID from synapse.util.async_helpers import timeout_deferred from synapse.util.metrics import Measure @@ -152,9 +152,7 @@ class ReplicationDataHandler: if event.type == EventTypes.Member: extra_users = (UserID.from_string(event.state_key),) - max_token = RoomStreamToken( - None, self.store.get_room_max_stream_ordering() - ) + max_token = self.store.get_room_max_token() event_pos = PersistedEventPosition(instance_name, token) self.notifier.on_new_room_event( event, event_pos, max_token, extra_users diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py index 5c5f00b213..ba53f66f02 100644 --- a/synapse/rest/admin/__init__.py +++ b/synapse/rest/admin/__init__.py @@ -109,7 +109,8 @@ class PurgeHistoryRestServlet(RestServlet): if event.room_id != room_id: raise SynapseError(400, "Event is for wrong room.") - token = await self.store.get_topological_token_for_event(event_id) + room_token = await self.store.get_topological_token_for_event(event_id) + token = str(room_token) logger.info("[purge] purging up to token %s (event_id %s)", token, event_id) elif "purge_up_to_ts" in body: diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 92e96468b4..37249f1e3f 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -35,7 +35,6 @@ what sort order was used: - topological tokems: "t%d-%d", where the integers map to the topological and stream ordering columns respectively. """ - import abc import logging from collections import namedtuple @@ -54,7 +53,7 @@ from synapse.storage.database import ( ) from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine -from synapse.types import Collection, RoomStreamToken +from synapse.types import Collection, PersistedEventPosition, RoomStreamToken from synapse.util.caches.stream_change_cache import StreamChangeCache if TYPE_CHECKING: @@ -305,6 +304,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta): def get_room_min_stream_ordering(self) -> int: raise NotImplementedError() + def get_room_max_token(self) -> RoomStreamToken: + return RoomStreamToken(None, self.get_room_max_stream_ordering()) + async def get_room_events_stream_for_rooms( self, room_ids: Collection[str], @@ -611,26 +613,28 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta): allow_none=allow_none, ) - async def get_stream_token_for_event(self, event_id: str) -> RoomStreamToken: - """The stream token for an event - Args: - event_id: The id of the event to look up a stream token for. - Raises: - StoreError if the event wasn't in the database. - Returns: - A stream token. + async def get_position_for_event(self, event_id: str) -> PersistedEventPosition: + """Get the persisted position for an event """ - stream_id = await self.get_stream_id_for_event(event_id) - return RoomStreamToken(None, stream_id) + row = await self.db_pool.simple_select_one( + table="events", + keyvalues={"event_id": event_id}, + retcols=("stream_ordering", "instance_name"), + desc="get_position_for_event", + ) + + return PersistedEventPosition( + row["instance_name"] or "master", row["stream_ordering"] + ) - async def get_topological_token_for_event(self, event_id: str) -> str: + async def get_topological_token_for_event(self, event_id: str) -> RoomStreamToken: """The stream token for an event Args: event_id: The id of the event to look up a stream token for. Raises: StoreError if the event wasn't in the database. Returns: - A "t%d-%d" topological token. + A `RoomStreamToken` topological token. """ row = await self.db_pool.simple_select_one( table="events", @@ -638,7 +642,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta): retcols=("stream_ordering", "topological_ordering"), desc="get_topological_token_for_event", ) - return "t%d-%d" % (row["topological_ordering"], row["stream_ordering"]) + return RoomStreamToken(row["topological_ordering"], row["stream_ordering"]) async def get_current_topological_token(self, room_id: str, stream_key: int) -> int: """Gets the topological token in a room after or at the given stream @@ -687,8 +691,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta): else: topo = None internal = event.internal_metadata - internal.before = str(RoomStreamToken(topo, stream - 1)) - internal.after = str(RoomStreamToken(topo, stream)) + internal.before = RoomStreamToken(topo, stream - 1) + internal.after = RoomStreamToken(topo, stream) internal.order = (int(topo) if topo else 0, int(stream)) async def get_events_around( diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index ded6cf9655..72939f3984 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -229,7 +229,7 @@ class EventsPersistenceStorage: defer.gatherResults(deferreds, consumeErrors=True) ) - return RoomStreamToken(None, self.main_store.get_current_events_token()) + return self.main_store.get_room_max_token() async def persist_event( self, event: EventBase, context: EventContext, backfilled: bool = False @@ -247,11 +247,10 @@ class EventsPersistenceStorage: await make_deferred_yieldable(deferred) - max_persisted_id = self.main_store.get_current_events_token() event_stream_id = event.internal_metadata.stream_ordering pos = PersistedEventPosition(self._instance_name, event_stream_id) - return pos, RoomStreamToken(None, max_persisted_id) + return pos, self.main_store.get_room_max_token() def _maybe_start_persisting(self, room_id: str): async def persisting_queue(item): diff --git a/synapse/types.py b/synapse/types.py index ec39f9e1e8..02bcc197ec 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -413,6 +413,18 @@ class RoomStreamToken: pass raise SynapseError(400, "Invalid token %r" % (string,)) + def copy_and_advance(self, other: "RoomStreamToken") -> "RoomStreamToken": + """Return a new token such that if an event is after both this token and + the other token, then its after the returned token too. + """ + + if self.topological or other.topological: + raise Exception("Can't advance topological tokens") + + max_stream = max(self.stream, other.stream) + + return RoomStreamToken(None, max_stream) + def as_tuple(self) -> Tuple[Optional[int], int]: return (self.topological, self.stream) @@ -458,31 +470,20 @@ class StreamToken: def room_stream_id(self): return self.room_key.stream - def is_after(self, other): - """Does this token contain events that the other doesn't?""" - return ( - (other.room_stream_id < self.room_stream_id) - or (int(other.presence_key) < int(self.presence_key)) - or (int(other.typing_key) < int(self.typing_key)) - or (int(other.receipt_key) < int(self.receipt_key)) - or (int(other.account_data_key) < int(self.account_data_key)) - or (int(other.push_rules_key) < int(self.push_rules_key)) - or (int(other.to_device_key) < int(self.to_device_key)) - or (int(other.device_list_key) < int(self.device_list_key)) - or (int(other.groups_key) < int(self.groups_key)) - ) - def copy_and_advance(self, key, new_value) -> "StreamToken": """Advance the given key in the token to a new value if and only if the new value is after the old value. """ - new_token = self.copy_and_replace(key, new_value) if key == "room_key": - new_id = new_token.room_stream_id - old_id = self.room_stream_id - else: - new_id = int(getattr(new_token, key)) - old_id = int(getattr(self, key)) + new_token = self.copy_and_replace( + "room_key", self.room_key.copy_and_advance(new_value) + ) + return new_token + + new_token = self.copy_and_replace(key, new_value) + new_id = int(getattr(new_token, key)) + old_id = int(getattr(self, key)) + if old_id < new_id: return new_token else: @@ -509,6 +510,18 @@ class PersistedEventPosition: def persisted_after(self, token: RoomStreamToken) -> bool: return token.stream < self.stream + def to_room_stream_token(self) -> RoomStreamToken: + """Converts the position to a room stream token such that events + persisted in the same room after this position will be after the + returned `RoomStreamToken`. + + Note: no guarentees are made about ordering w.r.t. events in other + rooms. + """ + # Doing the naive thing satisfies the desired properties described in + # the docstring. + return RoomStreamToken(None, self.stream) + class ThirdPartyInstanceID( namedtuple("ThirdPartyInstanceID", ("appservice_id", "network_id")) diff --git a/tests/rest/client/v1/test_rooms.py b/tests/rest/client/v1/test_rooms.py index 0a567b032f..a3287011e9 100644 --- a/tests/rest/client/v1/test_rooms.py +++ b/tests/rest/client/v1/test_rooms.py @@ -902,15 +902,15 @@ class RoomMessageListTestCase(RoomBase): # Send a first message in the room, which will be removed by the purge. first_event_id = self.helper.send(self.room_id, "message 1")["event_id"] - first_token = self.get_success( - store.get_topological_token_for_event(first_event_id) + first_token = str( + self.get_success(store.get_topological_token_for_event(first_event_id)) ) # Send a second message in the room, which won't be removed, and which we'll # use as the marker to purge events before. second_event_id = self.helper.send(self.room_id, "message 2")["event_id"] - second_token = self.get_success( - store.get_topological_token_for_event(second_event_id) + second_token = str( + self.get_success(store.get_topological_token_for_event(second_event_id)) ) # Send a third event in the room to ensure we don't fall under any edge case diff --git a/tests/storage/test_purge.py b/tests/storage/test_purge.py index 918387733b..723cd28933 100644 --- a/tests/storage/test_purge.py +++ b/tests/storage/test_purge.py @@ -47,8 +47,8 @@ class PurgeTests(HomeserverTestCase): storage = self.hs.get_storage() # Get the topological token - event = self.get_success( - store.get_topological_token_for_event(last["event_id"]) + event = str( + self.get_success(store.get_topological_token_for_event(last["event_id"])) ) # Purge everything before this topological token @@ -74,12 +74,10 @@ class PurgeTests(HomeserverTestCase): storage = self.hs.get_datastore() # Set the topological token higher than it should be - event = self.get_success( + token = self.get_success( storage.get_topological_token_for_event(last["event_id"]) ) - event = "t{}-{}".format( - *list(map(lambda x: x + 1, map(int, event[1:].split("-")))) - ) + event = "t{}-{}".format(token.topological + 1, token.stream + 1) # Purge everything before this topological token purge = defer.ensureDeferred(storage.purge_history(self.room_id, event, True)) -- cgit 1.5.1 From 7941372ec84786f85ae6d75fd2d7a4af5b72ac98 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 30 Sep 2020 20:29:19 +0100 Subject: Make token serializing/deserializing async (#8427) The idea is that in future tokens will encode a mapping of instance to position. However, we don't want to include the full instance name in the string representation, so instead we'll have a mapping between instance name and an immutable integer ID in the DB that we can use instead. We'll then do the lookup when we serialize/deserialize the token (we could alternatively pass around an `Instance` type that includes both the name and ID, but that turns out to be a lot more invasive). --- changelog.d/8427.misc | 1 + synapse/handlers/events.py | 4 +-- synapse/handlers/initial_sync.py | 14 ++++----- synapse/handlers/pagination.py | 8 ++--- synapse/handlers/room.py | 8 +++-- synapse/handlers/search.py | 8 ++--- synapse/rest/admin/__init__.py | 2 +- synapse/rest/client/v1/events.py | 3 +- synapse/rest/client/v1/initial_sync.py | 3 +- synapse/rest/client/v1/room.py | 11 +++++-- synapse/rest/client/v2_alpha/keys.py | 3 +- synapse/rest/client/v2_alpha/sync.py | 10 +++--- synapse/storage/databases/main/purge_events.py | 8 ++--- synapse/streams/config.py | 9 +++--- synapse/types.py | 43 +++++++++++++++++++++----- tests/rest/client/v1/test_rooms.py | 30 +++++++++++++----- tests/storage/test_purge.py | 9 ++++-- 17 files changed, 115 insertions(+), 59 deletions(-) create mode 100644 changelog.d/8427.misc (limited to 'tests/rest/client/v1/test_rooms.py') diff --git a/changelog.d/8427.misc b/changelog.d/8427.misc new file mode 100644 index 0000000000..c9656b9112 --- /dev/null +++ b/changelog.d/8427.misc @@ -0,0 +1 @@ +Make stream token serializing/deserializing async. diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 0875b74ea8..539b4fc32e 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -133,8 +133,8 @@ class EventStreamHandler(BaseHandler): chunk = { "chunk": chunks, - "start": tokens[0].to_string(), - "end": tokens[1].to_string(), + "start": await tokens[0].to_string(self.store), + "end": await tokens[1].to_string(self.store), } return chunk diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index 43f15435de..39a85801c1 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -203,8 +203,8 @@ class InitialSyncHandler(BaseHandler): messages, time_now=time_now, as_client_event=as_client_event ) ), - "start": start_token.to_string(), - "end": end_token.to_string(), + "start": await start_token.to_string(self.store), + "end": await end_token.to_string(self.store), } d["state"] = await self._event_serializer.serialize_events( @@ -249,7 +249,7 @@ class InitialSyncHandler(BaseHandler): ], "account_data": account_data_events, "receipts": receipt, - "end": now_token.to_string(), + "end": await now_token.to_string(self.store), } return ret @@ -348,8 +348,8 @@ class InitialSyncHandler(BaseHandler): "chunk": ( await self._event_serializer.serialize_events(messages, time_now) ), - "start": start_token.to_string(), - "end": end_token.to_string(), + "start": await start_token.to_string(self.store), + "end": await end_token.to_string(self.store), }, "state": ( await self._event_serializer.serialize_events( @@ -447,8 +447,8 @@ class InitialSyncHandler(BaseHandler): "chunk": ( await self._event_serializer.serialize_events(messages, time_now) ), - "start": start_token.to_string(), - "end": end_token.to_string(), + "start": await start_token.to_string(self.store), + "end": await end_token.to_string(self.store), }, "state": state, "presence": presence, diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index d6779a4b44..2c2a633938 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -413,8 +413,8 @@ class PaginationHandler: if not events: return { "chunk": [], - "start": from_token.to_string(), - "end": next_token.to_string(), + "start": await from_token.to_string(self.store), + "end": await next_token.to_string(self.store), } state = None @@ -442,8 +442,8 @@ class PaginationHandler: events, time_now, as_client_event=as_client_event ) ), - "start": from_token.to_string(), - "end": next_token.to_string(), + "start": await from_token.to_string(self.store), + "end": await next_token.to_string(self.store), } if state: diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 836b3f381a..d5f7c78edf 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1077,11 +1077,13 @@ class RoomContextHandler: # the token, which we replace. token = StreamToken.START - results["start"] = token.copy_and_replace( + results["start"] = await token.copy_and_replace( "room_key", results["start"] - ).to_string() + ).to_string(self.store) - results["end"] = token.copy_and_replace("room_key", results["end"]).to_string() + results["end"] = await token.copy_and_replace( + "room_key", results["end"] + ).to_string(self.store) return results diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index 6a76c20d79..e9402e6e2e 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -362,13 +362,13 @@ class SearchHandler(BaseHandler): self.storage, user.to_string(), res["events_after"] ) - res["start"] = now_token.copy_and_replace( + res["start"] = await now_token.copy_and_replace( "room_key", res["start"] - ).to_string() + ).to_string(self.store) - res["end"] = now_token.copy_and_replace( + res["end"] = await now_token.copy_and_replace( "room_key", res["end"] - ).to_string() + ).to_string(self.store) if include_profile: senders = { diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py index ba53f66f02..57cac22252 100644 --- a/synapse/rest/admin/__init__.py +++ b/synapse/rest/admin/__init__.py @@ -110,7 +110,7 @@ class PurgeHistoryRestServlet(RestServlet): raise SynapseError(400, "Event is for wrong room.") room_token = await self.store.get_topological_token_for_event(event_id) - token = str(room_token) + token = await room_token.to_string(self.store) logger.info("[purge] purging up to token %s (event_id %s)", token, event_id) elif "purge_up_to_ts" in body: diff --git a/synapse/rest/client/v1/events.py b/synapse/rest/client/v1/events.py index 985d994f6b..1ecb77aa26 100644 --- a/synapse/rest/client/v1/events.py +++ b/synapse/rest/client/v1/events.py @@ -33,6 +33,7 @@ class EventStreamRestServlet(RestServlet): super().__init__() self.event_stream_handler = hs.get_event_stream_handler() self.auth = hs.get_auth() + self.store = hs.get_datastore() async def on_GET(self, request): requester = await self.auth.get_user_by_req(request, allow_guest=True) @@ -44,7 +45,7 @@ class EventStreamRestServlet(RestServlet): if b"room_id" in request.args: room_id = request.args[b"room_id"][0].decode("ascii") - pagin_config = PaginationConfig.from_request(request) + pagin_config = await PaginationConfig.from_request(self.store, request) timeout = EventStreamRestServlet.DEFAULT_LONGPOLL_TIME_MS if b"timeout" in request.args: try: diff --git a/synapse/rest/client/v1/initial_sync.py b/synapse/rest/client/v1/initial_sync.py index d7042786ce..91da0ee573 100644 --- a/synapse/rest/client/v1/initial_sync.py +++ b/synapse/rest/client/v1/initial_sync.py @@ -27,11 +27,12 @@ class InitialSyncRestServlet(RestServlet): super().__init__() self.initial_sync_handler = hs.get_initial_sync_handler() self.auth = hs.get_auth() + self.store = hs.get_datastore() async def on_GET(self, request): requester = await self.auth.get_user_by_req(request) as_client_event = b"raw" not in request.args - pagination_config = PaginationConfig.from_request(request) + pagination_config = await PaginationConfig.from_request(self.store, request) include_archived = parse_boolean(request, "archived", default=False) content = await self.initial_sync_handler.snapshot_all_rooms( user_id=requester.user.to_string(), diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 7e64a2e0fe..b63389e5fe 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -451,6 +451,7 @@ class RoomMemberListRestServlet(RestServlet): super().__init__() self.message_handler = hs.get_message_handler() self.auth = hs.get_auth() + self.store = hs.get_datastore() async def on_GET(self, request, room_id): # TODO support Pagination stream API (limit/tokens) @@ -465,7 +466,7 @@ class RoomMemberListRestServlet(RestServlet): if at_token_string is None: at_token = None else: - at_token = StreamToken.from_string(at_token_string) + at_token = await StreamToken.from_string(self.store, at_token_string) # let you filter down on particular memberships. # XXX: this may not be the best shape for this API - we could pass in a filter @@ -521,10 +522,13 @@ class RoomMessageListRestServlet(RestServlet): super().__init__() self.pagination_handler = hs.get_pagination_handler() self.auth = hs.get_auth() + self.store = hs.get_datastore() async def on_GET(self, request, room_id): requester = await self.auth.get_user_by_req(request, allow_guest=True) - pagination_config = PaginationConfig.from_request(request, default_limit=10) + pagination_config = await PaginationConfig.from_request( + self.store, request, default_limit=10 + ) as_client_event = b"raw" not in request.args filter_str = parse_string(request, b"filter", encoding="utf-8") if filter_str: @@ -580,10 +584,11 @@ class RoomInitialSyncRestServlet(RestServlet): super().__init__() self.initial_sync_handler = hs.get_initial_sync_handler() self.auth = hs.get_auth() + self.store = hs.get_datastore() async def on_GET(self, request, room_id): requester = await self.auth.get_user_by_req(request, allow_guest=True) - pagination_config = PaginationConfig.from_request(request) + pagination_config = await PaginationConfig.from_request(self.store, request) content = await self.initial_sync_handler.room_initial_sync( room_id=room_id, requester=requester, pagin_config=pagination_config ) diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py index 7abd6ff333..55c4606569 100644 --- a/synapse/rest/client/v2_alpha/keys.py +++ b/synapse/rest/client/v2_alpha/keys.py @@ -180,6 +180,7 @@ class KeyChangesServlet(RestServlet): super().__init__() self.auth = hs.get_auth() self.device_handler = hs.get_device_handler() + self.store = hs.get_datastore() async def on_GET(self, request): requester = await self.auth.get_user_by_req(request, allow_guest=True) @@ -191,7 +192,7 @@ class KeyChangesServlet(RestServlet): # changes after the "to" as well as before. set_tag("to", parse_string(request, "to")) - from_token = StreamToken.from_string(from_token_string) + from_token = await StreamToken.from_string(self.store, from_token_string) user_id = requester.user.to_string() diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 51e395cc64..6779df952f 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -77,6 +77,7 @@ class SyncRestServlet(RestServlet): super().__init__() self.hs = hs self.auth = hs.get_auth() + self.store = hs.get_datastore() self.sync_handler = hs.get_sync_handler() self.clock = hs.get_clock() self.filtering = hs.get_filtering() @@ -151,10 +152,9 @@ class SyncRestServlet(RestServlet): device_id=device_id, ) + since_token = None if since is not None: - since_token = StreamToken.from_string(since) - else: - since_token = None + since_token = await StreamToken.from_string(self.store, since) # send any outstanding server notices to the user. await self._server_notices_sender.on_user_syncing(user.to_string()) @@ -236,7 +236,7 @@ class SyncRestServlet(RestServlet): "leave": sync_result.groups.leave, }, "device_one_time_keys_count": sync_result.device_one_time_keys_count, - "next_batch": sync_result.next_batch.to_string(), + "next_batch": await sync_result.next_batch.to_string(self.store), } @staticmethod @@ -413,7 +413,7 @@ class SyncRestServlet(RestServlet): result = { "timeline": { "events": serialized_timeline, - "prev_batch": room.timeline.prev_batch.to_string(), + "prev_batch": await room.timeline.prev_batch.to_string(self.store), "limited": room.timeline.limited, }, "state": {"events": serialized_state}, diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py index d7a03cbf7d..ecfc6717b3 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py @@ -42,17 +42,17 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore): The set of state groups that are referenced by deleted events. """ + parsed_token = await RoomStreamToken.parse(self, token) + return await self.db_pool.runInteraction( "purge_history", self._purge_history_txn, room_id, - token, + parsed_token, delete_local_events, ) - def _purge_history_txn(self, txn, room_id, token_str, delete_local_events): - token = RoomStreamToken.parse(token_str) - + def _purge_history_txn(self, txn, room_id, token, delete_local_events): # Tables that should be pruned: # event_auth # event_backward_extremities diff --git a/synapse/streams/config.py b/synapse/streams/config.py index 0bdf846edf..fdda21d165 100644 --- a/synapse/streams/config.py +++ b/synapse/streams/config.py @@ -12,7 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging from typing import Optional @@ -21,6 +20,7 @@ import attr from synapse.api.errors import SynapseError from synapse.http.servlet import parse_integer, parse_string from synapse.http.site import SynapseRequest +from synapse.storage.databases.main import DataStore from synapse.types import StreamToken logger = logging.getLogger(__name__) @@ -39,8 +39,9 @@ class PaginationConfig: limit = attr.ib(type=Optional[int]) @classmethod - def from_request( + async def from_request( cls, + store: "DataStore", request: SynapseRequest, raise_invalid_params: bool = True, default_limit: Optional[int] = None, @@ -54,13 +55,13 @@ class PaginationConfig: if from_tok == "END": from_tok = None # For backwards compat. elif from_tok: - from_tok = StreamToken.from_string(from_tok) + from_tok = await StreamToken.from_string(store, from_tok) except Exception: raise SynapseError(400, "'from' parameter is invalid") try: if to_tok: - to_tok = StreamToken.from_string(to_tok) + to_tok = await StreamToken.from_string(store, to_tok) except Exception: raise SynapseError(400, "'to' parameter is invalid") diff --git a/synapse/types.py b/synapse/types.py index 02bcc197ec..bd271f9f16 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -18,7 +18,17 @@ import re import string import sys from collections import namedtuple -from typing import Any, Dict, Mapping, MutableMapping, Optional, Tuple, Type, TypeVar +from typing import ( + TYPE_CHECKING, + Any, + Dict, + Mapping, + MutableMapping, + Optional, + Tuple, + Type, + TypeVar, +) import attr from signedjson.key import decode_verify_key_bytes @@ -26,6 +36,9 @@ from unpaddedbase64 import decode_base64 from synapse.api.errors import Codes, SynapseError +if TYPE_CHECKING: + from synapse.storage.databases.main import DataStore + # define a version of typing.Collection that works on python 3.5 if sys.version_info[:3] >= (3, 6, 0): from typing import Collection @@ -393,7 +406,7 @@ class RoomStreamToken: stream = attr.ib(type=int, validator=attr.validators.instance_of(int)) @classmethod - def parse(cls, string: str) -> "RoomStreamToken": + async def parse(cls, store: "DataStore", string: str) -> "RoomStreamToken": try: if string[0] == "s": return cls(topological=None, stream=int(string[1:])) @@ -428,7 +441,7 @@ class RoomStreamToken: def as_tuple(self) -> Tuple[Optional[int], int]: return (self.topological, self.stream) - def __str__(self) -> str: + async def to_string(self, store: "DataStore") -> str: if self.topological is not None: return "t%d-%d" % (self.topological, self.stream) else: @@ -453,18 +466,32 @@ class StreamToken: START = None # type: StreamToken @classmethod - def from_string(cls, string): + async def from_string(cls, store: "DataStore", string: str) -> "StreamToken": try: keys = string.split(cls._SEPARATOR) while len(keys) < len(attr.fields(cls)): # i.e. old token from before receipt_key keys.append("0") - return cls(RoomStreamToken.parse(keys[0]), *(int(k) for k in keys[1:])) + return cls( + await RoomStreamToken.parse(store, keys[0]), *(int(k) for k in keys[1:]) + ) except Exception: raise SynapseError(400, "Invalid Token") - def to_string(self): - return self._SEPARATOR.join([str(k) for k in attr.astuple(self, recurse=False)]) + async def to_string(self, store: "DataStore") -> str: + return self._SEPARATOR.join( + [ + await self.room_key.to_string(store), + str(self.presence_key), + str(self.typing_key), + str(self.receipt_key), + str(self.account_data_key), + str(self.push_rules_key), + str(self.to_device_key), + str(self.device_list_key), + str(self.groups_key), + ] + ) @property def room_stream_id(self): @@ -493,7 +520,7 @@ class StreamToken: return attr.evolve(self, **{key: new_value}) -StreamToken.START = StreamToken.from_string("s0_0") +StreamToken.START = StreamToken(RoomStreamToken(None, 0), 0, 0, 0, 0, 0, 0, 0, 0) @attr.s(slots=True, frozen=True) diff --git a/tests/rest/client/v1/test_rooms.py b/tests/rest/client/v1/test_rooms.py index a3287011e9..0d809d25d5 100644 --- a/tests/rest/client/v1/test_rooms.py +++ b/tests/rest/client/v1/test_rooms.py @@ -902,16 +902,18 @@ class RoomMessageListTestCase(RoomBase): # Send a first message in the room, which will be removed by the purge. first_event_id = self.helper.send(self.room_id, "message 1")["event_id"] - first_token = str( - self.get_success(store.get_topological_token_for_event(first_event_id)) + first_token = self.get_success( + store.get_topological_token_for_event(first_event_id) ) + first_token_str = self.get_success(first_token.to_string(store)) # Send a second message in the room, which won't be removed, and which we'll # use as the marker to purge events before. second_event_id = self.helper.send(self.room_id, "message 2")["event_id"] - second_token = str( - self.get_success(store.get_topological_token_for_event(second_event_id)) + second_token = self.get_success( + store.get_topological_token_for_event(second_event_id) ) + second_token_str = self.get_success(second_token.to_string(store)) # Send a third event in the room to ensure we don't fall under any edge case # due to our marker being the latest forward extremity in the room. @@ -921,7 +923,11 @@ class RoomMessageListTestCase(RoomBase): request, channel = self.make_request( "GET", "/rooms/%s/messages?access_token=x&from=%s&dir=b&filter=%s" - % (self.room_id, second_token, json.dumps({"types": [EventTypes.Message]})), + % ( + self.room_id, + second_token_str, + json.dumps({"types": [EventTypes.Message]}), + ), ) self.render(request) self.assertEqual(channel.code, 200, channel.json_body) @@ -936,7 +942,7 @@ class RoomMessageListTestCase(RoomBase): pagination_handler._purge_history( purge_id=purge_id, room_id=self.room_id, - token=second_token, + token=second_token_str, delete_local_events=True, ) ) @@ -946,7 +952,11 @@ class RoomMessageListTestCase(RoomBase): request, channel = self.make_request( "GET", "/rooms/%s/messages?access_token=x&from=%s&dir=b&filter=%s" - % (self.room_id, second_token, json.dumps({"types": [EventTypes.Message]})), + % ( + self.room_id, + second_token_str, + json.dumps({"types": [EventTypes.Message]}), + ), ) self.render(request) self.assertEqual(channel.code, 200, channel.json_body) @@ -960,7 +970,11 @@ class RoomMessageListTestCase(RoomBase): request, channel = self.make_request( "GET", "/rooms/%s/messages?access_token=x&from=%s&dir=b&filter=%s" - % (self.room_id, first_token, json.dumps({"types": [EventTypes.Message]})), + % ( + self.room_id, + first_token_str, + json.dumps({"types": [EventTypes.Message]}), + ), ) self.render(request) self.assertEqual(channel.code, 200, channel.json_body) diff --git a/tests/storage/test_purge.py b/tests/storage/test_purge.py index 723cd28933..cc1f3c53c5 100644 --- a/tests/storage/test_purge.py +++ b/tests/storage/test_purge.py @@ -47,12 +47,15 @@ class PurgeTests(HomeserverTestCase): storage = self.hs.get_storage() # Get the topological token - event = str( - self.get_success(store.get_topological_token_for_event(last["event_id"])) + token = self.get_success( + store.get_topological_token_for_event(last["event_id"]) ) + token_str = self.get_success(token.to_string(self.hs.get_datastore())) # Purge everything before this topological token - self.get_success(storage.purge_events.purge_history(self.room_id, event, True)) + self.get_success( + storage.purge_events.purge_history(self.room_id, token_str, True) + ) # 1-3 should fail and last will succeed, meaning that 1-3 are deleted # and last is not. -- cgit 1.5.1