summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/api/constants.py14
-rw-r--r--synapse/api/room_versions.py61
-rw-r--r--synapse/app/generic_worker.py2
-rw-r--r--synapse/config/experimental.py3
-rw-r--r--synapse/event_auth.py40
-rw-r--r--synapse/events/__init__.py9
-rw-r--r--synapse/events/snapshot.py159
-rw-r--r--synapse/events/utils.py9
-rw-r--r--synapse/federation/federation_client.py5
-rw-r--r--synapse/federation/federation_server.py4
-rw-r--r--synapse/federation/sender/__init__.py34
-rw-r--r--synapse/handlers/federation.py33
-rw-r--r--synapse/handlers/federation_event.py109
-rw-r--r--synapse/handlers/message.py168
-rw-r--r--synapse/handlers/pagination.py137
-rw-r--r--synapse/handlers/room_batch.py466
-rw-r--r--synapse/handlers/room_member.py56
-rw-r--r--synapse/http/federation/matrix_federation_agent.py14
-rw-r--r--synapse/http/matrixfederationclient.py9
-rw-r--r--synapse/media/_base.py15
-rw-r--r--synapse/push/bulk_push_rule_evaluator.py1
-rw-r--r--synapse/push/clientformat.py26
-rw-r--r--synapse/rest/__init__.py2
-rw-r--r--synapse/rest/client/room_batch.py254
-rw-r--r--synapse/rest/client/versions.py2
-rw-r--r--synapse/server.py5
-rw-r--r--synapse/storage/controllers/persist_events.py5
-rw-r--r--synapse/storage/database.py2
-rw-r--r--synapse/storage/databases/main/__init__.py2
-rw-r--r--synapse/storage/databases/main/event_federation.py211
-rw-r--r--synapse/storage/databases/main/events.py140
-rw-r--r--synapse/storage/databases/main/events_worker.py2
-rw-r--r--synapse/storage/databases/main/room_batch.py47
-rw-r--r--synapse/util/__init__.py5
-rw-r--r--synapse/util/caches/lrucache.py8
35 files changed, 351 insertions, 1708 deletions
diff --git a/synapse/api/constants.py b/synapse/api/constants.py

index faf0770c66..dc32553d0c 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py
@@ -123,10 +123,6 @@ class EventTypes: SpaceChild: Final = "m.space.child" SpaceParent: Final = "m.space.parent" - MSC2716_INSERTION: Final = "org.matrix.msc2716.insertion" - MSC2716_BATCH: Final = "org.matrix.msc2716.batch" - MSC2716_MARKER: Final = "org.matrix.msc2716.marker" - Reaction: Final = "m.reaction" @@ -222,16 +218,6 @@ class EventContentFields: # Used in m.room.guest_access events. GUEST_ACCESS: Final = "guest_access" - # Used on normal messages to indicate they were historically imported after the fact - MSC2716_HISTORICAL: Final = "org.matrix.msc2716.historical" - # For "insertion" events to indicate what the next batch ID should be in - # order to connect to it - MSC2716_NEXT_BATCH_ID: Final = "next_batch_id" - # Used on "batch" events to indicate which insertion event it connects to - MSC2716_BATCH_ID: Final = "batch_id" - # For "marker" events - MSC2716_INSERTION_EVENT_REFERENCE: Final = "insertion_event_reference" - # The authorising user for joining a restricted room. AUTHORISING_USER: Final = "join_authorised_via_users_server" diff --git a/synapse/api/room_versions.py b/synapse/api/room_versions.py
index c5c71e242f..25c105a4c8 100644 --- a/synapse/api/room_versions.py +++ b/synapse/api/room_versions.py
@@ -91,11 +91,6 @@ class RoomVersion: # MSC2403: Allows join_rules to be set to 'knock', changes auth rules to allow sending # m.room.membership event with membership 'knock'. msc2403_knocking: bool - # MSC2716: Adds m.room.power_levels -> content.historical field to control - # whether "insertion", "chunk", "marker" events can be sent - msc2716_historical: bool - # MSC2716: Adds support for redacting "insertion", "chunk", and "marker" events - msc2716_redactions: bool # MSC3389: Protect relation information from redaction. msc3389_relation_redactions: bool # MSC3787: Adds support for a `knock_restricted` join rule, mixing concepts of @@ -130,8 +125,6 @@ class RoomVersions: msc3083_join_rules=False, msc3375_redaction_rules=False, msc2403_knocking=False, - msc2716_historical=False, - msc2716_redactions=False, msc3389_relation_redactions=False, msc3787_knock_restricted_join_rule=False, msc3667_int_only_power_levels=False, @@ -153,8 +146,6 @@ class RoomVersions: msc3083_join_rules=False, msc3375_redaction_rules=False, msc2403_knocking=False, - msc2716_historical=False, - msc2716_redactions=False, msc3389_relation_redactions=False, msc3787_knock_restricted_join_rule=False, msc3667_int_only_power_levels=False, @@ -176,8 +167,6 @@ class RoomVersions: msc3083_join_rules=False, msc3375_redaction_rules=False, msc2403_knocking=False, - msc2716_historical=False, - msc2716_redactions=False, msc3389_relation_redactions=False, msc3787_knock_restricted_join_rule=False, msc3667_int_only_power_levels=False, @@ -199,8 +188,6 @@ class RoomVersions: msc3083_join_rules=False, msc3375_redaction_rules=False, msc2403_knocking=False, - msc2716_historical=False, - msc2716_redactions=False, msc3389_relation_redactions=False, msc3787_knock_restricted_join_rule=False, msc3667_int_only_power_levels=False, @@ -222,8 +209,6 @@ class RoomVersions: msc3083_join_rules=False, msc3375_redaction_rules=False, msc2403_knocking=False, - msc2716_historical=False, - msc2716_redactions=False, msc3389_relation_redactions=False, msc3787_knock_restricted_join_rule=False, msc3667_int_only_power_levels=False, @@ -245,8 +230,6 @@ class RoomVersions: msc3083_join_rules=False, msc3375_redaction_rules=False, msc2403_knocking=False, - msc2716_historical=False, - msc2716_redactions=False, msc3389_relation_redactions=False, msc3787_knock_restricted_join_rule=False, msc3667_int_only_power_levels=False, @@ -268,8 +251,6 @@ class RoomVersions: msc3083_join_rules=False, msc3375_redaction_rules=False, msc2403_knocking=False, - msc2716_historical=False, - msc2716_redactions=False, msc3389_relation_redactions=False, msc3787_knock_restricted_join_rule=False, msc3667_int_only_power_levels=False, @@ -291,8 +272,6 @@ class RoomVersions: msc3083_join_rules=False, msc3375_redaction_rules=False, msc2403_knocking=True, - msc2716_historical=False, - msc2716_redactions=False, msc3389_relation_redactions=False, msc3787_knock_restricted_join_rule=False, msc3667_int_only_power_levels=False, @@ -314,8 +293,6 @@ class RoomVersions: msc3083_join_rules=True, msc3375_redaction_rules=False, msc2403_knocking=True, - msc2716_historical=False, - msc2716_redactions=False, msc3389_relation_redactions=False, msc3787_knock_restricted_join_rule=False, msc3667_int_only_power_levels=False, @@ -337,8 +314,6 @@ class RoomVersions: msc3083_join_rules=True, msc3375_redaction_rules=True, msc2403_knocking=True, - msc2716_historical=False, - msc2716_redactions=False, msc3389_relation_redactions=False, msc3787_knock_restricted_join_rule=False, msc3667_int_only_power_levels=False, @@ -360,8 +335,6 @@ class RoomVersions: msc3083_join_rules=True, msc3375_redaction_rules=True, msc2403_knocking=True, - msc2716_historical=False, - msc2716_redactions=False, msc3389_relation_redactions=False, msc3787_knock_restricted_join_rule=True, msc3667_int_only_power_levels=False, @@ -383,8 +356,6 @@ class RoomVersions: msc3083_join_rules=True, msc3375_redaction_rules=True, msc2403_knocking=True, - msc2716_historical=False, - msc2716_redactions=False, msc3389_relation_redactions=False, msc3787_knock_restricted_join_rule=False, msc3667_int_only_power_levels=False, @@ -406,8 +377,6 @@ class RoomVersions: msc3083_join_rules=True, msc3375_redaction_rules=True, msc2403_knocking=True, - msc2716_historical=False, - msc2716_redactions=False, msc3389_relation_redactions=False, msc3787_knock_restricted_join_rule=True, msc3667_int_only_power_levels=True, @@ -415,29 +384,6 @@ class RoomVersions: msc3931_push_features=(), msc3989_redaction_rules=False, ) - MSC2716v4 = RoomVersion( - "org.matrix.msc2716v4", - RoomDisposition.UNSTABLE, - EventFormatVersions.ROOM_V4_PLUS, - StateResolutionVersions.V2, - enforce_key_validity=True, - special_case_aliases_auth=False, - strict_canonicaljson=True, - limit_notifications_power_levels=True, - msc2175_implicit_room_creator=False, - msc2176_redaction_rules=False, - msc3083_join_rules=False, - msc3375_redaction_rules=False, - msc2403_knocking=True, - msc2716_historical=True, - msc2716_redactions=True, - msc3389_relation_redactions=False, - msc3787_knock_restricted_join_rule=False, - msc3667_int_only_power_levels=False, - msc3821_redaction_rules=False, - msc3931_push_features=(), - msc3989_redaction_rules=False, - ) MSC1767v10 = RoomVersion( # MSC1767 (Extensible Events) based on room version "10" "org.matrix.msc1767.10", @@ -453,8 +399,6 @@ class RoomVersions: msc3083_join_rules=True, msc3375_redaction_rules=True, msc2403_knocking=True, - msc2716_historical=False, - msc2716_redactions=False, msc3389_relation_redactions=False, msc3787_knock_restricted_join_rule=True, msc3667_int_only_power_levels=True, @@ -476,8 +420,6 @@ class RoomVersions: msc3083_join_rules=True, msc3375_redaction_rules=True, msc2403_knocking=True, - msc2716_historical=False, - msc2716_redactions=False, msc3389_relation_redactions=False, msc3787_knock_restricted_join_rule=True, msc3667_int_only_power_levels=True, @@ -500,8 +442,6 @@ class RoomVersions: msc3083_join_rules=True, msc3375_redaction_rules=True, msc2403_knocking=True, - msc2716_historical=False, - msc2716_redactions=False, msc3389_relation_redactions=False, msc3787_knock_restricted_join_rule=True, msc3667_int_only_power_levels=True, @@ -526,7 +466,6 @@ KNOWN_ROOM_VERSIONS: Dict[str, RoomVersion] = { RoomVersions.V9, RoomVersions.MSC3787, RoomVersions.V10, - RoomVersions.MSC2716v4, RoomVersions.MSC3989, RoomVersions.MSC3820opt2, ) diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 909ebccf78..7406c3948c 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py
@@ -83,7 +83,6 @@ from synapse.storage.databases.main.receipts import ReceiptsWorkerStore from synapse.storage.databases.main.registration import RegistrationWorkerStore from synapse.storage.databases.main.relations import RelationsWorkerStore from synapse.storage.databases.main.room import RoomWorkerStore -from synapse.storage.databases.main.room_batch import RoomBatchStore from synapse.storage.databases.main.roommember import RoomMemberWorkerStore from synapse.storage.databases.main.search import SearchStore from synapse.storage.databases.main.session import SessionStore @@ -120,7 +119,6 @@ class GenericWorkerStore( # the races it creates aren't too bad. KeyStore, RoomWorkerStore, - RoomBatchStore, DirectoryWorkerStore, PushRulesWorkerStore, ApplicationServiceTransactionWorkerStore, diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py
index 1d5b5ded45..8e0f5356b4 100644 --- a/synapse/config/experimental.py +++ b/synapse/config/experimental.py
@@ -247,9 +247,6 @@ class ExperimentalConfig(Config): # MSC3026 (busy presence state) self.msc3026_enabled: bool = experimental.get("msc3026_enabled", False) - # MSC2716 (importing historical messages) - self.msc2716_enabled: bool = experimental.get("msc2716_enabled", False) - # MSC3244 (room version capabilities) self.msc3244_enabled: bool = experimental.get("msc3244_enabled", True) diff --git a/synapse/event_auth.py b/synapse/event_auth.py
index b4b43ec4d7..3aaf53dfbd 100644 --- a/synapse/event_auth.py +++ b/synapse/event_auth.py
@@ -339,13 +339,6 @@ def check_state_dependent_auth_rules( if event.type == EventTypes.Redaction: check_redaction(event.room_version, event, auth_dict) - if ( - event.type == EventTypes.MSC2716_INSERTION - or event.type == EventTypes.MSC2716_BATCH - or event.type == EventTypes.MSC2716_MARKER - ): - check_historical(event.room_version, event, auth_dict) - logger.debug("Allowing! %s", event) @@ -365,7 +358,6 @@ LENIENT_EVENT_BYTE_LIMITS_ROOM_VERSIONS = { RoomVersions.V9, RoomVersions.MSC3787, RoomVersions.V10, - RoomVersions.MSC2716v4, RoomVersions.MSC1767v10, } @@ -823,38 +815,6 @@ def check_redaction( raise AuthError(403, "You don't have permission to redact events") -def check_historical( - room_version_obj: RoomVersion, - event: "EventBase", - auth_events: StateMap["EventBase"], -) -> None: - """Check whether the event sender is allowed to send historical related - events like "insertion", "batch", and "marker". - - Returns: - None - - Raises: - AuthError if the event sender is not allowed to send historical related events - ("insertion", "batch", and "marker"). - """ - # Ignore the auth checks in room versions that do not support historical - # events - if not room_version_obj.msc2716_historical: - return - - user_level = get_user_power_level(event.user_id, auth_events) - - historical_level = get_named_level(auth_events, "historical", 100) - - if user_level < historical_level: - raise UnstableSpecAuthError( - 403, - 'You don\'t have permission to send send historical related events ("insertion", "batch", and "marker")', - errcode=Codes.INSUFFICIENT_POWER, - ) - - def _check_power_levels( room_version_obj: RoomVersion, event: "EventBase", diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py
index de7e5be42b..75b62adb33 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py
@@ -198,7 +198,6 @@ class _EventInternalMetadata: soft_failed: DictProperty[bool] = DictProperty("soft_failed") proactively_send: DictProperty[bool] = DictProperty("proactively_send") redacted: DictProperty[bool] = DictProperty("redacted") - historical: DictProperty[bool] = DictProperty("historical") txn_id: DictProperty[str] = DictProperty("txn_id") """The transaction ID, if it was set when the event was created.""" @@ -288,14 +287,6 @@ class _EventInternalMetadata: """ return self._dict.get("redacted", False) - def is_historical(self) -> bool: - """Whether this is a historical message. - This is used by the batchsend historical message endpoint and - is needed to and mark the event as backfilled and skip some checks - like push notifications. - """ - return self._dict.get("historical", False) - def is_notifiable(self) -> bool: """Whether this event can trigger a push notification""" return not self.is_outlier() or self.is_out_of_band_membership() diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py
index e7e8225b8e..a43498ed4d 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py
@@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. from abc import ABC, abstractmethod -from typing import TYPE_CHECKING, List, Optional, Tuple +from typing import TYPE_CHECKING, Dict, List, Optional, Tuple import attr from immutabledict import immutabledict @@ -107,33 +107,32 @@ class EventContext(UnpersistedEventContextBase): state_delta_due_to_event: If `state_group` and `state_group_before_event` are not None then this is the delta of the state between the two groups. - prev_group: If it is known, ``state_group``'s prev_group. Note that this being - None does not necessarily mean that ``state_group`` does not have - a prev_group! + state_group_deltas: If not empty, this is a dict collecting a mapping of the state + difference between state groups. - If the event is a state event, this is normally the same as - ``state_group_before_event``. + The keys are a tuple of two integers: the initial group and final state group. + The corresponding value is a state map representing the state delta between + these state groups. - If ``state_group`` is None (ie, the event is an outlier), ``prev_group`` - will always also be ``None``. + The dictionary is expected to have at most two entries with state groups of: - Note that this *not* (necessarily) the state group associated with - ``_prev_state_ids``. + 1. The state group before the event and after the event. + 2. The state group preceding the state group before the event and the + state group before the event. - delta_ids: If ``prev_group`` is not None, the state delta between ``prev_group`` - and ``state_group``. + This information is collected and stored as part of an optimization for persisting + events. partial_state: if True, we may be storing this event with a temporary, incomplete state. """ _storage: "StorageControllers" + state_group_deltas: Dict[Tuple[int, int], StateMap[str]] rejected: Optional[str] = None _state_group: Optional[int] = None state_group_before_event: Optional[int] = None _state_delta_due_to_event: Optional[StateMap[str]] = None - prev_group: Optional[int] = None - delta_ids: Optional[StateMap[str]] = None app_service: Optional[ApplicationService] = None partial_state: bool = False @@ -145,16 +144,14 @@ class EventContext(UnpersistedEventContextBase): state_group_before_event: Optional[int], state_delta_due_to_event: Optional[StateMap[str]], partial_state: bool, - prev_group: Optional[int] = None, - delta_ids: Optional[StateMap[str]] = None, + state_group_deltas: Dict[Tuple[int, int], StateMap[str]], ) -> "EventContext": return EventContext( storage=storage, state_group=state_group, state_group_before_event=state_group_before_event, state_delta_due_to_event=state_delta_due_to_event, - prev_group=prev_group, - delta_ids=delta_ids, + state_group_deltas=state_group_deltas, partial_state=partial_state, ) @@ -163,7 +160,7 @@ class EventContext(UnpersistedEventContextBase): storage: "StorageControllers", ) -> "EventContext": """Return an EventContext instance suitable for persisting an outlier event""" - return EventContext(storage=storage) + return EventContext(storage=storage, state_group_deltas={}) async def persist(self, event: EventBase) -> "EventContext": return self @@ -183,13 +180,15 @@ class EventContext(UnpersistedEventContextBase): "state_group": self._state_group, "state_group_before_event": self.state_group_before_event, "rejected": self.rejected, - "prev_group": self.prev_group, + "state_group_deltas": _encode_state_group_delta(self.state_group_deltas), "state_delta_due_to_event": _encode_state_dict( self._state_delta_due_to_event ), - "delta_ids": _encode_state_dict(self.delta_ids), "app_service_id": self.app_service.id if self.app_service else None, "partial_state": self.partial_state, + # add dummy delta_ids and prev_group for backwards compatibility + "delta_ids": None, + "prev_group": None, } @staticmethod @@ -204,17 +203,24 @@ class EventContext(UnpersistedEventContextBase): Returns: The event context. """ + # workaround for backwards/forwards compatibility: if the input doesn't have a value + # for "state_group_deltas" just assign an empty dict + state_group_deltas = input.get("state_group_deltas", None) + if state_group_deltas: + state_group_deltas = _decode_state_group_delta(state_group_deltas) + else: + state_group_deltas = {} + context = EventContext( # We use the state_group and prev_state_id stuff to pull the # current_state_ids out of the DB and construct prev_state_ids. storage=storage, state_group=input["state_group"], state_group_before_event=input["state_group_before_event"], - prev_group=input["prev_group"], + state_group_deltas=state_group_deltas, state_delta_due_to_event=_decode_state_dict( input["state_delta_due_to_event"] ), - delta_ids=_decode_state_dict(input["delta_ids"]), rejected=input["rejected"], partial_state=input.get("partial_state", False), ) @@ -349,7 +355,7 @@ class UnpersistedEventContext(UnpersistedEventContextBase): _storage: "StorageControllers" state_group_before_event: Optional[int] state_group_after_event: Optional[int] - state_delta_due_to_event: Optional[dict] + state_delta_due_to_event: Optional[StateMap[str]] prev_group_for_state_group_before_event: Optional[int] delta_ids_to_state_group_before_event: Optional[StateMap[str]] partial_state: bool @@ -380,26 +386,16 @@ class UnpersistedEventContext(UnpersistedEventContextBase): events_and_persisted_context = [] for event, unpersisted_context in amended_events_and_context: - if event.is_state(): - context = EventContext( - storage=unpersisted_context._storage, - state_group=unpersisted_context.state_group_after_event, - state_group_before_event=unpersisted_context.state_group_before_event, - state_delta_due_to_event=unpersisted_context.state_delta_due_to_event, - partial_state=unpersisted_context.partial_state, - prev_group=unpersisted_context.state_group_before_event, - delta_ids=unpersisted_context.state_delta_due_to_event, - ) - else: - context = EventContext( - storage=unpersisted_context._storage, - state_group=unpersisted_context.state_group_after_event, - state_group_before_event=unpersisted_context.state_group_before_event, - state_delta_due_to_event=unpersisted_context.state_delta_due_to_event, - partial_state=unpersisted_context.partial_state, - prev_group=unpersisted_context.prev_group_for_state_group_before_event, - delta_ids=unpersisted_context.delta_ids_to_state_group_before_event, - ) + state_group_deltas = unpersisted_context._build_state_group_deltas() + + context = EventContext( + storage=unpersisted_context._storage, + state_group=unpersisted_context.state_group_after_event, + state_group_before_event=unpersisted_context.state_group_before_event, + state_delta_due_to_event=unpersisted_context.state_delta_due_to_event, + partial_state=unpersisted_context.partial_state, + state_group_deltas=state_group_deltas, + ) events_and_persisted_context.append((event, context)) return events_and_persisted_context @@ -452,11 +448,11 @@ class UnpersistedEventContext(UnpersistedEventContextBase): # if the event isn't a state event the state group doesn't change if not self.state_delta_due_to_event: - state_group_after_event = self.state_group_before_event + self.state_group_after_event = self.state_group_before_event # otherwise if it is a state event we need to get a state group for it else: - state_group_after_event = await self._storage.state.store_state_group( + self.state_group_after_event = await self._storage.state.store_state_group( event.event_id, event.room_id, prev_group=self.state_group_before_event, @@ -464,16 +460,81 @@ class UnpersistedEventContext(UnpersistedEventContextBase): current_state_ids=None, ) + state_group_deltas = self._build_state_group_deltas() + return EventContext.with_state( storage=self._storage, - state_group=state_group_after_event, + state_group=self.state_group_after_event, state_group_before_event=self.state_group_before_event, state_delta_due_to_event=self.state_delta_due_to_event, + state_group_deltas=state_group_deltas, partial_state=self.partial_state, - prev_group=self.state_group_before_event, - delta_ids=self.state_delta_due_to_event, ) + def _build_state_group_deltas(self) -> Dict[Tuple[int, int], StateMap]: + """ + Collect deltas between the state groups associated with this context + """ + state_group_deltas = {} + + # if we know the state group before the event and after the event, add them and the + # state delta between them to state_group_deltas + if self.state_group_before_event and self.state_group_after_event: + # if we have the state groups we should have the delta + assert self.state_delta_due_to_event is not None + state_group_deltas[ + ( + self.state_group_before_event, + self.state_group_after_event, + ) + ] = self.state_delta_due_to_event + + # the state group before the event may also have a state group which precedes it, if + # we have that and the state group before the event, add them and the state + # delta between them to state_group_deltas + if ( + self.prev_group_for_state_group_before_event + and self.state_group_before_event + ): + # if we have both state groups we should have the delta between them + assert self.delta_ids_to_state_group_before_event is not None + state_group_deltas[ + ( + self.prev_group_for_state_group_before_event, + self.state_group_before_event, + ) + ] = self.delta_ids_to_state_group_before_event + + return state_group_deltas + + +def _encode_state_group_delta( + state_group_delta: Dict[Tuple[int, int], StateMap[str]] +) -> List[Tuple[int, int, Optional[List[Tuple[str, str, str]]]]]: + if not state_group_delta: + return [] + + state_group_delta_encoded = [] + for key, value in state_group_delta.items(): + state_group_delta_encoded.append((key[0], key[1], _encode_state_dict(value))) + + return state_group_delta_encoded + + +def _decode_state_group_delta( + input: List[Tuple[int, int, List[Tuple[str, str, str]]]] +) -> Dict[Tuple[int, int], StateMap[str]]: + if not input: + return {} + + state_group_deltas = {} + for state_group_1, state_group_2, state_dict in input: + state_map = _decode_state_dict(state_dict) + assert state_map is not None + state_group_deltas[(state_group_1, state_group_2)] = state_map + + return state_group_deltas + def _encode_state_dict( state_dict: Optional[StateMap[str]], diff --git a/synapse/events/utils.py b/synapse/events/utils.py
index e7b7b78b84..a55efcca56 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py
@@ -164,21 +164,12 @@ def prune_event_dict(room_version: RoomVersion, event_dict: JsonDict) -> JsonDic if room_version.msc2176_redaction_rules: add_fields("invite") - if room_version.msc2716_historical: - add_fields("historical") - elif event_type == EventTypes.Aliases and room_version.special_case_aliases_auth: add_fields("aliases") elif event_type == EventTypes.RoomHistoryVisibility: add_fields("history_visibility") elif event_type == EventTypes.Redaction and room_version.msc2176_redaction_rules: add_fields("redacts") - elif room_version.msc2716_redactions and event_type == EventTypes.MSC2716_INSERTION: - add_fields(EventContentFields.MSC2716_NEXT_BATCH_ID) - elif room_version.msc2716_redactions and event_type == EventTypes.MSC2716_BATCH: - add_fields(EventContentFields.MSC2716_BATCH_ID) - elif room_version.msc2716_redactions and event_type == EventTypes.MSC2716_MARKER: - add_fields(EventContentFields.MSC2716_INSERTION_EVENT_REFERENCE) # Protect the rel_type and event_id fields under the m.relates_to field. if room_version.msc3389_relation_redactions: diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index a2cf3a96c6..e5359ca558 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py
@@ -260,7 +260,9 @@ class FederationClient(FederationBase): use_unstable = False for user_id, one_time_keys in query.items(): for device_id, algorithms in one_time_keys.items(): - if any(count > 1 for count in algorithms.values()): + # If more than one algorithm is requested, attempt to use the unstable + # endpoint. + if sum(algorithms.values()) > 1: use_unstable = True if algorithms: # For the stable query, choose only the first algorithm. @@ -296,6 +298,7 @@ class FederationClient(FederationBase): else: logger.debug("Skipping unstable claim client keys API") + # TODO Potentially attempt multiple queries and combine the results? return await self.transport_layer.claim_client_keys( user, destination, content, timeout ) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 9425b32507..61fa3b30af 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py
@@ -1016,7 +1016,9 @@ class FederationServer(FederationBase): for user_id, device_keys in result.items(): for device_id, keys in device_keys.items(): for key_id, key in keys.items(): - json_result.setdefault(user_id, {})[device_id] = {key_id: key} + json_result.setdefault(user_id, {}).setdefault(device_id, {})[ + key_id + ] = key logger.info( "Claimed one-time-keys: %s", diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index f3bdc5a4d2..97abbdee18 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py
@@ -109,10 +109,8 @@ was enabled*, Catch-Up Mode is exited and we return to `_transaction_transmissio If a remote server is unreachable over federation, we back off from that server, with an exponentially-increasing retry interval. -Whilst we don't automatically retry after the interval, we prevent making new attempts -until such time as the back-off has cleared. -Once the back-off is cleared and a new PDU or EDU arrives for transmission, the transmission -loop resumes and empties the queue by making federation requests. +We automatically retry after the retry interval expires (roughly, the logic to do so +being triggered every minute). If the backoff grows too large (> 1 hour), the in-memory queue is emptied (to prevent unbounded growth) and Catch-Up Mode is entered. @@ -145,7 +143,6 @@ from prometheus_client import Counter from typing_extensions import Literal from twisted.internet import defer -from twisted.internet.interfaces import IDelayedCall import synapse.metrics from synapse.api.presence import UserPresenceState @@ -184,14 +181,18 @@ sent_pdus_destination_dist_total = Counter( "Total number of PDUs queued for sending across all destinations", ) -# Time (in s) after Synapse's startup that we will begin to wake up destinations -# that have catch-up outstanding. -CATCH_UP_STARTUP_DELAY_SEC = 15 +# Time (in s) to wait before trying to wake up destinations that have +# catch-up outstanding. This will also be the delay applied at startup +# before trying the same. +# Please note that rate limiting still applies, so while the loop is +# executed every X seconds the destinations may not be wake up because +# they are being rate limited following previous attempt failures. +WAKEUP_RETRY_PERIOD_SEC = 60 # Time (in s) to wait in between waking up each destination, i.e. one destination -# will be woken up every <x> seconds after Synapse's startup until we have woken -# every destination has outstanding catch-up. -CATCH_UP_STARTUP_INTERVAL_SEC = 5 +# will be woken up every <x> seconds until we have woken every destination +# has outstanding catch-up. +WAKEUP_INTERVAL_BETWEEN_DESTINATIONS_SEC = 5 class AbstractFederationSender(metaclass=abc.ABCMeta): @@ -415,12 +416,10 @@ class FederationSender(AbstractFederationSender): / hs.config.ratelimiting.federation_rr_transactions_per_room_per_second ) - # wake up destinations that have outstanding PDUs to be caught up - self._catchup_after_startup_timer: Optional[ - IDelayedCall - ] = self.clock.call_later( - CATCH_UP_STARTUP_DELAY_SEC, + # Regularly wake up destinations that have outstanding PDUs to be caught up + self.clock.looping_call( run_as_background_process, + WAKEUP_RETRY_PERIOD_SEC * 1000.0, "wake_destinations_needing_catchup", self._wake_destinations_needing_catchup, ) @@ -966,7 +965,6 @@ class FederationSender(AbstractFederationSender): if not destinations_to_wake: # finished waking all destinations! - self._catchup_after_startup_timer = None break last_processed = destinations_to_wake[-1] @@ -983,4 +981,4 @@ class FederationSender(AbstractFederationSender): last_processed, ) self.wake_destination(destination) - await self.clock.sleep(CATCH_UP_STARTUP_INTERVAL_SEC) + await self.clock.sleep(WAKEUP_INTERVAL_BETWEEN_DESTINATIONS_SEC) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index b7b5e21020..cc5ed97730 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py
@@ -105,14 +105,12 @@ backfill_processing_before_timer = Histogram( ) +# TODO: We can refactor this away now that there is only one backfill point again class _BackfillPointType(Enum): # a regular backwards extremity (ie, an event which we don't yet have, but which # is referred to by other events in the DAG) BACKWARDS_EXTREMITY = enum.auto() - # an MSC2716 "insertion event" - INSERTION_PONT = enum.auto() - @attr.s(slots=True, auto_attribs=True, frozen=True) class _BackfillPoint: @@ -273,32 +271,10 @@ class FederationHandler: ) ] - insertion_events_to_be_backfilled: List[_BackfillPoint] = [] - if self.hs.config.experimental.msc2716_enabled: - insertion_events_to_be_backfilled = [ - _BackfillPoint(event_id, depth, _BackfillPointType.INSERTION_PONT) - for event_id, depth in await self.store.get_insertion_event_backward_extremities_in_room( - room_id=room_id, - current_depth=current_depth, - # We only need to end up with 5 extremities combined with - # the backfill points to make the `/backfill` request ... - # (see the other comment above for more context). - limit=50, - ) - ] - logger.debug( - "_maybe_backfill_inner: backwards_extremities=%s insertion_events_to_be_backfilled=%s", - backwards_extremities, - insertion_events_to_be_backfilled, - ) - # we now have a list of potential places to backpaginate from. We prefer to # start with the most recent (ie, max depth), so let's sort the list. sorted_backfill_points: List[_BackfillPoint] = sorted( - itertools.chain( - backwards_extremities, - insertion_events_to_be_backfilled, - ), + backwards_extremities, key=lambda e: -int(e.depth), ) @@ -411,10 +387,7 @@ class FederationHandler: # event but not anything before it. This would require looking at the # state *before* the event, ignoring the special casing certain event # types have. - if bp.type == _BackfillPointType.INSERTION_PONT: - event_ids_to_check = [bp.event_id] - else: - event_ids_to_check = await self.store.get_successor_events(bp.event_id) + event_ids_to_check = await self.store.get_successor_events(bp.event_id) events_to_check = await self.store.get_events_as_list( event_ids_to_check, diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 42141d3670..d32d224d56 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py
@@ -601,18 +601,6 @@ class FederationEventHandler: room_id, [(event, context)] ) - # If we're joining the room again, check if there is new marker - # state indicating that there is new history imported somewhere in - # the DAG. Multiple markers can exist in the current state with - # unique state_keys. - # - # Do this after the state from the remote join was persisted (via - # `persist_events_and_notify`). Otherwise we can run into a - # situation where the create event doesn't exist yet in the - # `current_state_events` - for e in state: - await self._handle_marker_event(origin, e) - return stream_id_after_persist async def update_state_for_partial_state_event( @@ -915,13 +903,6 @@ class FederationEventHandler: ) ) - # We construct the event lists in source order from `/backfill` response because - # it's a) easiest, but also b) the order in which we process things matters for - # MSC2716 historical batches because many historical events are all at the same - # `depth` and we rely on the tenuous sort that the other server gave us and hope - # they're doing their best. The brittle nature of this ordering for historical - # messages over federation is one of the reasons why we don't want to continue - # on MSC2716 until we have online topological ordering. events_with_failed_pull_attempts, fresh_events = partition( new_events, lambda e: e.event_id in event_ids_with_failed_pull_attempts ) @@ -1460,8 +1441,6 @@ class FederationEventHandler: await self._run_push_actions_and_persist_event(event, context, backfilled) - await self._handle_marker_event(origin, event) - if backfilled or context.rejected: return @@ -1559,94 +1538,6 @@ class FederationEventHandler: except Exception: logger.exception("Failed to resync device for %s", sender) - @trace - async def _handle_marker_event(self, origin: str, marker_event: EventBase) -> None: - """Handles backfilling the insertion event when we receive a marker - event that points to one. - - Args: - origin: Origin of the event. Will be called to get the insertion event - marker_event: The event to process - """ - - if marker_event.type != EventTypes.MSC2716_MARKER: - # Not a marker event - return - - if marker_event.rejected_reason is not None: - # Rejected event - return - - # Skip processing a marker event if the room version doesn't - # support it or the event is not from the room creator. - room_version = await self._store.get_room_version(marker_event.room_id) - create_event = await self._store.get_create_event_for_room(marker_event.room_id) - if not room_version.msc2175_implicit_room_creator: - room_creator = create_event.content.get(EventContentFields.ROOM_CREATOR) - else: - room_creator = create_event.sender - if not room_version.msc2716_historical and ( - not self._config.experimental.msc2716_enabled - or marker_event.sender != room_creator - ): - return - - logger.debug("_handle_marker_event: received %s", marker_event) - - insertion_event_id = marker_event.content.get( - EventContentFields.MSC2716_INSERTION_EVENT_REFERENCE - ) - - if insertion_event_id is None: - # Nothing to retrieve then (invalid marker) - return - - already_seen_insertion_event = await self._store.have_seen_event( - marker_event.room_id, insertion_event_id - ) - if already_seen_insertion_event: - # No need to process a marker again if we have already seen the - # insertion event that it was pointing to - return - - logger.debug( - "_handle_marker_event: backfilling insertion event %s", insertion_event_id - ) - - await self._get_events_and_persist( - origin, - marker_event.room_id, - [insertion_event_id], - ) - - insertion_event = await self._store.get_event( - insertion_event_id, allow_none=True - ) - if insertion_event is None: - logger.warning( - "_handle_marker_event: server %s didn't return insertion event %s for marker %s", - origin, - insertion_event_id, - marker_event.event_id, - ) - return - - logger.debug( - "_handle_marker_event: succesfully backfilled insertion event %s from marker event %s", - insertion_event, - marker_event, - ) - - await self._store.insert_insertion_extremity( - insertion_event_id, marker_event.room_id - ) - - logger.debug( - "_handle_marker_event: insertion extremity added for %s from marker event %s", - insertion_event, - marker_event, - ) - async def backfill_event_id( self, destinations: List[str], room_id: str, event_id: str ) -> PulledPduInfo: diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 0b61c2272b..4292b47037 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py
@@ -60,7 +60,6 @@ from synapse.replication.http.send_event import ReplicationSendEventRestServlet from synapse.replication.http.send_events import ReplicationSendEventsRestServlet from synapse.storage.databases.main.events_worker import EventRedactBehaviour from synapse.types import ( - MutableStateMap, PersistedEventPosition, Requester, RoomAlias, @@ -573,7 +572,6 @@ class EventCreationHandler: state_event_ids: Optional[List[str]] = None, require_consent: bool = True, outlier: bool = False, - historical: bool = False, depth: Optional[int] = None, state_map: Optional[StateMap[str]] = None, for_batch: bool = False, @@ -599,7 +597,7 @@ class EventCreationHandler: allow_no_prev_events: Whether to allow this event to be created an empty list of prev_events. Normally this is prohibited just because most events should have a prev_event and we should only use this in special - cases like MSC2716. + cases (previously useful for MSC2716). prev_event_ids: the forward extremities to use as the prev_events for the new event. @@ -614,13 +612,10 @@ class EventCreationHandler: If non-None, prev_event_ids must also be provided. state_event_ids: - The full state at a given event. This is used particularly by the MSC2716 - /batch_send endpoint. One use case is with insertion events which float at - the beginning of a historical batch and don't have any `prev_events` to - derive from; we add all of these state events as the explicit state so the - rest of the historical batch can inherit the same state and state_group. - This should normally be left as None, which will cause the auth_event_ids - to be calculated based on the room state at the prev_events. + The full state at a given event. This was previously used particularly + by the MSC2716 /batch_send endpoint. This should normally be left as + None, which will cause the auth_event_ids to be calculated based on the + room state at the prev_events. require_consent: Whether to check if the requester has consented to the privacy policy. @@ -629,10 +624,6 @@ class EventCreationHandler: it's from an arbitrary point and floating in the DAG as opposed to being inline with the current DAG. - historical: Indicates whether the message is being inserted - back in time around some existing events. This is used to skip - a few checks and mark the event as backfilled. - depth: Override the depth used to order the event in the DAG. Should normally be set to None, which will cause the depth to be calculated based on the prev_events. @@ -717,8 +708,6 @@ class EventCreationHandler: builder.internal_metadata.outlier = outlier - builder.internal_metadata.historical = historical - event, unpersisted_context = await self.create_new_client_event( builder=builder, requester=requester, @@ -947,7 +936,6 @@ class EventCreationHandler: txn_id: Optional[str] = None, ignore_shadow_ban: bool = False, outlier: bool = False, - historical: bool = False, depth: Optional[int] = None, ) -> Tuple[EventBase, int]: """ @@ -961,19 +949,16 @@ class EventCreationHandler: allow_no_prev_events: Whether to allow this event to be created an empty list of prev_events. Normally this is prohibited just because most events should have a prev_event and we should only use this in special - cases like MSC2716. + cases (previously useful for MSC2716). prev_event_ids: The event IDs to use as the prev events. Should normally be left as None to automatically request them from the database. state_event_ids: - The full state at a given event. This is used particularly by the MSC2716 - /batch_send endpoint. One use case is with insertion events which float at - the beginning of a historical batch and don't have any `prev_events` to - derive from; we add all of these state events as the explicit state so the - rest of the historical batch can inherit the same state and state_group. - This should normally be left as None, which will cause the auth_event_ids - to be calculated based on the room state at the prev_events. + The full state at a given event. This was previously used particularly + by the MSC2716 /batch_send endpoint. This should normally be left as + None, which will cause the auth_event_ids to be calculated based on the + room state at the prev_events. ratelimit: Whether to rate limit this send. txn_id: The transaction ID. ignore_shadow_ban: True if shadow-banned users should be allowed to @@ -981,9 +966,6 @@ class EventCreationHandler: outlier: Indicates whether the event is an `outlier`, i.e. if it's from an arbitrary point and floating in the DAG as opposed to being inline with the current DAG. - historical: Indicates whether the message is being inserted - back in time around some existing events. This is used to skip - a few checks and mark the event as backfilled. depth: Override the depth used to order the event in the DAG. Should normally be set to None, which will cause the depth to be calculated based on the prev_events. @@ -1053,7 +1035,6 @@ class EventCreationHandler: prev_event_ids=prev_event_ids, state_event_ids=state_event_ids, outlier=outlier, - historical=historical, depth=depth, ) context = await unpersisted_context.persist(event) @@ -1145,7 +1126,7 @@ class EventCreationHandler: allow_no_prev_events: Whether to allow this event to be created an empty list of prev_events. Normally this is prohibited just because most events should have a prev_event and we should only use this in special - cases like MSC2716. + cases (previously useful for MSC2716). prev_event_ids: the forward extremities to use as the prev_events for the new event. @@ -1158,13 +1139,10 @@ class EventCreationHandler: based on the room state at the prev_events. state_event_ids: - The full state at a given event. This is used particularly by the MSC2716 - /batch_send endpoint. One use case is with insertion events which float at - the beginning of a historical batch and don't have any `prev_events` to - derive from; we add all of these state events as the explicit state so the - rest of the historical batch can inherit the same state and state_group. - This should normally be left as None, which will cause the auth_event_ids - to be calculated based on the room state at the prev_events. + The full state at a given event. This was previously used particularly + by the MSC2716 /batch_send endpoint. This should normally be left as + None, which will cause the auth_event_ids to be calculated based on the + room state at the prev_events. depth: Override the depth used to order the event in the DAG. Should normally be set to None, which will cause the depth to be calculated @@ -1261,52 +1239,6 @@ class EventCreationHandler: if builder.internal_metadata.outlier: event.internal_metadata.outlier = True context = EventContext.for_outlier(self._storage_controllers) - elif ( - event.type == EventTypes.MSC2716_INSERTION - and state_event_ids - and builder.internal_metadata.is_historical() - ): - # Add explicit state to the insertion event so it has state to derive - # from even though it's floating with no `prev_events`. The rest of - # the batch can derive from this state and state_group. - # - # TODO(faster_joins): figure out how this works, and make sure that the - # old state is complete. - # https://github.com/matrix-org/synapse/issues/13003 - metadata = await self.store.get_metadata_for_events(state_event_ids) - - state_map_for_event: MutableStateMap[str] = {} - for state_id in state_event_ids: - data = metadata.get(state_id) - if data is None: - # We're trying to persist a new historical batch of events - # with the given state, e.g. via - # `RoomBatchSendEventRestServlet`. The state can be inferred - # by Synapse or set directly by the client. - # - # Either way, we should have persisted all the state before - # getting here. - raise Exception( - f"State event {state_id} not found in DB," - " Synapse should have persisted it before using it." - ) - - if data.state_key is None: - raise Exception( - f"Trying to set non-state event {state_id} as state" - ) - - state_map_for_event[(data.event_type, data.state_key)] = state_id - - # TODO(faster_joins): check how MSC2716 works and whether we can have - # partial state here - # https://github.com/matrix-org/synapse/issues/13003 - context = await self.state.calculate_context_info( - event, - state_ids_before_event=state_map_for_event, - partial_state=False, - ) - else: context = await self.state.calculate_context_info(event) @@ -1876,28 +1808,6 @@ class EventCreationHandler: 403, "Redacting server ACL events is not permitted" ) - # Add a little safety stop-gap to prevent people from trying to - # redact MSC2716 related events when they're in a room version - # which does not support it yet. We allow people to use MSC2716 - # events in existing room versions but only from the room - # creator since it does not require any changes to the auth - # rules and in effect, the redaction algorithm . In the - # supported room version, we add the `historical` power level to - # auth the MSC2716 related events and adjust the redaction - # algorthim to keep the `historical` field around (redacting an - # event should only strip fields which don't affect the - # structural protocol level). - is_msc2716_event = ( - original_event.type == EventTypes.MSC2716_INSERTION - or original_event.type == EventTypes.MSC2716_BATCH - or original_event.type == EventTypes.MSC2716_MARKER - ) - if not room_version_obj.msc2716_historical and is_msc2716_event: - raise AuthError( - 403, - "Redacting MSC2716 events is not supported in this room version", - ) - event_types = event_auth.auth_types_for_event(event.room_version, event) prev_state_ids = await context.get_prev_state_ids( StateFilter.from_types(event_types) @@ -1935,58 +1845,12 @@ class EventCreationHandler: if prev_state_ids: raise AuthError(403, "Changing the room create event is forbidden") - if event.type == EventTypes.MSC2716_INSERTION: - room_version = await self.store.get_room_version_id(event.room_id) - room_version_obj = KNOWN_ROOM_VERSIONS[room_version] - - create_event = await self.store.get_create_event_for_room(event.room_id) - if not room_version_obj.msc2175_implicit_room_creator: - room_creator = create_event.content.get( - EventContentFields.ROOM_CREATOR - ) - else: - room_creator = create_event.sender - - # Only check an insertion event if the room version - # supports it or the event is from the room creator. - if room_version_obj.msc2716_historical or ( - self.config.experimental.msc2716_enabled - and event.sender == room_creator - ): - next_batch_id = event.content.get( - EventContentFields.MSC2716_NEXT_BATCH_ID - ) - conflicting_insertion_event_id = None - if next_batch_id: - conflicting_insertion_event_id = ( - await self.store.get_insertion_event_id_by_batch_id( - event.room_id, next_batch_id - ) - ) - if conflicting_insertion_event_id is not None: - # The current insertion event that we're processing is invalid - # because an insertion event already exists in the room with the - # same next_batch_id. We can't allow multiple because the batch - # pointing will get weird, e.g. we can't determine which insertion - # event the batch event is pointing to. - raise SynapseError( - HTTPStatus.BAD_REQUEST, - "Another insertion event already exists with the same next_batch_id", - errcode=Codes.INVALID_PARAM, - ) - - # Mark any `m.historical` messages as backfilled so they don't appear - # in `/sync` and have the proper decrementing `stream_ordering` as we import - backfilled = False - if event.internal_metadata.is_historical(): - backfilled = True - assert self._storage_controllers.persistence is not None ( persisted_events, max_stream_token, ) = await self._storage_controllers.persistence.persist_events( - events_and_context, backfilled=backfilled + events_and_context, ) events_and_pos = [] diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index d5257acb7d..19b8728db9 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py
@@ -40,6 +40,11 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) +# How many single event gaps we tolerate returning in a `/messages` response before we +# backfill and try to fill in the history. This is an arbitrarily picked number so feel +# free to tune it in the future. +BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD = 3 + @attr.s(slots=True, auto_attribs=True) class PurgeStatus: @@ -486,35 +491,35 @@ class PaginationHandler: room_id, room_token.stream ) - if not use_admin_priviledge and membership == Membership.LEAVE: - # If they have left the room then clamp the token to be before - # they left the room, to save the effort of loading from the - # database. - - # This is only None if the room is world_readable, in which - # case "JOIN" would have been returned. - assert member_event_id + # If they have left the room then clamp the token to be before + # they left the room, to save the effort of loading from the + # database. + if ( + pagin_config.direction == Direction.BACKWARDS + and not use_admin_priviledge + and membership == Membership.LEAVE + ): + # This is only None if the room is world_readable, in which case + # "Membership.JOIN" would have been returned and we should never hit + # this branch. + assert member_event_id + + leave_token = await self.store.get_topological_token_for_event( + member_event_id + ) + assert leave_token.topological is not None - leave_token = await self.store.get_topological_token_for_event( - member_event_id + if leave_token.topological < curr_topo: + from_token = from_token.copy_and_replace( + StreamKeyType.ROOM, leave_token ) - assert leave_token.topological is not None - - if leave_token.topological < curr_topo: - from_token = from_token.copy_and_replace( - StreamKeyType.ROOM, leave_token - ) - - await self.hs.get_federation_handler().maybe_backfill( - room_id, - curr_topo, - limit=pagin_config.limit, - ) to_room_key = None if pagin_config.to_token: to_room_key = pagin_config.to_token.room_key + # Initially fetch the events from the database. With any luck, we can return + # these without blocking on backfill (handled below). events, next_key = await self.store.paginate_room_events( room_id=room_id, from_key=from_token.room_key, @@ -524,6 +529,94 @@ class PaginationHandler: event_filter=event_filter, ) + if pagin_config.direction == Direction.BACKWARDS: + # We use a `Set` because there can be multiple events at a given depth + # and we only care about looking at the unique continum of depths to + # find gaps. + event_depths: Set[int] = {event.depth for event in events} + sorted_event_depths = sorted(event_depths) + + # Inspect the depths of the returned events to see if there are any gaps + found_big_gap = False + number_of_gaps = 0 + previous_event_depth = ( + sorted_event_depths[0] if len(sorted_event_depths) > 0 else 0 + ) + for event_depth in sorted_event_depths: + # We don't expect a negative depth but we'll just deal with it in + # any case by taking the absolute value to get the true gap between + # any two integers. + depth_gap = abs(event_depth - previous_event_depth) + # A `depth_gap` of 1 is a normal continuous chain to the next event + # (1 <-- 2 <-- 3) so anything larger indicates a missing event (it's + # also possible there is no event at a given depth but we can't ever + # know that for sure) + if depth_gap > 1: + number_of_gaps += 1 + + # We only tolerate a small number single-event long gaps in the + # returned events because those are most likely just events we've + # failed to pull in the past. Anything longer than that is probably + # a sign that we're missing a decent chunk of history and we should + # try to backfill it. + # + # XXX: It's possible we could tolerate longer gaps if we checked + # that a given events `prev_events` is one that has failed pull + # attempts and we could just treat it like a dead branch of history + # for now or at least something that we don't need the block the + # client on to try pulling. + # + # XXX: If we had something like MSC3871 to indicate gaps in the + # timeline to the client, we could also get away with any sized gap + # and just have the client refetch the holes as they see fit. + if depth_gap > 2: + found_big_gap = True + break + previous_event_depth = event_depth + + # Backfill in the foreground if we found a big gap, have too many holes, + # or we don't have enough events to fill the limit that the client asked + # for. + missing_too_many_events = ( + number_of_gaps > BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD + ) + not_enough_events_to_fill_response = len(events) < pagin_config.limit + if ( + found_big_gap + or missing_too_many_events + or not_enough_events_to_fill_response + ): + did_backfill = ( + await self.hs.get_federation_handler().maybe_backfill( + room_id, + curr_topo, + limit=pagin_config.limit, + ) + ) + + # If we did backfill something, refetch the events from the database to + # catch anything new that might have been added since we last fetched. + if did_backfill: + events, next_key = await self.store.paginate_room_events( + room_id=room_id, + from_key=from_token.room_key, + to_key=to_room_key, + direction=pagin_config.direction, + limit=pagin_config.limit, + event_filter=event_filter, + ) + else: + # Otherwise, we can backfill in the background for eventual + # consistency's sake but we don't need to block the client waiting + # for a costly federation call and processing. + run_as_background_process( + "maybe_backfill_in_the_background", + self.hs.get_federation_handler().maybe_backfill, + room_id, + curr_topo, + limit=pagin_config.limit, + ) + next_token = from_token.copy_and_replace(StreamKeyType.ROOM, next_key) # if no events are returned from pagination, that implies diff --git a/synapse/handlers/room_batch.py b/synapse/handlers/room_batch.py deleted file mode 100644
index bf9df60218..0000000000 --- a/synapse/handlers/room_batch.py +++ /dev/null
@@ -1,466 +0,0 @@ -import logging -from typing import TYPE_CHECKING, List, Tuple - -from synapse.api.constants import EventContentFields, EventTypes -from synapse.appservice import ApplicationService -from synapse.http.servlet import assert_params_in_dict -from synapse.types import JsonDict, Requester, UserID, create_requester -from synapse.util.stringutils import random_string - -if TYPE_CHECKING: - from synapse.server import HomeServer - -logger = logging.getLogger(__name__) - - -class RoomBatchHandler: - def __init__(self, hs: "HomeServer"): - self.hs = hs - self.store = hs.get_datastores().main - self._state_storage_controller = hs.get_storage_controllers().state - self.event_creation_handler = hs.get_event_creation_handler() - self.room_member_handler = hs.get_room_member_handler() - self.auth = hs.get_auth() - - async def inherit_depth_from_prev_ids(self, prev_event_ids: List[str]) -> int: - """Finds the depth which would sort it after the most-recent - prev_event_id but before the successors of those events. If no - successors are found, we assume it's an historical extremity part of the - current batch and use the same depth of the prev_event_ids. - - Args: - prev_event_ids: List of prev event IDs - - Returns: - Inherited depth - """ - ( - most_recent_prev_event_id, - most_recent_prev_event_depth, - ) = await self.store.get_max_depth_of(prev_event_ids) - - # We want to insert the historical event after the `prev_event` but before the successor event - # - # We inherit depth from the successor event instead of the `prev_event` - # because events returned from `/messages` are first sorted by `topological_ordering` - # which is just the `depth` and then tie-break with `stream_ordering`. - # - # We mark these inserted historical events as "backfilled" which gives them a - # negative `stream_ordering`. If we use the same depth as the `prev_event`, - # then our historical event will tie-break and be sorted before the `prev_event` - # when it should come after. - # - # We want to use the successor event depth so they appear after `prev_event` because - # it has a larger `depth` but before the successor event because the `stream_ordering` - # is negative before the successor event. - assert most_recent_prev_event_id is not None - successor_event_ids = await self.store.get_successor_events( - most_recent_prev_event_id - ) - - # If we can't find any successor events, then it's a forward extremity of - # historical messages and we can just inherit from the previous historical - # event which we can already assume has the correct depth where we want - # to insert into. - if not successor_event_ids: - depth = most_recent_prev_event_depth - else: - ( - _, - oldest_successor_depth, - ) = await self.store.get_min_depth_of(successor_event_ids) - - depth = oldest_successor_depth - - return depth - - def create_insertion_event_dict( - self, sender: str, room_id: str, origin_server_ts: int - ) -> JsonDict: - """Creates an event dict for an "insertion" event with the proper fields - and a random batch ID. - - Args: - sender: The event author MXID - room_id: The room ID that the event belongs to - origin_server_ts: Timestamp when the event was sent - - Returns: - The new event dictionary to insert. - """ - - next_batch_id = random_string(8) - insertion_event = { - "type": EventTypes.MSC2716_INSERTION, - "sender": sender, - "room_id": room_id, - "content": { - EventContentFields.MSC2716_NEXT_BATCH_ID: next_batch_id, - EventContentFields.MSC2716_HISTORICAL: True, - }, - "origin_server_ts": origin_server_ts, - } - - return insertion_event - - async def create_requester_for_user_id_from_app_service( - self, user_id: str, app_service: ApplicationService - ) -> Requester: - """Creates a new requester for the given user_id - and validates that the app service is allowed to control - the given user. - - Args: - user_id: The author MXID that the app service is controlling - app_service: The app service that controls the user - - Returns: - Requester object - """ - - await self.auth.validate_appservice_can_control_user_id(app_service, user_id) - - return create_requester(user_id, app_service=app_service) - - async def get_most_recent_full_state_ids_from_event_id_list( - self, event_ids: List[str] - ) -> List[str]: - """Find the most recent event_id and grab the full state at that event. - We will use this as a base to auth our historical messages against. - - Args: - event_ids: List of event ID's to look at - - Returns: - List of event ID's - """ - - ( - most_recent_event_id, - _, - ) = await self.store.get_max_depth_of(event_ids) - # mapping from (type, state_key) -> state_event_id - assert most_recent_event_id is not None - prev_state_map = await self._state_storage_controller.get_state_ids_for_event( - most_recent_event_id - ) - # List of state event ID's - full_state_ids = list(prev_state_map.values()) - - return full_state_ids - - async def persist_state_events_at_start( - self, - state_events_at_start: List[JsonDict], - room_id: str, - initial_state_event_ids: List[str], - app_service_requester: Requester, - ) -> List[str]: - """Takes all `state_events_at_start` event dictionaries and creates/persists - them in a floating state event chain which don't resolve into the current room - state. They are floating because they reference no prev_events which disconnects - them from the normal DAG. - - Args: - state_events_at_start: - room_id: Room where you want the events persisted in. - initial_state_event_ids: - The base set of state for the historical batch which the floating - state chain will derive from. This should probably be the state - from the `prev_event` defined by `/batch_send?prev_event_id=$abc`. - app_service_requester: The requester of an application service. - - Returns: - List of state event ID's we just persisted - """ - assert app_service_requester.app_service - - state_event_ids_at_start = [] - state_event_ids = initial_state_event_ids.copy() - - # Make the state events float off on their own by specifying no - # prev_events for the first one in the chain so we don't have a bunch of - # `@mxid joined the room` noise between each batch. - prev_event_ids_for_state_chain: List[str] = [] - - for index, state_event in enumerate(state_events_at_start): - assert_params_in_dict( - state_event, ["type", "origin_server_ts", "content", "sender"] - ) - - logger.debug( - "RoomBatchSendEventRestServlet inserting state_event=%s", state_event - ) - - event_dict = { - "type": state_event["type"], - "origin_server_ts": state_event["origin_server_ts"], - "content": state_event["content"], - "room_id": room_id, - "sender": state_event["sender"], - "state_key": state_event["state_key"], - } - - # Mark all events as historical - event_dict["content"][EventContentFields.MSC2716_HISTORICAL] = True - - # TODO: This is pretty much the same as some other code to handle inserting state in this file - if event_dict["type"] == EventTypes.Member: - membership = event_dict["content"].get("membership", None) - event_id, _ = await self.room_member_handler.update_membership( - await self.create_requester_for_user_id_from_app_service( - state_event["sender"], app_service_requester.app_service - ), - target=UserID.from_string(event_dict["state_key"]), - room_id=room_id, - action=membership, - content=event_dict["content"], - historical=True, - # Only the first event in the state chain should be floating. - # The rest should hang off each other in a chain. - allow_no_prev_events=index == 0, - prev_event_ids=prev_event_ids_for_state_chain, - # The first event in the state chain is floating with no - # `prev_events` which means it can't derive state from - # anywhere automatically. So we need to set some state - # explicitly. - # - # Make sure to use a copy of this list because we modify it - # later in the loop here. Otherwise it will be the same - # reference and also update in the event when we append - # later. - state_event_ids=state_event_ids.copy(), - ) - else: - ( - event, - _, - ) = await self.event_creation_handler.create_and_send_nonmember_event( - await self.create_requester_for_user_id_from_app_service( - state_event["sender"], app_service_requester.app_service - ), - event_dict, - historical=True, - # Only the first event in the state chain should be floating. - # The rest should hang off each other in a chain. - allow_no_prev_events=index == 0, - prev_event_ids=prev_event_ids_for_state_chain, - # The first event in the state chain is floating with no - # `prev_events` which means it can't derive state from - # anywhere automatically. So we need to set some state - # explicitly. - # - # Make sure to use a copy of this list because we modify it - # later in the loop here. Otherwise it will be the same - # reference and also update in the event when we append later. - state_event_ids=state_event_ids.copy(), - ) - event_id = event.event_id - - state_event_ids_at_start.append(event_id) - state_event_ids.append(event_id) - # Connect all the state in a floating chain - prev_event_ids_for_state_chain = [event_id] - - return state_event_ids_at_start - - async def persist_historical_events( - self, - events_to_create: List[JsonDict], - room_id: str, - inherited_depth: int, - initial_state_event_ids: List[str], - app_service_requester: Requester, - ) -> List[str]: - """Create and persists all events provided sequentially. Handles the - complexity of creating events in chronological order so they can - reference each other by prev_event but still persists in - reverse-chronoloical order so they have the correct - (topological_ordering, stream_ordering) and sort correctly from - /messages. - - Args: - events_to_create: List of historical events to create in JSON - dictionary format. - room_id: Room where you want the events persisted in. - inherited_depth: The depth to create the events at (you will - probably by calling inherit_depth_from_prev_ids(...)). - initial_state_event_ids: - This is used to set explicit state for the insertion event at - the start of the historical batch since it's floating with no - prev_events to derive state from automatically. - app_service_requester: The requester of an application service. - - Returns: - List of persisted event IDs - """ - assert app_service_requester.app_service - - # We expect the first event in a historical batch to be an insertion event - assert events_to_create[0]["type"] == EventTypes.MSC2716_INSERTION - # We expect the last event in a historical batch to be an batch event - assert events_to_create[-1]["type"] == EventTypes.MSC2716_BATCH - - # Make the historical event chain float off on its own by specifying no - # prev_events for the first event in the chain which causes the HS to - # ask for the state at the start of the batch later. - prev_event_ids: List[str] = [] - - event_ids = [] - events_to_persist = [] - for index, ev in enumerate(events_to_create): - assert_params_in_dict(ev, ["type", "origin_server_ts", "content", "sender"]) - - assert self.hs.is_mine_id(ev["sender"]), "User must be our own: %s" % ( - ev["sender"], - ) - - event_dict = { - "type": ev["type"], - "origin_server_ts": ev["origin_server_ts"], - "content": ev["content"], - "room_id": room_id, - "sender": ev["sender"], # requester.user.to_string(), - "prev_events": prev_event_ids.copy(), - } - - # Mark all events as historical - event_dict["content"][EventContentFields.MSC2716_HISTORICAL] = True - - event, unpersisted_context = await self.event_creation_handler.create_event( - await self.create_requester_for_user_id_from_app_service( - ev["sender"], app_service_requester.app_service - ), - event_dict, - # Only the first event (which is the insertion event) in the - # chain should be floating. The rest should hang off each other - # in a chain. - allow_no_prev_events=index == 0, - prev_event_ids=event_dict.get("prev_events"), - # Since the first event (which is the insertion event) in the - # chain is floating with no `prev_events`, it can't derive state - # from anywhere automatically. So we need to set some state - # explicitly. - state_event_ids=initial_state_event_ids if index == 0 else None, - historical=True, - depth=inherited_depth, - ) - context = await unpersisted_context.persist(event) - assert context._state_group - - # Normally this is done when persisting the event but we have to - # pre-emptively do it here because we create all the events first, - # then persist them in another pass below. And we want to share - # state_groups across the whole batch so this lookup needs to work - # for the next event in the batch in this loop. - await self.store.store_state_group_id_for_event_id( - event_id=event.event_id, - state_group_id=context._state_group, - ) - - logger.debug( - "RoomBatchSendEventRestServlet inserting event=%s, prev_event_ids=%s", - event, - prev_event_ids, - ) - - events_to_persist.append((event, context)) - event_id = event.event_id - - event_ids.append(event_id) - prev_event_ids = [event_id] - - # Persist events in reverse-chronological order so they have the - # correct stream_ordering as they are backfilled (which decrements). - # Events are sorted by (topological_ordering, stream_ordering) - # where topological_ordering is just depth. - for event, context in reversed(events_to_persist): - # This call can't raise `PartialStateConflictError` since we forbid - # use of the historical batch API during partial state - await self.event_creation_handler.handle_new_client_event( - await self.create_requester_for_user_id_from_app_service( - event.sender, app_service_requester.app_service - ), - events_and_context=[(event, context)], - ) - - return event_ids - - async def handle_batch_of_events( - self, - events_to_create: List[JsonDict], - room_id: str, - batch_id_to_connect_to: str, - inherited_depth: int, - initial_state_event_ids: List[str], - app_service_requester: Requester, - ) -> Tuple[List[str], str]: - """ - Handles creating and persisting all of the historical events as well as - insertion and batch meta events to make the batch navigable in the DAG. - - Args: - events_to_create: List of historical events to create in JSON - dictionary format. - room_id: Room where you want the events created in. - batch_id_to_connect_to: The batch_id from the insertion event you - want this batch to connect to. - inherited_depth: The depth to create the events at (you will - probably by calling inherit_depth_from_prev_ids(...)). - initial_state_event_ids: - This is used to set explicit state for the insertion event at - the start of the historical batch since it's floating with no - prev_events to derive state from automatically. This should - probably be the state from the `prev_event` defined by - `/batch_send?prev_event_id=$abc` plus the outcome of - `persist_state_events_at_start` - app_service_requester: The requester of an application service. - - Returns: - Tuple containing a list of created events and the next_batch_id - """ - - # Connect this current batch to the insertion event from the previous batch - last_event_in_batch = events_to_create[-1] - batch_event = { - "type": EventTypes.MSC2716_BATCH, - "sender": app_service_requester.user.to_string(), - "room_id": room_id, - "content": { - EventContentFields.MSC2716_BATCH_ID: batch_id_to_connect_to, - EventContentFields.MSC2716_HISTORICAL: True, - }, - # Since the batch event is put at the end of the batch, - # where the newest-in-time event is, copy the origin_server_ts from - # the last event we're inserting - "origin_server_ts": last_event_in_batch["origin_server_ts"], - } - # Add the batch event to the end of the batch (newest-in-time) - events_to_create.append(batch_event) - - # Add an "insertion" event to the start of each batch (next to the oldest-in-time - # event in the batch) so the next batch can be connected to this one. - insertion_event = self.create_insertion_event_dict( - sender=app_service_requester.user.to_string(), - room_id=room_id, - # Since the insertion event is put at the start of the batch, - # where the oldest-in-time event is, copy the origin_server_ts from - # the first event we're inserting - origin_server_ts=events_to_create[0]["origin_server_ts"], - ) - next_batch_id = insertion_event["content"][ - EventContentFields.MSC2716_NEXT_BATCH_ID - ] - # Prepend the insertion event to the start of the batch (oldest-in-time) - events_to_create = [insertion_event] + events_to_create - - # Create and persist all of the historical events - event_ids = await self.persist_historical_events( - events_to_create=events_to_create, - room_id=room_id, - inherited_depth=inherited_depth, - initial_state_event_ids=initial_state_event_ids, - app_service_requester=app_service_requester, - ) - - return event_ids, next_batch_id diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index af0ca5c26d..82e4fa7363 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py
@@ -362,7 +362,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): content: Optional[dict] = None, require_consent: bool = True, outlier: bool = False, - historical: bool = False, origin_server_ts: Optional[int] = None, ) -> Tuple[str, int]: """ @@ -378,16 +377,13 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): allow_no_prev_events: Whether to allow this event to be created an empty list of prev_events. Normally this is prohibited just because most events should have a prev_event and we should only use this in special - cases like MSC2716. + cases (previously useful for MSC2716). prev_event_ids: The event IDs to use as the prev events state_event_ids: - The full state at a given event. This is used particularly by the MSC2716 - /batch_send endpoint. One use case is the historical `state_events_at_start`; - since each is marked as an `outlier`, the `EventContext.for_outlier()` won't - have any `state_ids` set and therefore can't derive any state even though the - prev_events are set so we need to set them ourself via this argument. - This should normally be left as None, which will cause the auth_event_ids - to be calculated based on the room state at the prev_events. + The full state at a given event. This was previously used particularly + by the MSC2716 /batch_send endpoint. This should normally be left as + None, which will cause the auth_event_ids to be calculated based on the + room state at the prev_events. depth: Override the depth used to order the event in the DAG. Should normally be set to None, which will cause the depth to be calculated based on the prev_events. @@ -400,9 +396,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): outlier: Indicates whether the event is an `outlier`, i.e. if it's from an arbitrary point and floating in the DAG as opposed to being inline with the current DAG. - historical: Indicates whether the message is being inserted - back in time around some existing events. This is used to skip - a few checks and mark the event as backfilled. origin_server_ts: The origin_server_ts to use if a new event is created. Uses the current timestamp if set to None. @@ -477,7 +470,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): depth=depth, require_consent=require_consent, outlier=outlier, - historical=historical, ) context = await unpersisted_context.persist(event) prev_state_ids = await context.get_prev_state_ids( @@ -585,7 +577,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): new_room: bool = False, require_consent: bool = True, outlier: bool = False, - historical: bool = False, allow_no_prev_events: bool = False, prev_event_ids: Optional[List[str]] = None, state_event_ids: Optional[List[str]] = None, @@ -610,22 +601,16 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): outlier: Indicates whether the event is an `outlier`, i.e. if it's from an arbitrary point and floating in the DAG as opposed to being inline with the current DAG. - historical: Indicates whether the message is being inserted - back in time around some existing events. This is used to skip - a few checks and mark the event as backfilled. allow_no_prev_events: Whether to allow this event to be created an empty list of prev_events. Normally this is prohibited just because most events should have a prev_event and we should only use this in special - cases like MSC2716. + cases (previously useful for MSC2716). prev_event_ids: The event IDs to use as the prev events state_event_ids: - The full state at a given event. This is used particularly by the MSC2716 - /batch_send endpoint. One use case is the historical `state_events_at_start`; - since each is marked as an `outlier`, the `EventContext.for_outlier()` won't - have any `state_ids` set and therefore can't derive any state even though the - prev_events are set so we need to set them ourself via this argument. - This should normally be left as None, which will cause the auth_event_ids - to be calculated based on the room state at the prev_events. + The full state at a given event. This was previously used particularly + by the MSC2716 /batch_send endpoint. This should normally be left as + None, which will cause the auth_event_ids to be calculated based on the + room state at the prev_events. depth: Override the depth used to order the event in the DAG. Should normally be set to None, which will cause the depth to be calculated based on the prev_events. @@ -667,7 +652,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): new_room=new_room, require_consent=require_consent, outlier=outlier, - historical=historical, allow_no_prev_events=allow_no_prev_events, prev_event_ids=prev_event_ids, state_event_ids=state_event_ids, @@ -691,7 +675,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): new_room: bool = False, require_consent: bool = True, outlier: bool = False, - historical: bool = False, allow_no_prev_events: bool = False, prev_event_ids: Optional[List[str]] = None, state_event_ids: Optional[List[str]] = None, @@ -718,22 +701,16 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): outlier: Indicates whether the event is an `outlier`, i.e. if it's from an arbitrary point and floating in the DAG as opposed to being inline with the current DAG. - historical: Indicates whether the message is being inserted - back in time around some existing events. This is used to skip - a few checks and mark the event as backfilled. allow_no_prev_events: Whether to allow this event to be created an empty list of prev_events. Normally this is prohibited just because most events should have a prev_event and we should only use this in special - cases like MSC2716. + cases (previously useful for MSC2716). prev_event_ids: The event IDs to use as the prev events state_event_ids: - The full state at a given event. This is used particularly by the MSC2716 - /batch_send endpoint. One use case is the historical `state_events_at_start`; - since each is marked as an `outlier`, the `EventContext.for_outlier()` won't - have any `state_ids` set and therefore can't derive any state even though the - prev_events are set so we need to set them ourself via this argument. - This should normally be left as None, which will cause the auth_event_ids - to be calculated based on the room state at the prev_events. + The full state at a given event. This was previously used particularly + by the MSC2716 /batch_send endpoint. This should normally be left as + None, which will cause the auth_event_ids to be calculated based on the + room state at the prev_events. depth: Override the depth used to order the event in the DAG. Should normally be set to None, which will cause the depth to be calculated based on the prev_events. @@ -877,7 +854,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): content=content, require_consent=require_consent, outlier=outlier, - historical=historical, origin_server_ts=origin_server_ts, ) @@ -1498,7 +1474,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): # put the server which owns the alias at the front of the server list. if room_alias.domain in servers: servers.remove(room_alias.domain) - servers.insert(0, room_alias.domain) + servers.insert(0, room_alias.domain) return RoomID.from_string(room_id), servers diff --git a/synapse/http/federation/matrix_federation_agent.py b/synapse/http/federation/matrix_federation_agent.py
index 7e8cf31682..91a24efcd0 100644 --- a/synapse/http/federation/matrix_federation_agent.py +++ b/synapse/http/federation/matrix_federation_agent.py
@@ -51,8 +51,10 @@ logger = logging.getLogger(__name__) @implementer(IAgent) class MatrixFederationAgent: """An Agent-like thing which provides a `request` method which correctly - handles resolving matrix server names when using matrix://. Handles standard - https URIs as normal. + handles resolving matrix server names when using `matrix-federation://`. Handles + standard https URIs as normal. The `matrix-federation://` scheme is internal to + Synapse and we purposely want to avoid colliding with the `matrix://` URL scheme + which is now specced. Doesn't implement any retries. (Those are done in MatrixFederationHttpClient.) @@ -167,14 +169,14 @@ class MatrixFederationAgent: # There must be a valid hostname. assert parsed_uri.hostname - # If this is a matrix:// URI check if the server has delegated matrix + # If this is a matrix-federation:// URI check if the server has delegated matrix # traffic using well-known delegation. # # We have to do this here and not in the endpoint as we need to rewrite # the host header with the delegated server name. delegated_server = None if ( - parsed_uri.scheme == b"matrix" + parsed_uri.scheme == b"matrix-federation" and not _is_ip_literal(parsed_uri.hostname) and not parsed_uri.port ): @@ -250,7 +252,7 @@ class MatrixHostnameEndpointFactory: @implementer(IStreamClientEndpoint) class MatrixHostnameEndpoint: - """An endpoint that resolves matrix:// URLs using Matrix server name + """An endpoint that resolves matrix-federation:// URLs using Matrix server name resolution (i.e. via SRV). Does not check for well-known delegation. Args: @@ -379,7 +381,7 @@ class MatrixHostnameEndpoint: connect to. """ - if self._parsed_uri.scheme != b"matrix": + if self._parsed_uri.scheme != b"matrix-federation": return [Server(host=self._parsed_uri.host, port=self._parsed_uri.port)] # Note: We don't do well-known lookup as that needs to have happened diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index abb5ae5815..fc0101808d 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py
@@ -174,7 +174,14 @@ class MatrixFederationRequest: # The object is frozen so we can pre-compute this. uri = urllib.parse.urlunparse( - (b"matrix", destination_bytes, path_bytes, None, query_bytes, b"") + ( + b"matrix-federation", + destination_bytes, + path_bytes, + None, + query_bytes, + b"", + ) ) object.__setattr__(self, "uri", uri) diff --git a/synapse/media/_base.py b/synapse/media/_base.py
index ef8334ae25..20cb8b9010 100644 --- a/synapse/media/_base.py +++ b/synapse/media/_base.py
@@ -152,6 +152,9 @@ def add_file_headers( content_type = media_type request.setHeader(b"Content-Type", content_type.encode("UTF-8")) + + # Use a Content-Disposition of attachment to force download of media. + disposition = "attachment" if upload_name: # RFC6266 section 4.1 [1] defines both `filename` and `filename*`. # @@ -173,11 +176,17 @@ def add_file_headers( # correctly interpret those as of 0.99.2 and (b) they are a bit of a pain and we # may as well just do the filename* version. if _can_encode_filename_as_token(upload_name): - disposition = "inline; filename=%s" % (upload_name,) + disposition = "%s; filename=%s" % ( + disposition, + upload_name, + ) else: - disposition = "inline; filename*=utf-8''%s" % (_quote(upload_name),) + disposition = "%s; filename*=utf-8''%s" % ( + disposition, + _quote(upload_name), + ) - request.setHeader(b"Content-Disposition", disposition.encode("ascii")) + request.setHeader(b"Content-Disposition", disposition.encode("ascii")) # cache for at least a day. # XXX: we might want to turn this off for data we don't want to diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index 33002cc0f2..67377c647b 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -322,7 +322,6 @@ class BulkPushRuleEvaluator: ) -> None: if ( not event.internal_metadata.is_notifiable() - or event.internal_metadata.is_historical() or event.room_id in self.hs.config.server.rooms_to_exclude_from_sync ): # Push rules for events that aren't notifiable can't be processed by this and diff --git a/synapse/push/clientformat.py b/synapse/push/clientformat.py
index 88b52c26a0..735cef0aed 100644 --- a/synapse/push/clientformat.py +++ b/synapse/push/clientformat.py
@@ -41,12 +41,7 @@ def format_push_rules_for_user( rulearray.append(template_rule) - for type_key in ("pattern", "value"): - type_value = template_rule.pop(f"{type_key}_type", None) - if type_value == "user_id": - template_rule[type_key] = user.to_string() - elif type_value == "user_localpart": - template_rule[type_key] = user.localpart + _convert_type_to_value(template_rule, user) template_rule["enabled"] = enabled @@ -63,19 +58,20 @@ def format_push_rules_for_user( for c in template_rule["conditions"]: c.pop("_cache_key", None) - pattern_type = c.pop("pattern_type", None) - if pattern_type == "user_id": - c["pattern"] = user.to_string() - elif pattern_type == "user_localpart": - c["pattern"] = user.localpart - - sender_type = c.pop("sender_type", None) - if sender_type == "user_id": - c["sender"] = user.to_string() + _convert_type_to_value(c, user) return rules +def _convert_type_to_value(rule_or_cond: Dict[str, Any], user: UserID) -> None: + for type_key in ("pattern", "value"): + type_value = rule_or_cond.pop(f"{type_key}_type", None) + if type_value == "user_id": + rule_or_cond[type_key] = user.to_string() + elif type_value == "user_localpart": + rule_or_cond[type_key] = user.localpart + + def _add_empty_priority_class_arrays(d: Dict[str, list]) -> Dict[str, list]: for pc in PRIORITY_CLASS_MAP.keys(): d[pc] = [] diff --git a/synapse/rest/__init__.py b/synapse/rest/__init__.py
index 1af8d99d20..df0845edb2 100644 --- a/synapse/rest/__init__.py +++ b/synapse/rest/__init__.py
@@ -48,7 +48,6 @@ from synapse.rest.client import ( rendezvous, report_event, room, - room_batch, room_keys, room_upgrade_rest_servlet, sendtodevice, @@ -132,7 +131,6 @@ class ClientRestResource(JsonResource): user_directory.register_servlets(hs, client_resource) if is_main_process: room_upgrade_rest_servlet.register_servlets(hs, client_resource) - room_batch.register_servlets(hs, client_resource) capabilities.register_servlets(hs, client_resource) if is_main_process: account_validity.register_servlets(hs, client_resource) diff --git a/synapse/rest/client/room_batch.py b/synapse/rest/client/room_batch.py deleted file mode 100644
index 69f85112d8..0000000000 --- a/synapse/rest/client/room_batch.py +++ /dev/null
@@ -1,254 +0,0 @@ -# Copyright 2016 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging -import re -from http import HTTPStatus -from typing import TYPE_CHECKING, Tuple - -from synapse.api.constants import EventContentFields -from synapse.api.errors import AuthError, Codes, SynapseError -from synapse.http.server import HttpServer -from synapse.http.servlet import ( - RestServlet, - assert_params_in_dict, - parse_json_object_from_request, - parse_string, - parse_strings_from_args, -) -from synapse.http.site import SynapseRequest -from synapse.types import JsonDict - -if TYPE_CHECKING: - from synapse.server import HomeServer - -logger = logging.getLogger(__name__) - - -class RoomBatchSendEventRestServlet(RestServlet): - """ - API endpoint which can insert a batch of events historically back in time - next to the given `prev_event`. - - `batch_id` comes from `next_batch_id `in the response of the batch send - endpoint and is derived from the "insertion" events added to each batch. - It's not required for the first batch send. - - `state_events_at_start` is used to define the historical state events - needed to auth the events like join events. These events will float - outside of the normal DAG as outlier's and won't be visible in the chat - history which also allows us to insert multiple batches without having a bunch - of `@mxid joined the room` noise between each batch. - - `events` is chronological list of events you want to insert. - There is a reverse-chronological constraint on batches so once you insert - some messages, you can only insert older ones after that. - tldr; Insert batches from your most recent history -> oldest history. - - POST /_matrix/client/unstable/org.matrix.msc2716/rooms/<roomID>/batch_send?prev_event_id=<eventID>&batch_id=<batchID> - { - "events": [ ... ], - "state_events_at_start": [ ... ] - } - """ - - PATTERNS = ( - re.compile( - "^/_matrix/client/unstable/org.matrix.msc2716" - "/rooms/(?P<room_id>[^/]*)/batch_send$" - ), - ) - CATEGORY = "Client API requests" - - def __init__(self, hs: "HomeServer"): - super().__init__() - self.store = hs.get_datastores().main - self.event_creation_handler = hs.get_event_creation_handler() - self.auth = hs.get_auth() - self.room_batch_handler = hs.get_room_batch_handler() - - async def on_POST( - self, request: SynapseRequest, room_id: str - ) -> Tuple[int, JsonDict]: - requester = await self.auth.get_user_by_req(request, allow_guest=False) - - if not requester.app_service: - raise AuthError( - HTTPStatus.FORBIDDEN, - "Only application services can use the /batchsend endpoint", - ) - - body = parse_json_object_from_request(request) - assert_params_in_dict(body, ["state_events_at_start", "events"]) - - assert request.args is not None - prev_event_ids_from_query = parse_strings_from_args( - request.args, "prev_event_id" - ) - batch_id_from_query = parse_string(request, "batch_id") - - if prev_event_ids_from_query is None: - raise SynapseError( - HTTPStatus.BAD_REQUEST, - "prev_event query parameter is required when inserting historical messages back in time", - errcode=Codes.MISSING_PARAM, - ) - - if await self.store.is_partial_state_room(room_id): - raise SynapseError( - HTTPStatus.BAD_REQUEST, - "Cannot insert history batches until we have fully joined the room", - errcode=Codes.UNABLE_DUE_TO_PARTIAL_STATE, - ) - - # Verify the batch_id_from_query corresponds to an actual insertion event - # and have the batch connected. - if batch_id_from_query: - corresponding_insertion_event_id = ( - await self.store.get_insertion_event_id_by_batch_id( - room_id, batch_id_from_query - ) - ) - if corresponding_insertion_event_id is None: - raise SynapseError( - HTTPStatus.BAD_REQUEST, - "No insertion event corresponds to the given ?batch_id", - errcode=Codes.INVALID_PARAM, - ) - - # Make sure that the prev_event_ids exist and aren't outliers - ie, they are - # regular parts of the room DAG where we know the state. - non_outlier_prev_events = await self.store.have_events_in_timeline( - prev_event_ids_from_query - ) - for prev_event_id in prev_event_ids_from_query: - if prev_event_id not in non_outlier_prev_events: - raise SynapseError( - HTTPStatus.BAD_REQUEST, - "prev_event %s does not exist, or is an outlier" % (prev_event_id,), - errcode=Codes.INVALID_PARAM, - ) - - # For the event we are inserting next to (`prev_event_ids_from_query`), - # find the most recent state events that allowed that message to be - # sent. We will use that as a base to auth our historical messages - # against. - state_event_ids = await self.room_batch_handler.get_most_recent_full_state_ids_from_event_id_list( - prev_event_ids_from_query - ) - - state_event_ids_at_start = [] - # Create and persist all of the state events that float off on their own - # before the batch. These will most likely be all of the invite/member - # state events used to auth the upcoming historical messages. - if body["state_events_at_start"]: - state_event_ids_at_start = ( - await self.room_batch_handler.persist_state_events_at_start( - state_events_at_start=body["state_events_at_start"], - room_id=room_id, - initial_state_event_ids=state_event_ids, - app_service_requester=requester, - ) - ) - # Update our ongoing auth event ID list with all of the new state we - # just created - state_event_ids.extend(state_event_ids_at_start) - - inherited_depth = await self.room_batch_handler.inherit_depth_from_prev_ids( - prev_event_ids_from_query - ) - - events_to_create = body["events"] - - # Figure out which batch to connect to. If they passed in - # batch_id_from_query let's use it. The batch ID passed in comes - # from the batch_id in the "insertion" event from the previous batch. - last_event_in_batch = events_to_create[-1] - base_insertion_event = None - if batch_id_from_query: - batch_id_to_connect_to = batch_id_from_query - # Otherwise, create an insertion event to act as a starting point. - # - # We don't always have an insertion event to start hanging more history - # off of (ideally there would be one in the main DAG, but that's not the - # case if we're wanting to add history to e.g. existing rooms without - # an insertion event), in which case we just create a new insertion event - # that can then get pointed to by a "marker" event later. - else: - base_insertion_event_dict = ( - self.room_batch_handler.create_insertion_event_dict( - sender=requester.user.to_string(), - room_id=room_id, - origin_server_ts=last_event_in_batch["origin_server_ts"], - ) - ) - base_insertion_event_dict["prev_events"] = prev_event_ids_from_query.copy() - - ( - base_insertion_event, - _, - ) = await self.event_creation_handler.create_and_send_nonmember_event( - await self.room_batch_handler.create_requester_for_user_id_from_app_service( - base_insertion_event_dict["sender"], - requester.app_service, - ), - base_insertion_event_dict, - prev_event_ids=base_insertion_event_dict.get("prev_events"), - # Also set the explicit state here because we want to resolve - # any `state_events_at_start` here too. It's not strictly - # necessary to accomplish anything but if someone asks for the - # state at this point, we probably want to show them the - # historical state that was part of this batch. - state_event_ids=state_event_ids, - historical=True, - depth=inherited_depth, - ) - - batch_id_to_connect_to = base_insertion_event.content[ - EventContentFields.MSC2716_NEXT_BATCH_ID - ] - - # Create and persist all of the historical events as well as insertion - # and batch meta events to make the batch navigable in the DAG. - event_ids, next_batch_id = await self.room_batch_handler.handle_batch_of_events( - events_to_create=events_to_create, - room_id=room_id, - batch_id_to_connect_to=batch_id_to_connect_to, - inherited_depth=inherited_depth, - initial_state_event_ids=state_event_ids, - app_service_requester=requester, - ) - - insertion_event_id = event_ids[0] - batch_event_id = event_ids[-1] - historical_event_ids = event_ids[1:-1] - - response_dict = { - "state_event_ids": state_event_ids_at_start, - "event_ids": historical_event_ids, - "next_batch_id": next_batch_id, - "insertion_event_id": insertion_event_id, - "batch_event_id": batch_event_id, - } - if base_insertion_event is not None: - response_dict["base_insertion_event_id"] = base_insertion_event.event_id - - return HTTPStatus.OK, response_dict - - -def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: - msc2716_enabled = hs.config.experimental.msc2716_enabled - - if msc2716_enabled: - RoomBatchSendEventRestServlet(hs).register(http_server) diff --git a/synapse/rest/client/versions.py b/synapse/rest/client/versions.py
index 1910648755..95400ba570 100644 --- a/synapse/rest/client/versions.py +++ b/synapse/rest/client/versions.py
@@ -102,8 +102,6 @@ class VersionsRestServlet(RestServlet): "org.matrix.msc2285.stable": True, # TODO: Remove when MSC2285 becomes a part of the spec # Supports filtering of /publicRooms by room type as per MSC3827 "org.matrix.msc3827.stable": True, - # Adds support for importing historical messages as per MSC2716 - "org.matrix.msc2716": self.config.experimental.msc2716_enabled, # Adds support for thread relations, per MSC3440. "org.matrix.msc3440.stable": True, # TODO: remove when "v1.3" is added above # Support for thread read receipts & notification counts. diff --git a/synapse/server.py b/synapse/server.py
index 0f36ef69cb..b72b76a38b 100644 --- a/synapse/server.py +++ b/synapse/server.py
@@ -91,7 +91,6 @@ from synapse.handlers.room import ( RoomShutdownHandler, TimestampLookupHandler, ) -from synapse.handlers.room_batch import RoomBatchHandler from synapse.handlers.room_list import RoomListHandler from synapse.handlers.room_member import ( RoomForgetterHandler, @@ -493,10 +492,6 @@ class HomeServer(metaclass=abc.ABCMeta): return RoomCreationHandler(self) @cache_in_self - def get_room_batch_handler(self) -> RoomBatchHandler: - return RoomBatchHandler(self) - - @cache_in_self def get_room_shutdown_handler(self) -> RoomShutdownHandler: return RoomShutdownHandler(self) diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py
index f1d2c71c91..35c0680365 100644 --- a/synapse/storage/controllers/persist_events.py +++ b/synapse/storage/controllers/persist_events.py
@@ -839,9 +839,8 @@ class EventsPersistenceStorageController: "group" % (ev.event_id,) ) continue - - if ctx.prev_group: - state_group_deltas[(ctx.prev_group, ctx.state_group)] = ctx.delta_ids + if ctx.state_group_deltas: + state_group_deltas.update(ctx.state_group_deltas) # We need to map the event_ids to their state groups. First, let's # check if the event is one we're persisting, in which case we can diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index 10fa6c4802..7e49ae11bc 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py
@@ -1529,7 +1529,7 @@ class DatabasePool: # Lock the table just once, to prevent it being done once per row. # Note that, according to Postgres' documentation, once obtained, # the lock is held for the remainder of the current transaction. - self.engine.lock_table(txn, "user_ips") + self.engine.lock_table(txn, table) for keyv, valv in zip(key_values, value_values): _keys = dict(zip(key_names, keyv)) diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py
index 0032a92f49..3a10c265c9 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py
@@ -61,7 +61,6 @@ from .registration import RegistrationStore from .rejections import RejectionsStore from .relations import RelationsStore from .room import RoomStore -from .room_batch import RoomBatchStore from .roommember import RoomMemberStore from .search import SearchStore from .session import SessionStore @@ -87,7 +86,6 @@ class DataStore( DeviceStore, RoomMemberStore, RoomStore, - RoomBatchStore, RegistrationStore, ProfileStore, PresenceStore, diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index 2681917d0b..8b6e3c1dc7 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py
@@ -31,7 +31,7 @@ from typing import ( import attr from prometheus_client import Counter, Gauge -from synapse.api.constants import MAX_DEPTH, EventTypes +from synapse.api.constants import MAX_DEPTH from synapse.api.errors import StoreError from synapse.api.room_versions import EventFormatVersions, RoomVersion from synapse.events import EventBase, make_event_from_dict @@ -891,124 +891,6 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas room_id, ) - @trace - async def get_insertion_event_backward_extremities_in_room( - self, - room_id: str, - current_depth: int, - limit: int, - ) -> List[Tuple[str, int]]: - """ - Get the insertion events we know about that we haven't backfilled yet - along with the approximate depth. Only returns insertion events that are - at a depth lower than or equal to the `current_depth`. Sorted by depth, - highest to lowest (descending) so the closest events to the - `current_depth` are first in the list. - - We ignore insertion events that are newer than the user's current scroll - position (ie, those with depth greater than `current_depth`) as: - 1. we don't really care about getting events that have happened - after our current position; and - 2. by the nature of paginating and scrolling back, we have likely - previously tried and failed to backfill from that insertion event, so - to avoid getting "stuck" requesting the same backfill repeatedly - we drop those insertion event. - - Args: - room_id: Room where we want to find the oldest events - current_depth: The depth at the user's current scrollback position - limit: The max number of insertion event extremities to return - - Returns: - List of (event_id, depth) tuples. Sorted by depth, highest to lowest - (descending) so the closest events to the `current_depth` are first - in the list. - """ - - def get_insertion_event_backward_extremities_in_room_txn( - txn: LoggingTransaction, room_id: str - ) -> List[Tuple[str, int]]: - if isinstance(self.database_engine, PostgresEngine): - least_function = "LEAST" - elif isinstance(self.database_engine, Sqlite3Engine): - least_function = "MIN" - else: - raise RuntimeError("Unknown database engine") - - sql = f""" - SELECT - insertion_event_extremity.event_id, event.depth - /* We only want insertion events that are also marked as backwards extremities */ - FROM insertion_event_extremities AS insertion_event_extremity - /* Get the depth of the insertion event from the events table */ - INNER JOIN events AS event USING (event_id) - /** - * We use this info to make sure we don't retry to use a backfill point - * if we've already attempted to backfill from it recently. - */ - LEFT JOIN event_failed_pull_attempts AS failed_backfill_attempt_info - ON - failed_backfill_attempt_info.room_id = insertion_event_extremity.room_id - AND failed_backfill_attempt_info.event_id = insertion_event_extremity.event_id - WHERE - insertion_event_extremity.room_id = ? - /** - * We only want extremities that are older than or at - * the same position of the given `current_depth` (where older - * means less than the given depth) because we're looking backwards - * from the `current_depth` when backfilling. - * - * current_depth (ignore events that come after this, ignore 2-4) - * | - * ▼ - * <oldest-in-time> [0]<--[1]<--[2]<--[3]<--[4] <newest-in-time> - */ - AND event.depth <= ? /* current_depth */ - /** - * Exponential back-off (up to the upper bound) so we don't retry the - * same backfill point over and over. ex. 2hr, 4hr, 8hr, 16hr, etc - * - * We use `1 << n` as a power of 2 equivalent for compatibility - * with older SQLites. The left shift equivalent only works with - * powers of 2 because left shift is a binary operation (base-2). - * Otherwise, we would use `power(2, n)` or the power operator, `2^n`. - */ - AND ( - failed_backfill_attempt_info.event_id IS NULL - OR ? /* current_time */ >= failed_backfill_attempt_info.last_attempt_ts + ( - (1 << {least_function}(failed_backfill_attempt_info.num_attempts, ? /* max doubling steps */)) - * ? /* step */ - ) - ) - /** - * Sort from highest (closest to the `current_depth`) to the lowest depth - * because the closest are most relevant to backfill from first. - * Then tie-break on alphabetical order of the event_ids so we get a - * consistent ordering which is nice when asserting things in tests. - */ - ORDER BY event.depth DESC, insertion_event_extremity.event_id DESC - LIMIT ? - """ - - txn.execute( - sql, - ( - room_id, - current_depth, - self._clock.time_msec(), - BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS, - BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_MILLISECONDS, - limit, - ), - ) - return cast(List[Tuple[str, int]], txn.fetchall()) - - return await self.db_pool.runInteraction( - "get_insertion_event_backward_extremities_in_room", - get_insertion_event_backward_extremities_in_room_txn, - room_id, - ) - async def get_max_depth_of( self, event_ids: Collection[str] ) -> Tuple[Optional[str], int]: @@ -1280,50 +1162,6 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas return event_ids - def _get_connected_batch_event_backfill_results_txn( - self, txn: LoggingTransaction, insertion_event_id: str, limit: int - ) -> List[BackfillQueueNavigationItem]: - """ - Find any batch connections of a given insertion event. - A batch event points at a insertion event via: - batch_event.content[MSC2716_BATCH_ID] -> insertion_event.content[MSC2716_NEXT_BATCH_ID] - - Args: - txn: The database transaction to use - insertion_event_id: The event ID to navigate from. We will find - batch events that point back at this insertion event. - limit: Max number of event ID's to query for and return - - Returns: - List of batch events that the backfill queue can process - """ - batch_connection_query = """ - SELECT e.depth, e.stream_ordering, c.event_id, e.type FROM insertion_events AS i - /* Find the batch that connects to the given insertion event */ - INNER JOIN batch_events AS c - ON i.next_batch_id = c.batch_id - /* Get the depth of the batch start event from the events table */ - INNER JOIN events AS e ON c.event_id = e.event_id - /* Find an insertion event which matches the given event_id */ - WHERE i.event_id = ? - LIMIT ? - """ - - # Find any batch connections for the given insertion event - txn.execute( - batch_connection_query, - (insertion_event_id, limit), - ) - return [ - BackfillQueueNavigationItem( - depth=row[0], - stream_ordering=row[1], - event_id=row[2], - type=row[3], - ) - for row in txn - ] - def _get_connected_prev_event_backfill_results_txn( self, txn: LoggingTransaction, event_id: str, limit: int ) -> List[BackfillQueueNavigationItem]: @@ -1472,40 +1310,6 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas event_id_results.add(event_id) - # Try and find any potential historical batches of message history. - if self.hs.config.experimental.msc2716_enabled: - # We need to go and try to find any batch events connected - # to a given insertion event (by batch_id). If we find any, we'll - # add them to the queue and navigate up the DAG like normal in the - # next iteration of the loop. - if event_type == EventTypes.MSC2716_INSERTION: - # Find any batch connections for the given insertion event - connected_batch_event_backfill_results = ( - self._get_connected_batch_event_backfill_results_txn( - txn, event_id, limit - len(event_id_results) - ) - ) - logger.debug( - "_get_backfill_events(room_id=%s): connected_batch_event_backfill_results=%s", - room_id, - connected_batch_event_backfill_results, - ) - for ( - connected_batch_event_backfill_item - ) in connected_batch_event_backfill_results: - if ( - connected_batch_event_backfill_item.event_id - not in event_id_results - ): - queue.put( - ( - -connected_batch_event_backfill_item.depth, - -connected_batch_event_backfill_item.stream_ordering, - connected_batch_event_backfill_item.event_id, - connected_batch_event_backfill_item.type, - ) - ) - # Now we just look up the DAG by prev_events as normal connected_prev_event_backfill_results = ( self._get_connected_prev_event_backfill_results_txn( @@ -1748,19 +1552,6 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas _delete_old_forward_extrem_cache_txn, ) - @trace - async def insert_insertion_extremity(self, event_id: str, room_id: str) -> None: - await self.db_pool.simple_upsert( - table="insertion_event_extremities", - keyvalues={"event_id": event_id}, - values={ - "event_id": event_id, - "room_id": room_id, - }, - insertion_values={}, - desc="insert_insertion_extremity", - ) - async def insert_received_event_to_staging( self, origin: str, event: EventBase ) -> None: diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index e2e6eb479f..5c9db7554e 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py
@@ -1664,9 +1664,6 @@ class PersistEventsStore: self._handle_event_relations(txn, event) - self._handle_insertion_event(txn, event) - self._handle_batch_event(txn, event) - # Store the labels for this event. labels = event.content.get(EventContentFields.LABELS) if labels: @@ -1729,13 +1726,22 @@ class PersistEventsStore: if not row["rejects"] and not row["redacts"]: to_prefill.append(EventCacheEntry(event=event, redacted_event=None)) - async def prefill() -> None: + async def external_prefill() -> None: + for cache_entry in to_prefill: + await self.store._get_event_cache.set_external( + (cache_entry.event.event_id,), cache_entry + ) + + def local_prefill() -> None: for cache_entry in to_prefill: - await self.store._get_event_cache.set( + self.store._get_event_cache.set_local( (cache_entry.event.event_id,), cache_entry ) - txn.async_call_after(prefill) + # The order these are called here is not as important as knowing that after the + # transaction is finished, the async_call_after will run before the call_after. + txn.async_call_after(external_prefill) + txn.call_after(local_prefill) def _store_redaction(self, txn: LoggingTransaction, event: EventBase) -> None: assert event.redacts is not None @@ -1918,128 +1924,6 @@ class PersistEventsStore: ), ) - def _handle_insertion_event( - self, txn: LoggingTransaction, event: EventBase - ) -> None: - """Handles keeping track of insertion events and edges/connections. - Part of MSC2716. - - Args: - txn: The database transaction object - event: The event to process - """ - - if event.type != EventTypes.MSC2716_INSERTION: - # Not a insertion event - return - - # Skip processing an insertion event if the room version doesn't - # support it or the event is not from the room creator. - room_version = self.store.get_room_version_txn(txn, event.room_id) - room_creator = self.db_pool.simple_select_one_onecol_txn( - txn, - table="rooms", - keyvalues={"room_id": event.room_id}, - retcol="creator", - allow_none=True, - ) - if not room_version.msc2716_historical and ( - not self.hs.config.experimental.msc2716_enabled - or event.sender != room_creator - ): - return - - next_batch_id = event.content.get(EventContentFields.MSC2716_NEXT_BATCH_ID) - if next_batch_id is None: - # Invalid insertion event without next batch ID - return - - logger.debug( - "_handle_insertion_event (next_batch_id=%s) %s", next_batch_id, event - ) - - # Keep track of the insertion event and the batch ID - self.db_pool.simple_insert_txn( - txn, - table="insertion_events", - values={ - "event_id": event.event_id, - "room_id": event.room_id, - "next_batch_id": next_batch_id, - }, - ) - - # Insert an edge for every prev_event connection - for prev_event_id in event.prev_event_ids(): - self.db_pool.simple_insert_txn( - txn, - table="insertion_event_edges", - values={ - "event_id": event.event_id, - "room_id": event.room_id, - "insertion_prev_event_id": prev_event_id, - }, - ) - - def _handle_batch_event(self, txn: LoggingTransaction, event: EventBase) -> None: - """Handles inserting the batch edges/connections between the batch event - and an insertion event. Part of MSC2716. - - Args: - txn: The database transaction object - event: The event to process - """ - - if event.type != EventTypes.MSC2716_BATCH: - # Not a batch event - return - - # Skip processing a batch event if the room version doesn't - # support it or the event is not from the room creator. - room_version = self.store.get_room_version_txn(txn, event.room_id) - room_creator = self.db_pool.simple_select_one_onecol_txn( - txn, - table="rooms", - keyvalues={"room_id": event.room_id}, - retcol="creator", - allow_none=True, - ) - if not room_version.msc2716_historical and ( - not self.hs.config.experimental.msc2716_enabled - or event.sender != room_creator - ): - return - - batch_id = event.content.get(EventContentFields.MSC2716_BATCH_ID) - if batch_id is None: - # Invalid batch event without a batch ID - return - - logger.debug("_handle_batch_event batch_id=%s %s", batch_id, event) - - # Keep track of the insertion event and the batch ID - self.db_pool.simple_insert_txn( - txn, - table="batch_events", - values={ - "event_id": event.event_id, - "room_id": event.room_id, - "batch_id": batch_id, - }, - ) - - # When we receive an event with a `batch_id` referencing the - # `next_batch_id` of the insertion event, we can remove it from the - # `insertion_event_extremities` table. - sql = """ - DELETE FROM insertion_event_extremities WHERE event_id IN ( - SELECT event_id FROM insertion_events - WHERE next_batch_id = ? - ) - """ - - txn.execute(sql, (batch_id,)) - def _handle_redact_relations( self, txn: LoggingTransaction, room_id: str, redacted_event_id: str ) -> None: diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index d93ffc4efa..7e7648c951 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py
@@ -883,7 +883,7 @@ class EventsWorkerStore(SQLBaseStore): async def _invalidate_async_get_event_cache(self, event_id: str) -> None: """ - Invalidates an event in the asyncronous get event cache, which may be remote. + Invalidates an event in the asynchronous get event cache, which may be remote. Arguments: event_id: the event ID to invalidate diff --git a/synapse/storage/databases/main/room_batch.py b/synapse/storage/databases/main/room_batch.py deleted file mode 100644
index 131f357d04..0000000000 --- a/synapse/storage/databases/main/room_batch.py +++ /dev/null
@@ -1,47 +0,0 @@ -# Copyright 2021 The Matrix.org Foundation C.I.C. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from typing import Optional - -from synapse.storage._base import SQLBaseStore - - -class RoomBatchStore(SQLBaseStore): - async def get_insertion_event_id_by_batch_id( - self, room_id: str, batch_id: str - ) -> Optional[str]: - """Retrieve a insertion event ID. - - Args: - batch_id: The batch ID of the insertion event to retrieve. - - Returns: - The event_id of an insertion event, or None if there is no known - insertion event for the given insertion event. - """ - return await self.db_pool.simple_select_one_onecol( - table="insertion_events", - keyvalues={"room_id": room_id, "next_batch_id": batch_id}, - retcol="event_id", - allow_none=True, - ) - - async def store_state_group_id_for_event_id( - self, event_id: str, state_group_id: int - ) -> None: - await self.db_pool.simple_upsert( - table="event_to_state_groups", - keyvalues={"event_id": event_id}, - values={"state_group": state_group_id, "event_id": event_id}, - ) diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index 7ea0c4c36b..9f3b8741c1 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py
@@ -116,6 +116,11 @@ class Clock: Waits `msec` initially before calling `f` for the first time. + If the function given to `looping_call` returns an awaitable/deferred, the next + call isn't scheduled until after the returned awaitable has finished. We get + this functionality thanks to this function being a thin wrapper around + `twisted.internet.task.LoopingCall`. + Note that the function will be called with no logcontext, so if it is anything other than trivial, you probably want to wrap it in run_as_background_process. diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py
index 6137c85e10..be6554319a 100644 --- a/synapse/util/caches/lrucache.py +++ b/synapse/util/caches/lrucache.py
@@ -842,7 +842,13 @@ class AsyncLruCache(Generic[KT, VT]): return self._lru_cache.get(key, update_metrics=update_metrics) async def set(self, key: KT, value: VT) -> None: - self._lru_cache.set(key, value) + # This will add the entries in the correct order, local first external second + self.set_local(key, value) + await self.set_external(key, value) + + async def set_external(self, key: KT, value: VT) -> None: + # This method should add an entry to any configured external cache, in this case noop. + pass def set_local(self, key: KT, value: VT) -> None: self._lru_cache.set(key, value)