diff options
author | Eric Eastwood <erice@element.io> | 2023-06-16 14:12:24 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-06-16 14:12:24 -0500 |
commit | 0f02f0b4da92229e88e27a92ea3bfa523457bfc1 (patch) | |
tree | 32d565a9d015b96f8836c384e290ee903fab1e86 | |
parent | Don't always lock "user_ips" table when performing non-native upsert (#15788) (diff) | |
download | synapse-0f02f0b4da92229e88e27a92ea3bfa523457bfc1.tar.xz |
Remove experimental MSC2716 implementation to incrementally import history into existing rooms (#15748)
Context for why we're removing the implementation: - https://github.com/matrix-org/matrix-spec-proposals/pull/2716#issuecomment-1487441010 - https://github.com/matrix-org/matrix-spec-proposals/pull/2716#issuecomment-1504262734 Anyone wanting to continue MSC2716, should also address these leftover tasks: https://github.com/matrix-org/synapse/issues/10737 Closes https://github.com/matrix-org/synapse/issues/10737 in the fact that it is not longer necessary to track those things.
28 files changed, 36 insertions, 2103 deletions
diff --git a/changelog.d/15748.removal b/changelog.d/15748.removal new file mode 100644 index 0000000000..dcb9780178 --- /dev/null +++ b/changelog.d/15748.removal @@ -0,0 +1 @@ +Remove experimental [MSC2716](https://github.com/matrix-org/matrix-spec-proposals/pull/2716) implementation to incrementally import history into existing rooms. diff --git a/docker/complement/conf/workers-shared-extra.yaml.j2 b/docker/complement/conf/workers-shared-extra.yaml.j2 index 63acf86a46..2b11b487f6 100644 --- a/docker/complement/conf/workers-shared-extra.yaml.j2 +++ b/docker/complement/conf/workers-shared-extra.yaml.j2 @@ -92,8 +92,6 @@ allow_device_name_lookup_over_federation: true ## Experimental Features ## experimental_features: - # Enable history backfilling support - msc2716_enabled: true # client-side support for partial state in /send_join responses faster_joins: true # Enable support for polls diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py index 87a740e3d4..62fb88daab 100755 --- a/docker/configure_workers_and_start.py +++ b/docker/configure_workers_and_start.py @@ -244,7 +244,6 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = { "^/_matrix/client/(api/v1|r0|v3|unstable)/join/", "^/_matrix/client/(api/v1|r0|v3|unstable)/knock/", "^/_matrix/client/(api/v1|r0|v3|unstable)/profile/", - "^/_matrix/client/(v1|unstable/org.matrix.msc2716)/rooms/.*/batch_send", ], "shared_extra_conf": {}, "worker_extra_conf": "", diff --git a/docs/workers.md b/docs/workers.md index 991814c0bc..735128762a 100644 --- a/docs/workers.md +++ b/docs/workers.md @@ -232,7 +232,6 @@ information. ^/_matrix/client/v1/rooms/.*/hierarchy$ ^/_matrix/client/(v1|unstable)/rooms/.*/relations/ ^/_matrix/client/v1/rooms/.*/threads$ - ^/_matrix/client/unstable/org.matrix.msc2716/rooms/.*/batch_send$ ^/_matrix/client/unstable/im.nheko.summary/rooms/.*/summary$ ^/_matrix/client/(r0|v3|unstable)/account/3pid$ ^/_matrix/client/(r0|v3|unstable)/account/whoami$ diff --git a/scripts-dev/complement.sh b/scripts-dev/complement.sh index 131f26234e..24b83cfeb6 100755 --- a/scripts-dev/complement.sh +++ b/scripts-dev/complement.sh @@ -246,10 +246,6 @@ else else export PASS_SYNAPSE_COMPLEMENT_DATABASE=sqlite fi - - # The tests for importing historical messages (MSC2716) - # only pass with monoliths, currently. - test_tags="$test_tags,msc2716" fi if [[ -n "$ASYNCIO_REACTOR" ]]; then diff --git a/synapse/api/constants.py b/synapse/api/constants.py index faf0770c66..dc32553d0c 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -123,10 +123,6 @@ class EventTypes: SpaceChild: Final = "m.space.child" SpaceParent: Final = "m.space.parent" - MSC2716_INSERTION: Final = "org.matrix.msc2716.insertion" - MSC2716_BATCH: Final = "org.matrix.msc2716.batch" - MSC2716_MARKER: Final = "org.matrix.msc2716.marker" - Reaction: Final = "m.reaction" @@ -222,16 +218,6 @@ class EventContentFields: # Used in m.room.guest_access events. GUEST_ACCESS: Final = "guest_access" - # Used on normal messages to indicate they were historically imported after the fact - MSC2716_HISTORICAL: Final = "org.matrix.msc2716.historical" - # For "insertion" events to indicate what the next batch ID should be in - # order to connect to it - MSC2716_NEXT_BATCH_ID: Final = "next_batch_id" - # Used on "batch" events to indicate which insertion event it connects to - MSC2716_BATCH_ID: Final = "batch_id" - # For "marker" events - MSC2716_INSERTION_EVENT_REFERENCE: Final = "insertion_event_reference" - # The authorising user for joining a restricted room. AUTHORISING_USER: Final = "join_authorised_via_users_server" diff --git a/synapse/api/room_versions.py b/synapse/api/room_versions.py index c5c71e242f..25c105a4c8 100644 --- a/synapse/api/room_versions.py +++ b/synapse/api/room_versions.py @@ -91,11 +91,6 @@ class RoomVersion: # MSC2403: Allows join_rules to be set to 'knock', changes auth rules to allow sending # m.room.membership event with membership 'knock'. msc2403_knocking: bool - # MSC2716: Adds m.room.power_levels -> content.historical field to control - # whether "insertion", "chunk", "marker" events can be sent - msc2716_historical: bool - # MSC2716: Adds support for redacting "insertion", "chunk", and "marker" events - msc2716_redactions: bool # MSC3389: Protect relation information from redaction. msc3389_relation_redactions: bool # MSC3787: Adds support for a `knock_restricted` join rule, mixing concepts of @@ -130,8 +125,6 @@ class RoomVersions: msc3083_join_rules=False, msc3375_redaction_rules=False, msc2403_knocking=False, - msc2716_historical=False, - msc2716_redactions=False, msc3389_relation_redactions=False, msc3787_knock_restricted_join_rule=False, msc3667_int_only_power_levels=False, @@ -153,8 +146,6 @@ class RoomVersions: msc3083_join_rules=False, msc3375_redaction_rules=False, msc2403_knocking=False, - msc2716_historical=False, - msc2716_redactions=False, msc3389_relation_redactions=False, msc3787_knock_restricted_join_rule=False, msc3667_int_only_power_levels=False, @@ -176,8 +167,6 @@ class RoomVersions: msc3083_join_rules=False, msc3375_redaction_rules=False, msc2403_knocking=False, - msc2716_historical=False, - msc2716_redactions=False, msc3389_relation_redactions=False, msc3787_knock_restricted_join_rule=False, msc3667_int_only_power_levels=False, @@ -199,8 +188,6 @@ class RoomVersions: msc3083_join_rules=False, msc3375_redaction_rules=False, msc2403_knocking=False, - msc2716_historical=False, - msc2716_redactions=False, msc3389_relation_redactions=False, msc3787_knock_restricted_join_rule=False, msc3667_int_only_power_levels=False, @@ -222,8 +209,6 @@ class RoomVersions: msc3083_join_rules=False, msc3375_redaction_rules=False, msc2403_knocking=False, - msc2716_historical=False, - msc2716_redactions=False, msc3389_relation_redactions=False, msc3787_knock_restricted_join_rule=False, msc3667_int_only_power_levels=False, @@ -245,8 +230,6 @@ class RoomVersions: msc3083_join_rules=False, msc3375_redaction_rules=False, msc2403_knocking=False, - msc2716_historical=False, - msc2716_redactions=False, msc3389_relation_redactions=False, msc3787_knock_restricted_join_rule=False, msc3667_int_only_power_levels=False, @@ -268,8 +251,6 @@ class RoomVersions: msc3083_join_rules=False, msc3375_redaction_rules=False, msc2403_knocking=False, - msc2716_historical=False, - msc2716_redactions=False, msc3389_relation_redactions=False, msc3787_knock_restricted_join_rule=False, msc3667_int_only_power_levels=False, @@ -291,8 +272,6 @@ class RoomVersions: msc3083_join_rules=False, msc3375_redaction_rules=False, msc2403_knocking=True, - msc2716_historical=False, - msc2716_redactions=False, msc3389_relation_redactions=False, msc3787_knock_restricted_join_rule=False, msc3667_int_only_power_levels=False, @@ -314,8 +293,6 @@ class RoomVersions: msc3083_join_rules=True, msc3375_redaction_rules=False, msc2403_knocking=True, - msc2716_historical=False, - msc2716_redactions=False, msc3389_relation_redactions=False, msc3787_knock_restricted_join_rule=False, msc3667_int_only_power_levels=False, @@ -337,8 +314,6 @@ class RoomVersions: msc3083_join_rules=True, msc3375_redaction_rules=True, msc2403_knocking=True, - msc2716_historical=False, - msc2716_redactions=False, msc3389_relation_redactions=False, msc3787_knock_restricted_join_rule=False, msc3667_int_only_power_levels=False, @@ -360,8 +335,6 @@ class RoomVersions: msc3083_join_rules=True, msc3375_redaction_rules=True, msc2403_knocking=True, - msc2716_historical=False, - msc2716_redactions=False, msc3389_relation_redactions=False, msc3787_knock_restricted_join_rule=True, msc3667_int_only_power_levels=False, @@ -383,8 +356,6 @@ class RoomVersions: msc3083_join_rules=True, msc3375_redaction_rules=True, msc2403_knocking=True, - msc2716_historical=False, - msc2716_redactions=False, msc3389_relation_redactions=False, msc3787_knock_restricted_join_rule=False, msc3667_int_only_power_levels=False, @@ -406,8 +377,6 @@ class RoomVersions: msc3083_join_rules=True, msc3375_redaction_rules=True, msc2403_knocking=True, - msc2716_historical=False, - msc2716_redactions=False, msc3389_relation_redactions=False, msc3787_knock_restricted_join_rule=True, msc3667_int_only_power_levels=True, @@ -415,29 +384,6 @@ class RoomVersions: msc3931_push_features=(), msc3989_redaction_rules=False, ) - MSC2716v4 = RoomVersion( - "org.matrix.msc2716v4", - RoomDisposition.UNSTABLE, - EventFormatVersions.ROOM_V4_PLUS, - StateResolutionVersions.V2, - enforce_key_validity=True, - special_case_aliases_auth=False, - strict_canonicaljson=True, - limit_notifications_power_levels=True, - msc2175_implicit_room_creator=False, - msc2176_redaction_rules=False, - msc3083_join_rules=False, - msc3375_redaction_rules=False, - msc2403_knocking=True, - msc2716_historical=True, - msc2716_redactions=True, - msc3389_relation_redactions=False, - msc3787_knock_restricted_join_rule=False, - msc3667_int_only_power_levels=False, - msc3821_redaction_rules=False, - msc3931_push_features=(), - msc3989_redaction_rules=False, - ) MSC1767v10 = RoomVersion( # MSC1767 (Extensible Events) based on room version "10" "org.matrix.msc1767.10", @@ -453,8 +399,6 @@ class RoomVersions: msc3083_join_rules=True, msc3375_redaction_rules=True, msc2403_knocking=True, - msc2716_historical=False, - msc2716_redactions=False, msc3389_relation_redactions=False, msc3787_knock_restricted_join_rule=True, msc3667_int_only_power_levels=True, @@ -476,8 +420,6 @@ class RoomVersions: msc3083_join_rules=True, msc3375_redaction_rules=True, msc2403_knocking=True, - msc2716_historical=False, - msc2716_redactions=False, msc3389_relation_redactions=False, msc3787_knock_restricted_join_rule=True, msc3667_int_only_power_levels=True, @@ -500,8 +442,6 @@ class RoomVersions: msc3083_join_rules=True, msc3375_redaction_rules=True, msc2403_knocking=True, - msc2716_historical=False, - msc2716_redactions=False, msc3389_relation_redactions=False, msc3787_knock_restricted_join_rule=True, msc3667_int_only_power_levels=True, @@ -526,7 +466,6 @@ KNOWN_ROOM_VERSIONS: Dict[str, RoomVersion] = { RoomVersions.V9, RoomVersions.MSC3787, RoomVersions.V10, - RoomVersions.MSC2716v4, RoomVersions.MSC3989, RoomVersions.MSC3820opt2, ) diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 909ebccf78..7406c3948c 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -83,7 +83,6 @@ from synapse.storage.databases.main.receipts import ReceiptsWorkerStore from synapse.storage.databases.main.registration import RegistrationWorkerStore from synapse.storage.databases.main.relations import RelationsWorkerStore from synapse.storage.databases.main.room import RoomWorkerStore -from synapse.storage.databases.main.room_batch import RoomBatchStore from synapse.storage.databases.main.roommember import RoomMemberWorkerStore from synapse.storage.databases.main.search import SearchStore from synapse.storage.databases.main.session import SessionStore @@ -120,7 +119,6 @@ class GenericWorkerStore( # the races it creates aren't too bad. KeyStore, RoomWorkerStore, - RoomBatchStore, DirectoryWorkerStore, PushRulesWorkerStore, ApplicationServiceTransactionWorkerStore, diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py index 1d5b5ded45..8e0f5356b4 100644 --- a/synapse/config/experimental.py +++ b/synapse/config/experimental.py @@ -247,9 +247,6 @@ class ExperimentalConfig(Config): # MSC3026 (busy presence state) self.msc3026_enabled: bool = experimental.get("msc3026_enabled", False) - # MSC2716 (importing historical messages) - self.msc2716_enabled: bool = experimental.get("msc2716_enabled", False) - # MSC3244 (room version capabilities) self.msc3244_enabled: bool = experimental.get("msc3244_enabled", True) diff --git a/synapse/event_auth.py b/synapse/event_auth.py index b4b43ec4d7..3aaf53dfbd 100644 --- a/synapse/event_auth.py +++ b/synapse/event_auth.py @@ -339,13 +339,6 @@ def check_state_dependent_auth_rules( if event.type == EventTypes.Redaction: check_redaction(event.room_version, event, auth_dict) - if ( - event.type == EventTypes.MSC2716_INSERTION - or event.type == EventTypes.MSC2716_BATCH - or event.type == EventTypes.MSC2716_MARKER - ): - check_historical(event.room_version, event, auth_dict) - logger.debug("Allowing! %s", event) @@ -365,7 +358,6 @@ LENIENT_EVENT_BYTE_LIMITS_ROOM_VERSIONS = { RoomVersions.V9, RoomVersions.MSC3787, RoomVersions.V10, - RoomVersions.MSC2716v4, RoomVersions.MSC1767v10, } @@ -823,38 +815,6 @@ def check_redaction( raise AuthError(403, "You don't have permission to redact events") -def check_historical( - room_version_obj: RoomVersion, - event: "EventBase", - auth_events: StateMap["EventBase"], -) -> None: - """Check whether the event sender is allowed to send historical related - events like "insertion", "batch", and "marker". - - Returns: - None - - Raises: - AuthError if the event sender is not allowed to send historical related events - ("insertion", "batch", and "marker"). - """ - # Ignore the auth checks in room versions that do not support historical - # events - if not room_version_obj.msc2716_historical: - return - - user_level = get_user_power_level(event.user_id, auth_events) - - historical_level = get_named_level(auth_events, "historical", 100) - - if user_level < historical_level: - raise UnstableSpecAuthError( - 403, - 'You don\'t have permission to send send historical related events ("insertion", "batch", and "marker")', - errcode=Codes.INSUFFICIENT_POWER, - ) - - def _check_power_levels( room_version_obj: RoomVersion, event: "EventBase", diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index de7e5be42b..75b62adb33 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -198,7 +198,6 @@ class _EventInternalMetadata: soft_failed: DictProperty[bool] = DictProperty("soft_failed") proactively_send: DictProperty[bool] = DictProperty("proactively_send") redacted: DictProperty[bool] = DictProperty("redacted") - historical: DictProperty[bool] = DictProperty("historical") txn_id: DictProperty[str] = DictProperty("txn_id") """The transaction ID, if it was set when the event was created.""" @@ -288,14 +287,6 @@ class _EventInternalMetadata: """ return self._dict.get("redacted", False) - def is_historical(self) -> bool: - """Whether this is a historical message. - This is used by the batchsend historical message endpoint and - is needed to and mark the event as backfilled and skip some checks - like push notifications. - """ - return self._dict.get("historical", False) - def is_notifiable(self) -> bool: """Whether this event can trigger a push notification""" return not self.is_outlier() or self.is_out_of_band_membership() diff --git a/synapse/events/utils.py b/synapse/events/utils.py index e7b7b78b84..a55efcca56 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -164,21 +164,12 @@ def prune_event_dict(room_version: RoomVersion, event_dict: JsonDict) -> JsonDic if room_version.msc2176_redaction_rules: add_fields("invite") - if room_version.msc2716_historical: - add_fields("historical") - elif event_type == EventTypes.Aliases and room_version.special_case_aliases_auth: add_fields("aliases") elif event_type == EventTypes.RoomHistoryVisibility: add_fields("history_visibility") elif event_type == EventTypes.Redaction and room_version.msc2176_redaction_rules: add_fields("redacts") - elif room_version.msc2716_redactions and event_type == EventTypes.MSC2716_INSERTION: - add_fields(EventContentFields.MSC2716_NEXT_BATCH_ID) - elif room_version.msc2716_redactions and event_type == EventTypes.MSC2716_BATCH: - add_fields(EventContentFields.MSC2716_BATCH_ID) - elif room_version.msc2716_redactions and event_type == EventTypes.MSC2716_MARKER: - add_fields(EventContentFields.MSC2716_INSERTION_EVENT_REFERENCE) # Protect the rel_type and event_id fields under the m.relates_to field. if room_version.msc3389_relation_redactions: diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index b7b5e21020..cc5ed97730 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -105,14 +105,12 @@ backfill_processing_before_timer = Histogram( ) +# TODO: We can refactor this away now that there is only one backfill point again class _BackfillPointType(Enum): # a regular backwards extremity (ie, an event which we don't yet have, but which # is referred to by other events in the DAG) BACKWARDS_EXTREMITY = enum.auto() - # an MSC2716 "insertion event" - INSERTION_PONT = enum.auto() - @attr.s(slots=True, auto_attribs=True, frozen=True) class _BackfillPoint: @@ -273,32 +271,10 @@ class FederationHandler: ) ] - insertion_events_to_be_backfilled: List[_BackfillPoint] = [] - if self.hs.config.experimental.msc2716_enabled: - insertion_events_to_be_backfilled = [ - _BackfillPoint(event_id, depth, _BackfillPointType.INSERTION_PONT) - for event_id, depth in await self.store.get_insertion_event_backward_extremities_in_room( - room_id=room_id, - current_depth=current_depth, - # We only need to end up with 5 extremities combined with - # the backfill points to make the `/backfill` request ... - # (see the other comment above for more context). - limit=50, - ) - ] - logger.debug( - "_maybe_backfill_inner: backwards_extremities=%s insertion_events_to_be_backfilled=%s", - backwards_extremities, - insertion_events_to_be_backfilled, - ) - # we now have a list of potential places to backpaginate from. We prefer to # start with the most recent (ie, max depth), so let's sort the list. sorted_backfill_points: List[_BackfillPoint] = sorted( - itertools.chain( - backwards_extremities, - insertion_events_to_be_backfilled, - ), + backwards_extremities, key=lambda e: -int(e.depth), ) @@ -411,10 +387,7 @@ class FederationHandler: # event but not anything before it. This would require looking at the # state *before* the event, ignoring the special casing certain event # types have. - if bp.type == _BackfillPointType.INSERTION_PONT: - event_ids_to_check = [bp.event_id] - else: - event_ids_to_check = await self.store.get_successor_events(bp.event_id) + event_ids_to_check = await self.store.get_successor_events(bp.event_id) events_to_check = await self.store.get_events_as_list( event_ids_to_check, diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 42141d3670..d32d224d56 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -601,18 +601,6 @@ class FederationEventHandler: room_id, [(event, context)] ) - # If we're joining the room again, check if there is new marker - # state indicating that there is new history imported somewhere in - # the DAG. Multiple markers can exist in the current state with - # unique state_keys. - # - # Do this after the state from the remote join was persisted (via - # `persist_events_and_notify`). Otherwise we can run into a - # situation where the create event doesn't exist yet in the - # `current_state_events` - for e in state: - await self._handle_marker_event(origin, e) - return stream_id_after_persist async def update_state_for_partial_state_event( @@ -915,13 +903,6 @@ class FederationEventHandler: ) ) - # We construct the event lists in source order from `/backfill` response because - # it's a) easiest, but also b) the order in which we process things matters for - # MSC2716 historical batches because many historical events are all at the same - # `depth` and we rely on the tenuous sort that the other server gave us and hope - # they're doing their best. The brittle nature of this ordering for historical - # messages over federation is one of the reasons why we don't want to continue - # on MSC2716 until we have online topological ordering. events_with_failed_pull_attempts, fresh_events = partition( new_events, lambda e: e.event_id in event_ids_with_failed_pull_attempts ) @@ -1460,8 +1441,6 @@ class FederationEventHandler: await self._run_push_actions_and_persist_event(event, context, backfilled) - await self._handle_marker_event(origin, event) - if backfilled or context.rejected: return @@ -1559,94 +1538,6 @@ class FederationEventHandler: except Exception: logger.exception("Failed to resync device for %s", sender) - @trace - async def _handle_marker_event(self, origin: str, marker_event: EventBase) -> None: - """Handles backfilling the insertion event when we receive a marker - event that points to one. - - Args: - origin: Origin of the event. Will be called to get the insertion event - marker_event: The event to process - """ - - if marker_event.type != EventTypes.MSC2716_MARKER: - # Not a marker event - return - - if marker_event.rejected_reason is not None: - # Rejected event - return - - # Skip processing a marker event if the room version doesn't - # support it or the event is not from the room creator. - room_version = await self._store.get_room_version(marker_event.room_id) - create_event = await self._store.get_create_event_for_room(marker_event.room_id) - if not room_version.msc2175_implicit_room_creator: - room_creator = create_event.content.get(EventContentFields.ROOM_CREATOR) - else: - room_creator = create_event.sender - if not room_version.msc2716_historical and ( - not self._config.experimental.msc2716_enabled - or marker_event.sender != room_creator - ): - return - - logger.debug("_handle_marker_event: received %s", marker_event) - - insertion_event_id = marker_event.content.get( - EventContentFields.MSC2716_INSERTION_EVENT_REFERENCE - ) - - if insertion_event_id is None: - # Nothing to retrieve then (invalid marker) - return - - already_seen_insertion_event = await self._store.have_seen_event( - marker_event.room_id, insertion_event_id - ) - if already_seen_insertion_event: - # No need to process a marker again if we have already seen the - # insertion event that it was pointing to - return - - logger.debug( - "_handle_marker_event: backfilling insertion event %s", insertion_event_id - ) - - await self._get_events_and_persist( - origin, - marker_event.room_id, - [insertion_event_id], - ) - - insertion_event = await self._store.get_event( - insertion_event_id, allow_none=True - ) - if insertion_event is None: - logger.warning( - "_handle_marker_event: server %s didn't return insertion event %s for marker %s", - origin, - insertion_event_id, - marker_event.event_id, - ) - return - - logger.debug( - "_handle_marker_event: succesfully backfilled insertion event %s from marker event %s", - insertion_event, - marker_event, - ) - - await self._store.insert_insertion_extremity( - insertion_event_id, marker_event.room_id - ) - - logger.debug( - "_handle_marker_event: insertion extremity added for %s from marker event %s", - insertion_event, - marker_event, - ) - async def backfill_event_id( self, destinations: List[str], room_id: str, event_id: str ) -> PulledPduInfo: diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 0b61c2272b..4292b47037 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -60,7 +60,6 @@ from synapse.replication.http.send_event import ReplicationSendEventRestServlet from synapse.replication.http.send_events import ReplicationSendEventsRestServlet from synapse.storage.databases.main.events_worker import EventRedactBehaviour from synapse.types import ( - MutableStateMap, PersistedEventPosition, Requester, RoomAlias, @@ -573,7 +572,6 @@ class EventCreationHandler: state_event_ids: Optional[List[str]] = None, require_consent: bool = True, outlier: bool = False, - historical: bool = False, depth: Optional[int] = None, state_map: Optional[StateMap[str]] = None, for_batch: bool = False, @@ -599,7 +597,7 @@ class EventCreationHandler: allow_no_prev_events: Whether to allow this event to be created an empty list of prev_events. Normally this is prohibited just because most events should have a prev_event and we should only use this in special - cases like MSC2716. + cases (previously useful for MSC2716). prev_event_ids: the forward extremities to use as the prev_events for the new event. @@ -614,13 +612,10 @@ class EventCreationHandler: If non-None, prev_event_ids must also be provided. state_event_ids: - The full state at a given event. This is used particularly by the MSC2716 - /batch_send endpoint. One use case is with insertion events which float at - the beginning of a historical batch and don't have any `prev_events` to - derive from; we add all of these state events as the explicit state so the - rest of the historical batch can inherit the same state and state_group. - This should normally be left as None, which will cause the auth_event_ids - to be calculated based on the room state at the prev_events. + The full state at a given event. This was previously used particularly + by the MSC2716 /batch_send endpoint. This should normally be left as + None, which will cause the auth_event_ids to be calculated based on the + room state at the prev_events. require_consent: Whether to check if the requester has consented to the privacy policy. @@ -629,10 +624,6 @@ class EventCreationHandler: it's from an arbitrary point and floating in the DAG as opposed to being inline with the current DAG. - historical: Indicates whether the message is being inserted - back in time around some existing events. This is used to skip - a few checks and mark the event as backfilled. - depth: Override the depth used to order the event in the DAG. Should normally be set to None, which will cause the depth to be calculated based on the prev_events. @@ -717,8 +708,6 @@ class EventCreationHandler: builder.internal_metadata.outlier = outlier - builder.internal_metadata.historical = historical - event, unpersisted_context = await self.create_new_client_event( builder=builder, requester=requester, @@ -947,7 +936,6 @@ class EventCreationHandler: txn_id: Optional[str] = None, ignore_shadow_ban: bool = False, outlier: bool = False, - historical: bool = False, depth: Optional[int] = None, ) -> Tuple[EventBase, int]: """ @@ -961,19 +949,16 @@ class EventCreationHandler: allow_no_prev_events: Whether to allow this event to be created an empty list of prev_events. Normally this is prohibited just because most events should have a prev_event and we should only use this in special - cases like MSC2716. + cases (previously useful for MSC2716). prev_event_ids: The event IDs to use as the prev events. Should normally be left as None to automatically request them from the database. state_event_ids: - The full state at a given event. This is used particularly by the MSC2716 - /batch_send endpoint. One use case is with insertion events which float at - the beginning of a historical batch and don't have any `prev_events` to - derive from; we add all of these state events as the explicit state so the - rest of the historical batch can inherit the same state and state_group. - This should normally be left as None, which will cause the auth_event_ids - to be calculated based on the room state at the prev_events. + The full state at a given event. This was previously used particularly + by the MSC2716 /batch_send endpoint. This should normally be left as + None, which will cause the auth_event_ids to be calculated based on the + room state at the prev_events. ratelimit: Whether to rate limit this send. txn_id: The transaction ID. ignore_shadow_ban: True if shadow-banned users should be allowed to @@ -981,9 +966,6 @@ class EventCreationHandler: outlier: Indicates whether the event is an `outlier`, i.e. if it's from an arbitrary point and floating in the DAG as opposed to being inline with the current DAG. - historical: Indicates whether the message is being inserted - back in time around some existing events. This is used to skip - a few checks and mark the event as backfilled. depth: Override the depth used to order the event in the DAG. Should normally be set to None, which will cause the depth to be calculated based on the prev_events. @@ -1053,7 +1035,6 @@ class EventCreationHandler: prev_event_ids=prev_event_ids, state_event_ids=state_event_ids, outlier=outlier, - historical=historical, depth=depth, ) context = await unpersisted_context.persist(event) @@ -1145,7 +1126,7 @@ class EventCreationHandler: allow_no_prev_events: Whether to allow this event to be created an empty list of prev_events. Normally this is prohibited just because most events should have a prev_event and we should only use this in special - cases like MSC2716. + cases (previously useful for MSC2716). prev_event_ids: the forward extremities to use as the prev_events for the new event. @@ -1158,13 +1139,10 @@ class EventCreationHandler: based on the room state at the prev_events. state_event_ids: - The full state at a given event. This is used particularly by the MSC2716 - /batch_send endpoint. One use case is with insertion events which float at - the beginning of a historical batch and don't have any `prev_events` to - derive from; we add all of these state events as the explicit state so the - rest of the historical batch can inherit the same state and state_group. - This should normally be left as None, which will cause the auth_event_ids - to be calculated based on the room state at the prev_events. + The full state at a given event. This was previously used particularly + by the MSC2716 /batch_send endpoint. This should normally be left as + None, which will cause the auth_event_ids to be calculated based on the + room state at the prev_events. depth: Override the depth used to order the event in the DAG. Should normally be set to None, which will cause the depth to be calculated @@ -1261,52 +1239,6 @@ class EventCreationHandler: if builder.internal_metadata.outlier: event.internal_metadata.outlier = True context = EventContext.for_outlier(self._storage_controllers) - elif ( - event.type == EventTypes.MSC2716_INSERTION - and state_event_ids - and builder.internal_metadata.is_historical() - ): - # Add explicit state to the insertion event so it has state to derive - # from even though it's floating with no `prev_events`. The rest of - # the batch can derive from this state and state_group. - # - # TODO(faster_joins): figure out how this works, and make sure that the - # old state is complete. - # https://github.com/matrix-org/synapse/issues/13003 - metadata = await self.store.get_metadata_for_events(state_event_ids) - - state_map_for_event: MutableStateMap[str] = {} - for state_id in state_event_ids: - data = metadata.get(state_id) - if data is None: - # We're trying to persist a new historical batch of events - # with the given state, e.g. via - # `RoomBatchSendEventRestServlet`. The state can be inferred - # by Synapse or set directly by the client. - # - # Either way, we should have persisted all the state before - # getting here. - raise Exception( - f"State event {state_id} not found in DB," - " Synapse should have persisted it before using it." - ) - - if data.state_key is None: - raise Exception( - f"Trying to set non-state event {state_id} as state" - ) - - state_map_for_event[(data.event_type, data.state_key)] = state_id - - # TODO(faster_joins): check how MSC2716 works and whether we can have - # partial state here - # https://github.com/matrix-org/synapse/issues/13003 - context = await self.state.calculate_context_info( - event, - state_ids_before_event=state_map_for_event, - partial_state=False, - ) - else: context = await self.state.calculate_context_info(event) @@ -1876,28 +1808,6 @@ class EventCreationHandler: 403, "Redacting server ACL events is not permitted" ) - # Add a little safety stop-gap to prevent people from trying to - # redact MSC2716 related events when they're in a room version - # which does not support it yet. We allow people to use MSC2716 - # events in existing room versions but only from the room - # creator since it does not require any changes to the auth - # rules and in effect, the redaction algorithm . In the - # supported room version, we add the `historical` power level to - # auth the MSC2716 related events and adjust the redaction - # algorthim to keep the `historical` field around (redacting an - # event should only strip fields which don't affect the - # structural protocol level). - is_msc2716_event = ( - original_event.type == EventTypes.MSC2716_INSERTION - or original_event.type == EventTypes.MSC2716_BATCH - or original_event.type == EventTypes.MSC2716_MARKER - ) - if not room_version_obj.msc2716_historical and is_msc2716_event: - raise AuthError( - 403, - "Redacting MSC2716 events is not supported in this room version", - ) - event_types = event_auth.auth_types_for_event(event.room_version, event) prev_state_ids = await context.get_prev_state_ids( StateFilter.from_types(event_types) @@ -1935,58 +1845,12 @@ class EventCreationHandler: if prev_state_ids: raise AuthError(403, "Changing the room create event is forbidden") - if event.type == EventTypes.MSC2716_INSERTION: - room_version = await self.store.get_room_version_id(event.room_id) - room_version_obj = KNOWN_ROOM_VERSIONS[room_version] - - create_event = await self.store.get_create_event_for_room(event.room_id) - if not room_version_obj.msc2175_implicit_room_creator: - room_creator = create_event.content.get( - EventContentFields.ROOM_CREATOR - ) - else: - room_creator = create_event.sender - - # Only check an insertion event if the room version - # supports it or the event is from the room creator. - if room_version_obj.msc2716_historical or ( - self.config.experimental.msc2716_enabled - and event.sender == room_creator - ): - next_batch_id = event.content.get( - EventContentFields.MSC2716_NEXT_BATCH_ID - ) - conflicting_insertion_event_id = None - if next_batch_id: - conflicting_insertion_event_id = ( - await self.store.get_insertion_event_id_by_batch_id( - event.room_id, next_batch_id - ) - ) - if conflicting_insertion_event_id is not None: - # The current insertion event that we're processing is invalid - # because an insertion event already exists in the room with the - # same next_batch_id. We can't allow multiple because the batch - # pointing will get weird, e.g. we can't determine which insertion - # event the batch event is pointing to. - raise SynapseError( - HTTPStatus.BAD_REQUEST, - "Another insertion event already exists with the same next_batch_id", - errcode=Codes.INVALID_PARAM, - ) - - # Mark any `m.historical` messages as backfilled so they don't appear - # in `/sync` and have the proper decrementing `stream_ordering` as we import - backfilled = False - if event.internal_metadata.is_historical(): - backfilled = True - assert self._storage_controllers.persistence is not None ( persisted_events, max_stream_token, ) = await self._storage_controllers.persistence.persist_events( - events_and_context, backfilled=backfilled + events_and_context, ) events_and_pos = [] diff --git a/synapse/handlers/room_batch.py b/synapse/handlers/room_batch.py deleted file mode 100644 index bf9df60218..0000000000 --- a/synapse/handlers/room_batch.py +++ /dev/null @@ -1,466 +0,0 @@ -import logging -from typing import TYPE_CHECKING, List, Tuple - -from synapse.api.constants import EventContentFields, EventTypes -from synapse.appservice import ApplicationService -from synapse.http.servlet import assert_params_in_dict -from synapse.types import JsonDict, Requester, UserID, create_requester -from synapse.util.stringutils import random_string - -if TYPE_CHECKING: - from synapse.server import HomeServer - -logger = logging.getLogger(__name__) - - -class RoomBatchHandler: - def __init__(self, hs: "HomeServer"): - self.hs = hs - self.store = hs.get_datastores().main - self._state_storage_controller = hs.get_storage_controllers().state - self.event_creation_handler = hs.get_event_creation_handler() - self.room_member_handler = hs.get_room_member_handler() - self.auth = hs.get_auth() - - async def inherit_depth_from_prev_ids(self, prev_event_ids: List[str]) -> int: - """Finds the depth which would sort it after the most-recent - prev_event_id but before the successors of those events. If no - successors are found, we assume it's an historical extremity part of the - current batch and use the same depth of the prev_event_ids. - - Args: - prev_event_ids: List of prev event IDs - - Returns: - Inherited depth - """ - ( - most_recent_prev_event_id, - most_recent_prev_event_depth, - ) = await self.store.get_max_depth_of(prev_event_ids) - - # We want to insert the historical event after the `prev_event` but before the successor event - # - # We inherit depth from the successor event instead of the `prev_event` - # because events returned from `/messages` are first sorted by `topological_ordering` - # which is just the `depth` and then tie-break with `stream_ordering`. - # - # We mark these inserted historical events as "backfilled" which gives them a - # negative `stream_ordering`. If we use the same depth as the `prev_event`, - # then our historical event will tie-break and be sorted before the `prev_event` - # when it should come after. - # - # We want to use the successor event depth so they appear after `prev_event` because - # it has a larger `depth` but before the successor event because the `stream_ordering` - # is negative before the successor event. - assert most_recent_prev_event_id is not None - successor_event_ids = await self.store.get_successor_events( - most_recent_prev_event_id - ) - - # If we can't find any successor events, then it's a forward extremity of - # historical messages and we can just inherit from the previous historical - # event which we can already assume has the correct depth where we want - # to insert into. - if not successor_event_ids: - depth = most_recent_prev_event_depth - else: - ( - _, - oldest_successor_depth, - ) = await self.store.get_min_depth_of(successor_event_ids) - - depth = oldest_successor_depth - - return depth - - def create_insertion_event_dict( - self, sender: str, room_id: str, origin_server_ts: int - ) -> JsonDict: - """Creates an event dict for an "insertion" event with the proper fields - and a random batch ID. - - Args: - sender: The event author MXID - room_id: The room ID that the event belongs to - origin_server_ts: Timestamp when the event was sent - - Returns: - The new event dictionary to insert. - """ - - next_batch_id = random_string(8) - insertion_event = { - "type": EventTypes.MSC2716_INSERTION, - "sender": sender, - "room_id": room_id, - "content": { - EventContentFields.MSC2716_NEXT_BATCH_ID: next_batch_id, - EventContentFields.MSC2716_HISTORICAL: True, - }, - "origin_server_ts": origin_server_ts, - } - - return insertion_event - - async def create_requester_for_user_id_from_app_service( - self, user_id: str, app_service: ApplicationService - ) -> Requester: - """Creates a new requester for the given user_id - and validates that the app service is allowed to control - the given user. - - Args: - user_id: The author MXID that the app service is controlling - app_service: The app service that controls the user - - Returns: - Requester object - """ - - await self.auth.validate_appservice_can_control_user_id(app_service, user_id) - - return create_requester(user_id, app_service=app_service) - - async def get_most_recent_full_state_ids_from_event_id_list( - self, event_ids: List[str] - ) -> List[str]: - """Find the most recent event_id and grab the full state at that event. - We will use this as a base to auth our historical messages against. - - Args: - event_ids: List of event ID's to look at - - Returns: - List of event ID's - """ - - ( - most_recent_event_id, - _, - ) = await self.store.get_max_depth_of(event_ids) - # mapping from (type, state_key) -> state_event_id - assert most_recent_event_id is not None - prev_state_map = await self._state_storage_controller.get_state_ids_for_event( - most_recent_event_id - ) - # List of state event ID's - full_state_ids = list(prev_state_map.values()) - - return full_state_ids - - async def persist_state_events_at_start( - self, - state_events_at_start: List[JsonDict], - room_id: str, - initial_state_event_ids: List[str], - app_service_requester: Requester, - ) -> List[str]: - """Takes all `state_events_at_start` event dictionaries and creates/persists - them in a floating state event chain which don't resolve into the current room - state. They are floating because they reference no prev_events which disconnects - them from the normal DAG. - - Args: - state_events_at_start: - room_id: Room where you want the events persisted in. - initial_state_event_ids: - The base set of state for the historical batch which the floating - state chain will derive from. This should probably be the state - from the `prev_event` defined by `/batch_send?prev_event_id=$abc`. - app_service_requester: The requester of an application service. - - Returns: - List of state event ID's we just persisted - """ - assert app_service_requester.app_service - - state_event_ids_at_start = [] - state_event_ids = initial_state_event_ids.copy() - - # Make the state events float off on their own by specifying no - # prev_events for the first one in the chain so we don't have a bunch of - # `@mxid joined the room` noise between each batch. - prev_event_ids_for_state_chain: List[str] = [] - - for index, state_event in enumerate(state_events_at_start): - assert_params_in_dict( - state_event, ["type", "origin_server_ts", "content", "sender"] - ) - - logger.debug( - "RoomBatchSendEventRestServlet inserting state_event=%s", state_event - ) - - event_dict = { - "type": state_event["type"], - "origin_server_ts": state_event["origin_server_ts"], - "content": state_event["content"], - "room_id": room_id, - "sender": state_event["sender"], - "state_key": state_event["state_key"], - } - - # Mark all events as historical - event_dict["content"][EventContentFields.MSC2716_HISTORICAL] = True - - # TODO: This is pretty much the same as some other code to handle inserting state in this file - if event_dict["type"] == EventTypes.Member: - membership = event_dict["content"].get("membership", None) - event_id, _ = await self.room_member_handler.update_membership( - await self.create_requester_for_user_id_from_app_service( - state_event["sender"], app_service_requester.app_service - ), - target=UserID.from_string(event_dict["state_key"]), - room_id=room_id, - action=membership, - content=event_dict["content"], - historical=True, - # Only the first event in the state chain should be floating. - # The rest should hang off each other in a chain. - allow_no_prev_events=index == 0, - prev_event_ids=prev_event_ids_for_state_chain, - # The first event in the state chain is floating with no - # `prev_events` which means it can't derive state from - # anywhere automatically. So we need to set some state - # explicitly. - # - # Make sure to use a copy of this list because we modify it - # later in the loop here. Otherwise it will be the same - # reference and also update in the event when we append - # later. - state_event_ids=state_event_ids.copy(), - ) - else: - ( - event, - _, - ) = await self.event_creation_handler.create_and_send_nonmember_event( - await self.create_requester_for_user_id_from_app_service( - state_event["sender"], app_service_requester.app_service - ), - event_dict, - historical=True, - # Only the first event in the state chain should be floating. - # The rest should hang off each other in a chain. - allow_no_prev_events=index == 0, - prev_event_ids=prev_event_ids_for_state_chain, - # The first event in the state chain is floating with no - # `prev_events` which means it can't derive state from - # anywhere automatically. So we need to set some state - # explicitly. - # - # Make sure to use a copy of this list because we modify it - # later in the loop here. Otherwise it will be the same - # reference and also update in the event when we append later. - state_event_ids=state_event_ids.copy(), - ) - event_id = event.event_id - - state_event_ids_at_start.append(event_id) - state_event_ids.append(event_id) - # Connect all the state in a floating chain - prev_event_ids_for_state_chain = [event_id] - - return state_event_ids_at_start - - async def persist_historical_events( - self, - events_to_create: List[JsonDict], - room_id: str, - inherited_depth: int, - initial_state_event_ids: List[str], - app_service_requester: Requester, - ) -> List[str]: - """Create and persists all events provided sequentially. Handles the - complexity of creating events in chronological order so they can - reference each other by prev_event but still persists in - reverse-chronoloical order so they have the correct - (topological_ordering, stream_ordering) and sort correctly from - /messages. - - Args: - events_to_create: List of historical events to create in JSON - dictionary format. - room_id: Room where you want the events persisted in. - inherited_depth: The depth to create the events at (you will - probably by calling inherit_depth_from_prev_ids(...)). - initial_state_event_ids: - This is used to set explicit state for the insertion event at - the start of the historical batch since it's floating with no - prev_events to derive state from automatically. - app_service_requester: The requester of an application service. - - Returns: - List of persisted event IDs - """ - assert app_service_requester.app_service - - # We expect the first event in a historical batch to be an insertion event - assert events_to_create[0]["type"] == EventTypes.MSC2716_INSERTION - # We expect the last event in a historical batch to be an batch event - assert events_to_create[-1]["type"] == EventTypes.MSC2716_BATCH - - # Make the historical event chain float off on its own by specifying no - # prev_events for the first event in the chain which causes the HS to - # ask for the state at the start of the batch later. - prev_event_ids: List[str] = [] - - event_ids = [] - events_to_persist = [] - for index, ev in enumerate(events_to_create): - assert_params_in_dict(ev, ["type", "origin_server_ts", "content", "sender"]) - - assert self.hs.is_mine_id(ev["sender"]), "User must be our own: %s" % ( - ev["sender"], - ) - - event_dict = { - "type": ev["type"], - "origin_server_ts": ev["origin_server_ts"], - "content": ev["content"], - "room_id": room_id, - "sender": ev["sender"], # requester.user.to_string(), - "prev_events": prev_event_ids.copy(), - } - - # Mark all events as historical - event_dict["content"][EventContentFields.MSC2716_HISTORICAL] = True - - event, unpersisted_context = await self.event_creation_handler.create_event( - await self.create_requester_for_user_id_from_app_service( - ev["sender"], app_service_requester.app_service - ), - event_dict, - # Only the first event (which is the insertion event) in the - # chain should be floating. The rest should hang off each other - # in a chain. - allow_no_prev_events=index == 0, - prev_event_ids=event_dict.get("prev_events"), - # Since the first event (which is the insertion event) in the - # chain is floating with no `prev_events`, it can't derive state - # from anywhere automatically. So we need to set some state - # explicitly. - state_event_ids=initial_state_event_ids if index == 0 else None, - historical=True, - depth=inherited_depth, - ) - context = await unpersisted_context.persist(event) - assert context._state_group - - # Normally this is done when persisting the event but we have to - # pre-emptively do it here because we create all the events first, - # then persist them in another pass below. And we want to share - # state_groups across the whole batch so this lookup needs to work - # for the next event in the batch in this loop. - await self.store.store_state_group_id_for_event_id( - event_id=event.event_id, - state_group_id=context._state_group, - ) - - logger.debug( - "RoomBatchSendEventRestServlet inserting event=%s, prev_event_ids=%s", - event, - prev_event_ids, - ) - - events_to_persist.append((event, context)) - event_id = event.event_id - - event_ids.append(event_id) - prev_event_ids = [event_id] - - # Persist events in reverse-chronological order so they have the - # correct stream_ordering as they are backfilled (which decrements). - # Events are sorted by (topological_ordering, stream_ordering) - # where topological_ordering is just depth. - for event, context in reversed(events_to_persist): - # This call can't raise `PartialStateConflictError` since we forbid - # use of the historical batch API during partial state - await self.event_creation_handler.handle_new_client_event( - await self.create_requester_for_user_id_from_app_service( - event.sender, app_service_requester.app_service - ), - events_and_context=[(event, context)], - ) - - return event_ids - - async def handle_batch_of_events( - self, - events_to_create: List[JsonDict], - room_id: str, - batch_id_to_connect_to: str, - inherited_depth: int, - initial_state_event_ids: List[str], - app_service_requester: Requester, - ) -> Tuple[List[str], str]: - """ - Handles creating and persisting all of the historical events as well as - insertion and batch meta events to make the batch navigable in the DAG. - - Args: - events_to_create: List of historical events to create in JSON - dictionary format. - room_id: Room where you want the events created in. - batch_id_to_connect_to: The batch_id from the insertion event you - want this batch to connect to. - inherited_depth: The depth to create the events at (you will - probably by calling inherit_depth_from_prev_ids(...)). - initial_state_event_ids: - This is used to set explicit state for the insertion event at - the start of the historical batch since it's floating with no - prev_events to derive state from automatically. This should - probably be the state from the `prev_event` defined by - `/batch_send?prev_event_id=$abc` plus the outcome of - `persist_state_events_at_start` - app_service_requester: The requester of an application service. - - Returns: - Tuple containing a list of created events and the next_batch_id - """ - - # Connect this current batch to the insertion event from the previous batch - last_event_in_batch = events_to_create[-1] - batch_event = { - "type": EventTypes.MSC2716_BATCH, - "sender": app_service_requester.user.to_string(), - "room_id": room_id, - "content": { - EventContentFields.MSC2716_BATCH_ID: batch_id_to_connect_to, - EventContentFields.MSC2716_HISTORICAL: True, - }, - # Since the batch event is put at the end of the batch, - # where the newest-in-time event is, copy the origin_server_ts from - # the last event we're inserting - "origin_server_ts": last_event_in_batch["origin_server_ts"], - } - # Add the batch event to the end of the batch (newest-in-time) - events_to_create.append(batch_event) - - # Add an "insertion" event to the start of each batch (next to the oldest-in-time - # event in the batch) so the next batch can be connected to this one. - insertion_event = self.create_insertion_event_dict( - sender=app_service_requester.user.to_string(), - room_id=room_id, - # Since the insertion event is put at the start of the batch, - # where the oldest-in-time event is, copy the origin_server_ts from - # the first event we're inserting - origin_server_ts=events_to_create[0]["origin_server_ts"], - ) - next_batch_id = insertion_event["content"][ - EventContentFields.MSC2716_NEXT_BATCH_ID - ] - # Prepend the insertion event to the start of the batch (oldest-in-time) - events_to_create = [insertion_event] + events_to_create - - # Create and persist all of the historical events - event_ids = await self.persist_historical_events( - events_to_create=events_to_create, - room_id=room_id, - inherited_depth=inherited_depth, - initial_state_event_ids=initial_state_event_ids, - app_service_requester=app_service_requester, - ) - - return event_ids, next_batch_id diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 55df34bd06..82e4fa7363 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -362,7 +362,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): content: Optional[dict] = None, require_consent: bool = True, outlier: bool = False, - historical: bool = False, origin_server_ts: Optional[int] = None, ) -> Tuple[str, int]: """ @@ -378,16 +377,13 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): allow_no_prev_events: Whether to allow this event to be created an empty list of prev_events. Normally this is prohibited just because most events should have a prev_event and we should only use this in special - cases like MSC2716. + cases (previously useful for MSC2716). prev_event_ids: The event IDs to use as the prev events state_event_ids: - The full state at a given event. This is used particularly by the MSC2716 - /batch_send endpoint. One use case is the historical `state_events_at_start`; - since each is marked as an `outlier`, the `EventContext.for_outlier()` won't - have any `state_ids` set and therefore can't derive any state even though the - prev_events are set so we need to set them ourself via this argument. - This should normally be left as None, which will cause the auth_event_ids - to be calculated based on the room state at the prev_events. + The full state at a given event. This was previously used particularly + by the MSC2716 /batch_send endpoint. This should normally be left as + None, which will cause the auth_event_ids to be calculated based on the + room state at the prev_events. depth: Override the depth used to order the event in the DAG. Should normally be set to None, which will cause the depth to be calculated based on the prev_events. @@ -400,9 +396,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): outlier: Indicates whether the event is an `outlier`, i.e. if it's from an arbitrary point and floating in the DAG as opposed to being inline with the current DAG. - historical: Indicates whether the message is being inserted - back in time around some existing events. This is used to skip - a few checks and mark the event as backfilled. origin_server_ts: The origin_server_ts to use if a new event is created. Uses the current timestamp if set to None. @@ -477,7 +470,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): depth=depth, require_consent=require_consent, outlier=outlier, - historical=historical, ) context = await unpersisted_context.persist(event) prev_state_ids = await context.get_prev_state_ids( @@ -585,7 +577,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): new_room: bool = False, require_consent: bool = True, outlier: bool = False, - historical: bool = False, allow_no_prev_events: bool = False, prev_event_ids: Optional[List[str]] = None, state_event_ids: Optional[List[str]] = None, @@ -610,22 +601,16 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): outlier: Indicates whether the event is an `outlier`, i.e. if it's from an arbitrary point and floating in the DAG as opposed to being inline with the current DAG. - historical: Indicates whether the message is being inserted - back in time around some existing events. This is used to skip - a few checks and mark the event as backfilled. allow_no_prev_events: Whether to allow this event to be created an empty list of prev_events. Normally this is prohibited just because most events should have a prev_event and we should only use this in special - cases like MSC2716. + cases (previously useful for MSC2716). prev_event_ids: The event IDs to use as the prev events state_event_ids: - The full state at a given event. This is used particularly by the MSC2716 - /batch_send endpoint. One use case is the historical `state_events_at_start`; - since each is marked as an `outlier`, the `EventContext.for_outlier()` won't - have any `state_ids` set and therefore can't derive any state even though the - prev_events are set so we need to set them ourself via this argument. - This should normally be left as None, which will cause the auth_event_ids - to be calculated based on the room state at the prev_events. + The full state at a given event. This was previously used particularly + by the MSC2716 /batch_send endpoint. This should normally be left as + None, which will cause the auth_event_ids to be calculated based on the + room state at the prev_events. depth: Override the depth used to order the event in the DAG. Should normally be set to None, which will cause the depth to be calculated based on the prev_events. @@ -667,7 +652,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): new_room=new_room, require_consent=require_consent, outlier=outlier, - historical=historical, allow_no_prev_events=allow_no_prev_events, prev_event_ids=prev_event_ids, state_event_ids=state_event_ids, @@ -691,7 +675,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): new_room: bool = False, require_consent: bool = True, outlier: bool = False, - historical: bool = False, allow_no_prev_events: bool = False, prev_event_ids: Optional[List[str]] = None, state_event_ids: Optional[List[str]] = None, @@ -718,22 +701,16 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): outlier: Indicates whether the event is an `outlier`, i.e. if it's from an arbitrary point and floating in the DAG as opposed to being inline with the current DAG. - historical: Indicates whether the message is being inserted - back in time around some existing events. This is used to skip - a few checks and mark the event as backfilled. allow_no_prev_events: Whether to allow this event to be created an empty list of prev_events. Normally this is prohibited just because most events should have a prev_event and we should only use this in special - cases like MSC2716. + cases (previously useful for MSC2716). prev_event_ids: The event IDs to use as the prev events state_event_ids: - The full state at a given event. This is used particularly by the MSC2716 - /batch_send endpoint. One use case is the historical `state_events_at_start`; - since each is marked as an `outlier`, the `EventContext.for_outlier()` won't - have any `state_ids` set and therefore can't derive any state even though the - prev_events are set so we need to set them ourself via this argument. - This should normally be left as None, which will cause the auth_event_ids - to be calculated based on the room state at the prev_events. + The full state at a given event. This was previously used particularly + by the MSC2716 /batch_send endpoint. This should normally be left as + None, which will cause the auth_event_ids to be calculated based on the + room state at the prev_events. depth: Override the depth used to order the event in the DAG. Should normally be set to None, which will cause the depth to be calculated based on the prev_events. @@ -877,7 +854,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): content=content, require_consent=require_consent, outlier=outlier, - historical=historical, origin_server_ts=origin_server_ts, ) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 33002cc0f2..67377c647b 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -322,7 +322,6 @@ class BulkPushRuleEvaluator: ) -> None: if ( not event.internal_metadata.is_notifiable() - or event.internal_metadata.is_historical() or event.room_id in self.hs.config.server.rooms_to_exclude_from_sync ): # Push rules for events that aren't notifiable can't be processed by this and diff --git a/synapse/rest/__init__.py b/synapse/rest/__init__.py index 1af8d99d20..df0845edb2 100644 --- a/synapse/rest/__init__.py +++ b/synapse/rest/__init__.py @@ -48,7 +48,6 @@ from synapse.rest.client import ( rendezvous, report_event, room, - room_batch, room_keys, room_upgrade_rest_servlet, sendtodevice, @@ -132,7 +131,6 @@ class ClientRestResource(JsonResource): user_directory.register_servlets(hs, client_resource) if is_main_process: room_upgrade_rest_servlet.register_servlets(hs, client_resource) - room_batch.register_servlets(hs, client_resource) capabilities.register_servlets(hs, client_resource) if is_main_process: account_validity.register_servlets(hs, client_resource) diff --git a/synapse/rest/client/room_batch.py b/synapse/rest/client/room_batch.py deleted file mode 100644 index 69f85112d8..0000000000 --- a/synapse/rest/client/room_batch.py +++ /dev/null @@ -1,254 +0,0 @@ -# Copyright 2016 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging -import re -from http import HTTPStatus -from typing import TYPE_CHECKING, Tuple - -from synapse.api.constants import EventContentFields -from synapse.api.errors import AuthError, Codes, SynapseError -from synapse.http.server import HttpServer -from synapse.http.servlet import ( - RestServlet, - assert_params_in_dict, - parse_json_object_from_request, - parse_string, - parse_strings_from_args, -) -from synapse.http.site import SynapseRequest -from synapse.types import JsonDict - -if TYPE_CHECKING: - from synapse.server import HomeServer - -logger = logging.getLogger(__name__) - - -class RoomBatchSendEventRestServlet(RestServlet): - """ - API endpoint which can insert a batch of events historically back in time - next to the given `prev_event`. - - `batch_id` comes from `next_batch_id `in the response of the batch send - endpoint and is derived from the "insertion" events added to each batch. - It's not required for the first batch send. - - `state_events_at_start` is used to define the historical state events - needed to auth the events like join events. These events will float - outside of the normal DAG as outlier's and won't be visible in the chat - history which also allows us to insert multiple batches without having a bunch - of `@mxid joined the room` noise between each batch. - - `events` is chronological list of events you want to insert. - There is a reverse-chronological constraint on batches so once you insert - some messages, you can only insert older ones after that. - tldr; Insert batches from your most recent history -> oldest history. - - POST /_matrix/client/unstable/org.matrix.msc2716/rooms/<roomID>/batch_send?prev_event_id=<eventID>&batch_id=<batchID> - { - "events": [ ... ], - "state_events_at_start": [ ... ] - } - """ - - PATTERNS = ( - re.compile( - "^/_matrix/client/unstable/org.matrix.msc2716" - "/rooms/(?P<room_id>[^/]*)/batch_send$" - ), - ) - CATEGORY = "Client API requests" - - def __init__(self, hs: "HomeServer"): - super().__init__() - self.store = hs.get_datastores().main - self.event_creation_handler = hs.get_event_creation_handler() - self.auth = hs.get_auth() - self.room_batch_handler = hs.get_room_batch_handler() - - async def on_POST( - self, request: SynapseRequest, room_id: str - ) -> Tuple[int, JsonDict]: - requester = await self.auth.get_user_by_req(request, allow_guest=False) - - if not requester.app_service: - raise AuthError( - HTTPStatus.FORBIDDEN, - "Only application services can use the /batchsend endpoint", - ) - - body = parse_json_object_from_request(request) - assert_params_in_dict(body, ["state_events_at_start", "events"]) - - assert request.args is not None - prev_event_ids_from_query = parse_strings_from_args( - request.args, "prev_event_id" - ) - batch_id_from_query = parse_string(request, "batch_id") - - if prev_event_ids_from_query is None: - raise SynapseError( - HTTPStatus.BAD_REQUEST, - "prev_event query parameter is required when inserting historical messages back in time", - errcode=Codes.MISSING_PARAM, - ) - - if await self.store.is_partial_state_room(room_id): - raise SynapseError( - HTTPStatus.BAD_REQUEST, - "Cannot insert history batches until we have fully joined the room", - errcode=Codes.UNABLE_DUE_TO_PARTIAL_STATE, - ) - - # Verify the batch_id_from_query corresponds to an actual insertion event - # and have the batch connected. - if batch_id_from_query: - corresponding_insertion_event_id = ( - await self.store.get_insertion_event_id_by_batch_id( - room_id, batch_id_from_query - ) - ) - if corresponding_insertion_event_id is None: - raise SynapseError( - HTTPStatus.BAD_REQUEST, - "No insertion event corresponds to the given ?batch_id", - errcode=Codes.INVALID_PARAM, - ) - - # Make sure that the prev_event_ids exist and aren't outliers - ie, they are - # regular parts of the room DAG where we know the state. - non_outlier_prev_events = await self.store.have_events_in_timeline( - prev_event_ids_from_query - ) - for prev_event_id in prev_event_ids_from_query: - if prev_event_id not in non_outlier_prev_events: - raise SynapseError( - HTTPStatus.BAD_REQUEST, - "prev_event %s does not exist, or is an outlier" % (prev_event_id,), - errcode=Codes.INVALID_PARAM, - ) - - # For the event we are inserting next to (`prev_event_ids_from_query`), - # find the most recent state events that allowed that message to be - # sent. We will use that as a base to auth our historical messages - # against. - state_event_ids = await self.room_batch_handler.get_most_recent_full_state_ids_from_event_id_list( - prev_event_ids_from_query - ) - - state_event_ids_at_start = [] - # Create and persist all of the state events that float off on their own - # before the batch. These will most likely be all of the invite/member - # state events used to auth the upcoming historical messages. - if body["state_events_at_start"]: - state_event_ids_at_start = ( - await self.room_batch_handler.persist_state_events_at_start( - state_events_at_start=body["state_events_at_start"], - room_id=room_id, - initial_state_event_ids=state_event_ids, - app_service_requester=requester, - ) - ) - # Update our ongoing auth event ID list with all of the new state we - # just created - state_event_ids.extend(state_event_ids_at_start) - - inherited_depth = await self.room_batch_handler.inherit_depth_from_prev_ids( - prev_event_ids_from_query - ) - - events_to_create = body["events"] - - # Figure out which batch to connect to. If they passed in - # batch_id_from_query let's use it. The batch ID passed in comes - # from the batch_id in the "insertion" event from the previous batch. - last_event_in_batch = events_to_create[-1] - base_insertion_event = None - if batch_id_from_query: - batch_id_to_connect_to = batch_id_from_query - # Otherwise, create an insertion event to act as a starting point. - # - # We don't always have an insertion event to start hanging more history - # off of (ideally there would be one in the main DAG, but that's not the - # case if we're wanting to add history to e.g. existing rooms without - # an insertion event), in which case we just create a new insertion event - # that can then get pointed to by a "marker" event later. - else: - base_insertion_event_dict = ( - self.room_batch_handler.create_insertion_event_dict( - sender=requester.user.to_string(), - room_id=room_id, - origin_server_ts=last_event_in_batch["origin_server_ts"], - ) - ) - base_insertion_event_dict["prev_events"] = prev_event_ids_from_query.copy() - - ( - base_insertion_event, - _, - ) = await self.event_creation_handler.create_and_send_nonmember_event( - await self.room_batch_handler.create_requester_for_user_id_from_app_service( - base_insertion_event_dict["sender"], - requester.app_service, - ), - base_insertion_event_dict, - prev_event_ids=base_insertion_event_dict.get("prev_events"), - # Also set the explicit state here because we want to resolve - # any `state_events_at_start` here too. It's not strictly - # necessary to accomplish anything but if someone asks for the - # state at this point, we probably want to show them the - # historical state that was part of this batch. - state_event_ids=state_event_ids, - historical=True, - depth=inherited_depth, - ) - - batch_id_to_connect_to = base_insertion_event.content[ - EventContentFields.MSC2716_NEXT_BATCH_ID - ] - - # Create and persist all of the historical events as well as insertion - # and batch meta events to make the batch navigable in the DAG. - event_ids, next_batch_id = await self.room_batch_handler.handle_batch_of_events( - events_to_create=events_to_create, - room_id=room_id, - batch_id_to_connect_to=batch_id_to_connect_to, - inherited_depth=inherited_depth, - initial_state_event_ids=state_event_ids, - app_service_requester=requester, - ) - - insertion_event_id = event_ids[0] - batch_event_id = event_ids[-1] - historical_event_ids = event_ids[1:-1] - - response_dict = { - "state_event_ids": state_event_ids_at_start, - "event_ids": historical_event_ids, - "next_batch_id": next_batch_id, - "insertion_event_id": insertion_event_id, - "batch_event_id": batch_event_id, - } - if base_insertion_event is not None: - response_dict["base_insertion_event_id"] = base_insertion_event.event_id - - return HTTPStatus.OK, response_dict - - -def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: - msc2716_enabled = hs.config.experimental.msc2716_enabled - - if msc2716_enabled: - RoomBatchSendEventRestServlet(hs).register(http_server) diff --git a/synapse/rest/client/versions.py b/synapse/rest/client/versions.py index 1910648755..95400ba570 100644 --- a/synapse/rest/client/versions.py +++ b/synapse/rest/client/versions.py @@ -102,8 +102,6 @@ class VersionsRestServlet(RestServlet): "org.matrix.msc2285.stable": True, # TODO: Remove when MSC2285 becomes a part of the spec # Supports filtering of /publicRooms by room type as per MSC3827 "org.matrix.msc3827.stable": True, - # Adds support for importing historical messages as per MSC2716 - "org.matrix.msc2716": self.config.experimental.msc2716_enabled, # Adds support for thread relations, per MSC3440. "org.matrix.msc3440.stable": True, # TODO: remove when "v1.3" is added above # Support for thread read receipts & notification counts. diff --git a/synapse/server.py b/synapse/server.py index 0f36ef69cb..b72b76a38b 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -91,7 +91,6 @@ from synapse.handlers.room import ( RoomShutdownHandler, TimestampLookupHandler, ) -from synapse.handlers.room_batch import RoomBatchHandler from synapse.handlers.room_list import RoomListHandler from synapse.handlers.room_member import ( RoomForgetterHandler, @@ -493,10 +492,6 @@ class HomeServer(metaclass=abc.ABCMeta): return RoomCreationHandler(self) @cache_in_self - def get_room_batch_handler(self) -> RoomBatchHandler: - return RoomBatchHandler(self) - - @cache_in_self def get_room_shutdown_handler(self) -> RoomShutdownHandler: return RoomShutdownHandler(self) diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py index 0032a92f49..3a10c265c9 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py @@ -61,7 +61,6 @@ from .registration import RegistrationStore from .rejections import RejectionsStore from .relations import RelationsStore from .room import RoomStore -from .room_batch import RoomBatchStore from .roommember import RoomMemberStore from .search import SearchStore from .session import SessionStore @@ -87,7 +86,6 @@ class DataStore( DeviceStore, RoomMemberStore, RoomStore, - RoomBatchStore, RegistrationStore, ProfileStore, PresenceStore, diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 2681917d0b..8b6e3c1dc7 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -31,7 +31,7 @@ from typing import ( import attr from prometheus_client import Counter, Gauge -from synapse.api.constants import MAX_DEPTH, EventTypes +from synapse.api.constants import MAX_DEPTH from synapse.api.errors import StoreError from synapse.api.room_versions import EventFormatVersions, RoomVersion from synapse.events import EventBase, make_event_from_dict @@ -891,124 +891,6 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas room_id, ) - @trace - async def get_insertion_event_backward_extremities_in_room( - self, - room_id: str, - current_depth: int, - limit: int, - ) -> List[Tuple[str, int]]: - """ - Get the insertion events we know about that we haven't backfilled yet - along with the approximate depth. Only returns insertion events that are - at a depth lower than or equal to the `current_depth`. Sorted by depth, - highest to lowest (descending) so the closest events to the - `current_depth` are first in the list. - - We ignore insertion events that are newer than the user's current scroll - position (ie, those with depth greater than `current_depth`) as: - 1. we don't really care about getting events that have happened - after our current position; and - 2. by the nature of paginating and scrolling back, we have likely - previously tried and failed to backfill from that insertion event, so - to avoid getting "stuck" requesting the same backfill repeatedly - we drop those insertion event. - - Args: - room_id: Room where we want to find the oldest events - current_depth: The depth at the user's current scrollback position - limit: The max number of insertion event extremities to return - - Returns: - List of (event_id, depth) tuples. Sorted by depth, highest to lowest - (descending) so the closest events to the `current_depth` are first - in the list. - """ - - def get_insertion_event_backward_extremities_in_room_txn( - txn: LoggingTransaction, room_id: str - ) -> List[Tuple[str, int]]: - if isinstance(self.database_engine, PostgresEngine): - least_function = "LEAST" - elif isinstance(self.database_engine, Sqlite3Engine): - least_function = "MIN" - else: - raise RuntimeError("Unknown database engine") - - sql = f""" - SELECT - insertion_event_extremity.event_id, event.depth - /* We only want insertion events that are also marked as backwards extremities */ - FROM insertion_event_extremities AS insertion_event_extremity - /* Get the depth of the insertion event from the events table */ - INNER JOIN events AS event USING (event_id) - /** - * We use this info to make sure we don't retry to use a backfill point - * if we've already attempted to backfill from it recently. - */ - LEFT JOIN event_failed_pull_attempts AS failed_backfill_attempt_info - ON - failed_backfill_attempt_info.room_id = insertion_event_extremity.room_id - AND failed_backfill_attempt_info.event_id = insertion_event_extremity.event_id - WHERE - insertion_event_extremity.room_id = ? - /** - * We only want extremities that are older than or at - * the same position of the given `current_depth` (where older - * means less than the given depth) because we're looking backwards - * from the `current_depth` when backfilling. - * - * current_depth (ignore events that come after this, ignore 2-4) - * | - * ▼ - * <oldest-in-time> [0]<--[1]<--[2]<--[3]<--[4] <newest-in-time> - */ - AND event.depth <= ? /* current_depth */ - /** - * Exponential back-off (up to the upper bound) so we don't retry the - * same backfill point over and over. ex. 2hr, 4hr, 8hr, 16hr, etc - * - * We use `1 << n` as a power of 2 equivalent for compatibility - * with older SQLites. The left shift equivalent only works with - * powers of 2 because left shift is a binary operation (base-2). - * Otherwise, we would use `power(2, n)` or the power operator, `2^n`. - */ - AND ( - failed_backfill_attempt_info.event_id IS NULL - OR ? /* current_time */ >= failed_backfill_attempt_info.last_attempt_ts + ( - (1 << {least_function}(failed_backfill_attempt_info.num_attempts, ? /* max doubling steps */)) - * ? /* step */ - ) - ) - /** - * Sort from highest (closest to the `current_depth`) to the lowest depth - * because the closest are most relevant to backfill from first. - * Then tie-break on alphabetical order of the event_ids so we get a - * consistent ordering which is nice when asserting things in tests. - */ - ORDER BY event.depth DESC, insertion_event_extremity.event_id DESC - LIMIT ? - """ - - txn.execute( - sql, - ( - room_id, - current_depth, - self._clock.time_msec(), - BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS, - BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_MILLISECONDS, - limit, - ), - ) - return cast(List[Tuple[str, int]], txn.fetchall()) - - return await self.db_pool.runInteraction( - "get_insertion_event_backward_extremities_in_room", - get_insertion_event_backward_extremities_in_room_txn, - room_id, - ) - async def get_max_depth_of( self, event_ids: Collection[str] ) -> Tuple[Optional[str], int]: @@ -1280,50 +1162,6 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas return event_ids - def _get_connected_batch_event_backfill_results_txn( - self, txn: LoggingTransaction, insertion_event_id: str, limit: int - ) -> List[BackfillQueueNavigationItem]: - """ - Find any batch connections of a given insertion event. - A batch event points at a insertion event via: - batch_event.content[MSC2716_BATCH_ID] -> insertion_event.content[MSC2716_NEXT_BATCH_ID] - - Args: - txn: The database transaction to use - insertion_event_id: The event ID to navigate from. We will find - batch events that point back at this insertion event. - limit: Max number of event ID's to query for and return - - Returns: - List of batch events that the backfill queue can process - """ - batch_connection_query = """ - SELECT e.depth, e.stream_ordering, c.event_id, e.type FROM insertion_events AS i - /* Find the batch that connects to the given insertion event */ - INNER JOIN batch_events AS c - ON i.next_batch_id = c.batch_id - /* Get the depth of the batch start event from the events table */ - INNER JOIN events AS e ON c.event_id = e.event_id - /* Find an insertion event which matches the given event_id */ - WHERE i.event_id = ? - LIMIT ? - """ - - # Find any batch connections for the given insertion event - txn.execute( - batch_connection_query, - (insertion_event_id, limit), - ) - return [ - BackfillQueueNavigationItem( - depth=row[0], - stream_ordering=row[1], - event_id=row[2], - type=row[3], - ) - for row in txn - ] - def _get_connected_prev_event_backfill_results_txn( self, txn: LoggingTransaction, event_id: str, limit: int ) -> List[BackfillQueueNavigationItem]: @@ -1472,40 +1310,6 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas event_id_results.add(event_id) - # Try and find any potential historical batches of message history. - if self.hs.config.experimental.msc2716_enabled: - # We need to go and try to find any batch events connected - # to a given insertion event (by batch_id). If we find any, we'll - # add them to the queue and navigate up the DAG like normal in the - # next iteration of the loop. - if event_type == EventTypes.MSC2716_INSERTION: - # Find any batch connections for the given insertion event - connected_batch_event_backfill_results = ( - self._get_connected_batch_event_backfill_results_txn( - txn, event_id, limit - len(event_id_results) - ) - ) - logger.debug( - "_get_backfill_events(room_id=%s): connected_batch_event_backfill_results=%s", - room_id, - connected_batch_event_backfill_results, - ) - for ( - connected_batch_event_backfill_item - ) in connected_batch_event_backfill_results: - if ( - connected_batch_event_backfill_item.event_id - not in event_id_results - ): - queue.put( - ( - -connected_batch_event_backfill_item.depth, - -connected_batch_event_backfill_item.stream_ordering, - connected_batch_event_backfill_item.event_id, - connected_batch_event_backfill_item.type, - ) - ) - # Now we just look up the DAG by prev_events as normal connected_prev_event_backfill_results = ( self._get_connected_prev_event_backfill_results_txn( @@ -1748,19 +1552,6 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas _delete_old_forward_extrem_cache_txn, ) - @trace - async def insert_insertion_extremity(self, event_id: str, room_id: str) -> None: - await self.db_pool.simple_upsert( - table="insertion_event_extremities", - keyvalues={"event_id": event_id}, - values={ - "event_id": event_id, - "room_id": room_id, - }, - insertion_values={}, - desc="insert_insertion_extremity", - ) - async def insert_received_event_to_staging( self, origin: str, event: EventBase ) -> None: diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 44af3357af..5c9db7554e 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1664,9 +1664,6 @@ class PersistEventsStore: self._handle_event_relations(txn, event) - self._handle_insertion_event(txn, event) - self._handle_batch_event(txn, event) - # Store the labels for this event. labels = event.content.get(EventContentFields.LABELS) if labels: @@ -1927,128 +1924,6 @@ class PersistEventsStore: ), ) - def _handle_insertion_event( - self, txn: LoggingTransaction, event: EventBase - ) -> None: - """Handles keeping track of insertion events and edges/connections. - Part of MSC2716. - - Args: - txn: The database transaction object - event: The event to process - """ - - if event.type != EventTypes.MSC2716_INSERTION: - # Not a insertion event - return - - # Skip processing an insertion event if the room version doesn't - # support it or the event is not from the room creator. - room_version = self.store.get_room_version_txn(txn, event.room_id) - room_creator = self.db_pool.simple_select_one_onecol_txn( - txn, - table="rooms", - keyvalues={"room_id": event.room_id}, - retcol="creator", - allow_none=True, - ) - if not room_version.msc2716_historical and ( - not self.hs.config.experimental.msc2716_enabled - or event.sender != room_creator - ): - return - - next_batch_id = event.content.get(EventContentFields.MSC2716_NEXT_BATCH_ID) - if next_batch_id is None: - # Invalid insertion event without next batch ID - return - - logger.debug( - "_handle_insertion_event (next_batch_id=%s) %s", next_batch_id, event - ) - - # Keep track of the insertion event and the batch ID - self.db_pool.simple_insert_txn( - txn, - table="insertion_events", - values={ - "event_id": event.event_id, - "room_id": event.room_id, - "next_batch_id": next_batch_id, - }, - ) - - # Insert an edge for every prev_event connection - for prev_event_id in event.prev_event_ids(): - self.db_pool.simple_insert_txn( - txn, - table="insertion_event_edges", - values={ - "event_id": event.event_id, - "room_id": event.room_id, - "insertion_prev_event_id": prev_event_id, - }, - ) - - def _handle_batch_event(self, txn: LoggingTransaction, event: EventBase) -> None: - """Handles inserting the batch edges/connections between the batch event - and an insertion event. Part of MSC2716. - - Args: - txn: The database transaction object - event: The event to process - """ - - if event.type != EventTypes.MSC2716_BATCH: - # Not a batch event - return - - # Skip processing a batch event if the room version doesn't - # support it or the event is not from the room creator. - room_version = self.store.get_room_version_txn(txn, event.room_id) - room_creator = self.db_pool.simple_select_one_onecol_txn( - txn, - table="rooms", - keyvalues={"room_id": event.room_id}, - retcol="creator", - allow_none=True, - ) - if not room_version.msc2716_historical and ( - not self.hs.config.experimental.msc2716_enabled - or event.sender != room_creator - ): - return - - batch_id = event.content.get(EventContentFields.MSC2716_BATCH_ID) - if batch_id is None: - # Invalid batch event without a batch ID - return - - logger.debug("_handle_batch_event batch_id=%s %s", batch_id, event) - - # Keep track of the insertion event and the batch ID - self.db_pool.simple_insert_txn( - txn, - table="batch_events", - values={ - "event_id": event.event_id, - "room_id": event.room_id, - "batch_id": batch_id, - }, - ) - - # When we receive an event with a `batch_id` referencing the - # `next_batch_id` of the insertion event, we can remove it from the - # `insertion_event_extremities` table. - sql = """ - DELETE FROM insertion_event_extremities WHERE event_id IN ( - SELECT event_id FROM insertion_events - WHERE next_batch_id = ? - ) - """ - - txn.execute(sql, (batch_id,)) - def _handle_redact_relations( self, txn: LoggingTransaction, room_id: str, redacted_event_id: str ) -> None: diff --git a/synapse/storage/databases/main/room_batch.py b/synapse/storage/databases/main/room_batch.py deleted file mode 100644 index 131f357d04..0000000000 --- a/synapse/storage/databases/main/room_batch.py +++ /dev/null @@ -1,47 +0,0 @@ -# Copyright 2021 The Matrix.org Foundation C.I.C. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from typing import Optional - -from synapse.storage._base import SQLBaseStore - - -class RoomBatchStore(SQLBaseStore): - async def get_insertion_event_id_by_batch_id( - self, room_id: str, batch_id: str - ) -> Optional[str]: - """Retrieve a insertion event ID. - - Args: - batch_id: The batch ID of the insertion event to retrieve. - - Returns: - The event_id of an insertion event, or None if there is no known - insertion event for the given insertion event. - """ - return await self.db_pool.simple_select_one_onecol( - table="insertion_events", - keyvalues={"room_id": room_id, "next_batch_id": batch_id}, - retcol="event_id", - allow_none=True, - ) - - async def store_state_group_id_for_event_id( - self, event_id: str, state_group_id: int - ) -> None: - await self.db_pool.simple_upsert( - table="event_to_state_groups", - keyvalues={"event_id": event_id}, - values={"state_group": state_group_id, "event_id": event_id}, - ) diff --git a/tests/rest/client/test_room_batch.py b/tests/rest/client/test_room_batch.py deleted file mode 100644 index 9d5cb60d16..0000000000 --- a/tests/rest/client/test_room_batch.py +++ /dev/null @@ -1,302 +0,0 @@ -import logging -from typing import List, Tuple -from unittest.mock import Mock, patch - -from twisted.test.proto_helpers import MemoryReactor - -from synapse.api.constants import EventContentFields, EventTypes -from synapse.appservice import ApplicationService -from synapse.rest import admin -from synapse.rest.client import login, register, room, room_batch, sync -from synapse.server import HomeServer -from synapse.types import JsonDict, RoomStreamToken -from synapse.util import Clock - -from tests import unittest - -logger = logging.getLogger(__name__) - - -def _create_join_state_events_for_batch_send_request( - virtual_user_ids: List[str], - insert_time: int, -) -> List[JsonDict]: - return [ - { - "type": EventTypes.Member, - "sender": virtual_user_id, - "origin_server_ts": insert_time, - "content": { - "membership": "join", - "displayname": "display-name-for-%s" % (virtual_user_id,), - }, - "state_key": virtual_user_id, - } - for virtual_user_id in virtual_user_ids - ] - - -def _create_message_events_for_batch_send_request( - virtual_user_id: str, insert_time: int, count: int -) -> List[JsonDict]: - return [ - { - "type": EventTypes.Message, - "sender": virtual_user_id, - "origin_server_ts": insert_time, - "content": { - "msgtype": "m.text", - "body": "Historical %d" % (i), - EventContentFields.MSC2716_HISTORICAL: True, - }, - } - for i in range(count) - ] - - -class RoomBatchTestCase(unittest.HomeserverTestCase): - """Test importing batches of historical messages.""" - - servlets = [ - admin.register_servlets_for_client_rest_resource, - room_batch.register_servlets, - room.register_servlets, - register.register_servlets, - login.register_servlets, - sync.register_servlets, - ] - - def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: - config = self.default_config() - - self.appservice = ApplicationService( - token="i_am_an_app_service", - id="1234", - namespaces={"users": [{"regex": r"@as_user.*", "exclusive": True}]}, - # Note: this user does not have to match the regex above - sender="@as_main:test", - ) - - mock_load_appservices = Mock(return_value=[self.appservice]) - with patch( - "synapse.storage.databases.main.appservice.load_appservices", - mock_load_appservices, - ): - hs = self.setup_test_homeserver(config=config) - return hs - - def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: - self.clock = clock - self._storage_controllers = hs.get_storage_controllers() - - self.virtual_user_id, _ = self.register_appservice_user( - "as_user_potato", self.appservice.token - ) - - def _create_test_room(self) -> Tuple[str, str, str, str]: - room_id = self.helper.create_room_as( - self.appservice.sender, tok=self.appservice.token - ) - - res_a = self.helper.send_event( - room_id=room_id, - type=EventTypes.Message, - content={ - "msgtype": "m.text", - "body": "A", - }, - tok=self.appservice.token, - ) - event_id_a = res_a["event_id"] - - res_b = self.helper.send_event( - room_id=room_id, - type=EventTypes.Message, - content={ - "msgtype": "m.text", - "body": "B", - }, - tok=self.appservice.token, - ) - event_id_b = res_b["event_id"] - - res_c = self.helper.send_event( - room_id=room_id, - type=EventTypes.Message, - content={ - "msgtype": "m.text", - "body": "C", - }, - tok=self.appservice.token, - ) - event_id_c = res_c["event_id"] - - return room_id, event_id_a, event_id_b, event_id_c - - @unittest.override_config({"experimental_features": {"msc2716_enabled": True}}) - def test_same_state_groups_for_whole_historical_batch(self) -> None: - """Make sure that when using the `/batch_send` endpoint to import a - bunch of historical messages, it re-uses the same `state_group` across - the whole batch. This is an easy optimization to make sure we're getting - right because the state for the whole batch is contained in - `state_events_at_start` and can be shared across everything. - """ - - time_before_room = int(self.clock.time_msec()) - room_id, event_id_a, _, _ = self._create_test_room() - - channel = self.make_request( - "POST", - "/_matrix/client/unstable/org.matrix.msc2716/rooms/%s/batch_send?prev_event_id=%s" - % (room_id, event_id_a), - content={ - "events": _create_message_events_for_batch_send_request( - self.virtual_user_id, time_before_room, 3 - ), - "state_events_at_start": _create_join_state_events_for_batch_send_request( - [self.virtual_user_id], time_before_room - ), - }, - access_token=self.appservice.token, - ) - self.assertEqual(channel.code, 200, channel.result) - - # Get the historical event IDs that we just imported - historical_event_ids = channel.json_body["event_ids"] - self.assertEqual(len(historical_event_ids), 3) - - # Fetch the state_groups - state_group_map = self.get_success( - self._storage_controllers.state.get_state_groups_ids( - room_id, historical_event_ids - ) - ) - - # We expect all of the historical events to be using the same state_group - # so there should only be a single state_group here! - self.assertEqual( - len(state_group_map.keys()), - 1, - "Expected a single state_group to be returned by saw state_groups=%s" - % (state_group_map.keys(),), - ) - - @unittest.override_config({"experimental_features": {"msc2716_enabled": True}}) - def test_sync_while_batch_importing(self) -> None: - """ - Make sure that /sync correctly returns full room state when a user joins - during ongoing batch backfilling. - See: https://github.com/matrix-org/synapse/issues/12281 - """ - # Create user who will be invited & join room - user_id = self.register_user("beep", "test") - user_tok = self.login("beep", "test") - - time_before_room = int(self.clock.time_msec()) - - # Create a room with some events - room_id, _, _, _ = self._create_test_room() - # Invite the user - self.helper.invite( - room_id, src=self.appservice.sender, tok=self.appservice.token, targ=user_id - ) - - # Create another room, send a bunch of events to advance the stream token - other_room_id = self.helper.create_room_as( - self.appservice.sender, tok=self.appservice.token - ) - for _ in range(5): - self.helper.send_event( - room_id=other_room_id, - type=EventTypes.Message, - content={"msgtype": "m.text", "body": "C"}, - tok=self.appservice.token, - ) - - # Join the room as the normal user - self.helper.join(room_id, user_id, tok=user_tok) - - # Create an event to hang the historical batch from - In order to see - # the failure case originally reported in #12281, the historical batch - # must be hung from the most recent event in the room so the base - # insertion event ends up with the highest `topogological_ordering` - # (`depth`) in the room but will have a negative `stream_ordering` - # because it's a `historical` event. Previously, when assembling the - # `state` for the `/sync` response, the bugged logic would sort by - # `topological_ordering` descending and pick up the base insertion - # event because it has a negative `stream_ordering` below the given - # pagination token. Now we properly sort by `stream_ordering` - # descending which puts `historical` events with a negative - # `stream_ordering` way at the bottom and aren't selected as expected. - response = self.helper.send_event( - room_id=room_id, - type=EventTypes.Message, - content={ - "msgtype": "m.text", - "body": "C", - }, - tok=self.appservice.token, - ) - event_to_hang_id = response["event_id"] - - channel = self.make_request( - "POST", - "/_matrix/client/unstable/org.matrix.msc2716/rooms/%s/batch_send?prev_event_id=%s" - % (room_id, event_to_hang_id), - content={ - "events": _create_message_events_for_batch_send_request( - self.virtual_user_id, time_before_room, 3 - ), - "state_events_at_start": _create_join_state_events_for_batch_send_request( - [self.virtual_user_id], time_before_room - ), - }, - access_token=self.appservice.token, - ) - self.assertEqual(channel.code, 200, channel.result) - - # Now we need to find the invite + join events stream tokens so we can sync between - main_store = self.hs.get_datastores().main - events, next_key = self.get_success( - main_store.get_recent_events_for_room( - room_id, - 50, - end_token=main_store.get_room_max_token(), - ), - ) - invite_event_position = None - for event in events: - if ( - event.type == "m.room.member" - and event.content["membership"] == "invite" - ): - invite_event_position = self.get_success( - main_store.get_topological_token_for_event(event.event_id) - ) - break - - assert invite_event_position is not None, "No invite event found" - - # Remove the topological order from the token by re-creating w/stream only - invite_event_position = RoomStreamToken(None, invite_event_position.stream) - - # Sync everything after this token - since_token = self.get_success(invite_event_position.to_string(main_store)) - sync_response = self.make_request( - "GET", - f"/sync?since={since_token}", - access_token=user_tok, - ) - - # Assert that, for this room, the user was considered to have joined and thus - # receives the full state history - state_event_types = [ - event["type"] - for event in sync_response.json_body["rooms"]["join"][room_id]["state"][ - "events" - ] - ] - - assert ( - "m.room.create" in state_event_types - ), "Missing room full state in sync response" diff --git a/tests/storage/test_event_federation.py b/tests/storage/test_event_federation.py index 4b8d8328d7..0f3b0744f1 100644 --- a/tests/storage/test_event_federation.py +++ b/tests/storage/test_event_federation.py @@ -20,7 +20,6 @@ from parameterized import parameterized from twisted.test.proto_helpers import MemoryReactor -from synapse.api.constants import EventTypes from synapse.api.room_versions import ( KNOWN_ROOM_VERSIONS, EventFormatVersions, @@ -924,216 +923,6 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase): backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points] self.assertEqual(backfill_event_ids, ["b3", "b2", "b1"]) - def _setup_room_for_insertion_backfill_tests(self) -> _BackfillSetupInfo: - """ - Sets up a room with various insertion event backward extremities to test - backfill functions against. - - Returns: - _BackfillSetupInfo including the `room_id` to test against and - `depth_map` of events in the room - """ - room_id = "!backfill-room-test:some-host" - - depth_map: Dict[str, int] = { - "1": 1, - "2": 2, - "insertion_eventA": 3, - "3": 4, - "insertion_eventB": 5, - "4": 6, - "5": 7, - } - - def populate_db(txn: LoggingTransaction) -> None: - # Insert the room to satisfy the foreign key constraint of - # `event_failed_pull_attempts` - self.store.db_pool.simple_insert_txn( - txn, - "rooms", - { - "room_id": room_id, - "creator": "room_creator_user_id", - "is_public": True, - "room_version": "6", - }, - ) - - # Insert our server events - stream_ordering = 0 - for event_id, depth in depth_map.items(): - self.store.db_pool.simple_insert_txn( - txn, - table="events", - values={ - "event_id": event_id, - "type": EventTypes.MSC2716_INSERTION - if event_id.startswith("insertion_event") - else "test_regular_type", - "room_id": room_id, - "depth": depth, - "topological_ordering": depth, - "stream_ordering": stream_ordering, - "processed": True, - "outlier": False, - }, - ) - - if event_id.startswith("insertion_event"): - self.store.db_pool.simple_insert_txn( - txn, - table="insertion_event_extremities", - values={ - "event_id": event_id, - "room_id": room_id, - }, - ) - - stream_ordering += 1 - - self.get_success( - self.store.db_pool.runInteraction( - "_setup_room_for_insertion_backfill_tests_populate_db", - populate_db, - ) - ) - - return _BackfillSetupInfo(room_id=room_id, depth_map=depth_map) - - def test_get_insertion_event_backward_extremities_in_room(self) -> None: - """ - Test to make sure only insertion event backward extremities that are - older and come before the `current_depth` are returned. - """ - setup_info = self._setup_room_for_insertion_backfill_tests() - room_id = setup_info.room_id - depth_map = setup_info.depth_map - - # Try at "insertion_eventB" - backfill_points = self.get_success( - self.store.get_insertion_event_backward_extremities_in_room( - room_id, depth_map["insertion_eventB"], limit=100 - ) - ) - backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points] - self.assertEqual(backfill_event_ids, ["insertion_eventB", "insertion_eventA"]) - - # Try at "insertion_eventA" - backfill_points = self.get_success( - self.store.get_insertion_event_backward_extremities_in_room( - room_id, depth_map["insertion_eventA"], limit=100 - ) - ) - backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points] - # Event "2" has a depth of 2 but is not included here because we only - # know the approximate depth of 5 from our event "3". - self.assertListEqual(backfill_event_ids, ["insertion_eventA"]) - - def test_get_insertion_event_backward_extremities_in_room_excludes_events_we_have_attempted( - self, - ) -> None: - """ - Test to make sure that insertion events we have attempted to backfill - (and within backoff timeout duration) do not show up as an event to - backfill again. - """ - setup_info = self._setup_room_for_insertion_backfill_tests() - room_id = setup_info.room_id - depth_map = setup_info.depth_map - - # Record some attempts to backfill these events which will make - # `get_insertion_event_backward_extremities_in_room` exclude them - # because we haven't passed the backoff interval. - self.get_success( - self.store.record_event_failed_pull_attempt( - room_id, "insertion_eventA", "fake cause" - ) - ) - - # No time has passed since we attempted to backfill ^ - - # Try at "insertion_eventB" - backfill_points = self.get_success( - self.store.get_insertion_event_backward_extremities_in_room( - room_id, depth_map["insertion_eventB"], limit=100 - ) - ) - backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points] - # Only the backfill points that we didn't record earlier exist here. - self.assertEqual(backfill_event_ids, ["insertion_eventB"]) - - def test_get_insertion_event_backward_extremities_in_room_attempted_event_retry_after_backoff_duration( - self, - ) -> None: - """ - Test to make sure after we fake attempt to backfill event - "insertion_eventA" many times, we can see retry and see the - "insertion_eventA" again after the backoff timeout duration has - exceeded. - """ - setup_info = self._setup_room_for_insertion_backfill_tests() - room_id = setup_info.room_id - depth_map = setup_info.depth_map - - # Record some attempts to backfill these events which will make - # `get_backfill_points_in_room` exclude them because we - # haven't passed the backoff interval. - self.get_success( - self.store.record_event_failed_pull_attempt( - room_id, "insertion_eventB", "fake cause" - ) - ) - self.get_success( - self.store.record_event_failed_pull_attempt( - room_id, "insertion_eventA", "fake cause" - ) - ) - self.get_success( - self.store.record_event_failed_pull_attempt( - room_id, "insertion_eventA", "fake cause" - ) - ) - self.get_success( - self.store.record_event_failed_pull_attempt( - room_id, "insertion_eventA", "fake cause" - ) - ) - self.get_success( - self.store.record_event_failed_pull_attempt( - room_id, "insertion_eventA", "fake cause" - ) - ) - - # Now advance time by 2 hours and we should only be able to see - # "insertion_eventB" because we have waited long enough for the single - # attempt (2^1 hours) but we still shouldn't see "insertion_eventA" - # because we haven't waited long enough for this many attempts. - self.reactor.advance(datetime.timedelta(hours=2).total_seconds()) - - # Try at "insertion_eventA" and make sure that "insertion_eventA" is not - # in the list because we've already attempted many times - backfill_points = self.get_success( - self.store.get_insertion_event_backward_extremities_in_room( - room_id, depth_map["insertion_eventA"], limit=100 - ) - ) - backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points] - self.assertEqual(backfill_event_ids, []) - - # Now advance time by 20 hours (above 2^4 because we made 4 attemps) and - # see if we can now backfill it - self.reactor.advance(datetime.timedelta(hours=20).total_seconds()) - - # Try at "insertion_eventA" again after we advanced enough time and we - # should see "insertion_eventA" again - backfill_points = self.get_success( - self.store.get_insertion_event_backward_extremities_in_room( - room_id, depth_map["insertion_eventA"], limit=100 - ) - ) - backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points] - self.assertEqual(backfill_event_ids, ["insertion_eventA"]) - def test_get_event_ids_with_failed_pull_attempts(self) -> None: """ Test to make sure we properly get event_ids based on whether they have any |