diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 0b61c2272b..d0d4626ed6 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -53,6 +53,7 @@ from synapse.events.snapshot import EventContext, UnpersistedEventContextBase
from synapse.events.utils import SerializeEventConfig, maybe_upsert_event_field
from synapse.events.validator import EventValidator
from synapse.handlers.directory import DirectoryHandler
+from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME
from synapse.logging import opentracing
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
@@ -60,7 +61,6 @@ from synapse.replication.http.send_event import ReplicationSendEventRestServlet
from synapse.replication.http.send_events import ReplicationSendEventsRestServlet
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.types import (
- MutableStateMap,
PersistedEventPosition,
Requester,
RoomAlias,
@@ -379,7 +379,7 @@ class MessageHandler:
"""
expiry_ts = event.content.get(EventContentFields.SELF_DESTRUCT_AFTER)
- if type(expiry_ts) is not int or event.is_state():
+ if type(expiry_ts) is not int or event.is_state(): # noqa: E721
return
# _schedule_expiry_for_event won't actually schedule anything if there's already
@@ -486,6 +486,7 @@ class EventCreationHandler:
self._events_shard_config = self.config.worker.events_shard_config
self._instance_name = hs.get_instance_name()
self._notifier = hs.get_notifier()
+ self._worker_lock_handler = hs.get_worker_locks_handler()
self.room_prejoin_state_types = self.hs.config.api.room_prejoin_state
@@ -560,8 +561,6 @@ class EventCreationHandler:
expiry_ms=30 * 60 * 1000,
)
- self._msc3970_enabled = hs.config.experimental.msc3970_enabled
-
async def create_event(
self,
requester: Requester,
@@ -573,7 +572,6 @@ class EventCreationHandler:
state_event_ids: Optional[List[str]] = None,
require_consent: bool = True,
outlier: bool = False,
- historical: bool = False,
depth: Optional[int] = None,
state_map: Optional[StateMap[str]] = None,
for_batch: bool = False,
@@ -599,7 +597,7 @@ class EventCreationHandler:
allow_no_prev_events: Whether to allow this event to be created an empty
list of prev_events. Normally this is prohibited just because most
events should have a prev_event and we should only use this in special
- cases like MSC2716.
+ cases (previously useful for MSC2716).
prev_event_ids:
the forward extremities to use as the prev_events for the
new event.
@@ -614,13 +612,10 @@ class EventCreationHandler:
If non-None, prev_event_ids must also be provided.
state_event_ids:
- The full state at a given event. This is used particularly by the MSC2716
- /batch_send endpoint. One use case is with insertion events which float at
- the beginning of a historical batch and don't have any `prev_events` to
- derive from; we add all of these state events as the explicit state so the
- rest of the historical batch can inherit the same state and state_group.
- This should normally be left as None, which will cause the auth_event_ids
- to be calculated based on the room state at the prev_events.
+ The full state at a given event. This was previously used particularly
+ by the MSC2716 /batch_send endpoint. This should normally be left as
+ None, which will cause the auth_event_ids to be calculated based on the
+ room state at the prev_events.
require_consent: Whether to check if the requester has
consented to the privacy policy.
@@ -629,10 +624,6 @@ class EventCreationHandler:
it's from an arbitrary point and floating in the DAG as
opposed to being inline with the current DAG.
- historical: Indicates whether the message is being inserted
- back in time around some existing events. This is used to skip
- a few checks and mark the event as backfilled.
-
depth: Override the depth used to order the event in the DAG.
Should normally be set to None, which will cause the depth to be calculated
based on the prev_events.
@@ -702,13 +693,9 @@ class EventCreationHandler:
if require_consent and not is_exempt:
await self.assert_accepted_privacy_policy(requester)
- # Save the access token ID, the device ID and the transaction ID in the event
- # internal metadata. This is useful to determine if we should echo the
- # transaction_id in events.
+ # Save the the device ID and the transaction ID in the event internal metadata.
+ # This is useful to determine if we should echo the transaction_id in events.
# See `synapse.events.utils.EventClientSerializer.serialize_event`
- if requester.access_token_id is not None:
- builder.internal_metadata.token_id = requester.access_token_id
-
if requester.device_id is not None:
builder.internal_metadata.device_id = requester.device_id
@@ -717,8 +704,6 @@ class EventCreationHandler:
builder.internal_metadata.outlier = outlier
- builder.internal_metadata.historical = historical
-
event, unpersisted_context = await self.create_new_client_event(
builder=builder,
requester=requester,
@@ -749,7 +734,7 @@ class EventCreationHandler:
prev_event_id = state_map.get((EventTypes.Member, event.sender))
else:
prev_state_ids = await unpersisted_context.get_prev_state_ids(
- StateFilter.from_types([(EventTypes.Member, None)])
+ StateFilter.from_types([(EventTypes.Member, event.sender)])
)
prev_event_id = prev_state_ids.get((EventTypes.Member, event.sender))
prev_event = (
@@ -839,13 +824,13 @@ class EventCreationHandler:
u = await self.store.get_user_by_id(user_id)
assert u is not None
- if u["user_type"] in (UserTypes.SUPPORT, UserTypes.BOT):
+ if u.user_type in (UserTypes.SUPPORT, UserTypes.BOT):
# support and bot users are not required to consent
return
- if u["appservice_id"] is not None:
+ if u.appservice_id is not None:
# users registered by an appservice are exempt
return
- if u["consent_version"] == self.config.consent.user_consent_version:
+ if u.consent_version == self.config.consent.user_consent_version:
return
consent_uri = self._consent_uri_builder.build_user_consent_uri(user.localpart)
@@ -871,7 +856,7 @@ class EventCreationHandler:
return None
prev_state_ids = await context.get_prev_state_ids(
- StateFilter.from_types([(event.type, None)])
+ StateFilter.from_types([(event.type, event.state_key)])
)
prev_event_id = prev_state_ids.get((event.type, event.state_key))
if not prev_event_id:
@@ -887,14 +872,13 @@ class EventCreationHandler:
return prev_event
return None
- async def get_event_from_transaction(
+ async def get_event_id_from_transaction(
self,
requester: Requester,
txn_id: str,
room_id: str,
- ) -> Optional[EventBase]:
- """For the given transaction ID and room ID, check if there is a matching event.
- If so, fetch it and return it.
+ ) -> Optional[str]:
+ """For the given transaction ID and room ID, check if there is a matching event ID.
Args:
requester: The requester making the request in the context of which we want
@@ -903,12 +887,12 @@ class EventCreationHandler:
room_id: The room ID.
Returns:
- An event if one could be found, None otherwise.
+ An event ID if one could be found, None otherwise.
"""
+ existing_event_id = None
- if self._msc3970_enabled and requester.device_id:
- # When MSC3970 is enabled, we lookup for events sent by the same device first,
- # and fallback to the old behaviour if none were found.
+ # According to the spec, transactions are scoped to a user's device ID.
+ if requester.device_id:
existing_event_id = (
await self.store.get_event_id_from_transaction_id_and_device_id(
room_id,
@@ -918,22 +902,33 @@ class EventCreationHandler:
)
)
if existing_event_id:
- return await self.store.get_event(existing_event_id)
+ return existing_event_id
- # Pre-MSC3970, we looked up for events that were sent by the same session by
- # using the access token ID.
- if requester.access_token_id:
- existing_event_id = (
- await self.store.get_event_id_from_transaction_id_and_token_id(
- room_id,
- requester.user.to_string(),
- requester.access_token_id,
- txn_id,
- )
- )
- if existing_event_id:
- return await self.store.get_event(existing_event_id)
+ return existing_event_id
+
+ async def get_event_from_transaction(
+ self,
+ requester: Requester,
+ txn_id: str,
+ room_id: str,
+ ) -> Optional[EventBase]:
+ """For the given transaction ID and room ID, check if there is a matching event.
+ If so, fetch it and return it.
+
+ Args:
+ requester: The requester making the request in the context of which we want
+ to fetch the event.
+ txn_id: The transaction ID.
+ room_id: The room ID.
+ Returns:
+ An event if one could be found, None otherwise.
+ """
+ existing_event_id = await self.get_event_id_from_transaction(
+ requester, txn_id, room_id
+ )
+ if existing_event_id:
+ return await self.store.get_event(existing_event_id)
return None
async def create_and_send_nonmember_event(
@@ -947,7 +942,6 @@ class EventCreationHandler:
txn_id: Optional[str] = None,
ignore_shadow_ban: bool = False,
outlier: bool = False,
- historical: bool = False,
depth: Optional[int] = None,
) -> Tuple[EventBase, int]:
"""
@@ -961,19 +955,16 @@ class EventCreationHandler:
allow_no_prev_events: Whether to allow this event to be created an empty
list of prev_events. Normally this is prohibited just because most
events should have a prev_event and we should only use this in special
- cases like MSC2716.
+ cases (previously useful for MSC2716).
prev_event_ids:
The event IDs to use as the prev events.
Should normally be left as None to automatically request them
from the database.
state_event_ids:
- The full state at a given event. This is used particularly by the MSC2716
- /batch_send endpoint. One use case is with insertion events which float at
- the beginning of a historical batch and don't have any `prev_events` to
- derive from; we add all of these state events as the explicit state so the
- rest of the historical batch can inherit the same state and state_group.
- This should normally be left as None, which will cause the auth_event_ids
- to be calculated based on the room state at the prev_events.
+ The full state at a given event. This was previously used particularly
+ by the MSC2716 /batch_send endpoint. This should normally be left as
+ None, which will cause the auth_event_ids to be calculated based on the
+ room state at the prev_events.
ratelimit: Whether to rate limit this send.
txn_id: The transaction ID.
ignore_shadow_ban: True if shadow-banned users should be allowed to
@@ -981,9 +972,6 @@ class EventCreationHandler:
outlier: Indicates whether the event is an `outlier`, i.e. if
it's from an arbitrary point and floating in the DAG as
opposed to being inline with the current DAG.
- historical: Indicates whether the message is being inserted
- back in time around some existing events. This is used to skip
- a few checks and mark the event as backfilled.
depth: Override the depth used to order the event in the DAG.
Should normally be set to None, which will cause the depth to be calculated
based on the prev_events.
@@ -1028,6 +1016,37 @@ class EventCreationHandler:
event.internal_metadata.stream_ordering,
)
+ async with self._worker_lock_handler.acquire_read_write_lock(
+ NEW_EVENT_DURING_PURGE_LOCK_NAME, room_id, write=False
+ ):
+ return await self._create_and_send_nonmember_event_locked(
+ requester=requester,
+ event_dict=event_dict,
+ allow_no_prev_events=allow_no_prev_events,
+ prev_event_ids=prev_event_ids,
+ state_event_ids=state_event_ids,
+ ratelimit=ratelimit,
+ txn_id=txn_id,
+ ignore_shadow_ban=ignore_shadow_ban,
+ outlier=outlier,
+ depth=depth,
+ )
+
+ async def _create_and_send_nonmember_event_locked(
+ self,
+ requester: Requester,
+ event_dict: dict,
+ allow_no_prev_events: bool = False,
+ prev_event_ids: Optional[List[str]] = None,
+ state_event_ids: Optional[List[str]] = None,
+ ratelimit: bool = True,
+ txn_id: Optional[str] = None,
+ ignore_shadow_ban: bool = False,
+ outlier: bool = False,
+ depth: Optional[int] = None,
+ ) -> Tuple[EventBase, int]:
+ room_id = event_dict["room_id"]
+
# If we don't have any prev event IDs specified then we need to
# check that the host is in the room (as otherwise populating the
# prev events will fail), at which point we may as well check the
@@ -1053,7 +1072,6 @@ class EventCreationHandler:
prev_event_ids=prev_event_ids,
state_event_ids=state_event_ids,
outlier=outlier,
- historical=historical,
depth=depth,
)
context = await unpersisted_context.persist(event)
@@ -1145,7 +1163,7 @@ class EventCreationHandler:
allow_no_prev_events: Whether to allow this event to be created an empty
list of prev_events. Normally this is prohibited just because most
events should have a prev_event and we should only use this in special
- cases like MSC2716.
+ cases (previously useful for MSC2716).
prev_event_ids:
the forward extremities to use as the prev_events for the
new event.
@@ -1158,13 +1176,10 @@ class EventCreationHandler:
based on the room state at the prev_events.
state_event_ids:
- The full state at a given event. This is used particularly by the MSC2716
- /batch_send endpoint. One use case is with insertion events which float at
- the beginning of a historical batch and don't have any `prev_events` to
- derive from; we add all of these state events as the explicit state so the
- rest of the historical batch can inherit the same state and state_group.
- This should normally be left as None, which will cause the auth_event_ids
- to be calculated based on the room state at the prev_events.
+ The full state at a given event. This was previously used particularly
+ by the MSC2716 /batch_send endpoint. This should normally be left as
+ None, which will cause the auth_event_ids to be calculated based on the
+ room state at the prev_events.
depth: Override the depth used to order the event in the DAG.
Should normally be set to None, which will cause the depth to be calculated
@@ -1261,52 +1276,6 @@ class EventCreationHandler:
if builder.internal_metadata.outlier:
event.internal_metadata.outlier = True
context = EventContext.for_outlier(self._storage_controllers)
- elif (
- event.type == EventTypes.MSC2716_INSERTION
- and state_event_ids
- and builder.internal_metadata.is_historical()
- ):
- # Add explicit state to the insertion event so it has state to derive
- # from even though it's floating with no `prev_events`. The rest of
- # the batch can derive from this state and state_group.
- #
- # TODO(faster_joins): figure out how this works, and make sure that the
- # old state is complete.
- # https://github.com/matrix-org/synapse/issues/13003
- metadata = await self.store.get_metadata_for_events(state_event_ids)
-
- state_map_for_event: MutableStateMap[str] = {}
- for state_id in state_event_ids:
- data = metadata.get(state_id)
- if data is None:
- # We're trying to persist a new historical batch of events
- # with the given state, e.g. via
- # `RoomBatchSendEventRestServlet`. The state can be inferred
- # by Synapse or set directly by the client.
- #
- # Either way, we should have persisted all the state before
- # getting here.
- raise Exception(
- f"State event {state_id} not found in DB,"
- " Synapse should have persisted it before using it."
- )
-
- if data.state_key is None:
- raise Exception(
- f"Trying to set non-state event {state_id} as state"
- )
-
- state_map_for_event[(data.event_type, data.state_key)] = state_id
-
- # TODO(faster_joins): check how MSC2716 works and whether we can have
- # partial state here
- # https://github.com/matrix-org/synapse/issues/13003
- context = await self.state.calculate_context_info(
- event,
- state_ids_before_event=state_map_for_event,
- partial_state=False,
- )
-
else:
context = await self.state.calculate_context_info(event)
@@ -1488,23 +1457,23 @@ class EventCreationHandler:
# We now persist the event (and update the cache in parallel, since we
# don't want to block on it).
- event, context = events_and_context[0]
+ #
+ # Note: mypy gets confused if we inline dl and check with twisted#11770.
+ # Some kind of bug in mypy's deduction?
+ deferreds = (
+ run_in_background(
+ self._persist_events,
+ requester=requester,
+ events_and_context=events_and_context,
+ ratelimit=ratelimit,
+ extra_users=extra_users,
+ ),
+ run_in_background(
+ self.cache_joined_hosts_for_events, events_and_context
+ ).addErrback(log_failure, "cache_joined_hosts_for_event failed"),
+ )
result, _ = await make_deferred_yieldable(
- gather_results(
- (
- run_in_background(
- self._persist_events,
- requester=requester,
- events_and_context=events_and_context,
- ratelimit=ratelimit,
- extra_users=extra_users,
- ),
- run_in_background(
- self.cache_joined_hosts_for_events, events_and_context
- ).addErrback(log_failure, "cache_joined_hosts_for_event failed"),
- ),
- consumeErrors=True,
- )
+ gather_results(deferreds, consumeErrors=True)
).addErrback(unwrapFirstError)
return result
@@ -1633,12 +1602,11 @@ class EventCreationHandler:
if state_entry.state_group in self._external_cache_joined_hosts_updates:
return
- state = await state_entry.get_state(
- self._storage_controllers.state, StateFilter.all()
- )
with opentracing.start_active_span("get_joined_hosts"):
- joined_hosts = await self.store.get_joined_hosts(
- event.room_id, state, state_entry
+ joined_hosts = (
+ await self._storage_controllers.state.get_joined_hosts(
+ event.room_id, state_entry
+ )
)
# Note that the expiry times must be larger than the expiry time in
@@ -1758,6 +1726,11 @@ class EventCreationHandler:
event.event_id, event.room_id
)
+ if event.type == EventTypes.ServerACL:
+ self._storage_controllers.state.get_server_acl_for_room.invalidate(
+ (event.room_id,)
+ )
+
await self._maybe_kick_guest_users(event, context)
if event.type == EventTypes.CanonicalAlias:
@@ -1876,28 +1849,6 @@ class EventCreationHandler:
403, "Redacting server ACL events is not permitted"
)
- # Add a little safety stop-gap to prevent people from trying to
- # redact MSC2716 related events when they're in a room version
- # which does not support it yet. We allow people to use MSC2716
- # events in existing room versions but only from the room
- # creator since it does not require any changes to the auth
- # rules and in effect, the redaction algorithm . In the
- # supported room version, we add the `historical` power level to
- # auth the MSC2716 related events and adjust the redaction
- # algorthim to keep the `historical` field around (redacting an
- # event should only strip fields which don't affect the
- # structural protocol level).
- is_msc2716_event = (
- original_event.type == EventTypes.MSC2716_INSERTION
- or original_event.type == EventTypes.MSC2716_BATCH
- or original_event.type == EventTypes.MSC2716_MARKER
- )
- if not room_version_obj.msc2716_historical and is_msc2716_event:
- raise AuthError(
- 403,
- "Redacting MSC2716 events is not supported in this room version",
- )
-
event_types = event_auth.auth_types_for_event(event.room_version, event)
prev_state_ids = await context.get_prev_state_ids(
StateFilter.from_types(event_types)
@@ -1935,58 +1886,12 @@ class EventCreationHandler:
if prev_state_ids:
raise AuthError(403, "Changing the room create event is forbidden")
- if event.type == EventTypes.MSC2716_INSERTION:
- room_version = await self.store.get_room_version_id(event.room_id)
- room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
-
- create_event = await self.store.get_create_event_for_room(event.room_id)
- if not room_version_obj.msc2175_implicit_room_creator:
- room_creator = create_event.content.get(
- EventContentFields.ROOM_CREATOR
- )
- else:
- room_creator = create_event.sender
-
- # Only check an insertion event if the room version
- # supports it or the event is from the room creator.
- if room_version_obj.msc2716_historical or (
- self.config.experimental.msc2716_enabled
- and event.sender == room_creator
- ):
- next_batch_id = event.content.get(
- EventContentFields.MSC2716_NEXT_BATCH_ID
- )
- conflicting_insertion_event_id = None
- if next_batch_id:
- conflicting_insertion_event_id = (
- await self.store.get_insertion_event_id_by_batch_id(
- event.room_id, next_batch_id
- )
- )
- if conflicting_insertion_event_id is not None:
- # The current insertion event that we're processing is invalid
- # because an insertion event already exists in the room with the
- # same next_batch_id. We can't allow multiple because the batch
- # pointing will get weird, e.g. we can't determine which insertion
- # event the batch event is pointing to.
- raise SynapseError(
- HTTPStatus.BAD_REQUEST,
- "Another insertion event already exists with the same next_batch_id",
- errcode=Codes.INVALID_PARAM,
- )
-
- # Mark any `m.historical` messages as backfilled so they don't appear
- # in `/sync` and have the proper decrementing `stream_ordering` as we import
- backfilled = False
- if event.internal_metadata.is_historical():
- backfilled = True
-
assert self._storage_controllers.persistence is not None
(
persisted_events,
max_stream_token,
) = await self._storage_controllers.persistence.persist_events(
- events_and_context, backfilled=backfilled
+ events_and_context,
)
events_and_pos = []
@@ -2004,7 +1909,10 @@ class EventCreationHandler:
# We don't want to block sending messages on any presence code. This
# matters as sometimes presence code can take a while.
run_as_background_process(
- "bump_presence_active_time", self._bump_active_time, requester.user
+ "bump_presence_active_time",
+ self._bump_active_time,
+ requester.user,
+ requester.device_id,
)
async def _notify() -> None:
@@ -2041,10 +1949,10 @@ class EventCreationHandler:
logger.info("maybe_kick_guest_users %r", current_state)
await self.hs.get_room_member_handler().kick_guest_users(current_state)
- async def _bump_active_time(self, user: UserID) -> None:
+ async def _bump_active_time(self, user: UserID, device_id: Optional[str]) -> None:
try:
presence = self.hs.get_presence_handler()
- await presence.bump_presence_active_time(user)
+ await presence.bump_presence_active_time(user, device_id)
except Exception:
logger.exception("Error bumping presence active time")
@@ -2060,7 +1968,10 @@ class EventCreationHandler:
)
for room_id in room_ids:
- dummy_event_sent = await self._send_dummy_event_for_room(room_id)
+ async with self._worker_lock_handler.acquire_read_write_lock(
+ NEW_EVENT_DURING_PURGE_LOCK_NAME, room_id, write=False
+ ):
+ dummy_event_sent = await self._send_dummy_event_for_room(room_id)
if not dummy_event_sent:
# Did not find a valid user in the room, so remove from future attempts
|