summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/message.py134
-rw-r--r--synapse/handlers/room.py5
-rw-r--r--synapse/handlers/room_member.py29
3 files changed, 78 insertions, 90 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 00513fbf37..3e9a22e8f3 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -635,61 +635,6 @@ class EventCreationHandler:
         msg = self._block_events_without_consent_error % {"consent_uri": consent_uri}
         raise ConsentNotGivenError(msg=msg, consent_uri=consent_uri)
 
-    async def send_nonmember_event(
-        self,
-        requester: Requester,
-        event: EventBase,
-        context: EventContext,
-        ratelimit: bool = True,
-        ignore_shadow_ban: bool = False,
-    ) -> int:
-        """
-        Persists and notifies local clients and federation of an event.
-
-        Args:
-            requester: The requester sending the event.
-            event: The event to send.
-            context: The context of the event.
-            ratelimit: Whether to rate limit this send.
-            ignore_shadow_ban: True if shadow-banned users should be allowed to
-                send this event.
-
-        Return:
-            The stream_id of the persisted event.
-
-        Raises:
-            ShadowBanError if the requester has been shadow-banned.
-        """
-        if event.type == EventTypes.Member:
-            raise SynapseError(
-                500, "Tried to send member event through non-member codepath"
-            )
-
-        if not ignore_shadow_ban and requester.shadow_banned:
-            # We randomly sleep a bit just to annoy the requester.
-            await self.clock.sleep(random.randint(1, 10))
-            raise ShadowBanError()
-
-        user = UserID.from_string(event.sender)
-
-        assert self.hs.is_mine(user), "User must be our own: %s" % (user,)
-
-        if event.is_state():
-            prev_event = await self.deduplicate_state_event(event, context)
-            if prev_event is not None:
-                logger.info(
-                    "Not bothering to persist state event %s duplicated by %s",
-                    event.event_id,
-                    prev_event.event_id,
-                )
-                # we know it was persisted, so must have a stream ordering
-                assert prev_event.internal_metadata.stream_ordering
-                return prev_event.internal_metadata.stream_ordering
-
-        return await self.handle_new_client_event(
-            requester=requester, event=event, context=context, ratelimit=ratelimit
-        )
-
     async def deduplicate_state_event(
         self, event: EventBase, context: EventContext
     ) -> Optional[EventBase]:
@@ -730,7 +675,7 @@ class EventCreationHandler:
         """
         Creates an event, then sends it.
 
-        See self.create_event and self.send_nonmember_event.
+        See self.create_event and self.handle_new_client_event.
 
         Args:
             requester: The requester sending the event.
@@ -740,9 +685,19 @@ class EventCreationHandler:
             ignore_shadow_ban: True if shadow-banned users should be allowed to
                 send this event.
 
+        Returns:
+            The event, and its stream ordering (if state event deduplication happened,
+            the previous, duplicate event).
+
         Raises:
             ShadowBanError if the requester has been shadow-banned.
         """
+
+        if event_dict["type"] == EventTypes.Member:
+            raise SynapseError(
+                500, "Tried to send member event through non-member codepath"
+            )
+
         if not ignore_shadow_ban and requester.shadow_banned:
             # We randomly sleep a bit just to annoy the requester.
             await self.clock.sleep(random.randint(1, 10))
@@ -758,20 +713,27 @@ class EventCreationHandler:
                 requester, event_dict, token_id=requester.access_token_id, txn_id=txn_id
             )
 
+            assert self.hs.is_mine_id(event.sender), "User must be our own: %s" % (
+                event.sender,
+            )
+
             spam_error = self.spam_checker.check_event_for_spam(event)
             if spam_error:
                 if not isinstance(spam_error, str):
                     spam_error = "Spam is not permitted here"
                 raise SynapseError(403, spam_error, Codes.FORBIDDEN)
 
-            stream_id = await self.send_nonmember_event(
-                requester,
-                event,
-                context,
+            ev = await self.handle_new_client_event(
+                requester=requester,
+                event=event,
+                context=context,
                 ratelimit=ratelimit,
                 ignore_shadow_ban=ignore_shadow_ban,
             )
-        return event, stream_id
+
+        # we know it was persisted, so must have a stream ordering
+        assert ev.internal_metadata.stream_ordering
+        return ev, ev.internal_metadata.stream_ordering
 
     @measure_func("create_new_client_event")
     async def create_new_client_event(
@@ -845,8 +807,11 @@ class EventCreationHandler:
         context: EventContext,
         ratelimit: bool = True,
         extra_users: List[UserID] = [],
-    ) -> int:
-        """Processes a new event. This includes checking auth, persisting it,
+        ignore_shadow_ban: bool = False,
+    ) -> EventBase:
+        """Processes a new event.
+
+        This includes deduplicating, checking auth, persisting,
         notifying users, sending to remote servers, etc.
 
         If called from a worker will hit out to the master process for final
@@ -859,10 +824,39 @@ class EventCreationHandler:
             ratelimit
             extra_users: Any extra users to notify about event
 
+            ignore_shadow_ban: True if shadow-banned users should be allowed to
+                send this event.
+
         Return:
-            The stream_id of the persisted event.
+            If the event was deduplicated, the previous, duplicate, event. Otherwise,
+            `event`.
+
+        Raises:
+            ShadowBanError if the requester has been shadow-banned.
         """
 
+        # we don't apply shadow-banning to membership events here. Invites are blocked
+        # higher up the stack, and we allow shadow-banned users to send join and leave
+        # events as normal.
+        if (
+            event.type != EventTypes.Member
+            and not ignore_shadow_ban
+            and requester.shadow_banned
+        ):
+            # We randomly sleep a bit just to annoy the requester.
+            await self.clock.sleep(random.randint(1, 10))
+            raise ShadowBanError()
+
+        if event.is_state():
+            prev_event = await self.deduplicate_state_event(event, context)
+            if prev_event is not None:
+                logger.info(
+                    "Not bothering to persist state event %s duplicated by %s",
+                    event.event_id,
+                    prev_event.event_id,
+                )
+                return prev_event
+
         if event.is_state() and (event.type, event.state_key) == (
             EventTypes.Create,
             "",
@@ -917,13 +911,13 @@ class EventCreationHandler:
                 )
                 stream_id = result["stream_id"]
                 event.internal_metadata.stream_ordering = stream_id
-                return stream_id
+                return event
 
             stream_id = await self.persist_and_notify_client_event(
                 requester, event, context, ratelimit=ratelimit, extra_users=extra_users
             )
 
-            return stream_id
+            return event
         except Exception:
             # Ensure that we actually remove the entries in the push actions
             # staging area, if we calculated them.
@@ -1234,8 +1228,12 @@ class EventCreationHandler:
 
                 # Since this is a dummy-event it is OK if it is sent by a
                 # shadow-banned user.
-                await self.send_nonmember_event(
-                    requester, event, context, ratelimit=False, ignore_shadow_ban=True,
+                await self.handle_new_client_event(
+                    requester=requester,
+                    event=event,
+                    context=context,
+                    ratelimit=False,
+                    ignore_shadow_ban=True,
                 )
                 return True
             except ConsentNotGivenError:
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index f14f791586..d0530a446c 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -185,6 +185,7 @@ class RoomCreationHandler(BaseHandler):
             ShadowBanError if the requester is shadow-banned.
         """
         user_id = requester.user.to_string()
+        assert self.hs.is_mine_id(user_id), "User must be our own: %s" % (user_id,)
 
         # start by allocating a new room id
         r = await self.store.get_room(old_room_id)
@@ -229,8 +230,8 @@ class RoomCreationHandler(BaseHandler):
         )
 
         # now send the tombstone
-        await self.event_creation_handler.send_nonmember_event(
-            requester, tombstone_event, tombstone_context
+        await self.event_creation_handler.handle_new_client_event(
+            requester=requester, event=tombstone_event, context=tombstone_context,
         )
 
         old_room_state = await tombstone_context.get_current_state_ids()
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 13b749b7cb..fd8114a64d 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -188,16 +188,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
             require_consent=require_consent,
         )
 
-        # Check if this event matches the previous membership event for the user.
-        duplicate = await self.event_creation_handler.deduplicate_state_event(
-            event, context
-        )
-        if duplicate is not None:
-            # Discard the new event since this membership change is a no-op.
-            # we know it was persisted, so must have a stream ordering.
-            assert duplicate.internal_metadata.stream_ordering
-            return duplicate.event_id, duplicate.internal_metadata.stream_ordering
-
         prev_state_ids = await context.get_prev_state_ids()
 
         prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None)
@@ -222,7 +212,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
                         retry_after_ms=int(1000 * (time_allowed - time_now_s))
                     )
 
-        stream_id = await self.event_creation_handler.handle_new_client_event(
+        result_event = await self.event_creation_handler.handle_new_client_event(
             requester, event, context, extra_users=[target], ratelimit=ratelimit,
         )
 
@@ -232,7 +222,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
                 if prev_member_event.membership == Membership.JOIN:
                     await self._user_left_room(target, room_id)
 
-        return event.event_id, stream_id
+        # we know it was persisted, so should have a stream ordering
+        assert result_event.internal_metadata.stream_ordering
+        return result_event.event_id, result_event.internal_metadata.stream_ordering
 
     async def copy_room_tags_and_direct_to_room(
         self, old_room_id, new_room_id, user_id
@@ -673,12 +665,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         else:
             requester = types.create_requester(target_user)
 
-        prev_event = await self.event_creation_handler.deduplicate_state_event(
-            event, context
-        )
-        if prev_event is not None:
-            return
-
         prev_state_ids = await context.get_prev_state_ids()
         if event.membership == Membership.JOIN:
             if requester.is_guest:
@@ -1186,10 +1172,13 @@ class RoomMemberMasterHandler(RoomMemberHandler):
 
         context = await self.state_handler.compute_event_context(event)
         context.app_service = requester.app_service
-        stream_id = await self.event_creation_handler.handle_new_client_event(
+        result_event = await self.event_creation_handler.handle_new_client_event(
             requester, event, context, extra_users=[UserID.from_string(target_user)],
         )
-        return event.event_id, stream_id
+        # we know it was persisted, so must have a stream ordering
+        assert result_event.internal_metadata.stream_ordering
+
+        return result_event.event_id, result_event.internal_metadata.stream_ordering
 
     async def _user_left_room(self, target: UserID, room_id: str) -> None:
         """Implements RoomMemberHandler._user_left_room