diff options
Diffstat (limited to 'synapse/handlers/message.py')
-rw-r--r-- | synapse/handlers/message.py | 304 |
1 files changed, 32 insertions, 272 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index a39b852ceb..39d7724778 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -23,7 +23,6 @@ from canonicaljson import encode_canonical_json, json from twisted.internet import defer from twisted.internet.defer import succeed -from twisted.python.failure import Failure from synapse.api.constants import MAX_DEPTH, EventTypes, Membership from synapse.api.errors import AuthError, Codes, ConsentNotGivenError, SynapseError @@ -32,247 +31,26 @@ from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator from synapse.replication.http.send_event import send_event_to_master -from synapse.types import RoomAlias, RoomStreamToken, UserID -from synapse.util.async import Limiter, ReadWriteLock +from synapse.types import RoomAlias, UserID +from synapse.util.async import Linearizer from synapse.util.frozenutils import frozendict_json_encoder from synapse.util.logcontext import run_in_background from synapse.util.metrics import measure_func -from synapse.util.stringutils import random_string -from synapse.visibility import filter_events_for_client from ._base import BaseHandler logger = logging.getLogger(__name__) -class PurgeStatus(object): - """Object tracking the status of a purge request - - This class contains information on the progress of a purge request, for - return by get_purge_status. - - Attributes: - status (int): Tracks whether this request has completed. One of - STATUS_{ACTIVE,COMPLETE,FAILED} +class MessageHandler(object): + """Contains some read only APIs to get state about a room """ - STATUS_ACTIVE = 0 - STATUS_COMPLETE = 1 - STATUS_FAILED = 2 - - STATUS_TEXT = { - STATUS_ACTIVE: "active", - STATUS_COMPLETE: "complete", - STATUS_FAILED: "failed", - } - - def __init__(self): - self.status = PurgeStatus.STATUS_ACTIVE - - def asdict(self): - return { - "status": PurgeStatus.STATUS_TEXT[self.status] - } - - -class MessageHandler(BaseHandler): - def __init__(self, hs): - super(MessageHandler, self).__init__(hs) - self.hs = hs - self.state = hs.get_state_handler() + self.auth = hs.get_auth() self.clock = hs.get_clock() - - self.pagination_lock = ReadWriteLock() - self._purges_in_progress_by_room = set() - # map from purge id to PurgeStatus - self._purges_by_id = {} - - def start_purge_history(self, room_id, token, - delete_local_events=False): - """Start off a history purge on a room. - - Args: - room_id (str): The room to purge from - - token (str): topological token to delete events before - delete_local_events (bool): True to delete local events as well as - remote ones - - Returns: - str: unique ID for this purge transaction. - """ - if room_id in self._purges_in_progress_by_room: - raise SynapseError( - 400, - "History purge already in progress for %s" % (room_id, ), - ) - - purge_id = random_string(16) - - # we log the purge_id here so that it can be tied back to the - # request id in the log lines. - logger.info("[purge] starting purge_id %s", purge_id) - - self._purges_by_id[purge_id] = PurgeStatus() - run_in_background( - self._purge_history, - purge_id, room_id, token, delete_local_events, - ) - return purge_id - - @defer.inlineCallbacks - def _purge_history(self, purge_id, room_id, token, - delete_local_events): - """Carry out a history purge on a room. - - Args: - purge_id (str): The id for this purge - room_id (str): The room to purge from - token (str): topological token to delete events before - delete_local_events (bool): True to delete local events as well as - remote ones - - Returns: - Deferred - """ - self._purges_in_progress_by_room.add(room_id) - try: - with (yield self.pagination_lock.write(room_id)): - yield self.store.purge_history( - room_id, token, delete_local_events, - ) - logger.info("[purge] complete") - self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE - except Exception: - logger.error("[purge] failed: %s", Failure().getTraceback().rstrip()) - self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED - finally: - self._purges_in_progress_by_room.discard(room_id) - - # remove the purge from the list 24 hours after it completes - def clear_purge(): - del self._purges_by_id[purge_id] - self.hs.get_reactor().callLater(24 * 3600, clear_purge) - - def get_purge_status(self, purge_id): - """Get the current status of an active purge - - Args: - purge_id (str): purge_id returned by start_purge_history - - Returns: - PurgeStatus|None - """ - return self._purges_by_id.get(purge_id) - - @defer.inlineCallbacks - def get_messages(self, requester, room_id=None, pagin_config=None, - as_client_event=True, event_filter=None): - """Get messages in a room. - - Args: - requester (Requester): The user requesting messages. - room_id (str): The room they want messages from. - pagin_config (synapse.api.streams.PaginationConfig): The pagination - config rules to apply, if any. - as_client_event (bool): True to get events in client-server format. - event_filter (Filter): Filter to apply to results or None - Returns: - dict: Pagination API results - """ - user_id = requester.user.to_string() - - if pagin_config.from_token: - room_token = pagin_config.from_token.room_key - else: - pagin_config.from_token = ( - yield self.hs.get_event_sources().get_current_token_for_room( - room_id=room_id - ) - ) - room_token = pagin_config.from_token.room_key - - room_token = RoomStreamToken.parse(room_token) - - pagin_config.from_token = pagin_config.from_token.copy_and_replace( - "room_key", str(room_token) - ) - - source_config = pagin_config.get_source_config("room") - - with (yield self.pagination_lock.read(room_id)): - membership, member_event_id = yield self._check_in_room_or_world_readable( - room_id, user_id - ) - - if source_config.direction == 'b': - # if we're going backwards, we might need to backfill. This - # requires that we have a topo token. - if room_token.topological: - max_topo = room_token.topological - else: - max_topo = yield self.store.get_max_topological_token( - room_id, room_token.stream - ) - - if membership == Membership.LEAVE: - # If they have left the room then clamp the token to be before - # they left the room, to save the effort of loading from the - # database. - leave_token = yield self.store.get_topological_token_for_event( - member_event_id - ) - leave_token = RoomStreamToken.parse(leave_token) - if leave_token.topological < max_topo: - source_config.from_key = str(leave_token) - - yield self.hs.get_handlers().federation_handler.maybe_backfill( - room_id, max_topo - ) - - events, next_key = yield self.store.paginate_room_events( - room_id=room_id, - from_key=source_config.from_key, - to_key=source_config.to_key, - direction=source_config.direction, - limit=source_config.limit, - event_filter=event_filter, - ) - - next_token = pagin_config.from_token.copy_and_replace( - "room_key", next_key - ) - - if not events: - defer.returnValue({ - "chunk": [], - "start": pagin_config.from_token.to_string(), - "end": next_token.to_string(), - }) - - if event_filter: - events = event_filter.filter(events) - - events = yield filter_events_for_client( - self.store, - user_id, - events, - is_peeking=(member_event_id is None), - ) - - time_now = self.clock.time_msec() - - chunk = { - "chunk": [ - serialize_event(e, time_now, as_client_event) - for e in events - ], - "start": pagin_config.from_token.to_string(), - "end": next_token.to_string(), - } - - defer.returnValue(chunk) + self.state = hs.get_state_handler() + self.store = hs.get_datastore() @defer.inlineCallbacks def get_room_data(self, user_id=None, room_id=None, @@ -286,12 +64,12 @@ class MessageHandler(BaseHandler): Raises: SynapseError if something went wrong. """ - membership, membership_event_id = yield self._check_in_room_or_world_readable( + membership, membership_event_id = yield self.auth.check_in_room_or_world_readable( room_id, user_id ) if membership == Membership.JOIN: - data = yield self.state_handler.get_current_state( + data = yield self.state.get_current_state( room_id, event_type, state_key ) elif membership == Membership.LEAVE: @@ -304,31 +82,6 @@ class MessageHandler(BaseHandler): defer.returnValue(data) @defer.inlineCallbacks - def _check_in_room_or_world_readable(self, room_id, user_id): - try: - # check_user_was_in_room will return the most recent membership - # event for the user if: - # * The user is a non-guest user, and was ever in the room - # * The user is a guest user, and has joined the room - # else it will throw. - member_event = yield self.auth.check_user_was_in_room(room_id, user_id) - defer.returnValue((member_event.membership, member_event.event_id)) - return - except AuthError: - visibility = yield self.state_handler.get_current_state( - room_id, EventTypes.RoomHistoryVisibility, "" - ) - if ( - visibility and - visibility.content["history_visibility"] == "world_readable" - ): - defer.returnValue((Membership.JOIN, None)) - return - raise AuthError( - 403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN - ) - - @defer.inlineCallbacks def get_state_events(self, user_id, room_id, is_guest=False): """Retrieve all state events for a given room. If the user is joined to the room then return the current state. If the user has @@ -340,12 +93,12 @@ class MessageHandler(BaseHandler): Returns: A list of dicts representing state events. [{}, {}, {}] """ - membership, membership_event_id = yield self._check_in_room_or_world_readable( + membership, membership_event_id = yield self.auth.check_in_room_or_world_readable( room_id, user_id ) if membership == Membership.JOIN: - room_state = yield self.state_handler.get_current_state(room_id) + room_state = yield self.state.get_current_state(room_id) elif membership == Membership.LEAVE: room_state = yield self.store.get_state_for_events( [membership_event_id], None @@ -373,7 +126,7 @@ class MessageHandler(BaseHandler): if not requester.app_service: # We check AS auth after fetching the room membership, as it # requires us to pull out all joined members anyway. - membership, _ = yield self._check_in_room_or_world_readable( + membership, _ = yield self.auth.check_in_room_or_world_readable( room_id, user_id ) if membership != Membership.JOIN: @@ -427,7 +180,7 @@ class EventCreationHandler(object): # We arbitrarily limit concurrent event creation for a room to 5. # This is to stop us from diverging history *too* much. - self.limiter = Limiter(max_count=5) + self.limiter = Linearizer(max_count=5, name="room_event_creation_limit") self.action_generator = hs.get_action_generator() @@ -630,7 +383,8 @@ class EventCreationHandler(object): If so, returns the version of the event in context. Otherwise, returns None. """ - prev_event_id = context.prev_state_ids.get((event.type, event.state_key)) + prev_state_ids = yield context.get_prev_state_ids(self.store) + prev_event_id = prev_state_ids.get((event.type, event.state_key)) prev_event = yield self.store.get_event(prev_event_id, allow_none=True) if not prev_event: return @@ -752,8 +506,8 @@ class EventCreationHandler(object): event = builder.build() logger.debug( - "Created event %s with state: %s", - event.event_id, context.prev_state_ids, + "Created event %s", + event.event_id, ) defer.returnValue( @@ -806,8 +560,9 @@ class EventCreationHandler(object): # If we're a worker we need to hit out to the master. if self.config.worker_app: yield send_event_to_master( - self.hs.get_clock(), - self.http_client, + clock=self.hs.get_clock(), + store=self.store, + client=self.http_client, host=self.config.worker_replication_host, port=self.config.worker_replication_http_port, requester=requester, @@ -884,9 +639,11 @@ class EventCreationHandler(object): e.sender == event.sender ) + current_state_ids = yield context.get_current_state_ids(self.store) + state_to_include_ids = [ e_id - for k, e_id in iteritems(context.current_state_ids) + for k, e_id in iteritems(current_state_ids) if k[0] in self.hs.config.room_invite_state_types or k == (EventTypes.Member, event.sender) ] @@ -922,8 +679,9 @@ class EventCreationHandler(object): ) if event.type == EventTypes.Redaction: + prev_state_ids = yield context.get_prev_state_ids(self.store) auth_events_ids = yield self.auth.compute_auth_events( - event, context.prev_state_ids, for_verification=True, + event, prev_state_ids, for_verification=True, ) auth_events = yield self.store.get_events(auth_events_ids) auth_events = { @@ -943,11 +701,13 @@ class EventCreationHandler(object): "You don't have permission to redact events" ) - if event.type == EventTypes.Create and context.prev_state_ids: - raise AuthError( - 403, - "Changing the room create event is forbidden", - ) + if event.type == EventTypes.Create: + prev_state_ids = yield context.get_prev_state_ids(self.store) + if prev_state_ids: + raise AuthError( + 403, + "Changing the room create event is forbidden", + ) (event_stream_id, max_stream_id) = yield self.store.persist_event( event, context=context |