diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 3a9183e0b0..415c0935ed 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -59,6 +59,7 @@ from synapse.visibility import filter_events_for_client
from ._base import BaseHandler
if TYPE_CHECKING:
+ from synapse.events.third_party_rules import ThirdPartyEventRules
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
@@ -393,27 +394,31 @@ class EventCreationHandler:
self.action_generator = hs.get_action_generator()
self.spam_checker = hs.get_spam_checker()
- self.third_party_event_rules = hs.get_third_party_event_rules()
+ self.third_party_event_rules = (
+ self.hs.get_third_party_event_rules()
+ ) # type: ThirdPartyEventRules
self._block_events_without_consent_error = (
self.config.block_events_without_consent_error
)
+ # we need to construct a ConsentURIBuilder here, as it checks that the necessary
+ # config options, but *only* if we have a configuration for which we are
+ # going to need it.
+ if self._block_events_without_consent_error:
+ self._consent_uri_builder = ConsentURIBuilder(self.config)
+
# Rooms which should be excluded from dummy insertion. (For instance,
# those without local users who can send events into the room).
#
# map from room id to time-of-last-attempt.
#
self._rooms_to_exclude_from_dummy_event_insertion = {} # type: Dict[str, int]
-
- # we need to construct a ConsentURIBuilder here, as it checks that the necessary
- # config options, but *only* if we have a configuration for which we are
- # going to need it.
- if self._block_events_without_consent_error:
- self._consent_uri_builder = ConsentURIBuilder(self.config)
+ # The number of forward extremeities before a dummy event is sent.
+ self._dummy_events_threshold = hs.config.dummy_events_threshold
if (
- not self.config.worker_app
+ self.config.run_background_tasks
and self.config.cleanup_extremities_with_dummy_events
):
self.clock.looping_call(
@@ -428,8 +433,6 @@ class EventCreationHandler:
self._ephemeral_events_enabled = hs.config.enable_ephemeral_messages
- self._dummy_events_threshold = hs.config.dummy_events_threshold
-
async def create_event(
self,
requester: Requester,
@@ -635,59 +638,6 @@ class EventCreationHandler:
msg = self._block_events_without_consent_error % {"consent_uri": consent_uri}
raise ConsentNotGivenError(msg=msg, consent_uri=consent_uri)
- async def send_nonmember_event(
- self,
- requester: Requester,
- event: EventBase,
- context: EventContext,
- ratelimit: bool = True,
- ignore_shadow_ban: bool = False,
- ) -> int:
- """
- Persists and notifies local clients and federation of an event.
-
- Args:
- requester: The requester sending the event.
- event: The event to send.
- context: The context of the event.
- ratelimit: Whether to rate limit this send.
- ignore_shadow_ban: True if shadow-banned users should be allowed to
- send this event.
-
- Return:
- The stream_id of the persisted event.
-
- Raises:
- ShadowBanError if the requester has been shadow-banned.
- """
- if event.type == EventTypes.Member:
- raise SynapseError(
- 500, "Tried to send member event through non-member codepath"
- )
-
- if not ignore_shadow_ban and requester.shadow_banned:
- # We randomly sleep a bit just to annoy the requester.
- await self.clock.sleep(random.randint(1, 10))
- raise ShadowBanError()
-
- user = UserID.from_string(event.sender)
-
- assert self.hs.is_mine(user), "User must be our own: %s" % (user,)
-
- if event.is_state():
- prev_event = await self.deduplicate_state_event(event, context)
- if prev_event is not None:
- logger.info(
- "Not bothering to persist state event %s duplicated by %s",
- event.event_id,
- prev_event.event_id,
- )
- return await self.store.get_stream_id_for_event(prev_event.event_id)
-
- return await self.handle_new_client_event(
- requester=requester, event=event, context=context, ratelimit=ratelimit
- )
-
async def deduplicate_state_event(
self, event: EventBase, context: EventContext
) -> Optional[EventBase]:
@@ -728,7 +678,7 @@ class EventCreationHandler:
"""
Creates an event, then sends it.
- See self.create_event and self.send_nonmember_event.
+ See self.create_event and self.handle_new_client_event.
Args:
requester: The requester sending the event.
@@ -738,9 +688,19 @@ class EventCreationHandler:
ignore_shadow_ban: True if shadow-banned users should be allowed to
send this event.
+ Returns:
+ The event, and its stream ordering (if deduplication happened,
+ the previous, duplicate event).
+
Raises:
ShadowBanError if the requester has been shadow-banned.
"""
+
+ if event_dict["type"] == EventTypes.Member:
+ raise SynapseError(
+ 500, "Tried to send member event through non-member codepath"
+ )
+
if not ignore_shadow_ban and requester.shadow_banned:
# We randomly sleep a bit just to annoy the requester.
await self.clock.sleep(random.randint(1, 10))
@@ -752,24 +712,44 @@ class EventCreationHandler:
# extremities to pile up, which in turn leads to state resolution
# taking longer.
with (await self.limiter.queue(event_dict["room_id"])):
+ if txn_id and requester.access_token_id:
+ existing_event_id = await self.store.get_event_id_from_transaction_id(
+ event_dict["room_id"],
+ requester.user.to_string(),
+ requester.access_token_id,
+ txn_id,
+ )
+ if existing_event_id:
+ event = await self.store.get_event(existing_event_id)
+ # we know it was persisted, so must have a stream ordering
+ assert event.internal_metadata.stream_ordering
+ return event, event.internal_metadata.stream_ordering
+
event, context = await self.create_event(
requester, event_dict, token_id=requester.access_token_id, txn_id=txn_id
)
+ assert self.hs.is_mine_id(event.sender), "User must be our own: %s" % (
+ event.sender,
+ )
+
spam_error = self.spam_checker.check_event_for_spam(event)
if spam_error:
if not isinstance(spam_error, str):
spam_error = "Spam is not permitted here"
raise SynapseError(403, spam_error, Codes.FORBIDDEN)
- stream_id = await self.send_nonmember_event(
- requester,
- event,
- context,
+ ev = await self.handle_new_client_event(
+ requester=requester,
+ event=event,
+ context=context,
ratelimit=ratelimit,
ignore_shadow_ban=ignore_shadow_ban,
)
- return event, stream_id
+
+ # we know it was persisted, so must have a stream ordering
+ assert ev.internal_metadata.stream_ordering
+ return ev, ev.internal_metadata.stream_ordering
@measure_func("create_new_client_event")
async def create_new_client_event(
@@ -843,8 +823,11 @@ class EventCreationHandler:
context: EventContext,
ratelimit: bool = True,
extra_users: List[UserID] = [],
- ) -> int:
- """Processes a new event. This includes checking auth, persisting it,
+ ignore_shadow_ban: bool = False,
+ ) -> EventBase:
+ """Processes a new event.
+
+ This includes deduplicating, checking auth, persisting,
notifying users, sending to remote servers, etc.
If called from a worker will hit out to the master process for final
@@ -857,10 +840,39 @@ class EventCreationHandler:
ratelimit
extra_users: Any extra users to notify about event
+ ignore_shadow_ban: True if shadow-banned users should be allowed to
+ send this event.
+
Return:
- The stream_id of the persisted event.
+ If the event was deduplicated, the previous, duplicate, event. Otherwise,
+ `event`.
+
+ Raises:
+ ShadowBanError if the requester has been shadow-banned.
"""
+ # we don't apply shadow-banning to membership events here. Invites are blocked
+ # higher up the stack, and we allow shadow-banned users to send join and leave
+ # events as normal.
+ if (
+ event.type != EventTypes.Member
+ and not ignore_shadow_ban
+ and requester.shadow_banned
+ ):
+ # We randomly sleep a bit just to annoy the requester.
+ await self.clock.sleep(random.randint(1, 10))
+ raise ShadowBanError()
+
+ if event.is_state():
+ prev_event = await self.deduplicate_state_event(event, context)
+ if prev_event is not None:
+ logger.info(
+ "Not bothering to persist state event %s duplicated by %s",
+ event.event_id,
+ prev_event.event_id,
+ )
+ return prev_event
+
if event.is_state() and (event.type, event.state_key) == (
EventTypes.Create,
"",
@@ -914,14 +926,24 @@ class EventCreationHandler:
extra_users=extra_users,
)
stream_id = result["stream_id"]
- event.internal_metadata.stream_ordering = stream_id
- return stream_id
-
- stream_id = await self.persist_and_notify_client_event(
+ event_id = result["event_id"]
+ if event_id != event.event_id:
+ # If we get a different event back then it means that its
+ # been de-duplicated, so we replace the given event with the
+ # one already persisted.
+ event = await self.store.get_event(event_id)
+ else:
+ # If we newly persisted the event then we need to update its
+ # stream_ordering entry manually (as it was persisted on
+ # another worker).
+ event.internal_metadata.stream_ordering = stream_id
+ return event
+
+ event = await self.persist_and_notify_client_event(
requester, event, context, ratelimit=ratelimit, extra_users=extra_users
)
- return stream_id
+ return event
except Exception:
# Ensure that we actually remove the entries in the push actions
# staging area, if we calculated them.
@@ -966,11 +988,16 @@ class EventCreationHandler:
context: EventContext,
ratelimit: bool = True,
extra_users: List[UserID] = [],
- ) -> int:
+ ) -> EventBase:
"""Called when we have fully built the event, have already
calculated the push actions for the event, and checked auth.
This should only be run on the instance in charge of persisting events.
+
+ Returns:
+ The persisted event. This may be different than the given event if
+ it was de-duplicated (e.g. because we had already persisted an
+ event with the same transaction ID.)
"""
assert self.storage.persistence is not None
assert self._events_shard_config.should_handle(
@@ -1018,7 +1045,7 @@ class EventCreationHandler:
# Check the alias is currently valid (if it has changed).
room_alias_str = event.content.get("alias", None)
- directory_handler = self.hs.get_handlers().directory_handler
+ directory_handler = self.hs.get_directory_handler()
if room_alias_str and room_alias_str != original_alias:
await self._validate_canonical_alias(
directory_handler, room_alias_str, event.room_id
@@ -1044,7 +1071,7 @@ class EventCreationHandler:
directory_handler, alias_str, event.room_id
)
- federation_handler = self.hs.get_handlers().federation_handler
+ federation_handler = self.hs.get_federation_handler()
if event.type == EventTypes.Member:
if event.content["membership"] == Membership.INVITE:
@@ -1138,9 +1165,13 @@ class EventCreationHandler:
if prev_state_ids:
raise AuthError(403, "Changing the room create event is forbidden")
- event_pos, max_stream_token = await self.storage.persistence.persist_event(
- event, context=context
- )
+ # Note that this returns the event that was persisted, which may not be
+ # the same as we passed in if it was deduplicated due transaction IDs.
+ (
+ event,
+ event_pos,
+ max_stream_token,
+ ) = await self.storage.persistence.persist_event(event, context=context)
if self._ephemeral_events_enabled:
# If there's an expiry timestamp on the event, schedule its expiry.
@@ -1161,7 +1192,7 @@ class EventCreationHandler:
# matters as sometimes presence code can take a while.
run_in_background(self._bump_active_time, requester.user)
- return event_pos.stream
+ return event
async def _bump_active_time(self, user: UserID) -> None:
try:
@@ -1232,7 +1263,7 @@ class EventCreationHandler:
# Since this is a dummy-event it is OK if it is sent by a
# shadow-banned user.
- await self.send_nonmember_event(
+ await self.handle_new_client_event(
requester, event, context, ratelimit=False, ignore_shadow_ban=True,
)
return True
|