diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 2e964ed37e..ac1932a7f9 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -561,6 +561,8 @@ class EventCreationHandler:
expiry_ms=30 * 60 * 1000,
)
+ self._msc3970_enabled = hs.config.experimental.msc3970_enabled
+
async def create_event(
self,
requester: Requester,
@@ -701,9 +703,16 @@ class EventCreationHandler:
if require_consent and not is_exempt:
await self.assert_accepted_privacy_policy(requester)
+ # Save the access token ID, the device ID and the transaction ID in the event
+ # internal metadata. This is useful to determine if we should echo the
+ # transaction_id in events.
+ # See `synapse.events.utils.EventClientSerializer.serialize_event`
if requester.access_token_id is not None:
builder.internal_metadata.token_id = requester.access_token_id
+ if requester.device_id is not None:
+ builder.internal_metadata.device_id = requester.device_id
+
if txn_id is not None:
builder.internal_metadata.txn_id = txn_id
@@ -897,12 +906,31 @@ class EventCreationHandler:
Returns:
An event if one could be found, None otherwise.
"""
+
+ if self._msc3970_enabled and requester.device_id:
+ # When MSC3970 is enabled, we lookup for events sent by the same device first,
+ # and fallback to the old behaviour if none were found.
+ existing_event_id = (
+ await self.store.get_event_id_from_transaction_id_and_device_id(
+ room_id,
+ requester.user.to_string(),
+ requester.device_id,
+ txn_id,
+ )
+ )
+ if existing_event_id:
+ return await self.store.get_event(existing_event_id)
+
+ # Pre-MSC3970, we looked up for events that were sent by the same session by
+ # using the access token ID.
if requester.access_token_id:
- existing_event_id = await self.store.get_event_id_from_transaction_id(
- room_id,
- requester.user.to_string(),
- requester.access_token_id,
- txn_id,
+ existing_event_id = (
+ await self.store.get_event_id_from_transaction_id_and_token_id(
+ room_id,
+ requester.user.to_string(),
+ requester.access_token_id,
+ txn_id,
+ )
)
if existing_event_id:
return await self.store.get_event(existing_event_id)
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index ec317e6023..ed805d6ec8 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -169,6 +169,8 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
self.request_ratelimiter = hs.get_request_ratelimiter()
hs.get_notifier().add_new_join_in_room_callback(self._on_user_joined_room)
+ self._msc3970_enabled = hs.config.experimental.msc3970_enabled
+
def _on_user_joined_room(self, event_id: str, room_id: str) -> None:
"""Notify the rate limiter that a room join has occurred.
@@ -399,13 +401,30 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
# Check if we already have an event with a matching transaction ID. (We
# do this check just before we persist an event as well, but may as well
# do it up front for efficiency.)
- if txn_id and requester.access_token_id:
- existing_event_id = await self.store.get_event_id_from_transaction_id(
- room_id,
- requester.user.to_string(),
- requester.access_token_id,
- txn_id,
- )
+ if txn_id:
+ existing_event_id = None
+ if self._msc3970_enabled and requester.device_id:
+ # When MSC3970 is enabled, we lookup for events sent by the same device
+ # first, and fallback to the old behaviour if none were found.
+ existing_event_id = (
+ await self.store.get_event_id_from_transaction_id_and_device_id(
+ room_id,
+ requester.user.to_string(),
+ requester.device_id,
+ txn_id,
+ )
+ )
+
+ if requester.access_token_id and not existing_event_id:
+ existing_event_id = (
+ await self.store.get_event_id_from_transaction_id_and_token_id(
+ room_id,
+ requester.user.to_string(),
+ requester.access_token_id,
+ txn_id,
+ )
+ )
+
if existing_event_id:
event_pos = await self.store.get_position_for_event(existing_event_id)
return existing_event_id, event_pos.stream
|