diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index faf0770c66..dc32553d0c 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -123,10 +123,6 @@ class EventTypes:
SpaceChild: Final = "m.space.child"
SpaceParent: Final = "m.space.parent"
- MSC2716_INSERTION: Final = "org.matrix.msc2716.insertion"
- MSC2716_BATCH: Final = "org.matrix.msc2716.batch"
- MSC2716_MARKER: Final = "org.matrix.msc2716.marker"
-
Reaction: Final = "m.reaction"
@@ -222,16 +218,6 @@ class EventContentFields:
# Used in m.room.guest_access events.
GUEST_ACCESS: Final = "guest_access"
- # Used on normal messages to indicate they were historically imported after the fact
- MSC2716_HISTORICAL: Final = "org.matrix.msc2716.historical"
- # For "insertion" events to indicate what the next batch ID should be in
- # order to connect to it
- MSC2716_NEXT_BATCH_ID: Final = "next_batch_id"
- # Used on "batch" events to indicate which insertion event it connects to
- MSC2716_BATCH_ID: Final = "batch_id"
- # For "marker" events
- MSC2716_INSERTION_EVENT_REFERENCE: Final = "insertion_event_reference"
-
# The authorising user for joining a restricted room.
AUTHORISING_USER: Final = "join_authorised_via_users_server"
diff --git a/synapse/api/room_versions.py b/synapse/api/room_versions.py
index c5c71e242f..25c105a4c8 100644
--- a/synapse/api/room_versions.py
+++ b/synapse/api/room_versions.py
@@ -91,11 +91,6 @@ class RoomVersion:
# MSC2403: Allows join_rules to be set to 'knock', changes auth rules to allow sending
# m.room.membership event with membership 'knock'.
msc2403_knocking: bool
- # MSC2716: Adds m.room.power_levels -> content.historical field to control
- # whether "insertion", "chunk", "marker" events can be sent
- msc2716_historical: bool
- # MSC2716: Adds support for redacting "insertion", "chunk", and "marker" events
- msc2716_redactions: bool
# MSC3389: Protect relation information from redaction.
msc3389_relation_redactions: bool
# MSC3787: Adds support for a `knock_restricted` join rule, mixing concepts of
@@ -130,8 +125,6 @@ class RoomVersions:
msc3083_join_rules=False,
msc3375_redaction_rules=False,
msc2403_knocking=False,
- msc2716_historical=False,
- msc2716_redactions=False,
msc3389_relation_redactions=False,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
@@ -153,8 +146,6 @@ class RoomVersions:
msc3083_join_rules=False,
msc3375_redaction_rules=False,
msc2403_knocking=False,
- msc2716_historical=False,
- msc2716_redactions=False,
msc3389_relation_redactions=False,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
@@ -176,8 +167,6 @@ class RoomVersions:
msc3083_join_rules=False,
msc3375_redaction_rules=False,
msc2403_knocking=False,
- msc2716_historical=False,
- msc2716_redactions=False,
msc3389_relation_redactions=False,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
@@ -199,8 +188,6 @@ class RoomVersions:
msc3083_join_rules=False,
msc3375_redaction_rules=False,
msc2403_knocking=False,
- msc2716_historical=False,
- msc2716_redactions=False,
msc3389_relation_redactions=False,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
@@ -222,8 +209,6 @@ class RoomVersions:
msc3083_join_rules=False,
msc3375_redaction_rules=False,
msc2403_knocking=False,
- msc2716_historical=False,
- msc2716_redactions=False,
msc3389_relation_redactions=False,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
@@ -245,8 +230,6 @@ class RoomVersions:
msc3083_join_rules=False,
msc3375_redaction_rules=False,
msc2403_knocking=False,
- msc2716_historical=False,
- msc2716_redactions=False,
msc3389_relation_redactions=False,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
@@ -268,8 +251,6 @@ class RoomVersions:
msc3083_join_rules=False,
msc3375_redaction_rules=False,
msc2403_knocking=False,
- msc2716_historical=False,
- msc2716_redactions=False,
msc3389_relation_redactions=False,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
@@ -291,8 +272,6 @@ class RoomVersions:
msc3083_join_rules=False,
msc3375_redaction_rules=False,
msc2403_knocking=True,
- msc2716_historical=False,
- msc2716_redactions=False,
msc3389_relation_redactions=False,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
@@ -314,8 +293,6 @@ class RoomVersions:
msc3083_join_rules=True,
msc3375_redaction_rules=False,
msc2403_knocking=True,
- msc2716_historical=False,
- msc2716_redactions=False,
msc3389_relation_redactions=False,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
@@ -337,8 +314,6 @@ class RoomVersions:
msc3083_join_rules=True,
msc3375_redaction_rules=True,
msc2403_knocking=True,
- msc2716_historical=False,
- msc2716_redactions=False,
msc3389_relation_redactions=False,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
@@ -360,8 +335,6 @@ class RoomVersions:
msc3083_join_rules=True,
msc3375_redaction_rules=True,
msc2403_knocking=True,
- msc2716_historical=False,
- msc2716_redactions=False,
msc3389_relation_redactions=False,
msc3787_knock_restricted_join_rule=True,
msc3667_int_only_power_levels=False,
@@ -383,8 +356,6 @@ class RoomVersions:
msc3083_join_rules=True,
msc3375_redaction_rules=True,
msc2403_knocking=True,
- msc2716_historical=False,
- msc2716_redactions=False,
msc3389_relation_redactions=False,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
@@ -406,8 +377,6 @@ class RoomVersions:
msc3083_join_rules=True,
msc3375_redaction_rules=True,
msc2403_knocking=True,
- msc2716_historical=False,
- msc2716_redactions=False,
msc3389_relation_redactions=False,
msc3787_knock_restricted_join_rule=True,
msc3667_int_only_power_levels=True,
@@ -415,29 +384,6 @@ class RoomVersions:
msc3931_push_features=(),
msc3989_redaction_rules=False,
)
- MSC2716v4 = RoomVersion(
- "org.matrix.msc2716v4",
- RoomDisposition.UNSTABLE,
- EventFormatVersions.ROOM_V4_PLUS,
- StateResolutionVersions.V2,
- enforce_key_validity=True,
- special_case_aliases_auth=False,
- strict_canonicaljson=True,
- limit_notifications_power_levels=True,
- msc2175_implicit_room_creator=False,
- msc2176_redaction_rules=False,
- msc3083_join_rules=False,
- msc3375_redaction_rules=False,
- msc2403_knocking=True,
- msc2716_historical=True,
- msc2716_redactions=True,
- msc3389_relation_redactions=False,
- msc3787_knock_restricted_join_rule=False,
- msc3667_int_only_power_levels=False,
- msc3821_redaction_rules=False,
- msc3931_push_features=(),
- msc3989_redaction_rules=False,
- )
MSC1767v10 = RoomVersion(
# MSC1767 (Extensible Events) based on room version "10"
"org.matrix.msc1767.10",
@@ -453,8 +399,6 @@ class RoomVersions:
msc3083_join_rules=True,
msc3375_redaction_rules=True,
msc2403_knocking=True,
- msc2716_historical=False,
- msc2716_redactions=False,
msc3389_relation_redactions=False,
msc3787_knock_restricted_join_rule=True,
msc3667_int_only_power_levels=True,
@@ -476,8 +420,6 @@ class RoomVersions:
msc3083_join_rules=True,
msc3375_redaction_rules=True,
msc2403_knocking=True,
- msc2716_historical=False,
- msc2716_redactions=False,
msc3389_relation_redactions=False,
msc3787_knock_restricted_join_rule=True,
msc3667_int_only_power_levels=True,
@@ -500,8 +442,6 @@ class RoomVersions:
msc3083_join_rules=True,
msc3375_redaction_rules=True,
msc2403_knocking=True,
- msc2716_historical=False,
- msc2716_redactions=False,
msc3389_relation_redactions=False,
msc3787_knock_restricted_join_rule=True,
msc3667_int_only_power_levels=True,
@@ -526,7 +466,6 @@ KNOWN_ROOM_VERSIONS: Dict[str, RoomVersion] = {
RoomVersions.V9,
RoomVersions.MSC3787,
RoomVersions.V10,
- RoomVersions.MSC2716v4,
RoomVersions.MSC3989,
RoomVersions.MSC3820opt2,
)
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 909ebccf78..7406c3948c 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -83,7 +83,6 @@ from synapse.storage.databases.main.receipts import ReceiptsWorkerStore
from synapse.storage.databases.main.registration import RegistrationWorkerStore
from synapse.storage.databases.main.relations import RelationsWorkerStore
from synapse.storage.databases.main.room import RoomWorkerStore
-from synapse.storage.databases.main.room_batch import RoomBatchStore
from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
from synapse.storage.databases.main.search import SearchStore
from synapse.storage.databases.main.session import SessionStore
@@ -120,7 +119,6 @@ class GenericWorkerStore(
# the races it creates aren't too bad.
KeyStore,
RoomWorkerStore,
- RoomBatchStore,
DirectoryWorkerStore,
PushRulesWorkerStore,
ApplicationServiceTransactionWorkerStore,
diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py
index 1d5b5ded45..8e0f5356b4 100644
--- a/synapse/config/experimental.py
+++ b/synapse/config/experimental.py
@@ -247,9 +247,6 @@ class ExperimentalConfig(Config):
# MSC3026 (busy presence state)
self.msc3026_enabled: bool = experimental.get("msc3026_enabled", False)
- # MSC2716 (importing historical messages)
- self.msc2716_enabled: bool = experimental.get("msc2716_enabled", False)
-
# MSC3244 (room version capabilities)
self.msc3244_enabled: bool = experimental.get("msc3244_enabled", True)
diff --git a/synapse/event_auth.py b/synapse/event_auth.py
index b4b43ec4d7..3aaf53dfbd 100644
--- a/synapse/event_auth.py
+++ b/synapse/event_auth.py
@@ -339,13 +339,6 @@ def check_state_dependent_auth_rules(
if event.type == EventTypes.Redaction:
check_redaction(event.room_version, event, auth_dict)
- if (
- event.type == EventTypes.MSC2716_INSERTION
- or event.type == EventTypes.MSC2716_BATCH
- or event.type == EventTypes.MSC2716_MARKER
- ):
- check_historical(event.room_version, event, auth_dict)
-
logger.debug("Allowing! %s", event)
@@ -365,7 +358,6 @@ LENIENT_EVENT_BYTE_LIMITS_ROOM_VERSIONS = {
RoomVersions.V9,
RoomVersions.MSC3787,
RoomVersions.V10,
- RoomVersions.MSC2716v4,
RoomVersions.MSC1767v10,
}
@@ -823,38 +815,6 @@ def check_redaction(
raise AuthError(403, "You don't have permission to redact events")
-def check_historical(
- room_version_obj: RoomVersion,
- event: "EventBase",
- auth_events: StateMap["EventBase"],
-) -> None:
- """Check whether the event sender is allowed to send historical related
- events like "insertion", "batch", and "marker".
-
- Returns:
- None
-
- Raises:
- AuthError if the event sender is not allowed to send historical related events
- ("insertion", "batch", and "marker").
- """
- # Ignore the auth checks in room versions that do not support historical
- # events
- if not room_version_obj.msc2716_historical:
- return
-
- user_level = get_user_power_level(event.user_id, auth_events)
-
- historical_level = get_named_level(auth_events, "historical", 100)
-
- if user_level < historical_level:
- raise UnstableSpecAuthError(
- 403,
- 'You don\'t have permission to send send historical related events ("insertion", "batch", and "marker")',
- errcode=Codes.INSUFFICIENT_POWER,
- )
-
-
def _check_power_levels(
room_version_obj: RoomVersion,
event: "EventBase",
diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py
index de7e5be42b..75b62adb33 100644
--- a/synapse/events/__init__.py
+++ b/synapse/events/__init__.py
@@ -198,7 +198,6 @@ class _EventInternalMetadata:
soft_failed: DictProperty[bool] = DictProperty("soft_failed")
proactively_send: DictProperty[bool] = DictProperty("proactively_send")
redacted: DictProperty[bool] = DictProperty("redacted")
- historical: DictProperty[bool] = DictProperty("historical")
txn_id: DictProperty[str] = DictProperty("txn_id")
"""The transaction ID, if it was set when the event was created."""
@@ -288,14 +287,6 @@ class _EventInternalMetadata:
"""
return self._dict.get("redacted", False)
- def is_historical(self) -> bool:
- """Whether this is a historical message.
- This is used by the batchsend historical message endpoint and
- is needed to and mark the event as backfilled and skip some checks
- like push notifications.
- """
- return self._dict.get("historical", False)
-
def is_notifiable(self) -> bool:
"""Whether this event can trigger a push notification"""
return not self.is_outlier() or self.is_out_of_band_membership()
diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py
index e7e8225b8e..a43498ed4d 100644
--- a/synapse/events/snapshot.py
+++ b/synapse/events/snapshot.py
@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from abc import ABC, abstractmethod
-from typing import TYPE_CHECKING, List, Optional, Tuple
+from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
import attr
from immutabledict import immutabledict
@@ -107,33 +107,32 @@ class EventContext(UnpersistedEventContextBase):
state_delta_due_to_event: If `state_group` and `state_group_before_event` are not None
then this is the delta of the state between the two groups.
- prev_group: If it is known, ``state_group``'s prev_group. Note that this being
- None does not necessarily mean that ``state_group`` does not have
- a prev_group!
+ state_group_deltas: If not empty, this is a dict collecting a mapping of the state
+ difference between state groups.
- If the event is a state event, this is normally the same as
- ``state_group_before_event``.
+ The keys are a tuple of two integers: the initial group and final state group.
+ The corresponding value is a state map representing the state delta between
+ these state groups.
- If ``state_group`` is None (ie, the event is an outlier), ``prev_group``
- will always also be ``None``.
+ The dictionary is expected to have at most two entries with state groups of:
- Note that this *not* (necessarily) the state group associated with
- ``_prev_state_ids``.
+ 1. The state group before the event and after the event.
+ 2. The state group preceding the state group before the event and the
+ state group before the event.
- delta_ids: If ``prev_group`` is not None, the state delta between ``prev_group``
- and ``state_group``.
+ This information is collected and stored as part of an optimization for persisting
+ events.
partial_state: if True, we may be storing this event with a temporary,
incomplete state.
"""
_storage: "StorageControllers"
+ state_group_deltas: Dict[Tuple[int, int], StateMap[str]]
rejected: Optional[str] = None
_state_group: Optional[int] = None
state_group_before_event: Optional[int] = None
_state_delta_due_to_event: Optional[StateMap[str]] = None
- prev_group: Optional[int] = None
- delta_ids: Optional[StateMap[str]] = None
app_service: Optional[ApplicationService] = None
partial_state: bool = False
@@ -145,16 +144,14 @@ class EventContext(UnpersistedEventContextBase):
state_group_before_event: Optional[int],
state_delta_due_to_event: Optional[StateMap[str]],
partial_state: bool,
- prev_group: Optional[int] = None,
- delta_ids: Optional[StateMap[str]] = None,
+ state_group_deltas: Dict[Tuple[int, int], StateMap[str]],
) -> "EventContext":
return EventContext(
storage=storage,
state_group=state_group,
state_group_before_event=state_group_before_event,
state_delta_due_to_event=state_delta_due_to_event,
- prev_group=prev_group,
- delta_ids=delta_ids,
+ state_group_deltas=state_group_deltas,
partial_state=partial_state,
)
@@ -163,7 +160,7 @@ class EventContext(UnpersistedEventContextBase):
storage: "StorageControllers",
) -> "EventContext":
"""Return an EventContext instance suitable for persisting an outlier event"""
- return EventContext(storage=storage)
+ return EventContext(storage=storage, state_group_deltas={})
async def persist(self, event: EventBase) -> "EventContext":
return self
@@ -183,13 +180,15 @@ class EventContext(UnpersistedEventContextBase):
"state_group": self._state_group,
"state_group_before_event": self.state_group_before_event,
"rejected": self.rejected,
- "prev_group": self.prev_group,
+ "state_group_deltas": _encode_state_group_delta(self.state_group_deltas),
"state_delta_due_to_event": _encode_state_dict(
self._state_delta_due_to_event
),
- "delta_ids": _encode_state_dict(self.delta_ids),
"app_service_id": self.app_service.id if self.app_service else None,
"partial_state": self.partial_state,
+ # add dummy delta_ids and prev_group for backwards compatibility
+ "delta_ids": None,
+ "prev_group": None,
}
@staticmethod
@@ -204,17 +203,24 @@ class EventContext(UnpersistedEventContextBase):
Returns:
The event context.
"""
+ # workaround for backwards/forwards compatibility: if the input doesn't have a value
+ # for "state_group_deltas" just assign an empty dict
+ state_group_deltas = input.get("state_group_deltas", None)
+ if state_group_deltas:
+ state_group_deltas = _decode_state_group_delta(state_group_deltas)
+ else:
+ state_group_deltas = {}
+
context = EventContext(
# We use the state_group and prev_state_id stuff to pull the
# current_state_ids out of the DB and construct prev_state_ids.
storage=storage,
state_group=input["state_group"],
state_group_before_event=input["state_group_before_event"],
- prev_group=input["prev_group"],
+ state_group_deltas=state_group_deltas,
state_delta_due_to_event=_decode_state_dict(
input["state_delta_due_to_event"]
),
- delta_ids=_decode_state_dict(input["delta_ids"]),
rejected=input["rejected"],
partial_state=input.get("partial_state", False),
)
@@ -349,7 +355,7 @@ class UnpersistedEventContext(UnpersistedEventContextBase):
_storage: "StorageControllers"
state_group_before_event: Optional[int]
state_group_after_event: Optional[int]
- state_delta_due_to_event: Optional[dict]
+ state_delta_due_to_event: Optional[StateMap[str]]
prev_group_for_state_group_before_event: Optional[int]
delta_ids_to_state_group_before_event: Optional[StateMap[str]]
partial_state: bool
@@ -380,26 +386,16 @@ class UnpersistedEventContext(UnpersistedEventContextBase):
events_and_persisted_context = []
for event, unpersisted_context in amended_events_and_context:
- if event.is_state():
- context = EventContext(
- storage=unpersisted_context._storage,
- state_group=unpersisted_context.state_group_after_event,
- state_group_before_event=unpersisted_context.state_group_before_event,
- state_delta_due_to_event=unpersisted_context.state_delta_due_to_event,
- partial_state=unpersisted_context.partial_state,
- prev_group=unpersisted_context.state_group_before_event,
- delta_ids=unpersisted_context.state_delta_due_to_event,
- )
- else:
- context = EventContext(
- storage=unpersisted_context._storage,
- state_group=unpersisted_context.state_group_after_event,
- state_group_before_event=unpersisted_context.state_group_before_event,
- state_delta_due_to_event=unpersisted_context.state_delta_due_to_event,
- partial_state=unpersisted_context.partial_state,
- prev_group=unpersisted_context.prev_group_for_state_group_before_event,
- delta_ids=unpersisted_context.delta_ids_to_state_group_before_event,
- )
+ state_group_deltas = unpersisted_context._build_state_group_deltas()
+
+ context = EventContext(
+ storage=unpersisted_context._storage,
+ state_group=unpersisted_context.state_group_after_event,
+ state_group_before_event=unpersisted_context.state_group_before_event,
+ state_delta_due_to_event=unpersisted_context.state_delta_due_to_event,
+ partial_state=unpersisted_context.partial_state,
+ state_group_deltas=state_group_deltas,
+ )
events_and_persisted_context.append((event, context))
return events_and_persisted_context
@@ -452,11 +448,11 @@ class UnpersistedEventContext(UnpersistedEventContextBase):
# if the event isn't a state event the state group doesn't change
if not self.state_delta_due_to_event:
- state_group_after_event = self.state_group_before_event
+ self.state_group_after_event = self.state_group_before_event
# otherwise if it is a state event we need to get a state group for it
else:
- state_group_after_event = await self._storage.state.store_state_group(
+ self.state_group_after_event = await self._storage.state.store_state_group(
event.event_id,
event.room_id,
prev_group=self.state_group_before_event,
@@ -464,16 +460,81 @@ class UnpersistedEventContext(UnpersistedEventContextBase):
current_state_ids=None,
)
+ state_group_deltas = self._build_state_group_deltas()
+
return EventContext.with_state(
storage=self._storage,
- state_group=state_group_after_event,
+ state_group=self.state_group_after_event,
state_group_before_event=self.state_group_before_event,
state_delta_due_to_event=self.state_delta_due_to_event,
+ state_group_deltas=state_group_deltas,
partial_state=self.partial_state,
- prev_group=self.state_group_before_event,
- delta_ids=self.state_delta_due_to_event,
)
+ def _build_state_group_deltas(self) -> Dict[Tuple[int, int], StateMap]:
+ """
+ Collect deltas between the state groups associated with this context
+ """
+ state_group_deltas = {}
+
+ # if we know the state group before the event and after the event, add them and the
+ # state delta between them to state_group_deltas
+ if self.state_group_before_event and self.state_group_after_event:
+ # if we have the state groups we should have the delta
+ assert self.state_delta_due_to_event is not None
+ state_group_deltas[
+ (
+ self.state_group_before_event,
+ self.state_group_after_event,
+ )
+ ] = self.state_delta_due_to_event
+
+ # the state group before the event may also have a state group which precedes it, if
+ # we have that and the state group before the event, add them and the state
+ # delta between them to state_group_deltas
+ if (
+ self.prev_group_for_state_group_before_event
+ and self.state_group_before_event
+ ):
+ # if we have both state groups we should have the delta between them
+ assert self.delta_ids_to_state_group_before_event is not None
+ state_group_deltas[
+ (
+ self.prev_group_for_state_group_before_event,
+ self.state_group_before_event,
+ )
+ ] = self.delta_ids_to_state_group_before_event
+
+ return state_group_deltas
+
+
+def _encode_state_group_delta(
+ state_group_delta: Dict[Tuple[int, int], StateMap[str]]
+) -> List[Tuple[int, int, Optional[List[Tuple[str, str, str]]]]]:
+ if not state_group_delta:
+ return []
+
+ state_group_delta_encoded = []
+ for key, value in state_group_delta.items():
+ state_group_delta_encoded.append((key[0], key[1], _encode_state_dict(value)))
+
+ return state_group_delta_encoded
+
+
+def _decode_state_group_delta(
+ input: List[Tuple[int, int, List[Tuple[str, str, str]]]]
+) -> Dict[Tuple[int, int], StateMap[str]]:
+ if not input:
+ return {}
+
+ state_group_deltas = {}
+ for state_group_1, state_group_2, state_dict in input:
+ state_map = _decode_state_dict(state_dict)
+ assert state_map is not None
+ state_group_deltas[(state_group_1, state_group_2)] = state_map
+
+ return state_group_deltas
+
def _encode_state_dict(
state_dict: Optional[StateMap[str]],
diff --git a/synapse/events/utils.py b/synapse/events/utils.py
index e7b7b78b84..a55efcca56 100644
--- a/synapse/events/utils.py
+++ b/synapse/events/utils.py
@@ -164,21 +164,12 @@ def prune_event_dict(room_version: RoomVersion, event_dict: JsonDict) -> JsonDic
if room_version.msc2176_redaction_rules:
add_fields("invite")
- if room_version.msc2716_historical:
- add_fields("historical")
-
elif event_type == EventTypes.Aliases and room_version.special_case_aliases_auth:
add_fields("aliases")
elif event_type == EventTypes.RoomHistoryVisibility:
add_fields("history_visibility")
elif event_type == EventTypes.Redaction and room_version.msc2176_redaction_rules:
add_fields("redacts")
- elif room_version.msc2716_redactions and event_type == EventTypes.MSC2716_INSERTION:
- add_fields(EventContentFields.MSC2716_NEXT_BATCH_ID)
- elif room_version.msc2716_redactions and event_type == EventTypes.MSC2716_BATCH:
- add_fields(EventContentFields.MSC2716_BATCH_ID)
- elif room_version.msc2716_redactions and event_type == EventTypes.MSC2716_MARKER:
- add_fields(EventContentFields.MSC2716_INSERTION_EVENT_REFERENCE)
# Protect the rel_type and event_id fields under the m.relates_to field.
if room_version.msc3389_relation_redactions:
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index a2cf3a96c6..e5359ca558 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -260,7 +260,9 @@ class FederationClient(FederationBase):
use_unstable = False
for user_id, one_time_keys in query.items():
for device_id, algorithms in one_time_keys.items():
- if any(count > 1 for count in algorithms.values()):
+ # If more than one algorithm is requested, attempt to use the unstable
+ # endpoint.
+ if sum(algorithms.values()) > 1:
use_unstable = True
if algorithms:
# For the stable query, choose only the first algorithm.
@@ -296,6 +298,7 @@ class FederationClient(FederationBase):
else:
logger.debug("Skipping unstable claim client keys API")
+ # TODO Potentially attempt multiple queries and combine the results?
return await self.transport_layer.claim_client_keys(
user, destination, content, timeout
)
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 9425b32507..61fa3b30af 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -1016,7 +1016,9 @@ class FederationServer(FederationBase):
for user_id, device_keys in result.items():
for device_id, keys in device_keys.items():
for key_id, key in keys.items():
- json_result.setdefault(user_id, {})[device_id] = {key_id: key}
+ json_result.setdefault(user_id, {}).setdefault(device_id, {})[
+ key_id
+ ] = key
logger.info(
"Claimed one-time-keys: %s",
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index f3bdc5a4d2..97abbdee18 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -109,10 +109,8 @@ was enabled*, Catch-Up Mode is exited and we return to `_transaction_transmissio
If a remote server is unreachable over federation, we back off from that server,
with an exponentially-increasing retry interval.
-Whilst we don't automatically retry after the interval, we prevent making new attempts
-until such time as the back-off has cleared.
-Once the back-off is cleared and a new PDU or EDU arrives for transmission, the transmission
-loop resumes and empties the queue by making federation requests.
+We automatically retry after the retry interval expires (roughly, the logic to do so
+being triggered every minute).
If the backoff grows too large (> 1 hour), the in-memory queue is emptied (to prevent
unbounded growth) and Catch-Up Mode is entered.
@@ -145,7 +143,6 @@ from prometheus_client import Counter
from typing_extensions import Literal
from twisted.internet import defer
-from twisted.internet.interfaces import IDelayedCall
import synapse.metrics
from synapse.api.presence import UserPresenceState
@@ -184,14 +181,18 @@ sent_pdus_destination_dist_total = Counter(
"Total number of PDUs queued for sending across all destinations",
)
-# Time (in s) after Synapse's startup that we will begin to wake up destinations
-# that have catch-up outstanding.
-CATCH_UP_STARTUP_DELAY_SEC = 15
+# Time (in s) to wait before trying to wake up destinations that have
+# catch-up outstanding. This will also be the delay applied at startup
+# before trying the same.
+# Please note that rate limiting still applies, so while the loop is
+# executed every X seconds the destinations may not be wake up because
+# they are being rate limited following previous attempt failures.
+WAKEUP_RETRY_PERIOD_SEC = 60
# Time (in s) to wait in between waking up each destination, i.e. one destination
-# will be woken up every <x> seconds after Synapse's startup until we have woken
-# every destination has outstanding catch-up.
-CATCH_UP_STARTUP_INTERVAL_SEC = 5
+# will be woken up every <x> seconds until we have woken every destination
+# has outstanding catch-up.
+WAKEUP_INTERVAL_BETWEEN_DESTINATIONS_SEC = 5
class AbstractFederationSender(metaclass=abc.ABCMeta):
@@ -415,12 +416,10 @@ class FederationSender(AbstractFederationSender):
/ hs.config.ratelimiting.federation_rr_transactions_per_room_per_second
)
- # wake up destinations that have outstanding PDUs to be caught up
- self._catchup_after_startup_timer: Optional[
- IDelayedCall
- ] = self.clock.call_later(
- CATCH_UP_STARTUP_DELAY_SEC,
+ # Regularly wake up destinations that have outstanding PDUs to be caught up
+ self.clock.looping_call(
run_as_background_process,
+ WAKEUP_RETRY_PERIOD_SEC * 1000.0,
"wake_destinations_needing_catchup",
self._wake_destinations_needing_catchup,
)
@@ -966,7 +965,6 @@ class FederationSender(AbstractFederationSender):
if not destinations_to_wake:
# finished waking all destinations!
- self._catchup_after_startup_timer = None
break
last_processed = destinations_to_wake[-1]
@@ -983,4 +981,4 @@ class FederationSender(AbstractFederationSender):
last_processed,
)
self.wake_destination(destination)
- await self.clock.sleep(CATCH_UP_STARTUP_INTERVAL_SEC)
+ await self.clock.sleep(WAKEUP_INTERVAL_BETWEEN_DESTINATIONS_SEC)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index b7b5e21020..cc5ed97730 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -105,14 +105,12 @@ backfill_processing_before_timer = Histogram(
)
+# TODO: We can refactor this away now that there is only one backfill point again
class _BackfillPointType(Enum):
# a regular backwards extremity (ie, an event which we don't yet have, but which
# is referred to by other events in the DAG)
BACKWARDS_EXTREMITY = enum.auto()
- # an MSC2716 "insertion event"
- INSERTION_PONT = enum.auto()
-
@attr.s(slots=True, auto_attribs=True, frozen=True)
class _BackfillPoint:
@@ -273,32 +271,10 @@ class FederationHandler:
)
]
- insertion_events_to_be_backfilled: List[_BackfillPoint] = []
- if self.hs.config.experimental.msc2716_enabled:
- insertion_events_to_be_backfilled = [
- _BackfillPoint(event_id, depth, _BackfillPointType.INSERTION_PONT)
- for event_id, depth in await self.store.get_insertion_event_backward_extremities_in_room(
- room_id=room_id,
- current_depth=current_depth,
- # We only need to end up with 5 extremities combined with
- # the backfill points to make the `/backfill` request ...
- # (see the other comment above for more context).
- limit=50,
- )
- ]
- logger.debug(
- "_maybe_backfill_inner: backwards_extremities=%s insertion_events_to_be_backfilled=%s",
- backwards_extremities,
- insertion_events_to_be_backfilled,
- )
-
# we now have a list of potential places to backpaginate from. We prefer to
# start with the most recent (ie, max depth), so let's sort the list.
sorted_backfill_points: List[_BackfillPoint] = sorted(
- itertools.chain(
- backwards_extremities,
- insertion_events_to_be_backfilled,
- ),
+ backwards_extremities,
key=lambda e: -int(e.depth),
)
@@ -411,10 +387,7 @@ class FederationHandler:
# event but not anything before it. This would require looking at the
# state *before* the event, ignoring the special casing certain event
# types have.
- if bp.type == _BackfillPointType.INSERTION_PONT:
- event_ids_to_check = [bp.event_id]
- else:
- event_ids_to_check = await self.store.get_successor_events(bp.event_id)
+ event_ids_to_check = await self.store.get_successor_events(bp.event_id)
events_to_check = await self.store.get_events_as_list(
event_ids_to_check,
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 42141d3670..d32d224d56 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -601,18 +601,6 @@ class FederationEventHandler:
room_id, [(event, context)]
)
- # If we're joining the room again, check if there is new marker
- # state indicating that there is new history imported somewhere in
- # the DAG. Multiple markers can exist in the current state with
- # unique state_keys.
- #
- # Do this after the state from the remote join was persisted (via
- # `persist_events_and_notify`). Otherwise we can run into a
- # situation where the create event doesn't exist yet in the
- # `current_state_events`
- for e in state:
- await self._handle_marker_event(origin, e)
-
return stream_id_after_persist
async def update_state_for_partial_state_event(
@@ -915,13 +903,6 @@ class FederationEventHandler:
)
)
- # We construct the event lists in source order from `/backfill` response because
- # it's a) easiest, but also b) the order in which we process things matters for
- # MSC2716 historical batches because many historical events are all at the same
- # `depth` and we rely on the tenuous sort that the other server gave us and hope
- # they're doing their best. The brittle nature of this ordering for historical
- # messages over federation is one of the reasons why we don't want to continue
- # on MSC2716 until we have online topological ordering.
events_with_failed_pull_attempts, fresh_events = partition(
new_events, lambda e: e.event_id in event_ids_with_failed_pull_attempts
)
@@ -1460,8 +1441,6 @@ class FederationEventHandler:
await self._run_push_actions_and_persist_event(event, context, backfilled)
- await self._handle_marker_event(origin, event)
-
if backfilled or context.rejected:
return
@@ -1559,94 +1538,6 @@ class FederationEventHandler:
except Exception:
logger.exception("Failed to resync device for %s", sender)
- @trace
- async def _handle_marker_event(self, origin: str, marker_event: EventBase) -> None:
- """Handles backfilling the insertion event when we receive a marker
- event that points to one.
-
- Args:
- origin: Origin of the event. Will be called to get the insertion event
- marker_event: The event to process
- """
-
- if marker_event.type != EventTypes.MSC2716_MARKER:
- # Not a marker event
- return
-
- if marker_event.rejected_reason is not None:
- # Rejected event
- return
-
- # Skip processing a marker event if the room version doesn't
- # support it or the event is not from the room creator.
- room_version = await self._store.get_room_version(marker_event.room_id)
- create_event = await self._store.get_create_event_for_room(marker_event.room_id)
- if not room_version.msc2175_implicit_room_creator:
- room_creator = create_event.content.get(EventContentFields.ROOM_CREATOR)
- else:
- room_creator = create_event.sender
- if not room_version.msc2716_historical and (
- not self._config.experimental.msc2716_enabled
- or marker_event.sender != room_creator
- ):
- return
-
- logger.debug("_handle_marker_event: received %s", marker_event)
-
- insertion_event_id = marker_event.content.get(
- EventContentFields.MSC2716_INSERTION_EVENT_REFERENCE
- )
-
- if insertion_event_id is None:
- # Nothing to retrieve then (invalid marker)
- return
-
- already_seen_insertion_event = await self._store.have_seen_event(
- marker_event.room_id, insertion_event_id
- )
- if already_seen_insertion_event:
- # No need to process a marker again if we have already seen the
- # insertion event that it was pointing to
- return
-
- logger.debug(
- "_handle_marker_event: backfilling insertion event %s", insertion_event_id
- )
-
- await self._get_events_and_persist(
- origin,
- marker_event.room_id,
- [insertion_event_id],
- )
-
- insertion_event = await self._store.get_event(
- insertion_event_id, allow_none=True
- )
- if insertion_event is None:
- logger.warning(
- "_handle_marker_event: server %s didn't return insertion event %s for marker %s",
- origin,
- insertion_event_id,
- marker_event.event_id,
- )
- return
-
- logger.debug(
- "_handle_marker_event: succesfully backfilled insertion event %s from marker event %s",
- insertion_event,
- marker_event,
- )
-
- await self._store.insert_insertion_extremity(
- insertion_event_id, marker_event.room_id
- )
-
- logger.debug(
- "_handle_marker_event: insertion extremity added for %s from marker event %s",
- insertion_event,
- marker_event,
- )
-
async def backfill_event_id(
self, destinations: List[str], room_id: str, event_id: str
) -> PulledPduInfo:
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 0b61c2272b..4292b47037 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -60,7 +60,6 @@ from synapse.replication.http.send_event import ReplicationSendEventRestServlet
from synapse.replication.http.send_events import ReplicationSendEventsRestServlet
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.types import (
- MutableStateMap,
PersistedEventPosition,
Requester,
RoomAlias,
@@ -573,7 +572,6 @@ class EventCreationHandler:
state_event_ids: Optional[List[str]] = None,
require_consent: bool = True,
outlier: bool = False,
- historical: bool = False,
depth: Optional[int] = None,
state_map: Optional[StateMap[str]] = None,
for_batch: bool = False,
@@ -599,7 +597,7 @@ class EventCreationHandler:
allow_no_prev_events: Whether to allow this event to be created an empty
list of prev_events. Normally this is prohibited just because most
events should have a prev_event and we should only use this in special
- cases like MSC2716.
+ cases (previously useful for MSC2716).
prev_event_ids:
the forward extremities to use as the prev_events for the
new event.
@@ -614,13 +612,10 @@ class EventCreationHandler:
If non-None, prev_event_ids must also be provided.
state_event_ids:
- The full state at a given event. This is used particularly by the MSC2716
- /batch_send endpoint. One use case is with insertion events which float at
- the beginning of a historical batch and don't have any `prev_events` to
- derive from; we add all of these state events as the explicit state so the
- rest of the historical batch can inherit the same state and state_group.
- This should normally be left as None, which will cause the auth_event_ids
- to be calculated based on the room state at the prev_events.
+ The full state at a given event. This was previously used particularly
+ by the MSC2716 /batch_send endpoint. This should normally be left as
+ None, which will cause the auth_event_ids to be calculated based on the
+ room state at the prev_events.
require_consent: Whether to check if the requester has
consented to the privacy policy.
@@ -629,10 +624,6 @@ class EventCreationHandler:
it's from an arbitrary point and floating in the DAG as
opposed to being inline with the current DAG.
- historical: Indicates whether the message is being inserted
- back in time around some existing events. This is used to skip
- a few checks and mark the event as backfilled.
-
depth: Override the depth used to order the event in the DAG.
Should normally be set to None, which will cause the depth to be calculated
based on the prev_events.
@@ -717,8 +708,6 @@ class EventCreationHandler:
builder.internal_metadata.outlier = outlier
- builder.internal_metadata.historical = historical
-
event, unpersisted_context = await self.create_new_client_event(
builder=builder,
requester=requester,
@@ -947,7 +936,6 @@ class EventCreationHandler:
txn_id: Optional[str] = None,
ignore_shadow_ban: bool = False,
outlier: bool = False,
- historical: bool = False,
depth: Optional[int] = None,
) -> Tuple[EventBase, int]:
"""
@@ -961,19 +949,16 @@ class EventCreationHandler:
allow_no_prev_events: Whether to allow this event to be created an empty
list of prev_events. Normally this is prohibited just because most
events should have a prev_event and we should only use this in special
- cases like MSC2716.
+ cases (previously useful for MSC2716).
prev_event_ids:
The event IDs to use as the prev events.
Should normally be left as None to automatically request them
from the database.
state_event_ids:
- The full state at a given event. This is used particularly by the MSC2716
- /batch_send endpoint. One use case is with insertion events which float at
- the beginning of a historical batch and don't have any `prev_events` to
- derive from; we add all of these state events as the explicit state so the
- rest of the historical batch can inherit the same state and state_group.
- This should normally be left as None, which will cause the auth_event_ids
- to be calculated based on the room state at the prev_events.
+ The full state at a given event. This was previously used particularly
+ by the MSC2716 /batch_send endpoint. This should normally be left as
+ None, which will cause the auth_event_ids to be calculated based on the
+ room state at the prev_events.
ratelimit: Whether to rate limit this send.
txn_id: The transaction ID.
ignore_shadow_ban: True if shadow-banned users should be allowed to
@@ -981,9 +966,6 @@ class EventCreationHandler:
outlier: Indicates whether the event is an `outlier`, i.e. if
it's from an arbitrary point and floating in the DAG as
opposed to being inline with the current DAG.
- historical: Indicates whether the message is being inserted
- back in time around some existing events. This is used to skip
- a few checks and mark the event as backfilled.
depth: Override the depth used to order the event in the DAG.
Should normally be set to None, which will cause the depth to be calculated
based on the prev_events.
@@ -1053,7 +1035,6 @@ class EventCreationHandler:
prev_event_ids=prev_event_ids,
state_event_ids=state_event_ids,
outlier=outlier,
- historical=historical,
depth=depth,
)
context = await unpersisted_context.persist(event)
@@ -1145,7 +1126,7 @@ class EventCreationHandler:
allow_no_prev_events: Whether to allow this event to be created an empty
list of prev_events. Normally this is prohibited just because most
events should have a prev_event and we should only use this in special
- cases like MSC2716.
+ cases (previously useful for MSC2716).
prev_event_ids:
the forward extremities to use as the prev_events for the
new event.
@@ -1158,13 +1139,10 @@ class EventCreationHandler:
based on the room state at the prev_events.
state_event_ids:
- The full state at a given event. This is used particularly by the MSC2716
- /batch_send endpoint. One use case is with insertion events which float at
- the beginning of a historical batch and don't have any `prev_events` to
- derive from; we add all of these state events as the explicit state so the
- rest of the historical batch can inherit the same state and state_group.
- This should normally be left as None, which will cause the auth_event_ids
- to be calculated based on the room state at the prev_events.
+ The full state at a given event. This was previously used particularly
+ by the MSC2716 /batch_send endpoint. This should normally be left as
+ None, which will cause the auth_event_ids to be calculated based on the
+ room state at the prev_events.
depth: Override the depth used to order the event in the DAG.
Should normally be set to None, which will cause the depth to be calculated
@@ -1261,52 +1239,6 @@ class EventCreationHandler:
if builder.internal_metadata.outlier:
event.internal_metadata.outlier = True
context = EventContext.for_outlier(self._storage_controllers)
- elif (
- event.type == EventTypes.MSC2716_INSERTION
- and state_event_ids
- and builder.internal_metadata.is_historical()
- ):
- # Add explicit state to the insertion event so it has state to derive
- # from even though it's floating with no `prev_events`. The rest of
- # the batch can derive from this state and state_group.
- #
- # TODO(faster_joins): figure out how this works, and make sure that the
- # old state is complete.
- # https://github.com/matrix-org/synapse/issues/13003
- metadata = await self.store.get_metadata_for_events(state_event_ids)
-
- state_map_for_event: MutableStateMap[str] = {}
- for state_id in state_event_ids:
- data = metadata.get(state_id)
- if data is None:
- # We're trying to persist a new historical batch of events
- # with the given state, e.g. via
- # `RoomBatchSendEventRestServlet`. The state can be inferred
- # by Synapse or set directly by the client.
- #
- # Either way, we should have persisted all the state before
- # getting here.
- raise Exception(
- f"State event {state_id} not found in DB,"
- " Synapse should have persisted it before using it."
- )
-
- if data.state_key is None:
- raise Exception(
- f"Trying to set non-state event {state_id} as state"
- )
-
- state_map_for_event[(data.event_type, data.state_key)] = state_id
-
- # TODO(faster_joins): check how MSC2716 works and whether we can have
- # partial state here
- # https://github.com/matrix-org/synapse/issues/13003
- context = await self.state.calculate_context_info(
- event,
- state_ids_before_event=state_map_for_event,
- partial_state=False,
- )
-
else:
context = await self.state.calculate_context_info(event)
@@ -1876,28 +1808,6 @@ class EventCreationHandler:
403, "Redacting server ACL events is not permitted"
)
- # Add a little safety stop-gap to prevent people from trying to
- # redact MSC2716 related events when they're in a room version
- # which does not support it yet. We allow people to use MSC2716
- # events in existing room versions but only from the room
- # creator since it does not require any changes to the auth
- # rules and in effect, the redaction algorithm . In the
- # supported room version, we add the `historical` power level to
- # auth the MSC2716 related events and adjust the redaction
- # algorthim to keep the `historical` field around (redacting an
- # event should only strip fields which don't affect the
- # structural protocol level).
- is_msc2716_event = (
- original_event.type == EventTypes.MSC2716_INSERTION
- or original_event.type == EventTypes.MSC2716_BATCH
- or original_event.type == EventTypes.MSC2716_MARKER
- )
- if not room_version_obj.msc2716_historical and is_msc2716_event:
- raise AuthError(
- 403,
- "Redacting MSC2716 events is not supported in this room version",
- )
-
event_types = event_auth.auth_types_for_event(event.room_version, event)
prev_state_ids = await context.get_prev_state_ids(
StateFilter.from_types(event_types)
@@ -1935,58 +1845,12 @@ class EventCreationHandler:
if prev_state_ids:
raise AuthError(403, "Changing the room create event is forbidden")
- if event.type == EventTypes.MSC2716_INSERTION:
- room_version = await self.store.get_room_version_id(event.room_id)
- room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
-
- create_event = await self.store.get_create_event_for_room(event.room_id)
- if not room_version_obj.msc2175_implicit_room_creator:
- room_creator = create_event.content.get(
- EventContentFields.ROOM_CREATOR
- )
- else:
- room_creator = create_event.sender
-
- # Only check an insertion event if the room version
- # supports it or the event is from the room creator.
- if room_version_obj.msc2716_historical or (
- self.config.experimental.msc2716_enabled
- and event.sender == room_creator
- ):
- next_batch_id = event.content.get(
- EventContentFields.MSC2716_NEXT_BATCH_ID
- )
- conflicting_insertion_event_id = None
- if next_batch_id:
- conflicting_insertion_event_id = (
- await self.store.get_insertion_event_id_by_batch_id(
- event.room_id, next_batch_id
- )
- )
- if conflicting_insertion_event_id is not None:
- # The current insertion event that we're processing is invalid
- # because an insertion event already exists in the room with the
- # same next_batch_id. We can't allow multiple because the batch
- # pointing will get weird, e.g. we can't determine which insertion
- # event the batch event is pointing to.
- raise SynapseError(
- HTTPStatus.BAD_REQUEST,
- "Another insertion event already exists with the same next_batch_id",
- errcode=Codes.INVALID_PARAM,
- )
-
- # Mark any `m.historical` messages as backfilled so they don't appear
- # in `/sync` and have the proper decrementing `stream_ordering` as we import
- backfilled = False
- if event.internal_metadata.is_historical():
- backfilled = True
-
assert self._storage_controllers.persistence is not None
(
persisted_events,
max_stream_token,
) = await self._storage_controllers.persistence.persist_events(
- events_and_context, backfilled=backfilled
+ events_and_context,
)
events_and_pos = []
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index d5257acb7d..19b8728db9 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -40,6 +40,11 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
+# How many single event gaps we tolerate returning in a `/messages` response before we
+# backfill and try to fill in the history. This is an arbitrarily picked number so feel
+# free to tune it in the future.
+BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD = 3
+
@attr.s(slots=True, auto_attribs=True)
class PurgeStatus:
@@ -486,35 +491,35 @@ class PaginationHandler:
room_id, room_token.stream
)
- if not use_admin_priviledge and membership == Membership.LEAVE:
- # If they have left the room then clamp the token to be before
- # they left the room, to save the effort of loading from the
- # database.
-
- # This is only None if the room is world_readable, in which
- # case "JOIN" would have been returned.
- assert member_event_id
+ # If they have left the room then clamp the token to be before
+ # they left the room, to save the effort of loading from the
+ # database.
+ if (
+ pagin_config.direction == Direction.BACKWARDS
+ and not use_admin_priviledge
+ and membership == Membership.LEAVE
+ ):
+ # This is only None if the room is world_readable, in which case
+ # "Membership.JOIN" would have been returned and we should never hit
+ # this branch.
+ assert member_event_id
+
+ leave_token = await self.store.get_topological_token_for_event(
+ member_event_id
+ )
+ assert leave_token.topological is not None
- leave_token = await self.store.get_topological_token_for_event(
- member_event_id
+ if leave_token.topological < curr_topo:
+ from_token = from_token.copy_and_replace(
+ StreamKeyType.ROOM, leave_token
)
- assert leave_token.topological is not None
-
- if leave_token.topological < curr_topo:
- from_token = from_token.copy_and_replace(
- StreamKeyType.ROOM, leave_token
- )
-
- await self.hs.get_federation_handler().maybe_backfill(
- room_id,
- curr_topo,
- limit=pagin_config.limit,
- )
to_room_key = None
if pagin_config.to_token:
to_room_key = pagin_config.to_token.room_key
+ # Initially fetch the events from the database. With any luck, we can return
+ # these without blocking on backfill (handled below).
events, next_key = await self.store.paginate_room_events(
room_id=room_id,
from_key=from_token.room_key,
@@ -524,6 +529,94 @@ class PaginationHandler:
event_filter=event_filter,
)
+ if pagin_config.direction == Direction.BACKWARDS:
+ # We use a `Set` because there can be multiple events at a given depth
+ # and we only care about looking at the unique continum of depths to
+ # find gaps.
+ event_depths: Set[int] = {event.depth for event in events}
+ sorted_event_depths = sorted(event_depths)
+
+ # Inspect the depths of the returned events to see if there are any gaps
+ found_big_gap = False
+ number_of_gaps = 0
+ previous_event_depth = (
+ sorted_event_depths[0] if len(sorted_event_depths) > 0 else 0
+ )
+ for event_depth in sorted_event_depths:
+ # We don't expect a negative depth but we'll just deal with it in
+ # any case by taking the absolute value to get the true gap between
+ # any two integers.
+ depth_gap = abs(event_depth - previous_event_depth)
+ # A `depth_gap` of 1 is a normal continuous chain to the next event
+ # (1 <-- 2 <-- 3) so anything larger indicates a missing event (it's
+ # also possible there is no event at a given depth but we can't ever
+ # know that for sure)
+ if depth_gap > 1:
+ number_of_gaps += 1
+
+ # We only tolerate a small number single-event long gaps in the
+ # returned events because those are most likely just events we've
+ # failed to pull in the past. Anything longer than that is probably
+ # a sign that we're missing a decent chunk of history and we should
+ # try to backfill it.
+ #
+ # XXX: It's possible we could tolerate longer gaps if we checked
+ # that a given events `prev_events` is one that has failed pull
+ # attempts and we could just treat it like a dead branch of history
+ # for now or at least something that we don't need the block the
+ # client on to try pulling.
+ #
+ # XXX: If we had something like MSC3871 to indicate gaps in the
+ # timeline to the client, we could also get away with any sized gap
+ # and just have the client refetch the holes as they see fit.
+ if depth_gap > 2:
+ found_big_gap = True
+ break
+ previous_event_depth = event_depth
+
+ # Backfill in the foreground if we found a big gap, have too many holes,
+ # or we don't have enough events to fill the limit that the client asked
+ # for.
+ missing_too_many_events = (
+ number_of_gaps > BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD
+ )
+ not_enough_events_to_fill_response = len(events) < pagin_config.limit
+ if (
+ found_big_gap
+ or missing_too_many_events
+ or not_enough_events_to_fill_response
+ ):
+ did_backfill = (
+ await self.hs.get_federation_handler().maybe_backfill(
+ room_id,
+ curr_topo,
+ limit=pagin_config.limit,
+ )
+ )
+
+ # If we did backfill something, refetch the events from the database to
+ # catch anything new that might have been added since we last fetched.
+ if did_backfill:
+ events, next_key = await self.store.paginate_room_events(
+ room_id=room_id,
+ from_key=from_token.room_key,
+ to_key=to_room_key,
+ direction=pagin_config.direction,
+ limit=pagin_config.limit,
+ event_filter=event_filter,
+ )
+ else:
+ # Otherwise, we can backfill in the background for eventual
+ # consistency's sake but we don't need to block the client waiting
+ # for a costly federation call and processing.
+ run_as_background_process(
+ "maybe_backfill_in_the_background",
+ self.hs.get_federation_handler().maybe_backfill,
+ room_id,
+ curr_topo,
+ limit=pagin_config.limit,
+ )
+
next_token = from_token.copy_and_replace(StreamKeyType.ROOM, next_key)
# if no events are returned from pagination, that implies
diff --git a/synapse/handlers/room_batch.py b/synapse/handlers/room_batch.py
deleted file mode 100644
index bf9df60218..0000000000
--- a/synapse/handlers/room_batch.py
+++ /dev/null
@@ -1,466 +0,0 @@
-import logging
-from typing import TYPE_CHECKING, List, Tuple
-
-from synapse.api.constants import EventContentFields, EventTypes
-from synapse.appservice import ApplicationService
-from synapse.http.servlet import assert_params_in_dict
-from synapse.types import JsonDict, Requester, UserID, create_requester
-from synapse.util.stringutils import random_string
-
-if TYPE_CHECKING:
- from synapse.server import HomeServer
-
-logger = logging.getLogger(__name__)
-
-
-class RoomBatchHandler:
- def __init__(self, hs: "HomeServer"):
- self.hs = hs
- self.store = hs.get_datastores().main
- self._state_storage_controller = hs.get_storage_controllers().state
- self.event_creation_handler = hs.get_event_creation_handler()
- self.room_member_handler = hs.get_room_member_handler()
- self.auth = hs.get_auth()
-
- async def inherit_depth_from_prev_ids(self, prev_event_ids: List[str]) -> int:
- """Finds the depth which would sort it after the most-recent
- prev_event_id but before the successors of those events. If no
- successors are found, we assume it's an historical extremity part of the
- current batch and use the same depth of the prev_event_ids.
-
- Args:
- prev_event_ids: List of prev event IDs
-
- Returns:
- Inherited depth
- """
- (
- most_recent_prev_event_id,
- most_recent_prev_event_depth,
- ) = await self.store.get_max_depth_of(prev_event_ids)
-
- # We want to insert the historical event after the `prev_event` but before the successor event
- #
- # We inherit depth from the successor event instead of the `prev_event`
- # because events returned from `/messages` are first sorted by `topological_ordering`
- # which is just the `depth` and then tie-break with `stream_ordering`.
- #
- # We mark these inserted historical events as "backfilled" which gives them a
- # negative `stream_ordering`. If we use the same depth as the `prev_event`,
- # then our historical event will tie-break and be sorted before the `prev_event`
- # when it should come after.
- #
- # We want to use the successor event depth so they appear after `prev_event` because
- # it has a larger `depth` but before the successor event because the `stream_ordering`
- # is negative before the successor event.
- assert most_recent_prev_event_id is not None
- successor_event_ids = await self.store.get_successor_events(
- most_recent_prev_event_id
- )
-
- # If we can't find any successor events, then it's a forward extremity of
- # historical messages and we can just inherit from the previous historical
- # event which we can already assume has the correct depth where we want
- # to insert into.
- if not successor_event_ids:
- depth = most_recent_prev_event_depth
- else:
- (
- _,
- oldest_successor_depth,
- ) = await self.store.get_min_depth_of(successor_event_ids)
-
- depth = oldest_successor_depth
-
- return depth
-
- def create_insertion_event_dict(
- self, sender: str, room_id: str, origin_server_ts: int
- ) -> JsonDict:
- """Creates an event dict for an "insertion" event with the proper fields
- and a random batch ID.
-
- Args:
- sender: The event author MXID
- room_id: The room ID that the event belongs to
- origin_server_ts: Timestamp when the event was sent
-
- Returns:
- The new event dictionary to insert.
- """
-
- next_batch_id = random_string(8)
- insertion_event = {
- "type": EventTypes.MSC2716_INSERTION,
- "sender": sender,
- "room_id": room_id,
- "content": {
- EventContentFields.MSC2716_NEXT_BATCH_ID: next_batch_id,
- EventContentFields.MSC2716_HISTORICAL: True,
- },
- "origin_server_ts": origin_server_ts,
- }
-
- return insertion_event
-
- async def create_requester_for_user_id_from_app_service(
- self, user_id: str, app_service: ApplicationService
- ) -> Requester:
- """Creates a new requester for the given user_id
- and validates that the app service is allowed to control
- the given user.
-
- Args:
- user_id: The author MXID that the app service is controlling
- app_service: The app service that controls the user
-
- Returns:
- Requester object
- """
-
- await self.auth.validate_appservice_can_control_user_id(app_service, user_id)
-
- return create_requester(user_id, app_service=app_service)
-
- async def get_most_recent_full_state_ids_from_event_id_list(
- self, event_ids: List[str]
- ) -> List[str]:
- """Find the most recent event_id and grab the full state at that event.
- We will use this as a base to auth our historical messages against.
-
- Args:
- event_ids: List of event ID's to look at
-
- Returns:
- List of event ID's
- """
-
- (
- most_recent_event_id,
- _,
- ) = await self.store.get_max_depth_of(event_ids)
- # mapping from (type, state_key) -> state_event_id
- assert most_recent_event_id is not None
- prev_state_map = await self._state_storage_controller.get_state_ids_for_event(
- most_recent_event_id
- )
- # List of state event ID's
- full_state_ids = list(prev_state_map.values())
-
- return full_state_ids
-
- async def persist_state_events_at_start(
- self,
- state_events_at_start: List[JsonDict],
- room_id: str,
- initial_state_event_ids: List[str],
- app_service_requester: Requester,
- ) -> List[str]:
- """Takes all `state_events_at_start` event dictionaries and creates/persists
- them in a floating state event chain which don't resolve into the current room
- state. They are floating because they reference no prev_events which disconnects
- them from the normal DAG.
-
- Args:
- state_events_at_start:
- room_id: Room where you want the events persisted in.
- initial_state_event_ids:
- The base set of state for the historical batch which the floating
- state chain will derive from. This should probably be the state
- from the `prev_event` defined by `/batch_send?prev_event_id=$abc`.
- app_service_requester: The requester of an application service.
-
- Returns:
- List of state event ID's we just persisted
- """
- assert app_service_requester.app_service
-
- state_event_ids_at_start = []
- state_event_ids = initial_state_event_ids.copy()
-
- # Make the state events float off on their own by specifying no
- # prev_events for the first one in the chain so we don't have a bunch of
- # `@mxid joined the room` noise between each batch.
- prev_event_ids_for_state_chain: List[str] = []
-
- for index, state_event in enumerate(state_events_at_start):
- assert_params_in_dict(
- state_event, ["type", "origin_server_ts", "content", "sender"]
- )
-
- logger.debug(
- "RoomBatchSendEventRestServlet inserting state_event=%s", state_event
- )
-
- event_dict = {
- "type": state_event["type"],
- "origin_server_ts": state_event["origin_server_ts"],
- "content": state_event["content"],
- "room_id": room_id,
- "sender": state_event["sender"],
- "state_key": state_event["state_key"],
- }
-
- # Mark all events as historical
- event_dict["content"][EventContentFields.MSC2716_HISTORICAL] = True
-
- # TODO: This is pretty much the same as some other code to handle inserting state in this file
- if event_dict["type"] == EventTypes.Member:
- membership = event_dict["content"].get("membership", None)
- event_id, _ = await self.room_member_handler.update_membership(
- await self.create_requester_for_user_id_from_app_service(
- state_event["sender"], app_service_requester.app_service
- ),
- target=UserID.from_string(event_dict["state_key"]),
- room_id=room_id,
- action=membership,
- content=event_dict["content"],
- historical=True,
- # Only the first event in the state chain should be floating.
- # The rest should hang off each other in a chain.
- allow_no_prev_events=index == 0,
- prev_event_ids=prev_event_ids_for_state_chain,
- # The first event in the state chain is floating with no
- # `prev_events` which means it can't derive state from
- # anywhere automatically. So we need to set some state
- # explicitly.
- #
- # Make sure to use a copy of this list because we modify it
- # later in the loop here. Otherwise it will be the same
- # reference and also update in the event when we append
- # later.
- state_event_ids=state_event_ids.copy(),
- )
- else:
- (
- event,
- _,
- ) = await self.event_creation_handler.create_and_send_nonmember_event(
- await self.create_requester_for_user_id_from_app_service(
- state_event["sender"], app_service_requester.app_service
- ),
- event_dict,
- historical=True,
- # Only the first event in the state chain should be floating.
- # The rest should hang off each other in a chain.
- allow_no_prev_events=index == 0,
- prev_event_ids=prev_event_ids_for_state_chain,
- # The first event in the state chain is floating with no
- # `prev_events` which means it can't derive state from
- # anywhere automatically. So we need to set some state
- # explicitly.
- #
- # Make sure to use a copy of this list because we modify it
- # later in the loop here. Otherwise it will be the same
- # reference and also update in the event when we append later.
- state_event_ids=state_event_ids.copy(),
- )
- event_id = event.event_id
-
- state_event_ids_at_start.append(event_id)
- state_event_ids.append(event_id)
- # Connect all the state in a floating chain
- prev_event_ids_for_state_chain = [event_id]
-
- return state_event_ids_at_start
-
- async def persist_historical_events(
- self,
- events_to_create: List[JsonDict],
- room_id: str,
- inherited_depth: int,
- initial_state_event_ids: List[str],
- app_service_requester: Requester,
- ) -> List[str]:
- """Create and persists all events provided sequentially. Handles the
- complexity of creating events in chronological order so they can
- reference each other by prev_event but still persists in
- reverse-chronoloical order so they have the correct
- (topological_ordering, stream_ordering) and sort correctly from
- /messages.
-
- Args:
- events_to_create: List of historical events to create in JSON
- dictionary format.
- room_id: Room where you want the events persisted in.
- inherited_depth: The depth to create the events at (you will
- probably by calling inherit_depth_from_prev_ids(...)).
- initial_state_event_ids:
- This is used to set explicit state for the insertion event at
- the start of the historical batch since it's floating with no
- prev_events to derive state from automatically.
- app_service_requester: The requester of an application service.
-
- Returns:
- List of persisted event IDs
- """
- assert app_service_requester.app_service
-
- # We expect the first event in a historical batch to be an insertion event
- assert events_to_create[0]["type"] == EventTypes.MSC2716_INSERTION
- # We expect the last event in a historical batch to be an batch event
- assert events_to_create[-1]["type"] == EventTypes.MSC2716_BATCH
-
- # Make the historical event chain float off on its own by specifying no
- # prev_events for the first event in the chain which causes the HS to
- # ask for the state at the start of the batch later.
- prev_event_ids: List[str] = []
-
- event_ids = []
- events_to_persist = []
- for index, ev in enumerate(events_to_create):
- assert_params_in_dict(ev, ["type", "origin_server_ts", "content", "sender"])
-
- assert self.hs.is_mine_id(ev["sender"]), "User must be our own: %s" % (
- ev["sender"],
- )
-
- event_dict = {
- "type": ev["type"],
- "origin_server_ts": ev["origin_server_ts"],
- "content": ev["content"],
- "room_id": room_id,
- "sender": ev["sender"], # requester.user.to_string(),
- "prev_events": prev_event_ids.copy(),
- }
-
- # Mark all events as historical
- event_dict["content"][EventContentFields.MSC2716_HISTORICAL] = True
-
- event, unpersisted_context = await self.event_creation_handler.create_event(
- await self.create_requester_for_user_id_from_app_service(
- ev["sender"], app_service_requester.app_service
- ),
- event_dict,
- # Only the first event (which is the insertion event) in the
- # chain should be floating. The rest should hang off each other
- # in a chain.
- allow_no_prev_events=index == 0,
- prev_event_ids=event_dict.get("prev_events"),
- # Since the first event (which is the insertion event) in the
- # chain is floating with no `prev_events`, it can't derive state
- # from anywhere automatically. So we need to set some state
- # explicitly.
- state_event_ids=initial_state_event_ids if index == 0 else None,
- historical=True,
- depth=inherited_depth,
- )
- context = await unpersisted_context.persist(event)
- assert context._state_group
-
- # Normally this is done when persisting the event but we have to
- # pre-emptively do it here because we create all the events first,
- # then persist them in another pass below. And we want to share
- # state_groups across the whole batch so this lookup needs to work
- # for the next event in the batch in this loop.
- await self.store.store_state_group_id_for_event_id(
- event_id=event.event_id,
- state_group_id=context._state_group,
- )
-
- logger.debug(
- "RoomBatchSendEventRestServlet inserting event=%s, prev_event_ids=%s",
- event,
- prev_event_ids,
- )
-
- events_to_persist.append((event, context))
- event_id = event.event_id
-
- event_ids.append(event_id)
- prev_event_ids = [event_id]
-
- # Persist events in reverse-chronological order so they have the
- # correct stream_ordering as they are backfilled (which decrements).
- # Events are sorted by (topological_ordering, stream_ordering)
- # where topological_ordering is just depth.
- for event, context in reversed(events_to_persist):
- # This call can't raise `PartialStateConflictError` since we forbid
- # use of the historical batch API during partial state
- await self.event_creation_handler.handle_new_client_event(
- await self.create_requester_for_user_id_from_app_service(
- event.sender, app_service_requester.app_service
- ),
- events_and_context=[(event, context)],
- )
-
- return event_ids
-
- async def handle_batch_of_events(
- self,
- events_to_create: List[JsonDict],
- room_id: str,
- batch_id_to_connect_to: str,
- inherited_depth: int,
- initial_state_event_ids: List[str],
- app_service_requester: Requester,
- ) -> Tuple[List[str], str]:
- """
- Handles creating and persisting all of the historical events as well as
- insertion and batch meta events to make the batch navigable in the DAG.
-
- Args:
- events_to_create: List of historical events to create in JSON
- dictionary format.
- room_id: Room where you want the events created in.
- batch_id_to_connect_to: The batch_id from the insertion event you
- want this batch to connect to.
- inherited_depth: The depth to create the events at (you will
- probably by calling inherit_depth_from_prev_ids(...)).
- initial_state_event_ids:
- This is used to set explicit state for the insertion event at
- the start of the historical batch since it's floating with no
- prev_events to derive state from automatically. This should
- probably be the state from the `prev_event` defined by
- `/batch_send?prev_event_id=$abc` plus the outcome of
- `persist_state_events_at_start`
- app_service_requester: The requester of an application service.
-
- Returns:
- Tuple containing a list of created events and the next_batch_id
- """
-
- # Connect this current batch to the insertion event from the previous batch
- last_event_in_batch = events_to_create[-1]
- batch_event = {
- "type": EventTypes.MSC2716_BATCH,
- "sender": app_service_requester.user.to_string(),
- "room_id": room_id,
- "content": {
- EventContentFields.MSC2716_BATCH_ID: batch_id_to_connect_to,
- EventContentFields.MSC2716_HISTORICAL: True,
- },
- # Since the batch event is put at the end of the batch,
- # where the newest-in-time event is, copy the origin_server_ts from
- # the last event we're inserting
- "origin_server_ts": last_event_in_batch["origin_server_ts"],
- }
- # Add the batch event to the end of the batch (newest-in-time)
- events_to_create.append(batch_event)
-
- # Add an "insertion" event to the start of each batch (next to the oldest-in-time
- # event in the batch) so the next batch can be connected to this one.
- insertion_event = self.create_insertion_event_dict(
- sender=app_service_requester.user.to_string(),
- room_id=room_id,
- # Since the insertion event is put at the start of the batch,
- # where the oldest-in-time event is, copy the origin_server_ts from
- # the first event we're inserting
- origin_server_ts=events_to_create[0]["origin_server_ts"],
- )
- next_batch_id = insertion_event["content"][
- EventContentFields.MSC2716_NEXT_BATCH_ID
- ]
- # Prepend the insertion event to the start of the batch (oldest-in-time)
- events_to_create = [insertion_event] + events_to_create
-
- # Create and persist all of the historical events
- event_ids = await self.persist_historical_events(
- events_to_create=events_to_create,
- room_id=room_id,
- inherited_depth=inherited_depth,
- initial_state_event_ids=initial_state_event_ids,
- app_service_requester=app_service_requester,
- )
-
- return event_ids, next_batch_id
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index af0ca5c26d..82e4fa7363 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -362,7 +362,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
content: Optional[dict] = None,
require_consent: bool = True,
outlier: bool = False,
- historical: bool = False,
origin_server_ts: Optional[int] = None,
) -> Tuple[str, int]:
"""
@@ -378,16 +377,13 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
allow_no_prev_events: Whether to allow this event to be created an empty
list of prev_events. Normally this is prohibited just because most
events should have a prev_event and we should only use this in special
- cases like MSC2716.
+ cases (previously useful for MSC2716).
prev_event_ids: The event IDs to use as the prev events
state_event_ids:
- The full state at a given event. This is used particularly by the MSC2716
- /batch_send endpoint. One use case is the historical `state_events_at_start`;
- since each is marked as an `outlier`, the `EventContext.for_outlier()` won't
- have any `state_ids` set and therefore can't derive any state even though the
- prev_events are set so we need to set them ourself via this argument.
- This should normally be left as None, which will cause the auth_event_ids
- to be calculated based on the room state at the prev_events.
+ The full state at a given event. This was previously used particularly
+ by the MSC2716 /batch_send endpoint. This should normally be left as
+ None, which will cause the auth_event_ids to be calculated based on the
+ room state at the prev_events.
depth: Override the depth used to order the event in the DAG.
Should normally be set to None, which will cause the depth to be calculated
based on the prev_events.
@@ -400,9 +396,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
outlier: Indicates whether the event is an `outlier`, i.e. if
it's from an arbitrary point and floating in the DAG as
opposed to being inline with the current DAG.
- historical: Indicates whether the message is being inserted
- back in time around some existing events. This is used to skip
- a few checks and mark the event as backfilled.
origin_server_ts: The origin_server_ts to use if a new event is created. Uses
the current timestamp if set to None.
@@ -477,7 +470,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
depth=depth,
require_consent=require_consent,
outlier=outlier,
- historical=historical,
)
context = await unpersisted_context.persist(event)
prev_state_ids = await context.get_prev_state_ids(
@@ -585,7 +577,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
new_room: bool = False,
require_consent: bool = True,
outlier: bool = False,
- historical: bool = False,
allow_no_prev_events: bool = False,
prev_event_ids: Optional[List[str]] = None,
state_event_ids: Optional[List[str]] = None,
@@ -610,22 +601,16 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
outlier: Indicates whether the event is an `outlier`, i.e. if
it's from an arbitrary point and floating in the DAG as
opposed to being inline with the current DAG.
- historical: Indicates whether the message is being inserted
- back in time around some existing events. This is used to skip
- a few checks and mark the event as backfilled.
allow_no_prev_events: Whether to allow this event to be created an empty
list of prev_events. Normally this is prohibited just because most
events should have a prev_event and we should only use this in special
- cases like MSC2716.
+ cases (previously useful for MSC2716).
prev_event_ids: The event IDs to use as the prev events
state_event_ids:
- The full state at a given event. This is used particularly by the MSC2716
- /batch_send endpoint. One use case is the historical `state_events_at_start`;
- since each is marked as an `outlier`, the `EventContext.for_outlier()` won't
- have any `state_ids` set and therefore can't derive any state even though the
- prev_events are set so we need to set them ourself via this argument.
- This should normally be left as None, which will cause the auth_event_ids
- to be calculated based on the room state at the prev_events.
+ The full state at a given event. This was previously used particularly
+ by the MSC2716 /batch_send endpoint. This should normally be left as
+ None, which will cause the auth_event_ids to be calculated based on the
+ room state at the prev_events.
depth: Override the depth used to order the event in the DAG.
Should normally be set to None, which will cause the depth to be calculated
based on the prev_events.
@@ -667,7 +652,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
new_room=new_room,
require_consent=require_consent,
outlier=outlier,
- historical=historical,
allow_no_prev_events=allow_no_prev_events,
prev_event_ids=prev_event_ids,
state_event_ids=state_event_ids,
@@ -691,7 +675,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
new_room: bool = False,
require_consent: bool = True,
outlier: bool = False,
- historical: bool = False,
allow_no_prev_events: bool = False,
prev_event_ids: Optional[List[str]] = None,
state_event_ids: Optional[List[str]] = None,
@@ -718,22 +701,16 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
outlier: Indicates whether the event is an `outlier`, i.e. if
it's from an arbitrary point and floating in the DAG as
opposed to being inline with the current DAG.
- historical: Indicates whether the message is being inserted
- back in time around some existing events. This is used to skip
- a few checks and mark the event as backfilled.
allow_no_prev_events: Whether to allow this event to be created an empty
list of prev_events. Normally this is prohibited just because most
events should have a prev_event and we should only use this in special
- cases like MSC2716.
+ cases (previously useful for MSC2716).
prev_event_ids: The event IDs to use as the prev events
state_event_ids:
- The full state at a given event. This is used particularly by the MSC2716
- /batch_send endpoint. One use case is the historical `state_events_at_start`;
- since each is marked as an `outlier`, the `EventContext.for_outlier()` won't
- have any `state_ids` set and therefore can't derive any state even though the
- prev_events are set so we need to set them ourself via this argument.
- This should normally be left as None, which will cause the auth_event_ids
- to be calculated based on the room state at the prev_events.
+ The full state at a given event. This was previously used particularly
+ by the MSC2716 /batch_send endpoint. This should normally be left as
+ None, which will cause the auth_event_ids to be calculated based on the
+ room state at the prev_events.
depth: Override the depth used to order the event in the DAG.
Should normally be set to None, which will cause the depth to be calculated
based on the prev_events.
@@ -877,7 +854,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
content=content,
require_consent=require_consent,
outlier=outlier,
- historical=historical,
origin_server_ts=origin_server_ts,
)
@@ -1498,7 +1474,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
# put the server which owns the alias at the front of the server list.
if room_alias.domain in servers:
servers.remove(room_alias.domain)
- servers.insert(0, room_alias.domain)
+ servers.insert(0, room_alias.domain)
return RoomID.from_string(room_id), servers
diff --git a/synapse/http/federation/matrix_federation_agent.py b/synapse/http/federation/matrix_federation_agent.py
index 7e8cf31682..91a24efcd0 100644
--- a/synapse/http/federation/matrix_federation_agent.py
+++ b/synapse/http/federation/matrix_federation_agent.py
@@ -51,8 +51,10 @@ logger = logging.getLogger(__name__)
@implementer(IAgent)
class MatrixFederationAgent:
"""An Agent-like thing which provides a `request` method which correctly
- handles resolving matrix server names when using matrix://. Handles standard
- https URIs as normal.
+ handles resolving matrix server names when using `matrix-federation://`. Handles
+ standard https URIs as normal. The `matrix-federation://` scheme is internal to
+ Synapse and we purposely want to avoid colliding with the `matrix://` URL scheme
+ which is now specced.
Doesn't implement any retries. (Those are done in MatrixFederationHttpClient.)
@@ -167,14 +169,14 @@ class MatrixFederationAgent:
# There must be a valid hostname.
assert parsed_uri.hostname
- # If this is a matrix:// URI check if the server has delegated matrix
+ # If this is a matrix-federation:// URI check if the server has delegated matrix
# traffic using well-known delegation.
#
# We have to do this here and not in the endpoint as we need to rewrite
# the host header with the delegated server name.
delegated_server = None
if (
- parsed_uri.scheme == b"matrix"
+ parsed_uri.scheme == b"matrix-federation"
and not _is_ip_literal(parsed_uri.hostname)
and not parsed_uri.port
):
@@ -250,7 +252,7 @@ class MatrixHostnameEndpointFactory:
@implementer(IStreamClientEndpoint)
class MatrixHostnameEndpoint:
- """An endpoint that resolves matrix:// URLs using Matrix server name
+ """An endpoint that resolves matrix-federation:// URLs using Matrix server name
resolution (i.e. via SRV). Does not check for well-known delegation.
Args:
@@ -379,7 +381,7 @@ class MatrixHostnameEndpoint:
connect to.
"""
- if self._parsed_uri.scheme != b"matrix":
+ if self._parsed_uri.scheme != b"matrix-federation":
return [Server(host=self._parsed_uri.host, port=self._parsed_uri.port)]
# Note: We don't do well-known lookup as that needs to have happened
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index abb5ae5815..fc0101808d 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -174,7 +174,14 @@ class MatrixFederationRequest:
# The object is frozen so we can pre-compute this.
uri = urllib.parse.urlunparse(
- (b"matrix", destination_bytes, path_bytes, None, query_bytes, b"")
+ (
+ b"matrix-federation",
+ destination_bytes,
+ path_bytes,
+ None,
+ query_bytes,
+ b"",
+ )
)
object.__setattr__(self, "uri", uri)
diff --git a/synapse/media/_base.py b/synapse/media/_base.py
index ef8334ae25..20cb8b9010 100644
--- a/synapse/media/_base.py
+++ b/synapse/media/_base.py
@@ -152,6 +152,9 @@ def add_file_headers(
content_type = media_type
request.setHeader(b"Content-Type", content_type.encode("UTF-8"))
+
+ # Use a Content-Disposition of attachment to force download of media.
+ disposition = "attachment"
if upload_name:
# RFC6266 section 4.1 [1] defines both `filename` and `filename*`.
#
@@ -173,11 +176,17 @@ def add_file_headers(
# correctly interpret those as of 0.99.2 and (b) they are a bit of a pain and we
# may as well just do the filename* version.
if _can_encode_filename_as_token(upload_name):
- disposition = "inline; filename=%s" % (upload_name,)
+ disposition = "%s; filename=%s" % (
+ disposition,
+ upload_name,
+ )
else:
- disposition = "inline; filename*=utf-8''%s" % (_quote(upload_name),)
+ disposition = "%s; filename*=utf-8''%s" % (
+ disposition,
+ _quote(upload_name),
+ )
- request.setHeader(b"Content-Disposition", disposition.encode("ascii"))
+ request.setHeader(b"Content-Disposition", disposition.encode("ascii"))
# cache for at least a day.
# XXX: we might want to turn this off for data we don't want to
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index 33002cc0f2..67377c647b 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -322,7 +322,6 @@ class BulkPushRuleEvaluator:
) -> None:
if (
not event.internal_metadata.is_notifiable()
- or event.internal_metadata.is_historical()
or event.room_id in self.hs.config.server.rooms_to_exclude_from_sync
):
# Push rules for events that aren't notifiable can't be processed by this and
diff --git a/synapse/push/clientformat.py b/synapse/push/clientformat.py
index 88b52c26a0..735cef0aed 100644
--- a/synapse/push/clientformat.py
+++ b/synapse/push/clientformat.py
@@ -41,12 +41,7 @@ def format_push_rules_for_user(
rulearray.append(template_rule)
- for type_key in ("pattern", "value"):
- type_value = template_rule.pop(f"{type_key}_type", None)
- if type_value == "user_id":
- template_rule[type_key] = user.to_string()
- elif type_value == "user_localpart":
- template_rule[type_key] = user.localpart
+ _convert_type_to_value(template_rule, user)
template_rule["enabled"] = enabled
@@ -63,19 +58,20 @@ def format_push_rules_for_user(
for c in template_rule["conditions"]:
c.pop("_cache_key", None)
- pattern_type = c.pop("pattern_type", None)
- if pattern_type == "user_id":
- c["pattern"] = user.to_string()
- elif pattern_type == "user_localpart":
- c["pattern"] = user.localpart
-
- sender_type = c.pop("sender_type", None)
- if sender_type == "user_id":
- c["sender"] = user.to_string()
+ _convert_type_to_value(c, user)
return rules
+def _convert_type_to_value(rule_or_cond: Dict[str, Any], user: UserID) -> None:
+ for type_key in ("pattern", "value"):
+ type_value = rule_or_cond.pop(f"{type_key}_type", None)
+ if type_value == "user_id":
+ rule_or_cond[type_key] = user.to_string()
+ elif type_value == "user_localpart":
+ rule_or_cond[type_key] = user.localpart
+
+
def _add_empty_priority_class_arrays(d: Dict[str, list]) -> Dict[str, list]:
for pc in PRIORITY_CLASS_MAP.keys():
d[pc] = []
diff --git a/synapse/rest/__init__.py b/synapse/rest/__init__.py
index 1af8d99d20..df0845edb2 100644
--- a/synapse/rest/__init__.py
+++ b/synapse/rest/__init__.py
@@ -48,7 +48,6 @@ from synapse.rest.client import (
rendezvous,
report_event,
room,
- room_batch,
room_keys,
room_upgrade_rest_servlet,
sendtodevice,
@@ -132,7 +131,6 @@ class ClientRestResource(JsonResource):
user_directory.register_servlets(hs, client_resource)
if is_main_process:
room_upgrade_rest_servlet.register_servlets(hs, client_resource)
- room_batch.register_servlets(hs, client_resource)
capabilities.register_servlets(hs, client_resource)
if is_main_process:
account_validity.register_servlets(hs, client_resource)
diff --git a/synapse/rest/client/room_batch.py b/synapse/rest/client/room_batch.py
deleted file mode 100644
index 69f85112d8..0000000000
--- a/synapse/rest/client/room_batch.py
+++ /dev/null
@@ -1,254 +0,0 @@
-# Copyright 2016 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import logging
-import re
-from http import HTTPStatus
-from typing import TYPE_CHECKING, Tuple
-
-from synapse.api.constants import EventContentFields
-from synapse.api.errors import AuthError, Codes, SynapseError
-from synapse.http.server import HttpServer
-from synapse.http.servlet import (
- RestServlet,
- assert_params_in_dict,
- parse_json_object_from_request,
- parse_string,
- parse_strings_from_args,
-)
-from synapse.http.site import SynapseRequest
-from synapse.types import JsonDict
-
-if TYPE_CHECKING:
- from synapse.server import HomeServer
-
-logger = logging.getLogger(__name__)
-
-
-class RoomBatchSendEventRestServlet(RestServlet):
- """
- API endpoint which can insert a batch of events historically back in time
- next to the given `prev_event`.
-
- `batch_id` comes from `next_batch_id `in the response of the batch send
- endpoint and is derived from the "insertion" events added to each batch.
- It's not required for the first batch send.
-
- `state_events_at_start` is used to define the historical state events
- needed to auth the events like join events. These events will float
- outside of the normal DAG as outlier's and won't be visible in the chat
- history which also allows us to insert multiple batches without having a bunch
- of `@mxid joined the room` noise between each batch.
-
- `events` is chronological list of events you want to insert.
- There is a reverse-chronological constraint on batches so once you insert
- some messages, you can only insert older ones after that.
- tldr; Insert batches from your most recent history -> oldest history.
-
- POST /_matrix/client/unstable/org.matrix.msc2716/rooms/<roomID>/batch_send?prev_event_id=<eventID>&batch_id=<batchID>
- {
- "events": [ ... ],
- "state_events_at_start": [ ... ]
- }
- """
-
- PATTERNS = (
- re.compile(
- "^/_matrix/client/unstable/org.matrix.msc2716"
- "/rooms/(?P<room_id>[^/]*)/batch_send$"
- ),
- )
- CATEGORY = "Client API requests"
-
- def __init__(self, hs: "HomeServer"):
- super().__init__()
- self.store = hs.get_datastores().main
- self.event_creation_handler = hs.get_event_creation_handler()
- self.auth = hs.get_auth()
- self.room_batch_handler = hs.get_room_batch_handler()
-
- async def on_POST(
- self, request: SynapseRequest, room_id: str
- ) -> Tuple[int, JsonDict]:
- requester = await self.auth.get_user_by_req(request, allow_guest=False)
-
- if not requester.app_service:
- raise AuthError(
- HTTPStatus.FORBIDDEN,
- "Only application services can use the /batchsend endpoint",
- )
-
- body = parse_json_object_from_request(request)
- assert_params_in_dict(body, ["state_events_at_start", "events"])
-
- assert request.args is not None
- prev_event_ids_from_query = parse_strings_from_args(
- request.args, "prev_event_id"
- )
- batch_id_from_query = parse_string(request, "batch_id")
-
- if prev_event_ids_from_query is None:
- raise SynapseError(
- HTTPStatus.BAD_REQUEST,
- "prev_event query parameter is required when inserting historical messages back in time",
- errcode=Codes.MISSING_PARAM,
- )
-
- if await self.store.is_partial_state_room(room_id):
- raise SynapseError(
- HTTPStatus.BAD_REQUEST,
- "Cannot insert history batches until we have fully joined the room",
- errcode=Codes.UNABLE_DUE_TO_PARTIAL_STATE,
- )
-
- # Verify the batch_id_from_query corresponds to an actual insertion event
- # and have the batch connected.
- if batch_id_from_query:
- corresponding_insertion_event_id = (
- await self.store.get_insertion_event_id_by_batch_id(
- room_id, batch_id_from_query
- )
- )
- if corresponding_insertion_event_id is None:
- raise SynapseError(
- HTTPStatus.BAD_REQUEST,
- "No insertion event corresponds to the given ?batch_id",
- errcode=Codes.INVALID_PARAM,
- )
-
- # Make sure that the prev_event_ids exist and aren't outliers - ie, they are
- # regular parts of the room DAG where we know the state.
- non_outlier_prev_events = await self.store.have_events_in_timeline(
- prev_event_ids_from_query
- )
- for prev_event_id in prev_event_ids_from_query:
- if prev_event_id not in non_outlier_prev_events:
- raise SynapseError(
- HTTPStatus.BAD_REQUEST,
- "prev_event %s does not exist, or is an outlier" % (prev_event_id,),
- errcode=Codes.INVALID_PARAM,
- )
-
- # For the event we are inserting next to (`prev_event_ids_from_query`),
- # find the most recent state events that allowed that message to be
- # sent. We will use that as a base to auth our historical messages
- # against.
- state_event_ids = await self.room_batch_handler.get_most_recent_full_state_ids_from_event_id_list(
- prev_event_ids_from_query
- )
-
- state_event_ids_at_start = []
- # Create and persist all of the state events that float off on their own
- # before the batch. These will most likely be all of the invite/member
- # state events used to auth the upcoming historical messages.
- if body["state_events_at_start"]:
- state_event_ids_at_start = (
- await self.room_batch_handler.persist_state_events_at_start(
- state_events_at_start=body["state_events_at_start"],
- room_id=room_id,
- initial_state_event_ids=state_event_ids,
- app_service_requester=requester,
- )
- )
- # Update our ongoing auth event ID list with all of the new state we
- # just created
- state_event_ids.extend(state_event_ids_at_start)
-
- inherited_depth = await self.room_batch_handler.inherit_depth_from_prev_ids(
- prev_event_ids_from_query
- )
-
- events_to_create = body["events"]
-
- # Figure out which batch to connect to. If they passed in
- # batch_id_from_query let's use it. The batch ID passed in comes
- # from the batch_id in the "insertion" event from the previous batch.
- last_event_in_batch = events_to_create[-1]
- base_insertion_event = None
- if batch_id_from_query:
- batch_id_to_connect_to = batch_id_from_query
- # Otherwise, create an insertion event to act as a starting point.
- #
- # We don't always have an insertion event to start hanging more history
- # off of (ideally there would be one in the main DAG, but that's not the
- # case if we're wanting to add history to e.g. existing rooms without
- # an insertion event), in which case we just create a new insertion event
- # that can then get pointed to by a "marker" event later.
- else:
- base_insertion_event_dict = (
- self.room_batch_handler.create_insertion_event_dict(
- sender=requester.user.to_string(),
- room_id=room_id,
- origin_server_ts=last_event_in_batch["origin_server_ts"],
- )
- )
- base_insertion_event_dict["prev_events"] = prev_event_ids_from_query.copy()
-
- (
- base_insertion_event,
- _,
- ) = await self.event_creation_handler.create_and_send_nonmember_event(
- await self.room_batch_handler.create_requester_for_user_id_from_app_service(
- base_insertion_event_dict["sender"],
- requester.app_service,
- ),
- base_insertion_event_dict,
- prev_event_ids=base_insertion_event_dict.get("prev_events"),
- # Also set the explicit state here because we want to resolve
- # any `state_events_at_start` here too. It's not strictly
- # necessary to accomplish anything but if someone asks for the
- # state at this point, we probably want to show them the
- # historical state that was part of this batch.
- state_event_ids=state_event_ids,
- historical=True,
- depth=inherited_depth,
- )
-
- batch_id_to_connect_to = base_insertion_event.content[
- EventContentFields.MSC2716_NEXT_BATCH_ID
- ]
-
- # Create and persist all of the historical events as well as insertion
- # and batch meta events to make the batch navigable in the DAG.
- event_ids, next_batch_id = await self.room_batch_handler.handle_batch_of_events(
- events_to_create=events_to_create,
- room_id=room_id,
- batch_id_to_connect_to=batch_id_to_connect_to,
- inherited_depth=inherited_depth,
- initial_state_event_ids=state_event_ids,
- app_service_requester=requester,
- )
-
- insertion_event_id = event_ids[0]
- batch_event_id = event_ids[-1]
- historical_event_ids = event_ids[1:-1]
-
- response_dict = {
- "state_event_ids": state_event_ids_at_start,
- "event_ids": historical_event_ids,
- "next_batch_id": next_batch_id,
- "insertion_event_id": insertion_event_id,
- "batch_event_id": batch_event_id,
- }
- if base_insertion_event is not None:
- response_dict["base_insertion_event_id"] = base_insertion_event.event_id
-
- return HTTPStatus.OK, response_dict
-
-
-def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
- msc2716_enabled = hs.config.experimental.msc2716_enabled
-
- if msc2716_enabled:
- RoomBatchSendEventRestServlet(hs).register(http_server)
diff --git a/synapse/rest/client/versions.py b/synapse/rest/client/versions.py
index 1910648755..95400ba570 100644
--- a/synapse/rest/client/versions.py
+++ b/synapse/rest/client/versions.py
@@ -102,8 +102,6 @@ class VersionsRestServlet(RestServlet):
"org.matrix.msc2285.stable": True, # TODO: Remove when MSC2285 becomes a part of the spec
# Supports filtering of /publicRooms by room type as per MSC3827
"org.matrix.msc3827.stable": True,
- # Adds support for importing historical messages as per MSC2716
- "org.matrix.msc2716": self.config.experimental.msc2716_enabled,
# Adds support for thread relations, per MSC3440.
"org.matrix.msc3440.stable": True, # TODO: remove when "v1.3" is added above
# Support for thread read receipts & notification counts.
diff --git a/synapse/server.py b/synapse/server.py
index 0f36ef69cb..b72b76a38b 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -91,7 +91,6 @@ from synapse.handlers.room import (
RoomShutdownHandler,
TimestampLookupHandler,
)
-from synapse.handlers.room_batch import RoomBatchHandler
from synapse.handlers.room_list import RoomListHandler
from synapse.handlers.room_member import (
RoomForgetterHandler,
@@ -493,10 +492,6 @@ class HomeServer(metaclass=abc.ABCMeta):
return RoomCreationHandler(self)
@cache_in_self
- def get_room_batch_handler(self) -> RoomBatchHandler:
- return RoomBatchHandler(self)
-
- @cache_in_self
def get_room_shutdown_handler(self) -> RoomShutdownHandler:
return RoomShutdownHandler(self)
diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py
index f1d2c71c91..35c0680365 100644
--- a/synapse/storage/controllers/persist_events.py
+++ b/synapse/storage/controllers/persist_events.py
@@ -839,9 +839,8 @@ class EventsPersistenceStorageController:
"group" % (ev.event_id,)
)
continue
-
- if ctx.prev_group:
- state_group_deltas[(ctx.prev_group, ctx.state_group)] = ctx.delta_ids
+ if ctx.state_group_deltas:
+ state_group_deltas.update(ctx.state_group_deltas)
# We need to map the event_ids to their state groups. First, let's
# check if the event is one we're persisting, in which case we can
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index 10fa6c4802..7e49ae11bc 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -1529,7 +1529,7 @@ class DatabasePool:
# Lock the table just once, to prevent it being done once per row.
# Note that, according to Postgres' documentation, once obtained,
# the lock is held for the remainder of the current transaction.
- self.engine.lock_table(txn, "user_ips")
+ self.engine.lock_table(txn, table)
for keyv, valv in zip(key_values, value_values):
_keys = dict(zip(key_names, keyv))
diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py
index 0032a92f49..3a10c265c9 100644
--- a/synapse/storage/databases/main/__init__.py
+++ b/synapse/storage/databases/main/__init__.py
@@ -61,7 +61,6 @@ from .registration import RegistrationStore
from .rejections import RejectionsStore
from .relations import RelationsStore
from .room import RoomStore
-from .room_batch import RoomBatchStore
from .roommember import RoomMemberStore
from .search import SearchStore
from .session import SessionStore
@@ -87,7 +86,6 @@ class DataStore(
DeviceStore,
RoomMemberStore,
RoomStore,
- RoomBatchStore,
RegistrationStore,
ProfileStore,
PresenceStore,
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index 2681917d0b..8b6e3c1dc7 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -31,7 +31,7 @@ from typing import (
import attr
from prometheus_client import Counter, Gauge
-from synapse.api.constants import MAX_DEPTH, EventTypes
+from synapse.api.constants import MAX_DEPTH
from synapse.api.errors import StoreError
from synapse.api.room_versions import EventFormatVersions, RoomVersion
from synapse.events import EventBase, make_event_from_dict
@@ -891,124 +891,6 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
room_id,
)
- @trace
- async def get_insertion_event_backward_extremities_in_room(
- self,
- room_id: str,
- current_depth: int,
- limit: int,
- ) -> List[Tuple[str, int]]:
- """
- Get the insertion events we know about that we haven't backfilled yet
- along with the approximate depth. Only returns insertion events that are
- at a depth lower than or equal to the `current_depth`. Sorted by depth,
- highest to lowest (descending) so the closest events to the
- `current_depth` are first in the list.
-
- We ignore insertion events that are newer than the user's current scroll
- position (ie, those with depth greater than `current_depth`) as:
- 1. we don't really care about getting events that have happened
- after our current position; and
- 2. by the nature of paginating and scrolling back, we have likely
- previously tried and failed to backfill from that insertion event, so
- to avoid getting "stuck" requesting the same backfill repeatedly
- we drop those insertion event.
-
- Args:
- room_id: Room where we want to find the oldest events
- current_depth: The depth at the user's current scrollback position
- limit: The max number of insertion event extremities to return
-
- Returns:
- List of (event_id, depth) tuples. Sorted by depth, highest to lowest
- (descending) so the closest events to the `current_depth` are first
- in the list.
- """
-
- def get_insertion_event_backward_extremities_in_room_txn(
- txn: LoggingTransaction, room_id: str
- ) -> List[Tuple[str, int]]:
- if isinstance(self.database_engine, PostgresEngine):
- least_function = "LEAST"
- elif isinstance(self.database_engine, Sqlite3Engine):
- least_function = "MIN"
- else:
- raise RuntimeError("Unknown database engine")
-
- sql = f"""
- SELECT
- insertion_event_extremity.event_id, event.depth
- /* We only want insertion events that are also marked as backwards extremities */
- FROM insertion_event_extremities AS insertion_event_extremity
- /* Get the depth of the insertion event from the events table */
- INNER JOIN events AS event USING (event_id)
- /**
- * We use this info to make sure we don't retry to use a backfill point
- * if we've already attempted to backfill from it recently.
- */
- LEFT JOIN event_failed_pull_attempts AS failed_backfill_attempt_info
- ON
- failed_backfill_attempt_info.room_id = insertion_event_extremity.room_id
- AND failed_backfill_attempt_info.event_id = insertion_event_extremity.event_id
- WHERE
- insertion_event_extremity.room_id = ?
- /**
- * We only want extremities that are older than or at
- * the same position of the given `current_depth` (where older
- * means less than the given depth) because we're looking backwards
- * from the `current_depth` when backfilling.
- *
- * current_depth (ignore events that come after this, ignore 2-4)
- * |
- * ▼
- * <oldest-in-time> [0]<--[1]<--[2]<--[3]<--[4] <newest-in-time>
- */
- AND event.depth <= ? /* current_depth */
- /**
- * Exponential back-off (up to the upper bound) so we don't retry the
- * same backfill point over and over. ex. 2hr, 4hr, 8hr, 16hr, etc
- *
- * We use `1 << n` as a power of 2 equivalent for compatibility
- * with older SQLites. The left shift equivalent only works with
- * powers of 2 because left shift is a binary operation (base-2).
- * Otherwise, we would use `power(2, n)` or the power operator, `2^n`.
- */
- AND (
- failed_backfill_attempt_info.event_id IS NULL
- OR ? /* current_time */ >= failed_backfill_attempt_info.last_attempt_ts + (
- (1 << {least_function}(failed_backfill_attempt_info.num_attempts, ? /* max doubling steps */))
- * ? /* step */
- )
- )
- /**
- * Sort from highest (closest to the `current_depth`) to the lowest depth
- * because the closest are most relevant to backfill from first.
- * Then tie-break on alphabetical order of the event_ids so we get a
- * consistent ordering which is nice when asserting things in tests.
- */
- ORDER BY event.depth DESC, insertion_event_extremity.event_id DESC
- LIMIT ?
- """
-
- txn.execute(
- sql,
- (
- room_id,
- current_depth,
- self._clock.time_msec(),
- BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS,
- BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_MILLISECONDS,
- limit,
- ),
- )
- return cast(List[Tuple[str, int]], txn.fetchall())
-
- return await self.db_pool.runInteraction(
- "get_insertion_event_backward_extremities_in_room",
- get_insertion_event_backward_extremities_in_room_txn,
- room_id,
- )
-
async def get_max_depth_of(
self, event_ids: Collection[str]
) -> Tuple[Optional[str], int]:
@@ -1280,50 +1162,6 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
return event_ids
- def _get_connected_batch_event_backfill_results_txn(
- self, txn: LoggingTransaction, insertion_event_id: str, limit: int
- ) -> List[BackfillQueueNavigationItem]:
- """
- Find any batch connections of a given insertion event.
- A batch event points at a insertion event via:
- batch_event.content[MSC2716_BATCH_ID] -> insertion_event.content[MSC2716_NEXT_BATCH_ID]
-
- Args:
- txn: The database transaction to use
- insertion_event_id: The event ID to navigate from. We will find
- batch events that point back at this insertion event.
- limit: Max number of event ID's to query for and return
-
- Returns:
- List of batch events that the backfill queue can process
- """
- batch_connection_query = """
- SELECT e.depth, e.stream_ordering, c.event_id, e.type FROM insertion_events AS i
- /* Find the batch that connects to the given insertion event */
- INNER JOIN batch_events AS c
- ON i.next_batch_id = c.batch_id
- /* Get the depth of the batch start event from the events table */
- INNER JOIN events AS e ON c.event_id = e.event_id
- /* Find an insertion event which matches the given event_id */
- WHERE i.event_id = ?
- LIMIT ?
- """
-
- # Find any batch connections for the given insertion event
- txn.execute(
- batch_connection_query,
- (insertion_event_id, limit),
- )
- return [
- BackfillQueueNavigationItem(
- depth=row[0],
- stream_ordering=row[1],
- event_id=row[2],
- type=row[3],
- )
- for row in txn
- ]
-
def _get_connected_prev_event_backfill_results_txn(
self, txn: LoggingTransaction, event_id: str, limit: int
) -> List[BackfillQueueNavigationItem]:
@@ -1472,40 +1310,6 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
event_id_results.add(event_id)
- # Try and find any potential historical batches of message history.
- if self.hs.config.experimental.msc2716_enabled:
- # We need to go and try to find any batch events connected
- # to a given insertion event (by batch_id). If we find any, we'll
- # add them to the queue and navigate up the DAG like normal in the
- # next iteration of the loop.
- if event_type == EventTypes.MSC2716_INSERTION:
- # Find any batch connections for the given insertion event
- connected_batch_event_backfill_results = (
- self._get_connected_batch_event_backfill_results_txn(
- txn, event_id, limit - len(event_id_results)
- )
- )
- logger.debug(
- "_get_backfill_events(room_id=%s): connected_batch_event_backfill_results=%s",
- room_id,
- connected_batch_event_backfill_results,
- )
- for (
- connected_batch_event_backfill_item
- ) in connected_batch_event_backfill_results:
- if (
- connected_batch_event_backfill_item.event_id
- not in event_id_results
- ):
- queue.put(
- (
- -connected_batch_event_backfill_item.depth,
- -connected_batch_event_backfill_item.stream_ordering,
- connected_batch_event_backfill_item.event_id,
- connected_batch_event_backfill_item.type,
- )
- )
-
# Now we just look up the DAG by prev_events as normal
connected_prev_event_backfill_results = (
self._get_connected_prev_event_backfill_results_txn(
@@ -1748,19 +1552,6 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
_delete_old_forward_extrem_cache_txn,
)
- @trace
- async def insert_insertion_extremity(self, event_id: str, room_id: str) -> None:
- await self.db_pool.simple_upsert(
- table="insertion_event_extremities",
- keyvalues={"event_id": event_id},
- values={
- "event_id": event_id,
- "room_id": room_id,
- },
- insertion_values={},
- desc="insert_insertion_extremity",
- )
-
async def insert_received_event_to_staging(
self, origin: str, event: EventBase
) -> None:
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index e2e6eb479f..5c9db7554e 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -1664,9 +1664,6 @@ class PersistEventsStore:
self._handle_event_relations(txn, event)
- self._handle_insertion_event(txn, event)
- self._handle_batch_event(txn, event)
-
# Store the labels for this event.
labels = event.content.get(EventContentFields.LABELS)
if labels:
@@ -1729,13 +1726,22 @@ class PersistEventsStore:
if not row["rejects"] and not row["redacts"]:
to_prefill.append(EventCacheEntry(event=event, redacted_event=None))
- async def prefill() -> None:
+ async def external_prefill() -> None:
+ for cache_entry in to_prefill:
+ await self.store._get_event_cache.set_external(
+ (cache_entry.event.event_id,), cache_entry
+ )
+
+ def local_prefill() -> None:
for cache_entry in to_prefill:
- await self.store._get_event_cache.set(
+ self.store._get_event_cache.set_local(
(cache_entry.event.event_id,), cache_entry
)
- txn.async_call_after(prefill)
+ # The order these are called here is not as important as knowing that after the
+ # transaction is finished, the async_call_after will run before the call_after.
+ txn.async_call_after(external_prefill)
+ txn.call_after(local_prefill)
def _store_redaction(self, txn: LoggingTransaction, event: EventBase) -> None:
assert event.redacts is not None
@@ -1918,128 +1924,6 @@ class PersistEventsStore:
),
)
- def _handle_insertion_event(
- self, txn: LoggingTransaction, event: EventBase
- ) -> None:
- """Handles keeping track of insertion events and edges/connections.
- Part of MSC2716.
-
- Args:
- txn: The database transaction object
- event: The event to process
- """
-
- if event.type != EventTypes.MSC2716_INSERTION:
- # Not a insertion event
- return
-
- # Skip processing an insertion event if the room version doesn't
- # support it or the event is not from the room creator.
- room_version = self.store.get_room_version_txn(txn, event.room_id)
- room_creator = self.db_pool.simple_select_one_onecol_txn(
- txn,
- table="rooms",
- keyvalues={"room_id": event.room_id},
- retcol="creator",
- allow_none=True,
- )
- if not room_version.msc2716_historical and (
- not self.hs.config.experimental.msc2716_enabled
- or event.sender != room_creator
- ):
- return
-
- next_batch_id = event.content.get(EventContentFields.MSC2716_NEXT_BATCH_ID)
- if next_batch_id is None:
- # Invalid insertion event without next batch ID
- return
-
- logger.debug(
- "_handle_insertion_event (next_batch_id=%s) %s", next_batch_id, event
- )
-
- # Keep track of the insertion event and the batch ID
- self.db_pool.simple_insert_txn(
- txn,
- table="insertion_events",
- values={
- "event_id": event.event_id,
- "room_id": event.room_id,
- "next_batch_id": next_batch_id,
- },
- )
-
- # Insert an edge for every prev_event connection
- for prev_event_id in event.prev_event_ids():
- self.db_pool.simple_insert_txn(
- txn,
- table="insertion_event_edges",
- values={
- "event_id": event.event_id,
- "room_id": event.room_id,
- "insertion_prev_event_id": prev_event_id,
- },
- )
-
- def _handle_batch_event(self, txn: LoggingTransaction, event: EventBase) -> None:
- """Handles inserting the batch edges/connections between the batch event
- and an insertion event. Part of MSC2716.
-
- Args:
- txn: The database transaction object
- event: The event to process
- """
-
- if event.type != EventTypes.MSC2716_BATCH:
- # Not a batch event
- return
-
- # Skip processing a batch event if the room version doesn't
- # support it or the event is not from the room creator.
- room_version = self.store.get_room_version_txn(txn, event.room_id)
- room_creator = self.db_pool.simple_select_one_onecol_txn(
- txn,
- table="rooms",
- keyvalues={"room_id": event.room_id},
- retcol="creator",
- allow_none=True,
- )
- if not room_version.msc2716_historical and (
- not self.hs.config.experimental.msc2716_enabled
- or event.sender != room_creator
- ):
- return
-
- batch_id = event.content.get(EventContentFields.MSC2716_BATCH_ID)
- if batch_id is None:
- # Invalid batch event without a batch ID
- return
-
- logger.debug("_handle_batch_event batch_id=%s %s", batch_id, event)
-
- # Keep track of the insertion event and the batch ID
- self.db_pool.simple_insert_txn(
- txn,
- table="batch_events",
- values={
- "event_id": event.event_id,
- "room_id": event.room_id,
- "batch_id": batch_id,
- },
- )
-
- # When we receive an event with a `batch_id` referencing the
- # `next_batch_id` of the insertion event, we can remove it from the
- # `insertion_event_extremities` table.
- sql = """
- DELETE FROM insertion_event_extremities WHERE event_id IN (
- SELECT event_id FROM insertion_events
- WHERE next_batch_id = ?
- )
- """
-
- txn.execute(sql, (batch_id,))
-
def _handle_redact_relations(
self, txn: LoggingTransaction, room_id: str, redacted_event_id: str
) -> None:
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index d93ffc4efa..7e7648c951 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -883,7 +883,7 @@ class EventsWorkerStore(SQLBaseStore):
async def _invalidate_async_get_event_cache(self, event_id: str) -> None:
"""
- Invalidates an event in the asyncronous get event cache, which may be remote.
+ Invalidates an event in the asynchronous get event cache, which may be remote.
Arguments:
event_id: the event ID to invalidate
diff --git a/synapse/storage/databases/main/room_batch.py b/synapse/storage/databases/main/room_batch.py
deleted file mode 100644
index 131f357d04..0000000000
--- a/synapse/storage/databases/main/room_batch.py
+++ /dev/null
@@ -1,47 +0,0 @@
-# Copyright 2021 The Matrix.org Foundation C.I.C.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from typing import Optional
-
-from synapse.storage._base import SQLBaseStore
-
-
-class RoomBatchStore(SQLBaseStore):
- async def get_insertion_event_id_by_batch_id(
- self, room_id: str, batch_id: str
- ) -> Optional[str]:
- """Retrieve a insertion event ID.
-
- Args:
- batch_id: The batch ID of the insertion event to retrieve.
-
- Returns:
- The event_id of an insertion event, or None if there is no known
- insertion event for the given insertion event.
- """
- return await self.db_pool.simple_select_one_onecol(
- table="insertion_events",
- keyvalues={"room_id": room_id, "next_batch_id": batch_id},
- retcol="event_id",
- allow_none=True,
- )
-
- async def store_state_group_id_for_event_id(
- self, event_id: str, state_group_id: int
- ) -> None:
- await self.db_pool.simple_upsert(
- table="event_to_state_groups",
- keyvalues={"event_id": event_id},
- values={"state_group": state_group_id, "event_id": event_id},
- )
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index 7ea0c4c36b..9f3b8741c1 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -116,6 +116,11 @@ class Clock:
Waits `msec` initially before calling `f` for the first time.
+ If the function given to `looping_call` returns an awaitable/deferred, the next
+ call isn't scheduled until after the returned awaitable has finished. We get
+ this functionality thanks to this function being a thin wrapper around
+ `twisted.internet.task.LoopingCall`.
+
Note that the function will be called with no logcontext, so if it is anything
other than trivial, you probably want to wrap it in run_as_background_process.
diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py
index 6137c85e10..be6554319a 100644
--- a/synapse/util/caches/lrucache.py
+++ b/synapse/util/caches/lrucache.py
@@ -842,7 +842,13 @@ class AsyncLruCache(Generic[KT, VT]):
return self._lru_cache.get(key, update_metrics=update_metrics)
async def set(self, key: KT, value: VT) -> None:
- self._lru_cache.set(key, value)
+ # This will add the entries in the correct order, local first external second
+ self.set_local(key, value)
+ await self.set_external(key, value)
+
+ async def set_external(self, key: KT, value: VT) -> None:
+ # This method should add an entry to any configured external cache, in this case noop.
+ pass
def set_local(self, key: KT, value: VT) -> None:
self._lru_cache.set(key, value)
|