summary refs log tree commit diff
path: root/synapse/handlers/message.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/message.py')
-rw-r--r--synapse/handlers/message.py236
1 files changed, 121 insertions, 115 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index c13abd402d..5891939bb1 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -636,59 +636,6 @@ class EventCreationHandler:
         msg = self._block_events_without_consent_error % {"consent_uri": consent_uri}
         raise ConsentNotGivenError(msg=msg, consent_uri=consent_uri)
 
-    async def send_nonmember_event(
-        self,
-        requester: Requester,
-        event: EventBase,
-        context: EventContext,
-        ratelimit: bool = True,
-        ignore_shadow_ban: bool = False,
-    ) -> int:
-        """
-        Persists and notifies local clients and federation of an event.
-
-        Args:
-            requester: The requester sending the event.
-            event: The event to send.
-            context: The context of the event.
-            ratelimit: Whether to rate limit this send.
-            ignore_shadow_ban: True if shadow-banned users should be allowed to
-                send this event.
-
-        Return:
-            The stream_id of the persisted event.
-
-        Raises:
-            ShadowBanError if the requester has been shadow-banned.
-        """
-        if event.type == EventTypes.Member:
-            raise SynapseError(
-                500, "Tried to send member event through non-member codepath"
-            )
-
-        if not ignore_shadow_ban and requester.shadow_banned:
-            # We randomly sleep a bit just to annoy the requester.
-            await self.clock.sleep(random.randint(1, 10))
-            raise ShadowBanError()
-
-        user = UserID.from_string(event.sender)
-
-        assert self.hs.is_mine(user), "User must be our own: %s" % (user,)
-
-        if event.is_state():
-            prev_event = await self.deduplicate_state_event(event, context)
-            if prev_event is not None:
-                logger.info(
-                    "Not bothering to persist state event %s duplicated by %s",
-                    event.event_id,
-                    prev_event.event_id,
-                )
-                return await self.store.get_stream_id_for_event(prev_event.event_id)
-
-        return await self.handle_new_client_event(
-            requester=requester, event=event, context=context, ratelimit=ratelimit
-        )
-
     async def deduplicate_state_event(
         self, event: EventBase, context: EventContext
     ) -> Optional[EventBase]:
@@ -729,7 +676,7 @@ class EventCreationHandler:
         """
         Creates an event, then sends it.
 
-        See self.create_event and self.send_nonmember_event.
+        See self.create_event and self.handle_new_client_event.
 
         Args:
             requester: The requester sending the event.
@@ -739,9 +686,19 @@ class EventCreationHandler:
             ignore_shadow_ban: True if shadow-banned users should be allowed to
                 send this event.
 
+        Returns:
+            The event, and its stream ordering (if state event deduplication happened,
+            the previous, duplicate event).
+
         Raises:
             ShadowBanError if the requester has been shadow-banned.
         """
+
+        if event_dict["type"] == EventTypes.Member:
+            raise SynapseError(
+                500, "Tried to send member event through non-member codepath"
+            )
+
         if not ignore_shadow_ban and requester.shadow_banned:
             # We randomly sleep a bit just to annoy the requester.
             await self.clock.sleep(random.randint(1, 10))
@@ -757,20 +714,27 @@ class EventCreationHandler:
                 requester, event_dict, token_id=requester.access_token_id, txn_id=txn_id
             )
 
+            assert self.hs.is_mine_id(event.sender), "User must be our own: %s" % (
+                event.sender,
+            )
+
             spam_error = self.spam_checker.check_event_for_spam(event)
             if spam_error:
                 if not isinstance(spam_error, str):
                     spam_error = "Spam is not permitted here"
                 raise SynapseError(403, spam_error, Codes.FORBIDDEN)
 
-            stream_id = await self.send_nonmember_event(
-                requester,
-                event,
-                context,
+            ev = await self.handle_new_client_event(
+                requester=requester,
+                event=event,
+                context=context,
                 ratelimit=ratelimit,
                 ignore_shadow_ban=ignore_shadow_ban,
             )
-        return event, stream_id
+
+        # we know it was persisted, so must have a stream ordering
+        assert ev.internal_metadata.stream_ordering
+        return ev, ev.internal_metadata.stream_ordering
 
     @measure_func("create_new_client_event")
     async def create_new_client_event(
@@ -844,8 +808,11 @@ class EventCreationHandler:
         context: EventContext,
         ratelimit: bool = True,
         extra_users: List[UserID] = [],
-    ) -> int:
-        """Processes a new event. This includes checking auth, persisting it,
+        ignore_shadow_ban: bool = False,
+    ) -> EventBase:
+        """Processes a new event.
+
+        This includes deduplicating, checking auth, persisting,
         notifying users, sending to remote servers, etc.
 
         If called from a worker will hit out to the master process for final
@@ -858,10 +825,39 @@ class EventCreationHandler:
             ratelimit
             extra_users: Any extra users to notify about event
 
+            ignore_shadow_ban: True if shadow-banned users should be allowed to
+                send this event.
+
         Return:
-            The stream_id of the persisted event.
+            If the event was deduplicated, the previous, duplicate, event. Otherwise,
+            `event`.
+
+        Raises:
+            ShadowBanError if the requester has been shadow-banned.
         """
 
+        # we don't apply shadow-banning to membership events here. Invites are blocked
+        # higher up the stack, and we allow shadow-banned users to send join and leave
+        # events as normal.
+        if (
+            event.type != EventTypes.Member
+            and not ignore_shadow_ban
+            and requester.shadow_banned
+        ):
+            # We randomly sleep a bit just to annoy the requester.
+            await self.clock.sleep(random.randint(1, 10))
+            raise ShadowBanError()
+
+        if event.is_state():
+            prev_event = await self.deduplicate_state_event(event, context)
+            if prev_event is not None:
+                logger.info(
+                    "Not bothering to persist state event %s duplicated by %s",
+                    event.event_id,
+                    prev_event.event_id,
+                )
+                return prev_event
+
         if event.is_state() and (event.type, event.state_key) == (
             EventTypes.Create,
             "",
@@ -916,13 +912,13 @@ class EventCreationHandler:
                 )
                 stream_id = result["stream_id"]
                 event.internal_metadata.stream_ordering = stream_id
-                return stream_id
+                return event
 
             stream_id = await self.persist_and_notify_client_event(
                 requester, event, context, ratelimit=ratelimit, extra_users=extra_users
             )
 
-            return stream_id
+            return event
         except Exception:
             # Ensure that we actually remove the entries in the push actions
             # staging area, if we calculated them.
@@ -1139,7 +1135,7 @@ class EventCreationHandler:
             if prev_state_ids:
                 raise AuthError(403, "Changing the room create event is forbidden")
 
-        event_stream_id, max_stream_id = await self.storage.persistence.persist_event(
+        event_pos, max_stream_token = await self.storage.persistence.persist_event(
             event, context=context
         )
 
@@ -1150,7 +1146,7 @@ class EventCreationHandler:
         def _notify():
             try:
                 self.notifier.on_new_room_event(
-                    event, event_stream_id, max_stream_id, extra_users=extra_users
+                    event, event_pos, max_stream_token, extra_users=extra_users
                 )
             except Exception:
                 logger.exception("Error notifying about new room event")
@@ -1162,7 +1158,7 @@ class EventCreationHandler:
             # matters as sometimes presence code can take a while.
             run_in_background(self._bump_active_time, requester.user)
 
-        return event_stream_id
+        return event_pos.stream
 
     async def _bump_active_time(self, user: UserID) -> None:
         try:
@@ -1183,54 +1179,7 @@ class EventCreationHandler:
         )
 
         for room_id in room_ids:
-            # For each room we need to find a joined member we can use to send
-            # the dummy event with.
-
-            latest_event_ids = await self.store.get_prev_events_for_room(room_id)
-
-            members = await self.state.get_current_users_in_room(
-                room_id, latest_event_ids=latest_event_ids
-            )
-            dummy_event_sent = False
-            for user_id in members:
-                if not self.hs.is_mine_id(user_id):
-                    continue
-                requester = create_requester(user_id)
-                try:
-                    event, context = await self.create_event(
-                        requester,
-                        {
-                            "type": "org.matrix.dummy_event",
-                            "content": {},
-                            "room_id": room_id,
-                            "sender": user_id,
-                        },
-                        prev_event_ids=latest_event_ids,
-                    )
-
-                    event.internal_metadata.proactively_send = False
-
-                    # Since this is a dummy-event it is OK if it is sent by a
-                    # shadow-banned user.
-                    await self.send_nonmember_event(
-                        requester,
-                        event,
-                        context,
-                        ratelimit=False,
-                        ignore_shadow_ban=True,
-                    )
-                    dummy_event_sent = True
-                    break
-                except ConsentNotGivenError:
-                    logger.info(
-                        "Failed to send dummy event into room %s for user %s due to "
-                        "lack of consent. Will try another user" % (room_id, user_id)
-                    )
-                except AuthError:
-                    logger.info(
-                        "Failed to send dummy event into room %s for user %s due to "
-                        "lack of power. Will try another user" % (room_id, user_id)
-                    )
+            dummy_event_sent = await self._send_dummy_event_for_room(room_id)
 
             if not dummy_event_sent:
                 # Did not find a valid user in the room, so remove from future attempts
@@ -1243,6 +1192,63 @@ class EventCreationHandler:
                 now = self.clock.time_msec()
                 self._rooms_to_exclude_from_dummy_event_insertion[room_id] = now
 
+    async def _send_dummy_event_for_room(self, room_id: str) -> bool:
+        """Attempt to send a dummy event for the given room.
+
+        Args:
+            room_id: room to try to send an event from
+
+        Returns:
+            True if a dummy event was successfully sent. False if no user was able
+            to send an event.
+        """
+
+        # For each room we need to find a joined member we can use to send
+        # the dummy event with.
+        latest_event_ids = await self.store.get_prev_events_for_room(room_id)
+        members = await self.state.get_current_users_in_room(
+            room_id, latest_event_ids=latest_event_ids
+        )
+        for user_id in members:
+            if not self.hs.is_mine_id(user_id):
+                continue
+            requester = create_requester(user_id)
+            try:
+                event, context = await self.create_event(
+                    requester,
+                    {
+                        "type": "org.matrix.dummy_event",
+                        "content": {},
+                        "room_id": room_id,
+                        "sender": user_id,
+                    },
+                    prev_event_ids=latest_event_ids,
+                )
+
+                event.internal_metadata.proactively_send = False
+
+                # Since this is a dummy-event it is OK if it is sent by a
+                # shadow-banned user.
+                await self.handle_new_client_event(
+                    requester=requester,
+                    event=event,
+                    context=context,
+                    ratelimit=False,
+                    ignore_shadow_ban=True,
+                )
+                return True
+            except ConsentNotGivenError:
+                logger.info(
+                    "Failed to send dummy event into room %s for user %s due to "
+                    "lack of consent. Will try another user" % (room_id, user_id)
+                )
+            except AuthError:
+                logger.info(
+                    "Failed to send dummy event into room %s for user %s due to "
+                    "lack of power. Will try another user" % (room_id, user_id)
+                )
+        return False
+
     def _expire_rooms_to_exclude_from_dummy_event_insertion(self):
         expire_before = self.clock.time_msec() - _DUMMY_EVENT_ROOM_EXCLUSION_EXPIRY
         to_expire = set()