summary refs log tree commit diff
path: root/synapse/handlers/message.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/message.py')
-rw-r--r--synapse/handlers/message.py264
1 files changed, 161 insertions, 103 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py

index a5e8dc1323..8949343801 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py
@@ -15,9 +15,10 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import TYPE_CHECKING, List, Optional, Tuple +import random +from typing import TYPE_CHECKING, Dict, List, Optional, Tuple -from canonicaljson import encode_canonical_json, json +from canonicaljson import encode_canonical_json from twisted.internet.interfaces import IDelayedCall @@ -34,6 +35,7 @@ from synapse.api.errors import ( Codes, ConsentNotGivenError, NotFoundError, + ShadowBanError, SynapseError, ) from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersions @@ -45,16 +47,10 @@ from synapse.events.validator import EventValidator from synapse.logging.context import run_in_background from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.http.send_event import ReplicationSendEventRestServlet -from synapse.storage.data_stores.main.events_worker import EventRedactBehaviour +from synapse.storage.databases.main.events_worker import EventRedactBehaviour from synapse.storage.state import StateFilter -from synapse.types import ( - Collection, - Requester, - RoomAlias, - StreamToken, - UserID, - create_requester, -) +from synapse.types import Requester, RoomAlias, StreamToken, UserID, create_requester +from synapse.util import json_decoder from synapse.util.async_helpers import Linearizer from synapse.util.frozenutils import frozendict_json_encoder from synapse.util.metrics import measure_func @@ -69,7 +65,7 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) -class MessageHandler(object): +class MessageHandler: """Contains some read only APIs to get state about a room """ @@ -93,12 +89,7 @@ class MessageHandler(object): ) async def get_room_data( - self, - user_id: str = None, - room_id: str = None, - event_type: Optional[str] = None, - state_key: str = "", - is_guest: bool = False, + self, user_id: str, room_id: str, event_type: str, state_key: str, ) -> dict: """ Get data from a room. @@ -107,11 +98,10 @@ class MessageHandler(object): room_id event_type state_key - is_guest Returns: The path data content. Raises: - SynapseError if something went wrong. + SynapseError or AuthError if the user is not in the room """ ( membership, @@ -128,6 +118,16 @@ class MessageHandler(object): [membership_event_id], StateFilter.from_types([key]) ) data = room_state[membership_event_id].get(key) + else: + # check_user_in_room_or_world_readable, if it doesn't raise an AuthError, should + # only ever return a Membership.JOIN/LEAVE object + # + # Safeguard in case it returned something else + logger.error( + "Attempted to retrieve data from a room for a user that has never been in it. " + "This should not have happened." + ) + raise SynapseError(403, "User not in room", errcode=Codes.FORBIDDEN) return data @@ -362,7 +362,7 @@ class MessageHandler(object): _DUMMY_EVENT_ROOM_EXCLUSION_EXPIRY = 7 * 24 * 60 * 60 * 1000 -class EventCreationHandler(object): +class EventCreationHandler: def __init__(self, hs: "HomeServer"): self.hs = hs self.auth = hs.get_auth() @@ -377,9 +377,8 @@ class EventCreationHandler(object): self.notifier = hs.get_notifier() self.config = hs.config self.require_membership_for_aliases = hs.config.require_membership_for_aliases - self._is_event_writer = ( - self.config.worker.writers.events == hs.get_instance_name() - ) + self._events_shard_config = self.config.worker.events_shard_config + self._instance_name = hs.get_instance_name() self.room_invite_state_types = self.hs.config.room_invite_state_types @@ -388,8 +387,6 @@ class EventCreationHandler(object): # This is only used to get at ratelimit function, and maybe_kick_guest_users self.base_handler = BaseHandler(hs) - self.pusher_pool = hs.get_pusherpool() - # We arbitrarily limit concurrent event creation for a room to 5. # This is to stop us from diverging history *too* much. self.limiter = Linearizer(max_count=5, name="room_event_creation_limit") @@ -410,7 +407,7 @@ class EventCreationHandler(object): # # map from room id to time-of-last-attempt. # - self._rooms_to_exclude_from_dummy_event_insertion = {} # type: dict[str, int] + self._rooms_to_exclude_from_dummy_event_insertion = {} # type: Dict[str, int] # we need to construct a ConsentURIBuilder here, as it checks that the necessary # config options, but *only* if we have a configuration for which we are @@ -442,7 +439,7 @@ class EventCreationHandler(object): event_dict: dict, token_id: Optional[str] = None, txn_id: Optional[str] = None, - prev_event_ids: Optional[Collection[str]] = None, + prev_event_ids: Optional[List[str]] = None, require_consent: bool = True, ) -> Tuple[EventBase, EventContext]: """ @@ -647,37 +644,48 @@ class EventCreationHandler(object): event: EventBase, context: EventContext, ratelimit: bool = True, + ignore_shadow_ban: bool = False, ) -> int: """ Persists and notifies local clients and federation of an event. Args: - requester - event the event to send. - context: the context of the event. + requester: The requester sending the event. + event: The event to send. + context: The context of the event. ratelimit: Whether to rate limit this send. + ignore_shadow_ban: True if shadow-banned users should be allowed to + send this event. Return: The stream_id of the persisted event. + + Raises: + ShadowBanError if the requester has been shadow-banned. """ if event.type == EventTypes.Member: raise SynapseError( 500, "Tried to send member event through non-member codepath" ) + if not ignore_shadow_ban and requester.shadow_banned: + # We randomly sleep a bit just to annoy the requester. + await self.clock.sleep(random.randint(1, 10)) + raise ShadowBanError() + user = UserID.from_string(event.sender) assert self.hs.is_mine(user), "User must be our own: %s" % (user,) if event.is_state(): - prev_state = await self.deduplicate_state_event(event, context) - if prev_state is not None: + prev_event = await self.deduplicate_state_event(event, context) + if prev_event is not None: logger.info( "Not bothering to persist state event %s duplicated by %s", event.event_id, - prev_state.event_id, + prev_event.event_id, ) - return prev_state + return await self.store.get_stream_id_for_event(prev_event.event_id) return await self.handle_new_client_event( requester=requester, event=event, context=context, ratelimit=ratelimit @@ -685,40 +693,61 @@ class EventCreationHandler(object): async def deduplicate_state_event( self, event: EventBase, context: EventContext - ) -> None: + ) -> Optional[EventBase]: """ Checks whether event is in the latest resolved state in context. - If so, returns the version of the event in context. - Otherwise, returns None. + Args: + event: The event to check for duplication. + context: The event context. + + Returns: + The previous verion of the event is returned, if it is found in the + event context. Otherwise, None is returned. """ prev_state_ids = await context.get_prev_state_ids() prev_event_id = prev_state_ids.get((event.type, event.state_key)) if not prev_event_id: - return + return None prev_event = await self.store.get_event(prev_event_id, allow_none=True) if not prev_event: - return + return None if prev_event and event.user_id == prev_event.user_id: prev_content = encode_canonical_json(prev_event.content) next_content = encode_canonical_json(event.content) if prev_content == next_content: return prev_event - return + return None async def create_and_send_nonmember_event( self, requester: Requester, - event_dict: EventBase, + event_dict: dict, ratelimit: bool = True, txn_id: Optional[str] = None, + ignore_shadow_ban: bool = False, ) -> Tuple[EventBase, int]: """ Creates an event, then sends it. See self.create_event and self.send_nonmember_event. + + Args: + requester: The requester sending the event. + event_dict: An entire event. + ratelimit: Whether to rate limit this send. + txn_id: The transaction ID. + ignore_shadow_ban: True if shadow-banned users should be allowed to + send this event. + + Raises: + ShadowBanError if the requester has been shadow-banned. """ + if not ignore_shadow_ban and requester.shadow_banned: + # We randomly sleep a bit just to annoy the requester. + await self.clock.sleep(random.randint(1, 10)) + raise ShadowBanError() # We limit the number of concurrent event sends in a room so that we # don't fork the DAG too much. If we don't limit then we can end up in @@ -737,7 +766,11 @@ class EventCreationHandler(object): raise SynapseError(403, spam_error, Codes.FORBIDDEN) stream_id = await self.send_nonmember_event( - requester, event, context, ratelimit=ratelimit + requester, + event, + context, + ratelimit=ratelimit, + ignore_shadow_ban=ignore_shadow_ban, ) return event, stream_id @@ -746,7 +779,7 @@ class EventCreationHandler(object): self, builder: EventBuilder, requester: Optional[Requester] = None, - prev_event_ids: Optional[Collection[str]] = None, + prev_event_ids: Optional[List[str]] = None, ) -> Tuple[EventBase, EventContext]: """Create a new event for a local client @@ -771,6 +804,15 @@ class EventCreationHandler(object): else: prev_event_ids = await self.store.get_prev_events_for_room(builder.room_id) + # we now ought to have some prev_events (unless it's a create event). + # + # do a quick sanity check here, rather than waiting until we've created the + # event and then try to auth it (which fails with a somewhat confusing "No + # create event in auth events") + assert ( + builder.type == EventTypes.Create or len(prev_event_ids) > 0 + ), "Attempting to create an event with no prev_events" + event = await builder.build(prev_event_ids=prev_event_ids) context = await self.state.compute_event_context(event) if requester: @@ -853,7 +895,7 @@ class EventCreationHandler(object): # Ensure that we can round trip before trying to persist in db try: dump = frozendict_json_encoder.encode(event.content) - json.loads(dump) + json_decoder.decode(dump) except Exception: logger.exception("Failed to encode content: %r", event.content) raise @@ -862,9 +904,10 @@ class EventCreationHandler(object): try: # If we're a worker we need to hit out to the master. - if not self._is_event_writer: + writer_instance = self._events_shard_config.get_instance(event.room_id) + if writer_instance != self._instance_name: result = await self.send_event( - instance_name=self.config.worker.writers.events, + instance_name=writer_instance, event_id=event.event_id, store=self.store, requester=requester, @@ -885,9 +928,7 @@ class EventCreationHandler(object): except Exception: # Ensure that we actually remove the entries in the push actions # staging area, if we calculated them. - run_in_background( - self.store.remove_push_actions_from_staging, event.event_id - ) + await self.store.remove_push_actions_from_staging(event.event_id) raise async def _validate_canonical_alias( @@ -934,7 +975,10 @@ class EventCreationHandler(object): This should only be run on the instance in charge of persisting events. """ - assert self._is_event_writer + assert self.storage.persistence is not None + assert self._events_shard_config.should_handle( + self._instance_name, event.room_id + ) if ratelimit: # We check if this is a room admin redacting an event so that we @@ -951,7 +995,7 @@ class EventCreationHandler(object): allow_none=True, ) - is_admin_redaction = ( + is_admin_redaction = bool( original_event and event.sender != original_event.sender ) @@ -965,7 +1009,7 @@ class EventCreationHandler(object): # Validate a newly added alias or newly added alt_aliases. original_alias = None - original_alt_aliases = set() + original_alt_aliases = [] # type: List[str] original_event_id = event.unsigned.get("replaces_state") if original_event_id: @@ -1013,6 +1057,10 @@ class EventCreationHandler(object): current_state_ids = await context.get_current_state_ids() + # We know this event is not an outlier, so this must be + # non-None. + assert current_state_ids is not None + state_to_include_ids = [ e_id for k, e_id in current_state_ids.items() @@ -1064,11 +1112,11 @@ class EventCreationHandler(object): raise SynapseError(400, "Cannot redact event from a different room") prev_state_ids = await context.get_prev_state_ids() - auth_events_ids = await self.auth.compute_auth_events( + auth_events_ids = self.auth.compute_auth_events( event, prev_state_ids, for_verification=True ) - auth_events = await self.store.get_events(auth_events_ids) - auth_events = {(e.type, e.state_key): e for e in auth_events.values()} + auth_events_map = await self.store.get_events(auth_events_ids) + auth_events = {(e.type, e.state_key): e for e in auth_events_map.values()} room_version = await self.store.get_room_version_id(event.room_id) room_version_obj = KNOWN_ROOM_VERSIONS[room_version] @@ -1093,7 +1141,7 @@ class EventCreationHandler(object): if prev_state_ids: raise AuthError(403, "Changing the room create event is forbidden") - event_stream_id, max_stream_id = await self.storage.persistence.persist_event( + event_pos, max_stream_token = await self.storage.persistence.persist_event( event, context=context ) @@ -1101,12 +1149,10 @@ class EventCreationHandler(object): # If there's an expiry timestamp on the event, schedule its expiry. self._message_handler.maybe_schedule_expiry(event) - await self.pusher_pool.on_new_notifications(event_stream_id, max_stream_id) - def _notify(): try: self.notifier.on_new_room_event( - event, event_stream_id, max_stream_id, extra_users=extra_users + event, event_pos, max_stream_token, extra_users=extra_users ) except Exception: logger.exception("Error notifying about new room event") @@ -1118,7 +1164,7 @@ class EventCreationHandler(object): # matters as sometimes presence code can take a while. run_in_background(self._bump_active_time, requester.user) - return event_stream_id + return event_pos.stream async def _bump_active_time(self, user: UserID) -> None: try: @@ -1139,48 +1185,7 @@ class EventCreationHandler(object): ) for room_id in room_ids: - # For each room we need to find a joined member we can use to send - # the dummy event with. - - latest_event_ids = await self.store.get_prev_events_for_room(room_id) - - members = await self.state.get_current_users_in_room( - room_id, latest_event_ids=latest_event_ids - ) - dummy_event_sent = False - for user_id in members: - if not self.hs.is_mine_id(user_id): - continue - requester = create_requester(user_id) - try: - event, context = await self.create_event( - requester, - { - "type": "org.matrix.dummy_event", - "content": {}, - "room_id": room_id, - "sender": user_id, - }, - prev_event_ids=latest_event_ids, - ) - - event.internal_metadata.proactively_send = False - - await self.send_nonmember_event( - requester, event, context, ratelimit=False - ) - dummy_event_sent = True - break - except ConsentNotGivenError: - logger.info( - "Failed to send dummy event into room %s for user %s due to " - "lack of consent. Will try another user" % (room_id, user_id) - ) - except AuthError: - logger.info( - "Failed to send dummy event into room %s for user %s due to " - "lack of power. Will try another user" % (room_id, user_id) - ) + dummy_event_sent = await self._send_dummy_event_for_room(room_id) if not dummy_event_sent: # Did not find a valid user in the room, so remove from future attempts @@ -1193,6 +1198,59 @@ class EventCreationHandler(object): now = self.clock.time_msec() self._rooms_to_exclude_from_dummy_event_insertion[room_id] = now + async def _send_dummy_event_for_room(self, room_id: str) -> bool: + """Attempt to send a dummy event for the given room. + + Args: + room_id: room to try to send an event from + + Returns: + True if a dummy event was successfully sent. False if no user was able + to send an event. + """ + + # For each room we need to find a joined member we can use to send + # the dummy event with. + latest_event_ids = await self.store.get_prev_events_for_room(room_id) + members = await self.state.get_current_users_in_room( + room_id, latest_event_ids=latest_event_ids + ) + for user_id in members: + if not self.hs.is_mine_id(user_id): + continue + requester = create_requester(user_id) + try: + event, context = await self.create_event( + requester, + { + "type": "org.matrix.dummy_event", + "content": {}, + "room_id": room_id, + "sender": user_id, + }, + prev_event_ids=latest_event_ids, + ) + + event.internal_metadata.proactively_send = False + + # Since this is a dummy-event it is OK if it is sent by a + # shadow-banned user. + await self.send_nonmember_event( + requester, event, context, ratelimit=False, ignore_shadow_ban=True, + ) + return True + except ConsentNotGivenError: + logger.info( + "Failed to send dummy event into room %s for user %s due to " + "lack of consent. Will try another user" % (room_id, user_id) + ) + except AuthError: + logger.info( + "Failed to send dummy event into room %s for user %s due to " + "lack of power. Will try another user" % (room_id, user_id) + ) + return False + def _expire_rooms_to_exclude_from_dummy_event_insertion(self): expire_before = self.clock.time_msec() - _DUMMY_EVENT_ROOM_EXCLUSION_EXPIRY to_expire = set()