diff options
Diffstat (limited to 'synapse/handlers/message.py')
-rw-r--r-- | synapse/handlers/message.py | 127 |
1 files changed, 66 insertions, 61 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 8a7b4916cd..ee271e85e5 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -376,9 +376,8 @@ class EventCreationHandler: self.notifier = hs.get_notifier() self.config = hs.config self.require_membership_for_aliases = hs.config.require_membership_for_aliases - self._is_event_writer = ( - self.config.worker.writers.events == hs.get_instance_name() - ) + self._events_shard_config = self.config.worker.events_shard_config + self._instance_name = hs.get_instance_name() self.room_invite_state_types = self.hs.config.room_invite_state_types @@ -387,8 +386,6 @@ class EventCreationHandler: # This is only used to get at ratelimit function, and maybe_kick_guest_users self.base_handler = BaseHandler(hs) - self.pusher_pool = hs.get_pusherpool() - # We arbitrarily limit concurrent event creation for a room to 5. # This is to stop us from diverging history *too* much. self.limiter = Linearizer(max_count=5, name="room_event_creation_limit") @@ -904,9 +901,10 @@ class EventCreationHandler: try: # If we're a worker we need to hit out to the master. - if not self._is_event_writer: + writer_instance = self._events_shard_config.get_instance(event.room_id) + if writer_instance != self._instance_name: result = await self.send_event( - instance_name=self.config.worker.writers.events, + instance_name=writer_instance, event_id=event.event_id, store=self.store, requester=requester, @@ -974,7 +972,10 @@ class EventCreationHandler: This should only be run on the instance in charge of persisting events. """ - assert self._is_event_writer + assert self.storage.persistence is not None + assert self._events_shard_config.should_handle( + self._instance_name, event.room_id + ) if ratelimit: # We check if this is a room admin redacting an event so that we @@ -1137,7 +1138,7 @@ class EventCreationHandler: if prev_state_ids: raise AuthError(403, "Changing the room create event is forbidden") - event_stream_id, max_stream_id = await self.storage.persistence.persist_event( + event_pos, max_stream_token = await self.storage.persistence.persist_event( event, context=context ) @@ -1145,12 +1146,10 @@ class EventCreationHandler: # If there's an expiry timestamp on the event, schedule its expiry. self._message_handler.maybe_schedule_expiry(event) - await self.pusher_pool.on_new_notifications(event_stream_id, max_stream_id) - def _notify(): try: self.notifier.on_new_room_event( - event, event_stream_id, max_stream_id, extra_users=extra_users + event, event_pos, max_stream_token, extra_users=extra_users ) except Exception: logger.exception("Error notifying about new room event") @@ -1162,7 +1161,7 @@ class EventCreationHandler: # matters as sometimes presence code can take a while. run_in_background(self._bump_active_time, requester.user) - return event_stream_id + return event_pos.stream async def _bump_active_time(self, user: UserID) -> None: try: @@ -1183,54 +1182,7 @@ class EventCreationHandler: ) for room_id in room_ids: - # For each room we need to find a joined member we can use to send - # the dummy event with. - - latest_event_ids = await self.store.get_prev_events_for_room(room_id) - - members = await self.state.get_current_users_in_room( - room_id, latest_event_ids=latest_event_ids - ) - dummy_event_sent = False - for user_id in members: - if not self.hs.is_mine_id(user_id): - continue - requester = create_requester(user_id) - try: - event, context = await self.create_event( - requester, - { - "type": "org.matrix.dummy_event", - "content": {}, - "room_id": room_id, - "sender": user_id, - }, - prev_event_ids=latest_event_ids, - ) - - event.internal_metadata.proactively_send = False - - # Since this is a dummy-event it is OK if it is sent by a - # shadow-banned user. - await self.send_nonmember_event( - requester, - event, - context, - ratelimit=False, - ignore_shadow_ban=True, - ) - dummy_event_sent = True - break - except ConsentNotGivenError: - logger.info( - "Failed to send dummy event into room %s for user %s due to " - "lack of consent. Will try another user" % (room_id, user_id) - ) - except AuthError: - logger.info( - "Failed to send dummy event into room %s for user %s due to " - "lack of power. Will try another user" % (room_id, user_id) - ) + dummy_event_sent = await self._send_dummy_event_for_room(room_id) if not dummy_event_sent: # Did not find a valid user in the room, so remove from future attempts @@ -1243,6 +1195,59 @@ class EventCreationHandler: now = self.clock.time_msec() self._rooms_to_exclude_from_dummy_event_insertion[room_id] = now + async def _send_dummy_event_for_room(self, room_id: str) -> bool: + """Attempt to send a dummy event for the given room. + + Args: + room_id: room to try to send an event from + + Returns: + True if a dummy event was successfully sent. False if no user was able + to send an event. + """ + + # For each room we need to find a joined member we can use to send + # the dummy event with. + latest_event_ids = await self.store.get_prev_events_for_room(room_id) + members = await self.state.get_current_users_in_room( + room_id, latest_event_ids=latest_event_ids + ) + for user_id in members: + if not self.hs.is_mine_id(user_id): + continue + requester = create_requester(user_id) + try: + event, context = await self.create_event( + requester, + { + "type": "org.matrix.dummy_event", + "content": {}, + "room_id": room_id, + "sender": user_id, + }, + prev_event_ids=latest_event_ids, + ) + + event.internal_metadata.proactively_send = False + + # Since this is a dummy-event it is OK if it is sent by a + # shadow-banned user. + await self.send_nonmember_event( + requester, event, context, ratelimit=False, ignore_shadow_ban=True, + ) + return True + except ConsentNotGivenError: + logger.info( + "Failed to send dummy event into room %s for user %s due to " + "lack of consent. Will try another user" % (room_id, user_id) + ) + except AuthError: + logger.info( + "Failed to send dummy event into room %s for user %s due to " + "lack of power. Will try another user" % (room_id, user_id) + ) + return False + def _expire_rooms_to_exclude_from_dummy_event_insertion(self): expire_before = self.clock.time_msec() - _DUMMY_EVENT_ROOM_EXCLUSION_EXPIRY to_expire = set() |