diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index cf8017b0db..fcfdaa074f 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -872,6 +872,36 @@ class EventCreationHandler:
return prev_event
return None
+ async def get_event_from_transaction(
+ self,
+ requester: Requester,
+ txn_id: str,
+ room_id: str,
+ ) -> Optional[EventBase]:
+ """For the given transaction ID and room ID, check if there is a matching event.
+ If so, fetch it and return it.
+
+ Args:
+ requester: The requester making the request in the context of which we want
+ to fetch the event.
+ txn_id: The transaction ID.
+ room_id: The room ID.
+
+ Returns:
+ An event if one could be found, None otherwise.
+ """
+ if requester.access_token_id:
+ existing_event_id = await self.store.get_event_id_from_transaction_id(
+ room_id,
+ requester.user.to_string(),
+ requester.access_token_id,
+ txn_id,
+ )
+ if existing_event_id:
+ return await self.store.get_event(existing_event_id)
+
+ return None
+
async def create_and_send_nonmember_event(
self,
requester: Requester,
@@ -951,18 +981,17 @@ class EventCreationHandler:
# extremities to pile up, which in turn leads to state resolution
# taking longer.
async with self.limiter.queue(event_dict["room_id"]):
- if txn_id and requester.access_token_id:
- existing_event_id = await self.store.get_event_id_from_transaction_id(
- event_dict["room_id"],
- requester.user.to_string(),
- requester.access_token_id,
- txn_id,
+ if txn_id:
+ event = await self.get_event_from_transaction(
+ requester, txn_id, event_dict["room_id"]
)
- if existing_event_id:
- event = await self.store.get_event(existing_event_id)
+ if event:
# we know it was persisted, so must have a stream ordering
assert event.internal_metadata.stream_ordering
- return event, event.internal_metadata.stream_ordering
+ return (
+ event,
+ event.internal_metadata.stream_ordering,
+ )
event, context = await self.create_event(
requester,
@@ -1421,17 +1450,9 @@ class EventCreationHandler:
a room that has been un-partial stated.
"""
- for event, context in events_and_context:
- # Skip push notification actions for historical messages
- # because we don't want to notify people about old history back in time.
- # The historical messages also do not have the proper `context.current_state_ids`
- # and `state_groups` because they have `prev_events` that aren't persisted yet
- # (historical messages persisted in reverse-chronological order).
- if not event.internal_metadata.is_historical():
- with opentracing.start_active_span("calculate_push_actions"):
- await self._bulk_push_rule_evaluator.action_for_event_by_user(
- event, context
- )
+ await self._bulk_push_rule_evaluator.action_for_events_by_user(
+ events_and_context
+ )
try:
# If we're a worker we need to hit out to the master.
|