diff --git a/changelog.d/8463.misc b/changelog.d/8463.misc
new file mode 100644
index 0000000000..040c9bb90f
--- /dev/null
+++ b/changelog.d/8463.misc
@@ -0,0 +1 @@
+Reduce inconsistencies between codepaths for membership and non-membership events.
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 00513fbf37..3e9a22e8f3 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -635,61 +635,6 @@ class EventCreationHandler:
msg = self._block_events_without_consent_error % {"consent_uri": consent_uri}
raise ConsentNotGivenError(msg=msg, consent_uri=consent_uri)
- async def send_nonmember_event(
- self,
- requester: Requester,
- event: EventBase,
- context: EventContext,
- ratelimit: bool = True,
- ignore_shadow_ban: bool = False,
- ) -> int:
- """
- Persists and notifies local clients and federation of an event.
-
- Args:
- requester: The requester sending the event.
- event: The event to send.
- context: The context of the event.
- ratelimit: Whether to rate limit this send.
- ignore_shadow_ban: True if shadow-banned users should be allowed to
- send this event.
-
- Return:
- The stream_id of the persisted event.
-
- Raises:
- ShadowBanError if the requester has been shadow-banned.
- """
- if event.type == EventTypes.Member:
- raise SynapseError(
- 500, "Tried to send member event through non-member codepath"
- )
-
- if not ignore_shadow_ban and requester.shadow_banned:
- # We randomly sleep a bit just to annoy the requester.
- await self.clock.sleep(random.randint(1, 10))
- raise ShadowBanError()
-
- user = UserID.from_string(event.sender)
-
- assert self.hs.is_mine(user), "User must be our own: %s" % (user,)
-
- if event.is_state():
- prev_event = await self.deduplicate_state_event(event, context)
- if prev_event is not None:
- logger.info(
- "Not bothering to persist state event %s duplicated by %s",
- event.event_id,
- prev_event.event_id,
- )
- # we know it was persisted, so must have a stream ordering
- assert prev_event.internal_metadata.stream_ordering
- return prev_event.internal_metadata.stream_ordering
-
- return await self.handle_new_client_event(
- requester=requester, event=event, context=context, ratelimit=ratelimit
- )
-
async def deduplicate_state_event(
self, event: EventBase, context: EventContext
) -> Optional[EventBase]:
@@ -730,7 +675,7 @@ class EventCreationHandler:
"""
Creates an event, then sends it.
- See self.create_event and self.send_nonmember_event.
+ See self.create_event and self.handle_new_client_event.
Args:
requester: The requester sending the event.
@@ -740,9 +685,19 @@ class EventCreationHandler:
ignore_shadow_ban: True if shadow-banned users should be allowed to
send this event.
+ Returns:
+ The event, and its stream ordering (if state event deduplication happened,
+ the previous, duplicate event).
+
Raises:
ShadowBanError if the requester has been shadow-banned.
"""
+
+ if event_dict["type"] == EventTypes.Member:
+ raise SynapseError(
+ 500, "Tried to send member event through non-member codepath"
+ )
+
if not ignore_shadow_ban and requester.shadow_banned:
# We randomly sleep a bit just to annoy the requester.
await self.clock.sleep(random.randint(1, 10))
@@ -758,20 +713,27 @@ class EventCreationHandler:
requester, event_dict, token_id=requester.access_token_id, txn_id=txn_id
)
+ assert self.hs.is_mine_id(event.sender), "User must be our own: %s" % (
+ event.sender,
+ )
+
spam_error = self.spam_checker.check_event_for_spam(event)
if spam_error:
if not isinstance(spam_error, str):
spam_error = "Spam is not permitted here"
raise SynapseError(403, spam_error, Codes.FORBIDDEN)
- stream_id = await self.send_nonmember_event(
- requester,
- event,
- context,
+ ev = await self.handle_new_client_event(
+ requester=requester,
+ event=event,
+ context=context,
ratelimit=ratelimit,
ignore_shadow_ban=ignore_shadow_ban,
)
- return event, stream_id
+
+ # we know it was persisted, so must have a stream ordering
+ assert ev.internal_metadata.stream_ordering
+ return ev, ev.internal_metadata.stream_ordering
@measure_func("create_new_client_event")
async def create_new_client_event(
@@ -845,8 +807,11 @@ class EventCreationHandler:
context: EventContext,
ratelimit: bool = True,
extra_users: List[UserID] = [],
- ) -> int:
- """Processes a new event. This includes checking auth, persisting it,
+ ignore_shadow_ban: bool = False,
+ ) -> EventBase:
+ """Processes a new event.
+
+ This includes deduplicating, checking auth, persisting,
notifying users, sending to remote servers, etc.
If called from a worker will hit out to the master process for final
@@ -859,10 +824,39 @@ class EventCreationHandler:
ratelimit
extra_users: Any extra users to notify about event
+ ignore_shadow_ban: True if shadow-banned users should be allowed to
+ send this event.
+
Return:
- The stream_id of the persisted event.
+ If the event was deduplicated, the previous, duplicate, event. Otherwise,
+ `event`.
+
+ Raises:
+ ShadowBanError if the requester has been shadow-banned.
"""
+ # we don't apply shadow-banning to membership events here. Invites are blocked
+ # higher up the stack, and we allow shadow-banned users to send join and leave
+ # events as normal.
+ if (
+ event.type != EventTypes.Member
+ and not ignore_shadow_ban
+ and requester.shadow_banned
+ ):
+ # We randomly sleep a bit just to annoy the requester.
+ await self.clock.sleep(random.randint(1, 10))
+ raise ShadowBanError()
+
+ if event.is_state():
+ prev_event = await self.deduplicate_state_event(event, context)
+ if prev_event is not None:
+ logger.info(
+ "Not bothering to persist state event %s duplicated by %s",
+ event.event_id,
+ prev_event.event_id,
+ )
+ return prev_event
+
if event.is_state() and (event.type, event.state_key) == (
EventTypes.Create,
"",
@@ -917,13 +911,13 @@ class EventCreationHandler:
)
stream_id = result["stream_id"]
event.internal_metadata.stream_ordering = stream_id
- return stream_id
+ return event
stream_id = await self.persist_and_notify_client_event(
requester, event, context, ratelimit=ratelimit, extra_users=extra_users
)
- return stream_id
+ return event
except Exception:
# Ensure that we actually remove the entries in the push actions
# staging area, if we calculated them.
@@ -1234,8 +1228,12 @@ class EventCreationHandler:
# Since this is a dummy-event it is OK if it is sent by a
# shadow-banned user.
- await self.send_nonmember_event(
- requester, event, context, ratelimit=False, ignore_shadow_ban=True,
+ await self.handle_new_client_event(
+ requester=requester,
+ event=event,
+ context=context,
+ ratelimit=False,
+ ignore_shadow_ban=True,
)
return True
except ConsentNotGivenError:
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index f14f791586..d0530a446c 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -185,6 +185,7 @@ class RoomCreationHandler(BaseHandler):
ShadowBanError if the requester is shadow-banned.
"""
user_id = requester.user.to_string()
+ assert self.hs.is_mine_id(user_id), "User must be our own: %s" % (user_id,)
# start by allocating a new room id
r = await self.store.get_room(old_room_id)
@@ -229,8 +230,8 @@ class RoomCreationHandler(BaseHandler):
)
# now send the tombstone
- await self.event_creation_handler.send_nonmember_event(
- requester, tombstone_event, tombstone_context
+ await self.event_creation_handler.handle_new_client_event(
+ requester=requester, event=tombstone_event, context=tombstone_context,
)
old_room_state = await tombstone_context.get_current_state_ids()
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 13b749b7cb..fd8114a64d 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -188,16 +188,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
require_consent=require_consent,
)
- # Check if this event matches the previous membership event for the user.
- duplicate = await self.event_creation_handler.deduplicate_state_event(
- event, context
- )
- if duplicate is not None:
- # Discard the new event since this membership change is a no-op.
- # we know it was persisted, so must have a stream ordering.
- assert duplicate.internal_metadata.stream_ordering
- return duplicate.event_id, duplicate.internal_metadata.stream_ordering
-
prev_state_ids = await context.get_prev_state_ids()
prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None)
@@ -222,7 +212,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
retry_after_ms=int(1000 * (time_allowed - time_now_s))
)
- stream_id = await self.event_creation_handler.handle_new_client_event(
+ result_event = await self.event_creation_handler.handle_new_client_event(
requester, event, context, extra_users=[target], ratelimit=ratelimit,
)
@@ -232,7 +222,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
if prev_member_event.membership == Membership.JOIN:
await self._user_left_room(target, room_id)
- return event.event_id, stream_id
+ # we know it was persisted, so should have a stream ordering
+ assert result_event.internal_metadata.stream_ordering
+ return result_event.event_id, result_event.internal_metadata.stream_ordering
async def copy_room_tags_and_direct_to_room(
self, old_room_id, new_room_id, user_id
@@ -673,12 +665,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
else:
requester = types.create_requester(target_user)
- prev_event = await self.event_creation_handler.deduplicate_state_event(
- event, context
- )
- if prev_event is not None:
- return
-
prev_state_ids = await context.get_prev_state_ids()
if event.membership == Membership.JOIN:
if requester.is_guest:
@@ -1186,10 +1172,13 @@ class RoomMemberMasterHandler(RoomMemberHandler):
context = await self.state_handler.compute_event_context(event)
context.app_service = requester.app_service
- stream_id = await self.event_creation_handler.handle_new_client_event(
+ result_event = await self.event_creation_handler.handle_new_client_event(
requester, event, context, extra_users=[UserID.from_string(target_user)],
)
- return event.event_id, stream_id
+ # we know it was persisted, so must have a stream ordering
+ assert result_event.internal_metadata.stream_ordering
+
+ return result_event.event_id, result_event.internal_metadata.stream_ordering
async def _user_left_room(self, target: UserID, room_id: str) -> None:
"""Implements RoomMemberHandler._user_left_room
diff --git a/tests/handlers/test_register.py b/tests/handlers/test_register.py
index cb7c0ed51a..702c6aa089 100644
--- a/tests/handlers/test_register.py
+++ b/tests/handlers/test_register.py
@@ -413,7 +413,7 @@ class RegistrationTestCase(unittest.HomeserverTestCase):
)
)
self.get_success(
- event_creation_handler.send_nonmember_event(requester, event, context)
+ event_creation_handler.handle_new_client_event(requester, event, context)
)
# Register a second user, which won't be be in the room (or even have an invite)
diff --git a/tests/unittest.py b/tests/unittest.py
index 82ede9de34..5c87f6097e 100644
--- a/tests/unittest.py
+++ b/tests/unittest.py
@@ -608,7 +608,9 @@ class HomeserverTestCase(TestCase):
if soft_failed:
event.internal_metadata.soft_failed = True
- self.get_success(event_creator.send_nonmember_event(requester, event, context))
+ self.get_success(
+ event_creator.handle_new_client_event(requester, event, context)
+ )
return event.event_id
|