From d936371b698ea3085472ee83ae9a88ea7832280e Mon Sep 17 00:00:00 2001 From: Sorunome Date: Wed, 9 Jun 2021 20:39:51 +0200 Subject: Implement knock feature (#6739) This PR aims to implement the knock feature as proposed in https://github.com/matrix-org/matrix-doc/pull/2403 Signed-off-by: Sorunome mail@sorunome.de Signed-off-by: Andrew Morgan andrewm@element.io --- synapse/config/experimental.py | 7 +++++++ 1 file changed, 7 insertions(+) (limited to 'synapse/config/experimental.py') diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py index 6ebce4b2f7..37668079e7 100644 --- a/synapse/config/experimental.py +++ b/synapse/config/experimental.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersions from synapse.config._base import Config from synapse.types import JsonDict @@ -29,3 +30,9 @@ class ExperimentalConfig(Config): # MSC3026 (busy presence state) self.msc3026_enabled = experimental.get("msc3026_enabled", False) # type: bool + + # MSC2403 (room knocking) + self.msc2403_enabled = experimental.get("msc2403_enabled", False) # type: bool + if self.msc2403_enabled: + # Enable the MSC2403 unstable room version + KNOWN_ROOM_VERSIONS[RoomVersions.MSC2403.identifier] = RoomVersions.MSC2403 -- cgit 1.5.1 From 9e5ab6dd581389271b817d256e2fca113614a080 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 15 Jun 2021 07:45:14 -0400 Subject: Remove the experimental flag for knocking and use stable prefixes / endpoints. (#10167) * Room version 7 for knocking. * Stable prefixes and endpoints (both client and federation) for knocking. * Removes the experimental configuration flag. --- changelog.d/10167.feature | 1 + synapse/api/constants.py | 4 ++-- synapse/api/room_versions.py | 7 ++++--- synapse/config/experimental.py | 7 ------- synapse/federation/federation_client.py | 9 ++------- synapse/federation/transport/client.py | 27 +++------------------------ synapse/federation/transport/server.py | 22 ++-------------------- synapse/handlers/federation.py | 6 ++---- synapse/handlers/room_member.py | 5 +---- synapse/rest/__init__.py | 5 +---- synapse/rest/client/v2_alpha/knock.py | 6 ++---- tests/federation/transport/test_knocking.py | 22 +++++++++------------- tests/rest/client/v2_alpha/test_sync.py | 8 ++++---- 13 files changed, 33 insertions(+), 96 deletions(-) create mode 100644 changelog.d/10167.feature (limited to 'synapse/config/experimental.py') diff --git a/changelog.d/10167.feature b/changelog.d/10167.feature new file mode 100644 index 0000000000..9c41140194 --- /dev/null +++ b/changelog.d/10167.feature @@ -0,0 +1 @@ +Implement "room knocking" as per [MSC2403](https://github.com/matrix-org/matrix-doc/pull/2403). Contributed by Sorunome and anoa. \ No newline at end of file diff --git a/synapse/api/constants.py b/synapse/api/constants.py index 8d5b2177d2..3940da5c88 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -41,7 +41,7 @@ class Membership: INVITE = "invite" JOIN = "join" - KNOCK = "xyz.amorgan.knock" + KNOCK = "knock" LEAVE = "leave" BAN = "ban" LIST = (INVITE, JOIN, KNOCK, LEAVE, BAN) @@ -58,7 +58,7 @@ class PresenceState: class JoinRules: PUBLIC = "public" - KNOCK = "xyz.amorgan.knock" + KNOCK = "knock" INVITE = "invite" PRIVATE = "private" # As defined for MSC3083. diff --git a/synapse/api/room_versions.py b/synapse/api/room_versions.py index 3349f399ba..f6c1c97b40 100644 --- a/synapse/api/room_versions.py +++ b/synapse/api/room_versions.py @@ -180,9 +180,9 @@ class RoomVersions: msc3083_join_rules=True, msc2403_knocking=False, ) - MSC2403 = RoomVersion( - "xyz.amorgan.knock", - RoomDisposition.UNSTABLE, + V7 = RoomVersion( + "7", + RoomDisposition.STABLE, EventFormatVersions.V3, StateResolutionVersions.V2, enforce_key_validity=True, @@ -206,6 +206,7 @@ KNOWN_ROOM_VERSIONS = { RoomVersions.V6, RoomVersions.MSC2176, RoomVersions.MSC3083, + RoomVersions.V7, ) # Note that we do not include MSC2043 here unless it is enabled in the config. } # type: Dict[str, RoomVersion] diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py index 37668079e7..6ebce4b2f7 100644 --- a/synapse/config/experimental.py +++ b/synapse/config/experimental.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersions from synapse.config._base import Config from synapse.types import JsonDict @@ -30,9 +29,3 @@ class ExperimentalConfig(Config): # MSC3026 (busy presence state) self.msc3026_enabled = experimental.get("msc3026_enabled", False) # type: bool - - # MSC2403 (room knocking) - self.msc2403_enabled = experimental.get("msc2403_enabled", False) # type: bool - if self.msc2403_enabled: - # Enable the MSC2403 unstable room version - KNOWN_ROOM_VERSIONS[RoomVersions.MSC2403.identifier] = RoomVersions.MSC2403 diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 03ec14ce87..ed09c6af1f 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -90,7 +90,6 @@ class FederationClient(FederationBase): self._clock.looping_call(self._clear_tried_cache, 60 * 1000) self.state = hs.get_state_handler() self.transport_layer = hs.get_federation_transport_client() - self._msc2403_enabled = hs.config.experimental.msc2403_enabled self.hostname = hs.hostname self.signing_key = hs.signing_key @@ -621,11 +620,7 @@ class FederationClient(FederationBase): SynapseError: if the chosen remote server returns a 300/400 code, or no servers successfully handle the request. """ - valid_memberships = {Membership.JOIN, Membership.LEAVE} - - # Allow knocking if the feature is enabled - if self._msc2403_enabled: - valid_memberships.add(Membership.KNOCK) + valid_memberships = {Membership.JOIN, Membership.LEAVE, Membership.KNOCK} if membership not in valid_memberships: raise RuntimeError( @@ -989,7 +984,7 @@ class FederationClient(FederationBase): return await self._do_send_knock(destination, pdu) return await self._try_destination_list( - "xyz.amorgan.knock/send_knock", destinations, send_request + "send_knock", destinations, send_request ) async def _do_send_knock(self, destination: str, pdu: EventBase) -> JsonDict: diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index af0c679ed9..c9e7c57461 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -47,7 +47,6 @@ class TransportLayerClient: def __init__(self, hs): self.server_name = hs.hostname self.client = hs.get_federation_http_client() - self._msc2403_enabled = hs.config.experimental.msc2403_enabled @log_function def get_room_state_ids(self, destination, room_id, event_id): @@ -221,29 +220,14 @@ class TransportLayerClient: Fails with ``FederationDeniedError`` if the remote destination is not in our federation whitelist """ - valid_memberships = {Membership.JOIN, Membership.LEAVE} - - # Allow knocking if the feature is enabled - if self._msc2403_enabled: - valid_memberships.add(Membership.KNOCK) + valid_memberships = {Membership.JOIN, Membership.LEAVE, Membership.KNOCK} if membership not in valid_memberships: raise RuntimeError( "make_membership_event called with membership='%s', must be one of %s" % (membership, ",".join(valid_memberships)) ) - - # Knock currently uses an unstable prefix - if membership == Membership.KNOCK: - # Create a path in the form of /unstable/xyz.amorgan.knock/make_knock/... - path = _create_path( - FEDERATION_UNSTABLE_PREFIX + "/xyz.amorgan.knock", - "/make_knock/%s/%s", - room_id, - user_id, - ) - else: - path = _create_v1_path("/make_%s/%s/%s", membership, room_id, user_id) + path = _create_v1_path("/make_%s/%s/%s", membership, room_id, user_id) ignore_backoff = False retry_on_dns_fail = False @@ -366,12 +350,7 @@ class TransportLayerClient: The list of state events may be empty. """ - path = _create_path( - FEDERATION_UNSTABLE_PREFIX + "/xyz.amorgan.knock", - "/send_knock/%s/%s", - room_id, - event_id, - ) + path = _create_v1_path("/send_knock/%s/%s", room_id, event_id) return await self.client.put_json( destination=destination, path=path, data=content diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index fe5fb6bee7..16d740cf58 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -567,8 +567,6 @@ class FederationV2SendLeaveServlet(BaseFederationServerServlet): class FederationMakeKnockServlet(BaseFederationServerServlet): PATH = "/make_knock/(?P[^/]*)/(?P[^/]*)" - PREFIX = FEDERATION_UNSTABLE_PREFIX + "/xyz.amorgan.knock" - async def on_GET(self, origin, content, query, room_id, user_id): try: # Retrieve the room versions the remote homeserver claims to support @@ -585,8 +583,6 @@ class FederationMakeKnockServlet(BaseFederationServerServlet): class FederationV1SendKnockServlet(BaseFederationServerServlet): PATH = "/send_knock/(?P[^/]*)/(?P[^/]*)" - PREFIX = FEDERATION_UNSTABLE_PREFIX + "/xyz.amorgan.knock" - async def on_PUT(self, origin, content, query, room_id, event_id): content = await self.handler.on_send_knock_request(origin, content, room_id) return 200, content @@ -1610,6 +1606,8 @@ FEDERATION_SERVLET_CLASSES = ( FederationVersionServlet, RoomComplexityServlet, FederationSpaceSummaryServlet, + FederationV1SendKnockServlet, + FederationMakeKnockServlet, ) # type: Tuple[Type[BaseFederationServlet], ...] OPENID_SERVLET_CLASSES = ( @@ -1652,12 +1650,6 @@ GROUP_ATTESTATION_SERVLET_CLASSES = ( ) # type: Tuple[Type[BaseFederationServlet], ...] -MSC2403_SERVLET_CLASSES = ( - FederationV1SendKnockServlet, - FederationMakeKnockServlet, -) - - DEFAULT_SERVLET_GROUPS = ( "federation", "room_list", @@ -1700,16 +1692,6 @@ def register_servlets( server_name=hs.hostname, ).register(resource) - # Register msc2403 (knocking) servlets if the feature is enabled - if hs.config.experimental.msc2403_enabled: - for servletclass in MSC2403_SERVLET_CLASSES: - servletclass( - hs=hs, - authenticator=authenticator, - ratelimiter=ratelimiter, - server_name=hs.hostname, - ).register(resource) - if "openid" in servlet_groups: for servletclass in OPENID_SERVLET_CLASSES: servletclass( diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 6647063485..b3a93212f1 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -2009,8 +2009,7 @@ class FederationHandler(BaseHandler): """ if get_domain_from_id(user_id) != origin: logger.info( - "Get /xyz.amorgan.knock/make_knock request for user %r" - "from different origin %s, ignoring", + "Get /make_knock request for user %r from different origin %s, ignoring", user_id, origin, ) @@ -2077,8 +2076,7 @@ class FederationHandler(BaseHandler): if get_domain_from_id(event.sender) != origin: logger.info( - "Got /xyz.amorgan.knock/send_knock request for user %r " - "from different origin %s", + "Got /send_knock request for user %r from different origin %s", event.sender, origin, ) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index c26963b1e1..a49a61a34c 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -707,10 +707,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): knock.event_id, txn_id, requester, content ) - elif ( - self.config.experimental.msc2403_enabled - and effective_membership_state == Membership.KNOCK - ): + elif effective_membership_state == Membership.KNOCK: if not is_host_in_room: # The knock needs to be sent over federation instead remote_room_hosts.append(get_domain_from_id(room_id)) diff --git a/synapse/rest/__init__.py b/synapse/rest/__init__.py index 138411ad19..d29f2fea5e 100644 --- a/synapse/rest/__init__.py +++ b/synapse/rest/__init__.py @@ -121,10 +121,7 @@ class ClientRestResource(JsonResource): account_validity.register_servlets(hs, client_resource) relations.register_servlets(hs, client_resource) password_policy.register_servlets(hs, client_resource) - - # Register msc2403 (knocking) servlets if the feature is enabled - if hs.config.experimental.msc2403_enabled: - knock.register_servlets(hs, client_resource) + knock.register_servlets(hs, client_resource) # moving to /_synapse/admin admin.register_servlets_for_client_rest_resource(hs, client_resource) diff --git a/synapse/rest/client/v2_alpha/knock.py b/synapse/rest/client/v2_alpha/knock.py index f046bf9cb3..7d1bc40658 100644 --- a/synapse/rest/client/v2_alpha/knock.py +++ b/synapse/rest/client/v2_alpha/knock.py @@ -39,12 +39,10 @@ logger = logging.getLogger(__name__) class KnockRoomAliasServlet(RestServlet): """ - POST /xyz.amorgan.knock/{roomIdOrAlias} + POST /knock/{roomIdOrAlias} """ - PATTERNS = client_patterns( - "/xyz.amorgan.knock/(?P[^/]*)", releases=() - ) + PATTERNS = client_patterns("/knock/(?P[^/]*)") def __init__(self, hs: "HomeServer"): super().__init__() diff --git a/tests/federation/transport/test_knocking.py b/tests/federation/transport/test_knocking.py index 121aa88cfa..8c215d50f2 100644 --- a/tests/federation/transport/test_knocking.py +++ b/tests/federation/transport/test_knocking.py @@ -25,9 +25,6 @@ from synapse.types import RoomAlias from tests.test_utils import event_injection from tests.unittest import FederatingHomeserverTestCase, TestCase, override_config -# An identifier to use while MSC2304 is not in a stable release of the spec -KNOCK_UNSTABLE_IDENTIFIER = "xyz.amorgan.knock" - class KnockingStrippedStateEventHelperMixin(TestCase): def send_example_state_events_to_room( @@ -61,7 +58,7 @@ class KnockingStrippedStateEventHelperMixin(TestCase): self.get_success( event_injection.inject_event( hs, - room_version=RoomVersions.MSC2403.identifier, + room_version=RoomVersions.V7.identifier, room_id=room_id, sender=sender, type="com.example.secret", @@ -121,7 +118,7 @@ class KnockingStrippedStateEventHelperMixin(TestCase): self.get_success( event_injection.inject_event( hs, - room_version=RoomVersions.MSC2403.identifier, + room_version=RoomVersions.V7.identifier, room_id=room_id, sender=sender, type=event_type, @@ -135,7 +132,7 @@ class KnockingStrippedStateEventHelperMixin(TestCase): room_state[EventTypes.Create] = { "content": { "creator": sender, - "room_version": RoomVersions.MSC2403.identifier, + "room_version": RoomVersions.V7.identifier, }, "state_key": "", } @@ -232,7 +229,7 @@ class FederationKnockingTestCase( room_id = self.helper.create_room_as( "u1", is_public=False, - room_version=RoomVersions.MSC2403.identifier, + room_version=RoomVersions.V7.identifier, tok=user_token, ) @@ -243,14 +240,13 @@ class FederationKnockingTestCase( channel = self.make_request( "GET", - "/_matrix/federation/unstable/%s/make_knock/%s/%s?ver=%s" + "/_matrix/federation/v1/make_knock/%s/%s?ver=%s" % ( - KNOCK_UNSTABLE_IDENTIFIER, room_id, fake_knocking_user_id, # Inform the remote that we support the room version of the room we're # knocking on - RoomVersions.MSC2403.identifier, + RoomVersions.V7.identifier, ), ) self.assertEquals(200, channel.code, channel.result) @@ -275,7 +271,7 @@ class FederationKnockingTestCase( self.clock, self.hs.hostname, self.hs.signing_key, - room_version=RoomVersions.MSC2403, + room_version=RoomVersions.V7, event_dict=knock_event, ) @@ -287,8 +283,8 @@ class FederationKnockingTestCase( # Send the signed knock event into the room channel = self.make_request( "PUT", - "/_matrix/federation/unstable/%s/send_knock/%s/%s" - % (KNOCK_UNSTABLE_IDENTIFIER, room_id, signed_knock_event.event_id), + "/_matrix/federation/v1/send_knock/%s/%s" + % (room_id, signed_knock_event.event_id), signed_knock_event_json, ) self.assertEquals(200, channel.code, channel.result) diff --git a/tests/rest/client/v2_alpha/test_sync.py b/tests/rest/client/v2_alpha/test_sync.py index be5737e420..b52f78ba69 100644 --- a/tests/rest/client/v2_alpha/test_sync.py +++ b/tests/rest/client/v2_alpha/test_sync.py @@ -333,7 +333,7 @@ class SyncKnockTestCase( self.room_id = self.helper.create_room_as( self.user_id, is_public=False, - room_version="xyz.amorgan.knock", + room_version="7", tok=self.tok, ) @@ -363,7 +363,7 @@ class SyncKnockTestCase( # Knock on a room channel = self.make_request( "POST", - "/_matrix/client/unstable/xyz.amorgan.knock/%s" % (self.room_id,), + "/_matrix/client/r0/knock/%s" % (self.room_id,), b"{}", self.knocker_tok, ) @@ -371,7 +371,7 @@ class SyncKnockTestCase( # We expect to see the knock event in the stripped room state later self.expected_room_state[EventTypes.Member] = { - "content": {"membership": "xyz.amorgan.knock", "displayname": "knocker"}, + "content": {"membership": "knock", "displayname": "knocker"}, "state_key": "@knocker:test", } @@ -384,7 +384,7 @@ class SyncKnockTestCase( self.assertEqual(channel.code, 200, channel.json_body) # Extract the stripped room state events from /sync - knock_entry = channel.json_body["rooms"]["xyz.amorgan.knock"] + knock_entry = channel.json_body["rooms"]["knock"] room_state_events = knock_entry[self.room_id]["knock_state"]["events"] # Validate that the knock membership event came last -- cgit 1.5.1 From 96f6293de51c2fcf530bb6ca3705cf596c19656f Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 22 Jun 2021 04:02:53 -0500 Subject: Add endpoints for backfilling history (MSC2716) (#9247) Work on https://github.com/matrix-org/matrix-doc/pull/2716 --- changelog.d/9247.feature | 1 + scripts-dev/complement.sh | 2 +- synapse/api/auth.py | 7 +- synapse/api/constants.py | 15 ++ synapse/config/experimental.py | 3 + synapse/events/__init__.py | 9 + synapse/events/builder.py | 17 +- synapse/handlers/message.py | 104 +++++++- synapse/handlers/room_member.py | 90 +++++++ synapse/rest/client/v1/room.py | 288 ++++++++++++++++++++- synapse/storage/databases/main/event_federation.py | 50 +++- tests/handlers/test_presence.py | 4 +- tests/replication/test_federation_sender_shard.py | 4 +- tests/storage/test_redaction.py | 13 +- 14 files changed, 584 insertions(+), 23 deletions(-) create mode 100644 changelog.d/9247.feature (limited to 'synapse/config/experimental.py') diff --git a/changelog.d/9247.feature b/changelog.d/9247.feature new file mode 100644 index 0000000000..c687acf102 --- /dev/null +++ b/changelog.d/9247.feature @@ -0,0 +1 @@ +Add experimental support for backfilling history into rooms ([MSC2716](https://github.com/matrix-org/matrix-doc/pull/2716)). diff --git a/scripts-dev/complement.sh b/scripts-dev/complement.sh index 0043964673..ba060104c3 100755 --- a/scripts-dev/complement.sh +++ b/scripts-dev/complement.sh @@ -65,4 +65,4 @@ if [[ -n "$1" ]]; then fi # Run the tests! -go test -v -tags synapse_blacklist,msc2946,msc3083 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests +go test -v -tags synapse_blacklist,msc2946,msc3083,msc2716 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests diff --git a/synapse/api/auth.py b/synapse/api/auth.py index cf4333a923..edf1b918eb 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -92,11 +92,8 @@ class Auth: async def check_from_context( self, room_version: str, event, context, do_sig_check=True ) -> None: - prev_state_ids = await context.get_prev_state_ids() - auth_events_ids = self.compute_auth_events( - event, prev_state_ids, for_verification=True - ) - auth_events_by_id = await self.store.get_events(auth_events_ids) + auth_event_ids = event.auth_event_ids() + auth_events_by_id = await self.store.get_events(auth_event_ids) auth_events = {(e.type, e.state_key): e for e in auth_events_by_id.values()} room_version_obj = KNOWN_ROOM_VERSIONS[room_version] diff --git a/synapse/api/constants.py b/synapse/api/constants.py index 6c3958f7ab..414e4c019a 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -119,6 +119,9 @@ class EventTypes: SpaceChild = "m.space.child" SpaceParent = "m.space.parent" + MSC2716_INSERTION = "org.matrix.msc2716.insertion" + MSC2716_MARKER = "org.matrix.msc2716.marker" + class ToDeviceEventTypes: RoomKeyRequest = "m.room_key_request" @@ -185,6 +188,18 @@ class EventContentFields: # cf https://github.com/matrix-org/matrix-doc/pull/1772 ROOM_TYPE = "type" + # Used on normal messages to indicate they were historically imported after the fact + MSC2716_HISTORICAL = "org.matrix.msc2716.historical" + # For "insertion" events + MSC2716_NEXT_CHUNK_ID = "org.matrix.msc2716.next_chunk_id" + # Used on normal message events to indicate where the chunk connects to + MSC2716_CHUNK_ID = "org.matrix.msc2716.chunk_id" + # For "marker" events + MSC2716_MARKER_INSERTION = "org.matrix.msc2716.marker.insertion" + MSC2716_MARKER_INSERTION_PREV_EVENTS = ( + "org.matrix.msc2716.marker.insertion_prev_events" + ) + class RoomEncryptionAlgorithms: MEGOLM_V1_AES_SHA2 = "m.megolm.v1.aes-sha2" diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py index 6ebce4b2f7..7fb1f7021f 100644 --- a/synapse/config/experimental.py +++ b/synapse/config/experimental.py @@ -29,3 +29,6 @@ class ExperimentalConfig(Config): # MSC3026 (busy presence state) self.msc3026_enabled = experimental.get("msc3026_enabled", False) # type: bool + + # MSC2716 (backfill existing history) + self.msc2716_enabled = experimental.get("msc2716_enabled", False) # type: bool diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index c8b52cbc7a..0cb9c1cc1e 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -119,6 +119,7 @@ class _EventInternalMetadata: redacted = DictProperty("redacted") # type: bool txn_id = DictProperty("txn_id") # type: str token_id = DictProperty("token_id") # type: str + historical = DictProperty("historical") # type: bool # XXX: These are set by StreamWorkerStore._set_before_and_after. # I'm pretty sure that these are never persisted to the database, so shouldn't @@ -204,6 +205,14 @@ class _EventInternalMetadata: """ return self._dict.get("redacted", False) + def is_historical(self) -> bool: + """Whether this is a historical message. + This is used by the batchsend historical message endpoint and + is needed to and mark the event as backfilled and skip some checks + like push notifications. + """ + return self._dict.get("historical", False) + class EventBase(metaclass=abc.ABCMeta): @property diff --git a/synapse/events/builder.py b/synapse/events/builder.py index 5793553a88..81bf8615b7 100644 --- a/synapse/events/builder.py +++ b/synapse/events/builder.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging from typing import Any, Dict, List, Optional, Tuple, Union import attr @@ -33,6 +34,8 @@ from synapse.types import EventID, JsonDict from synapse.util import Clock from synapse.util.stringutils import random_string +logger = logging.getLogger(__name__) + @attr.s(slots=True, cmp=False, frozen=True) class EventBuilder: @@ -100,6 +103,7 @@ class EventBuilder: self, prev_event_ids: List[str], auth_event_ids: Optional[List[str]], + depth: Optional[int] = None, ) -> EventBase: """Transform into a fully signed and hashed event @@ -108,6 +112,9 @@ class EventBuilder: auth_event_ids: The event IDs to use as the auth events. Should normally be set to None, which will cause them to be calculated based on the room state at the prev_events. + depth: Override the depth used to order the event in the DAG. + Should normally be set to None, which will cause the depth to be calculated + based on the prev_events. Returns: The signed and hashed event. @@ -131,8 +138,14 @@ class EventBuilder: auth_events = auth_event_ids prev_events = prev_event_ids - old_depth = await self._store.get_max_depth_of(prev_event_ids) - depth = old_depth + 1 + # Otherwise, progress the depth as normal + if depth is None: + ( + _, + most_recent_prev_event_depth, + ) = await self._store.get_max_depth_of(prev_event_ids) + + depth = most_recent_prev_event_depth + 1 # we cap depth of generated events, to ensure that they are not # rejected by other servers (and so that they can be persisted in diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 4d2255bdf1..db12abd59d 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -482,6 +482,9 @@ class EventCreationHandler: prev_event_ids: Optional[List[str]] = None, auth_event_ids: Optional[List[str]] = None, require_consent: bool = True, + outlier: bool = False, + historical: bool = False, + depth: Optional[int] = None, ) -> Tuple[EventBase, EventContext]: """ Given a dict from a client, create a new event. @@ -508,6 +511,14 @@ class EventCreationHandler: require_consent: Whether to check if the requester has consented to the privacy policy. + + outlier: Indicates whether the event is an `outlier`, i.e. if + it's from an arbitrary point and floating in the DAG as + opposed to being inline with the current DAG. + depth: Override the depth used to order the event in the DAG. + Should normally be set to None, which will cause the depth to be calculated + based on the prev_events. + Raises: ResourceLimitError if server is blocked to some resource being exceeded @@ -563,11 +574,36 @@ class EventCreationHandler: if txn_id is not None: builder.internal_metadata.txn_id = txn_id + builder.internal_metadata.outlier = outlier + + builder.internal_metadata.historical = historical + + # Strip down the auth_event_ids to only what we need to auth the event. + # For example, we don't need extra m.room.member that don't match event.sender + if auth_event_ids is not None: + temp_event = await builder.build( + prev_event_ids=prev_event_ids, + auth_event_ids=auth_event_ids, + depth=depth, + ) + auth_events = await self.store.get_events_as_list(auth_event_ids) + # Create a StateMap[str] + auth_event_state_map = { + (e.type, e.state_key): e.event_id for e in auth_events + } + # Actually strip down and use the necessary auth events + auth_event_ids = self.auth.compute_auth_events( + event=temp_event, + current_state_ids=auth_event_state_map, + for_verification=False, + ) + event, context = await self.create_new_client_event( builder=builder, requester=requester, prev_event_ids=prev_event_ids, auth_event_ids=auth_event_ids, + depth=depth, ) # In an ideal world we wouldn't need the second part of this condition. However, @@ -724,9 +760,13 @@ class EventCreationHandler: self, requester: Requester, event_dict: dict, + prev_event_ids: Optional[List[str]] = None, + auth_event_ids: Optional[List[str]] = None, ratelimit: bool = True, txn_id: Optional[str] = None, ignore_shadow_ban: bool = False, + outlier: bool = False, + depth: Optional[int] = None, ) -> Tuple[EventBase, int]: """ Creates an event, then sends it. @@ -736,10 +776,24 @@ class EventCreationHandler: Args: requester: The requester sending the event. event_dict: An entire event. + prev_event_ids: + The event IDs to use as the prev events. + Should normally be left as None to automatically request them + from the database. + auth_event_ids: + The event ids to use as the auth_events for the new event. + Should normally be left as None, which will cause them to be calculated + based on the room state at the prev_events. ratelimit: Whether to rate limit this send. txn_id: The transaction ID. ignore_shadow_ban: True if shadow-banned users should be allowed to send this event. + outlier: Indicates whether the event is an `outlier`, i.e. if + it's from an arbitrary point and floating in the DAG as + opposed to being inline with the current DAG. + depth: Override the depth used to order the event in the DAG. + Should normally be set to None, which will cause the depth to be calculated + based on the prev_events. Returns: The event, and its stream ordering (if deduplication happened, @@ -779,7 +833,13 @@ class EventCreationHandler: return event, event.internal_metadata.stream_ordering event, context = await self.create_event( - requester, event_dict, txn_id=txn_id + requester, + event_dict, + txn_id=txn_id, + prev_event_ids=prev_event_ids, + auth_event_ids=auth_event_ids, + outlier=outlier, + depth=depth, ) assert self.hs.is_mine_id(event.sender), "User must be our own: %s" % ( @@ -811,6 +871,7 @@ class EventCreationHandler: requester: Optional[Requester] = None, prev_event_ids: Optional[List[str]] = None, auth_event_ids: Optional[List[str]] = None, + depth: Optional[int] = None, ) -> Tuple[EventBase, EventContext]: """Create a new event for a local client @@ -828,6 +889,10 @@ class EventCreationHandler: Should normally be left as None, which will cause them to be calculated based on the room state at the prev_events. + depth: Override the depth used to order the event in the DAG. + Should normally be set to None, which will cause the depth to be calculated + based on the prev_events. + Returns: Tuple of created event, context """ @@ -851,9 +916,24 @@ class EventCreationHandler: ), "Attempting to create an event with no prev_events" event = await builder.build( - prev_event_ids=prev_event_ids, auth_event_ids=auth_event_ids + prev_event_ids=prev_event_ids, + auth_event_ids=auth_event_ids, + depth=depth, ) - context = await self.state.compute_event_context(event) + + old_state = None + + # Pass on the outlier property from the builder to the event + # after it is created + if builder.internal_metadata.outlier: + event.internal_metadata.outlier = builder.internal_metadata.outlier + + # Calculate the state for outliers that pass in their own `auth_event_ids` + if auth_event_ids: + old_state = await self.store.get_events_as_list(auth_event_ids) + + context = await self.state.compute_event_context(event, old_state=old_state) + if requester: context.app_service = requester.app_service @@ -1018,7 +1098,13 @@ class EventCreationHandler: the arguments. """ - await self.action_generator.handle_push_actions_for_event(event, context) + # Skip push notification actions for historical messages + # because we don't want to notify people about old history back in time. + # The historical messages also do not have the proper `context.current_state_ids` + # and `state_groups` because they have `prev_events` that aren't persisted yet + # (historical messages persisted in reverse-chronological order). + if not event.internal_metadata.is_historical(): + await self.action_generator.handle_push_actions_for_event(event, context) try: # If we're a worker we need to hit out to the master. @@ -1317,13 +1403,21 @@ class EventCreationHandler: if prev_state_ids: raise AuthError(403, "Changing the room create event is forbidden") + # Mark any `m.historical` messages as backfilled so they don't appear + # in `/sync` and have the proper decrementing `stream_ordering` as we import + backfilled = False + if event.internal_metadata.is_historical(): + backfilled = True + # Note that this returns the event that was persisted, which may not be # the same as we passed in if it was deduplicated due transaction IDs. ( event, event_pos, max_stream_token, - ) = await self.storage.persistence.persist_event(event, context=context) + ) = await self.storage.persistence.persist_event( + event, context=context, backfilled=backfilled + ) if self._ephemeral_events_enabled: # If there's an expiry timestamp on the event, schedule its expiry. diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index a49a61a34c..1192591609 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -257,11 +257,42 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): room_id: str, membership: str, prev_event_ids: List[str], + auth_event_ids: Optional[List[str]] = None, txn_id: Optional[str] = None, ratelimit: bool = True, content: Optional[dict] = None, require_consent: bool = True, + outlier: bool = False, ) -> Tuple[str, int]: + """ + Internal membership update function to get an existing event or create + and persist a new event for the new membership change. + + Args: + requester: + target: + room_id: + membership: + prev_event_ids: The event IDs to use as the prev events + + auth_event_ids: + The event ids to use as the auth_events for the new event. + Should normally be left as None, which will cause them to be calculated + based on the room state at the prev_events. + + txn_id: + ratelimit: + content: + require_consent: + + outlier: Indicates whether the event is an `outlier`, i.e. if + it's from an arbitrary point and floating in the DAG as + opposed to being inline with the current DAG. + + Returns: + Tuple of event ID and stream ordering position + """ + user_id = target.to_string() if content is None: @@ -298,7 +329,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): }, txn_id=txn_id, prev_event_ids=prev_event_ids, + auth_event_ids=auth_event_ids, require_consent=require_consent, + outlier=outlier, ) prev_state_ids = await context.get_prev_state_ids() @@ -399,6 +432,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): ratelimit: bool = True, content: Optional[dict] = None, require_consent: bool = True, + outlier: bool = False, + prev_event_ids: Optional[List[str]] = None, + auth_event_ids: Optional[List[str]] = None, ) -> Tuple[str, int]: """Update a user's membership in a room. @@ -413,6 +449,14 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): ratelimit: Whether to rate limit the request. content: The content of the created event. require_consent: Whether consent is required. + outlier: Indicates whether the event is an `outlier`, i.e. if + it's from an arbitrary point and floating in the DAG as + opposed to being inline with the current DAG. + prev_event_ids: The event IDs to use as the prev events + auth_event_ids: + The event ids to use as the auth_events for the new event. + Should normally be left as None, which will cause them to be calculated + based on the room state at the prev_events. Returns: A tuple of the new event ID and stream ID. @@ -439,6 +483,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): ratelimit=ratelimit, content=content, require_consent=require_consent, + outlier=outlier, + prev_event_ids=prev_event_ids, + auth_event_ids=auth_event_ids, ) return result @@ -455,10 +502,36 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): ratelimit: bool = True, content: Optional[dict] = None, require_consent: bool = True, + outlier: bool = False, + prev_event_ids: Optional[List[str]] = None, + auth_event_ids: Optional[List[str]] = None, ) -> Tuple[str, int]: """Helper for update_membership. Assumes that the membership linearizer is already held for the room. + + Args: + requester: + target: + room_id: + action: + txn_id: + remote_room_hosts: + third_party_signed: + ratelimit: + content: + require_consent: + outlier: Indicates whether the event is an `outlier`, i.e. if + it's from an arbitrary point and floating in the DAG as + opposed to being inline with the current DAG. + prev_event_ids: The event IDs to use as the prev events + auth_event_ids: + The event ids to use as the auth_events for the new event. + Should normally be left as None, which will cause them to be calculated + based on the room state at the prev_events. + + Returns: + A tuple of the new event ID and stream ID. """ content_specified = bool(content) if content is None: @@ -543,6 +616,21 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): if block_invite: raise SynapseError(403, "Invites have been disabled on this server") + if prev_event_ids: + return await self._local_membership_update( + requester=requester, + target=target, + room_id=room_id, + membership=effective_membership_state, + txn_id=txn_id, + ratelimit=ratelimit, + prev_event_ids=prev_event_ids, + auth_event_ids=auth_event_ids, + content=content, + require_consent=require_consent, + outlier=outlier, + ) + latest_event_ids = await self.store.get_prev_events_for_room(room_id) current_state_ids = await self.state_handler.get_current_state_ids( @@ -732,8 +820,10 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): txn_id=txn_id, ratelimit=ratelimit, prev_event_ids=latest_event_ids, + auth_event_ids=auth_event_ids, content=content, require_consent=require_consent, + outlier=outlier, ) async def transfer_room_state_on_room_upgrade( diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 16d087ea60..92ebe838fd 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -19,7 +19,7 @@ import re from typing import TYPE_CHECKING, Dict, List, Optional, Tuple from urllib import parse as urlparse -from synapse.api.constants import EventTypes, Membership +from synapse.api.constants import EventContentFields, EventTypes, Membership from synapse.api.errors import ( AuthError, Codes, @@ -266,6 +266,288 @@ class RoomSendEventRestServlet(TransactionRestServlet): ) +class RoomBatchSendEventRestServlet(TransactionRestServlet): + """ + API endpoint which can insert a chunk of events historically back in time + next to the given `prev_event`. + + `chunk_id` comes from `next_chunk_id `in the response of the batch send + endpoint and is derived from the "insertion" events added to each chunk. + It's not required for the first batch send. + + `state_events_at_start` is used to define the historical state events + needed to auth the events like join events. These events will float + outside of the normal DAG as outlier's and won't be visible in the chat + history which also allows us to insert multiple chunks without having a bunch + of `@mxid joined the room` noise between each chunk. + + `events` is chronological chunk/list of events you want to insert. + There is a reverse-chronological constraint on chunks so once you insert + some messages, you can only insert older ones after that. + tldr; Insert chunks from your most recent history -> oldest history. + + POST /_matrix/client/unstable/org.matrix.msc2716/rooms//batch_send?prev_event=&chunk_id= + { + "events": [ ... ], + "state_events_at_start": [ ... ] + } + """ + + PATTERNS = ( + re.compile( + "^/_matrix/client/unstable/org.matrix.msc2716" + "/rooms/(?P[^/]*)/batch_send$" + ), + ) + + def __init__(self, hs): + super().__init__(hs) + self.hs = hs + self.store = hs.get_datastore() + self.state_store = hs.get_storage().state + self.event_creation_handler = hs.get_event_creation_handler() + self.room_member_handler = hs.get_room_member_handler() + self.auth = hs.get_auth() + + async def inherit_depth_from_prev_ids(self, prev_event_ids) -> int: + ( + most_recent_prev_event_id, + most_recent_prev_event_depth, + ) = await self.store.get_max_depth_of(prev_event_ids) + + # We want to insert the historical event after the `prev_event` but before the successor event + # + # We inherit depth from the successor event instead of the `prev_event` + # because events returned from `/messages` are first sorted by `topological_ordering` + # which is just the `depth` and then tie-break with `stream_ordering`. + # + # We mark these inserted historical events as "backfilled" which gives them a + # negative `stream_ordering`. If we use the same depth as the `prev_event`, + # then our historical event will tie-break and be sorted before the `prev_event` + # when it should come after. + # + # We want to use the successor event depth so they appear after `prev_event` because + # it has a larger `depth` but before the successor event because the `stream_ordering` + # is negative before the successor event. + successor_event_ids = await self.store.get_successor_events( + [most_recent_prev_event_id] + ) + + # If we can't find any successor events, then it's a forward extremity of + # historical messages and we can just inherit from the previous historical + # event which we can already assume has the correct depth where we want + # to insert into. + if not successor_event_ids: + depth = most_recent_prev_event_depth + else: + ( + _, + oldest_successor_depth, + ) = await self.store.get_min_depth_of(successor_event_ids) + + depth = oldest_successor_depth + + return depth + + async def on_POST(self, request, room_id): + requester = await self.auth.get_user_by_req(request, allow_guest=False) + + if not requester.app_service: + raise AuthError( + 403, + "Only application services can use the /batchsend endpoint", + ) + + body = parse_json_object_from_request(request) + assert_params_in_dict(body, ["state_events_at_start", "events"]) + + prev_events_from_query = parse_strings_from_args(request.args, "prev_event") + chunk_id_from_query = parse_string(request, "chunk_id", default=None) + + if prev_events_from_query is None: + raise SynapseError( + 400, + "prev_event query parameter is required when inserting historical messages back in time", + errcode=Codes.MISSING_PARAM, + ) + + # For the event we are inserting next to (`prev_events_from_query`), + # find the most recent auth events (derived from state events) that + # allowed that message to be sent. We will use that as a base + # to auth our historical messages against. + ( + most_recent_prev_event_id, + _, + ) = await self.store.get_max_depth_of(prev_events_from_query) + # mapping from (type, state_key) -> state_event_id + prev_state_map = await self.state_store.get_state_ids_for_event( + most_recent_prev_event_id + ) + # List of state event ID's + prev_state_ids = list(prev_state_map.values()) + auth_event_ids = prev_state_ids + + for state_event in body["state_events_at_start"]: + assert_params_in_dict( + state_event, ["type", "origin_server_ts", "content", "sender"] + ) + + logger.debug( + "RoomBatchSendEventRestServlet inserting state_event=%s, auth_event_ids=%s", + state_event, + auth_event_ids, + ) + + event_dict = { + "type": state_event["type"], + "origin_server_ts": state_event["origin_server_ts"], + "content": state_event["content"], + "room_id": room_id, + "sender": state_event["sender"], + "state_key": state_event["state_key"], + } + + # Make the state events float off on their own + fake_prev_event_id = "$" + random_string(43) + + # TODO: This is pretty much the same as some other code to handle inserting state in this file + if event_dict["type"] == EventTypes.Member: + membership = event_dict["content"].get("membership", None) + event_id, _ = await self.room_member_handler.update_membership( + requester, + target=UserID.from_string(event_dict["state_key"]), + room_id=room_id, + action=membership, + content=event_dict["content"], + outlier=True, + prev_event_ids=[fake_prev_event_id], + # Make sure to use a copy of this list because we modify it + # later in the loop here. Otherwise it will be the same + # reference and also update in the event when we append later. + auth_event_ids=auth_event_ids.copy(), + ) + else: + # TODO: Add some complement tests that adds state that is not member joins + # and will use this code path. Maybe we only want to support join state events + # and can get rid of this `else`? + ( + event, + _, + ) = await self.event_creation_handler.create_and_send_nonmember_event( + requester, + event_dict, + outlier=True, + prev_event_ids=[fake_prev_event_id], + # Make sure to use a copy of this list because we modify it + # later in the loop here. Otherwise it will be the same + # reference and also update in the event when we append later. + auth_event_ids=auth_event_ids.copy(), + ) + event_id = event.event_id + + auth_event_ids.append(event_id) + + events_to_create = body["events"] + + # If provided, connect the chunk to the last insertion point + # The chunk ID passed in comes from the chunk_id in the + # "insertion" event from the previous chunk. + if chunk_id_from_query: + last_event_in_chunk = events_to_create[-1] + last_event_in_chunk["content"][ + EventContentFields.MSC2716_CHUNK_ID + ] = chunk_id_from_query + + # Add an "insertion" event to the start of each chunk (next to the oldest + # event in the chunk) so the next chunk can be connected to this one. + next_chunk_id = random_string(64) + insertion_event = { + "type": EventTypes.MSC2716_INSERTION, + "sender": requester.user.to_string(), + "content": { + EventContentFields.MSC2716_NEXT_CHUNK_ID: next_chunk_id, + EventContentFields.MSC2716_HISTORICAL: True, + }, + # Since the insertion event is put at the start of the chunk, + # where the oldest event is, copy the origin_server_ts from + # the first event we're inserting + "origin_server_ts": events_to_create[0]["origin_server_ts"], + } + # Prepend the insertion event to the start of the chunk + events_to_create = [insertion_event] + events_to_create + + inherited_depth = await self.inherit_depth_from_prev_ids(prev_events_from_query) + + event_ids = [] + prev_event_ids = prev_events_from_query + events_to_persist = [] + for ev in events_to_create: + assert_params_in_dict(ev, ["type", "origin_server_ts", "content", "sender"]) + + # Mark all events as historical + # This has important semantics within the Synapse internals to backfill properly + ev["content"][EventContentFields.MSC2716_HISTORICAL] = True + + event_dict = { + "type": ev["type"], + "origin_server_ts": ev["origin_server_ts"], + "content": ev["content"], + "room_id": room_id, + "sender": ev["sender"], # requester.user.to_string(), + "prev_events": prev_event_ids.copy(), + } + + event, context = await self.event_creation_handler.create_event( + requester, + event_dict, + prev_event_ids=event_dict.get("prev_events"), + auth_event_ids=auth_event_ids, + historical=True, + depth=inherited_depth, + ) + logger.debug( + "RoomBatchSendEventRestServlet inserting event=%s, prev_event_ids=%s, auth_event_ids=%s", + event, + prev_event_ids, + auth_event_ids, + ) + + assert self.hs.is_mine_id(event.sender), "User must be our own: %s" % ( + event.sender, + ) + + events_to_persist.append((event, context)) + event_id = event.event_id + + event_ids.append(event_id) + prev_event_ids = [event_id] + + # Persist events in reverse-chronological order so they have the + # correct stream_ordering as they are backfilled (which decrements). + # Events are sorted by (topological_ordering, stream_ordering) + # where topological_ordering is just depth. + for (event, context) in reversed(events_to_persist): + ev = await self.event_creation_handler.handle_new_client_event( + requester=requester, + event=event, + context=context, + ) + + return 200, { + "state_events": auth_event_ids, + "events": event_ids, + "next_chunk_id": next_chunk_id, + } + + def on_GET(self, request, room_id): + return 501, "Not implemented" + + def on_PUT(self, request, room_id): + return self.txns.fetch_or_execute_request( + request, self.on_POST, request, room_id + ) + + # TODO: Needs unit testing for room ID + alias joins class JoinRoomAliasServlet(TransactionRestServlet): def __init__(self, hs): @@ -1054,6 +1336,8 @@ class RoomSpaceSummaryRestServlet(RestServlet): def register_servlets(hs: "HomeServer", http_server, is_worker=False): + msc2716_enabled = hs.config.experimental.msc2716_enabled + RoomStateEventRestServlet(hs).register(http_server) RoomMemberListRestServlet(hs).register(http_server) JoinedRoomMemberListRestServlet(hs).register(http_server) @@ -1061,6 +1345,8 @@ def register_servlets(hs: "HomeServer", http_server, is_worker=False): JoinRoomAliasServlet(hs).register(http_server) RoomMembershipRestServlet(hs).register(http_server) RoomSendEventRestServlet(hs).register(http_server) + if msc2716_enabled: + RoomBatchSendEventRestServlet(hs).register(http_server) PublicRoomListRestServlet(hs).register(http_server) RoomStateRestServlet(hs).register(http_server) RoomRedactEventRestServlet(hs).register(http_server) diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index ff81d5cd17..c0ea445550 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -16,6 +16,7 @@ import logging from queue import Empty, PriorityQueue from typing import Collection, Dict, Iterable, List, Set, Tuple +from synapse.api.constants import MAX_DEPTH from synapse.api.errors import StoreError from synapse.events import EventBase from synapse.metrics.background_process_metrics import wrap_as_background_process @@ -670,8 +671,8 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas return dict(txn) - async def get_max_depth_of(self, event_ids: List[str]) -> int: - """Returns the max depth of a set of event IDs + async def get_max_depth_of(self, event_ids: List[str]) -> Tuple[str, int]: + """Returns the event ID and depth for the event that has the max depth from a set of event IDs Args: event_ids: The event IDs to calculate the max depth of. @@ -680,14 +681,53 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas table="events", column="event_id", iterable=event_ids, - retcols=("depth",), + retcols=( + "event_id", + "depth", + ), desc="get_max_depth_of", ) if not rows: - return 0 + return None, 0 else: - return max(row["depth"] for row in rows) + max_depth_event_id = "" + current_max_depth = 0 + for row in rows: + if row["depth"] > current_max_depth: + max_depth_event_id = row["event_id"] + current_max_depth = row["depth"] + + return max_depth_event_id, current_max_depth + + async def get_min_depth_of(self, event_ids: List[str]) -> Tuple[str, int]: + """Returns the event ID and depth for the event that has the min depth from a set of event IDs + + Args: + event_ids: The event IDs to calculate the max depth of. + """ + rows = await self.db_pool.simple_select_many_batch( + table="events", + column="event_id", + iterable=event_ids, + retcols=( + "event_id", + "depth", + ), + desc="get_min_depth_of", + ) + + if not rows: + return None, 0 + else: + min_depth_event_id = "" + current_min_depth = MAX_DEPTH + for row in rows: + if row["depth"] < current_min_depth: + min_depth_event_id = row["event_id"] + current_min_depth = row["depth"] + + return min_depth_event_id, current_min_depth async def get_prev_events_for_room(self, room_id: str) -> List[str]: """ diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index d90a9fec91..dfb9b3a0fa 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -863,7 +863,9 @@ class PresenceJoinTestCase(unittest.HomeserverTestCase): self.store.get_latest_event_ids_in_room(room_id) ) - event = self.get_success(builder.build(prev_event_ids, None)) + event = self.get_success( + builder.build(prev_event_ids=prev_event_ids, auth_event_ids=None) + ) self.get_success(self.federation_handler.on_receive_pdu(hostname, event)) diff --git a/tests/replication/test_federation_sender_shard.py b/tests/replication/test_federation_sender_shard.py index 48ab3aa4e3..584da58371 100644 --- a/tests/replication/test_federation_sender_shard.py +++ b/tests/replication/test_federation_sender_shard.py @@ -224,7 +224,9 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase): } builder = factory.for_room_version(room_version, event_dict) - join_event = self.get_success(builder.build(prev_event_ids, None)) + join_event = self.get_success( + builder.build(prev_event_ids=prev_event_ids, auth_event_ids=None) + ) self.get_success(federation.on_send_join_request(remote_server, join_event)) self.replicate() diff --git a/tests/storage/test_redaction.py b/tests/storage/test_redaction.py index bb31ab756d..dbacce4380 100644 --- a/tests/storage/test_redaction.py +++ b/tests/storage/test_redaction.py @@ -232,9 +232,14 @@ class RedactionTestCase(unittest.HomeserverTestCase): self._base_builder = base_builder self._event_id = event_id - async def build(self, prev_event_ids, auth_event_ids): + async def build( + self, + prev_event_ids, + auth_event_ids, + depth: Optional[int] = None, + ): built_event = await self._base_builder.build( - prev_event_ids, auth_event_ids + prev_event_ids=prev_event_ids, auth_event_ids=auth_event_ids ) built_event._event_id = self._event_id @@ -251,6 +256,10 @@ class RedactionTestCase(unittest.HomeserverTestCase): def type(self): return self._base_builder.type + @property + def internal_metadata(self): + return self._base_builder.internal_metadata + event_1, context_1 = self.get_success( self.event_creation_handler.create_new_client_event( EventIdManglingBuilder( -- cgit 1.5.1