summary refs log tree commit diff
path: root/synapse/handlers/message.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/message.py')
-rw-r--r--synapse/handlers/message.py431
1 files changed, 126 insertions, 305 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 1cb81b6cf8..e484061cc0 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -14,269 +14,50 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-import simplejson
 import sys
 
-from canonicaljson import encode_canonical_json
 import six
-from six import string_types, itervalues, iteritems
-from twisted.internet import defer, reactor
+from six import iteritems, itervalues, string_types
+
+from canonicaljson import encode_canonical_json, json
+
+from twisted.internet import defer
 from twisted.internet.defer import succeed
-from twisted.python.failure import Failure
 
-from synapse.api.constants import EventTypes, Membership, MAX_DEPTH
+from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
 from synapse.api.errors import (
-    AuthError, Codes, SynapseError,
+    AuthError,
+    Codes,
     ConsentNotGivenError,
+    NotFoundError,
+    SynapseError,
 )
 from synapse.api.urls import ConsentURIBuilder
 from synapse.crypto.event_signing import add_hashes_and_signatures
 from synapse.events.utils import serialize_event
 from synapse.events.validator import EventValidator
-from synapse.types import (
-    UserID, RoomAlias, RoomStreamToken,
-)
-from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter
+from synapse.replication.http.send_event import ReplicationSendEventRestServlet
+from synapse.types import RoomAlias, UserID
+from synapse.util.async_helpers import Linearizer
+from synapse.util.frozenutils import frozendict_json_encoder
 from synapse.util.logcontext import run_in_background
 from synapse.util.metrics import measure_func
-from synapse.util.frozenutils import frozendict_json_encoder
-from synapse.util.stringutils import random_string
 from synapse.visibility import filter_events_for_client
-from synapse.replication.http.send_event import send_event_to_master
 
 from ._base import BaseHandler
 
 logger = logging.getLogger(__name__)
 
 
-class PurgeStatus(object):
-    """Object tracking the status of a purge request
-
-    This class contains information on the progress of a purge request, for
-    return by get_purge_status.
-
-    Attributes:
-        status (int): Tracks whether this request has completed. One of
-            STATUS_{ACTIVE,COMPLETE,FAILED}
+class MessageHandler(object):
+    """Contains some read only APIs to get state about a room
     """
 
-    STATUS_ACTIVE = 0
-    STATUS_COMPLETE = 1
-    STATUS_FAILED = 2
-
-    STATUS_TEXT = {
-        STATUS_ACTIVE: "active",
-        STATUS_COMPLETE: "complete",
-        STATUS_FAILED: "failed",
-    }
-
-    def __init__(self):
-        self.status = PurgeStatus.STATUS_ACTIVE
-
-    def asdict(self):
-        return {
-            "status": PurgeStatus.STATUS_TEXT[self.status]
-        }
-
-
-class MessageHandler(BaseHandler):
-
     def __init__(self, hs):
-        super(MessageHandler, self).__init__(hs)
-        self.hs = hs
-        self.state = hs.get_state_handler()
+        self.auth = hs.get_auth()
         self.clock = hs.get_clock()
-
-        self.pagination_lock = ReadWriteLock()
-        self._purges_in_progress_by_room = set()
-        # map from purge id to PurgeStatus
-        self._purges_by_id = {}
-
-    def start_purge_history(self, room_id, token,
-                            delete_local_events=False):
-        """Start off a history purge on a room.
-
-        Args:
-            room_id (str): The room to purge from
-
-            token (str): topological token to delete events before
-            delete_local_events (bool): True to delete local events as well as
-                remote ones
-
-        Returns:
-            str: unique ID for this purge transaction.
-        """
-        if room_id in self._purges_in_progress_by_room:
-            raise SynapseError(
-                400,
-                "History purge already in progress for %s" % (room_id, ),
-            )
-
-        purge_id = random_string(16)
-
-        # we log the purge_id here so that it can be tied back to the
-        # request id in the log lines.
-        logger.info("[purge] starting purge_id %s", purge_id)
-
-        self._purges_by_id[purge_id] = PurgeStatus()
-        run_in_background(
-            self._purge_history,
-            purge_id, room_id, token, delete_local_events,
-        )
-        return purge_id
-
-    @defer.inlineCallbacks
-    def _purge_history(self, purge_id, room_id, token,
-                       delete_local_events):
-        """Carry out a history purge on a room.
-
-        Args:
-            purge_id (str): The id for this purge
-            room_id (str): The room to purge from
-            token (str): topological token to delete events before
-            delete_local_events (bool): True to delete local events as well as
-                remote ones
-
-        Returns:
-            Deferred
-        """
-        self._purges_in_progress_by_room.add(room_id)
-        try:
-            with (yield self.pagination_lock.write(room_id)):
-                yield self.store.purge_history(
-                    room_id, token, delete_local_events,
-                )
-            logger.info("[purge] complete")
-            self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
-        except Exception:
-            logger.error("[purge] failed: %s", Failure().getTraceback().rstrip())
-            self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
-        finally:
-            self._purges_in_progress_by_room.discard(room_id)
-
-            # remove the purge from the list 24 hours after it completes
-            def clear_purge():
-                del self._purges_by_id[purge_id]
-            reactor.callLater(24 * 3600, clear_purge)
-
-    def get_purge_status(self, purge_id):
-        """Get the current status of an active purge
-
-        Args:
-            purge_id (str): purge_id returned by start_purge_history
-
-        Returns:
-            PurgeStatus|None
-        """
-        return self._purges_by_id.get(purge_id)
-
-    @defer.inlineCallbacks
-    def get_messages(self, requester, room_id=None, pagin_config=None,
-                     as_client_event=True, event_filter=None):
-        """Get messages in a room.
-
-        Args:
-            requester (Requester): The user requesting messages.
-            room_id (str): The room they want messages from.
-            pagin_config (synapse.api.streams.PaginationConfig): The pagination
-                config rules to apply, if any.
-            as_client_event (bool): True to get events in client-server format.
-            event_filter (Filter): Filter to apply to results or None
-        Returns:
-            dict: Pagination API results
-        """
-        user_id = requester.user.to_string()
-
-        if pagin_config.from_token:
-            room_token = pagin_config.from_token.room_key
-        else:
-            pagin_config.from_token = (
-                yield self.hs.get_event_sources().get_current_token_for_room(
-                    room_id=room_id
-                )
-            )
-            room_token = pagin_config.from_token.room_key
-
-        room_token = RoomStreamToken.parse(room_token)
-
-        pagin_config.from_token = pagin_config.from_token.copy_and_replace(
-            "room_key", str(room_token)
-        )
-
-        source_config = pagin_config.get_source_config("room")
-
-        with (yield self.pagination_lock.read(room_id)):
-            membership, member_event_id = yield self._check_in_room_or_world_readable(
-                room_id, user_id
-            )
-
-            if source_config.direction == 'b':
-                # if we're going backwards, we might need to backfill. This
-                # requires that we have a topo token.
-                if room_token.topological:
-                    max_topo = room_token.topological
-                else:
-                    max_topo = yield self.store.get_max_topological_token(
-                        room_id, room_token.stream
-                    )
-
-                if membership == Membership.LEAVE:
-                    # If they have left the room then clamp the token to be before
-                    # they left the room, to save the effort of loading from the
-                    # database.
-                    leave_token = yield self.store.get_topological_token_for_event(
-                        member_event_id
-                    )
-                    leave_token = RoomStreamToken.parse(leave_token)
-                    if leave_token.topological < max_topo:
-                        source_config.from_key = str(leave_token)
-
-                yield self.hs.get_handlers().federation_handler.maybe_backfill(
-                    room_id, max_topo
-                )
-
-            events, next_key = yield self.store.paginate_room_events(
-                room_id=room_id,
-                from_key=source_config.from_key,
-                to_key=source_config.to_key,
-                direction=source_config.direction,
-                limit=source_config.limit,
-                event_filter=event_filter,
-            )
-
-            next_token = pagin_config.from_token.copy_and_replace(
-                "room_key", next_key
-            )
-
-        if not events:
-            defer.returnValue({
-                "chunk": [],
-                "start": pagin_config.from_token.to_string(),
-                "end": next_token.to_string(),
-            })
-
-        if event_filter:
-            events = event_filter.filter(events)
-
-        events = yield filter_events_for_client(
-            self.store,
-            user_id,
-            events,
-            is_peeking=(member_event_id is None),
-        )
-
-        time_now = self.clock.time_msec()
-
-        chunk = {
-            "chunk": [
-                serialize_event(e, time_now, as_client_event)
-                for e in events
-            ],
-            "start": pagin_config.from_token.to_string(),
-            "end": next_token.to_string(),
-        }
-
-        defer.returnValue(chunk)
+        self.state = hs.get_state_handler()
+        self.store = hs.get_datastore()
 
     @defer.inlineCallbacks
     def get_room_data(self, user_id=None, room_id=None,
@@ -290,12 +71,12 @@ class MessageHandler(BaseHandler):
         Raises:
             SynapseError if something went wrong.
         """
-        membership, membership_event_id = yield self._check_in_room_or_world_readable(
+        membership, membership_event_id = yield self.auth.check_in_room_or_world_readable(
             room_id, user_id
         )
 
         if membership == Membership.JOIN:
-            data = yield self.state_handler.get_current_state(
+            data = yield self.state.get_current_state(
                 room_id, event_type, state_key
             )
         elif membership == Membership.LEAVE:
@@ -308,53 +89,85 @@ class MessageHandler(BaseHandler):
         defer.returnValue(data)
 
     @defer.inlineCallbacks
-    def _check_in_room_or_world_readable(self, room_id, user_id):
-        try:
-            # check_user_was_in_room will return the most recent membership
-            # event for the user if:
-            #  * The user is a non-guest user, and was ever in the room
-            #  * The user is a guest user, and has joined the room
-            # else it will throw.
-            member_event = yield self.auth.check_user_was_in_room(room_id, user_id)
-            defer.returnValue((member_event.membership, member_event.event_id))
-            return
-        except AuthError:
-            visibility = yield self.state_handler.get_current_state(
-                room_id, EventTypes.RoomHistoryVisibility, ""
-            )
-            if (
-                visibility and
-                visibility.content["history_visibility"] == "world_readable"
-            ):
-                defer.returnValue((Membership.JOIN, None))
-                return
-            raise AuthError(
-                403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN
-            )
-
-    @defer.inlineCallbacks
-    def get_state_events(self, user_id, room_id, is_guest=False):
+    def get_state_events(
+        self, user_id, room_id, types=None, filtered_types=None,
+        at_token=None, is_guest=False,
+    ):
         """Retrieve all state events for a given room. If the user is
         joined to the room then return the current state. If the user has
-        left the room return the state events from when they left.
+        left the room return the state events from when they left. If an explicit
+        'at' parameter is passed, return the state events as of that event, if
+        visible.
 
         Args:
             user_id(str): The user requesting state events.
             room_id(str): The room ID to get all state events from.
+            types(list[(str, str|None)]|None): List of (type, state_key) tuples
+                which are used to filter the state fetched. If `state_key` is None,
+                all events are returned of the given type.
+                May be None, which matches any key.
+            filtered_types(list[str]|None): Only apply filtering via `types` to this
+                list of event types.  Other types of events are returned unfiltered.
+                If None, `types` filtering is applied to all events.
+            at_token(StreamToken|None): the stream token of the at which we are requesting
+                the stats. If the user is not allowed to view the state as of that
+                stream token, we raise a 403 SynapseError. If None, returns the current
+                state based on the current_state_events table.
+            is_guest(bool): whether this user is a guest
         Returns:
             A list of dicts representing state events. [{}, {}, {}]
+        Raises:
+            NotFoundError (404) if the at token does not yield an event
+
+            AuthError (403) if the user doesn't have permission to view
+            members of this room.
         """
-        membership, membership_event_id = yield self._check_in_room_or_world_readable(
-            room_id, user_id
-        )
+        if at_token:
+            # FIXME this claims to get the state at a stream position, but
+            # get_recent_events_for_room operates by topo ordering. This therefore
+            # does not reliably give you the state at the given stream position.
+            # (https://github.com/matrix-org/synapse/issues/3305)
+            last_events, _ = yield self.store.get_recent_events_for_room(
+                room_id, end_token=at_token.room_key, limit=1,
+            )
 
-        if membership == Membership.JOIN:
-            room_state = yield self.state_handler.get_current_state(room_id)
-        elif membership == Membership.LEAVE:
-            room_state = yield self.store.get_state_for_events(
-                [membership_event_id], None
+            if not last_events:
+                raise NotFoundError("Can't find event for token %s" % (at_token, ))
+
+            visible_events = yield filter_events_for_client(
+                self.store, user_id, last_events,
             )
-            room_state = room_state[membership_event_id]
+
+            event = last_events[0]
+            if visible_events:
+                room_state = yield self.store.get_state_for_events(
+                    [event.event_id], types, filtered_types=filtered_types,
+                )
+                room_state = room_state[event.event_id]
+            else:
+                raise AuthError(
+                    403,
+                    "User %s not allowed to view events in room %s at token %s" % (
+                        user_id, room_id, at_token,
+                    )
+                )
+        else:
+            membership, membership_event_id = (
+                yield self.auth.check_in_room_or_world_readable(
+                    room_id, user_id,
+                )
+            )
+
+            if membership == Membership.JOIN:
+                state_ids = yield self.store.get_filtered_current_state_ids(
+                    room_id, types, filtered_types=filtered_types,
+                )
+                room_state = yield self.store.get_events(state_ids.values())
+            elif membership == Membership.LEAVE:
+                room_state = yield self.store.get_state_for_events(
+                    [membership_event_id], types, filtered_types=filtered_types,
+                )
+                room_state = room_state[membership_event_id]
 
         now = self.clock.time_msec()
         defer.returnValue(
@@ -377,7 +190,7 @@ class MessageHandler(BaseHandler):
         if not requester.app_service:
             # We check AS auth after fetching the room membership, as it
             # requires us to pull out all joined members anyway.
-            membership, _ = yield self._check_in_room_or_world_readable(
+            membership, _ = yield self.auth.check_in_room_or_world_readable(
                 room_id, user_id
             )
             if membership != Membership.JOIN:
@@ -388,7 +201,7 @@ class MessageHandler(BaseHandler):
         users_with_profile = yield self.state.get_current_user_in_room(room_id)
 
         # If this is an AS, double check that they are allowed to see the members.
-        # This can either be because the AS user is in the room or becuase there
+        # This can either be because the AS user is in the room or because there
         # is a user in the room that the AS is "interested in"
         if requester.app_service and user_id not in users_with_profile:
             for uid in users_with_profile:
@@ -422,7 +235,7 @@ class EventCreationHandler(object):
         self.notifier = hs.get_notifier()
         self.config = hs.config
 
-        self.http_client = hs.get_simple_http_client()
+        self.send_event_to_master = ReplicationSendEventRestServlet.make_client(hs)
 
         # This is only used to get at ratelimit function, and maybe_kick_guest_users
         self.base_handler = BaseHandler(hs)
@@ -431,7 +244,7 @@ class EventCreationHandler(object):
 
         # We arbitrarily limit concurrent event creation for a room to 5.
         # This is to stop us from diverging history *too* much.
-        self.limiter = Limiter(max_count=5)
+        self.limiter = Linearizer(max_count=5, name="room_event_creation_limit")
 
         self.action_generator = hs.get_action_generator()
 
@@ -463,10 +276,14 @@ class EventCreationHandler(object):
                 where *hashes* is a map from algorithm to hash.
 
                 If None, they will be requested from the database.
-
+        Raises:
+            ResourceLimitError if server is blocked to some resource being
+            exceeded
         Returns:
             Tuple of created event (FrozenEvent), Context
         """
+        yield self.auth.check_auth_blocking(requester.user.to_string())
+
         builder = self.event_builder_factory.new(event_dict)
 
         self.validator.validate_new(builder)
@@ -491,7 +308,7 @@ class EventCreationHandler(object):
                         target, e
                     )
 
-        is_exempt = yield self._is_exempt_from_privacy_policy(builder)
+        is_exempt = yield self._is_exempt_from_privacy_policy(builder, requester)
         if not is_exempt:
             yield self.assert_accepted_privacy_policy(requester)
 
@@ -509,12 +326,13 @@ class EventCreationHandler(object):
 
         defer.returnValue((event, context))
 
-    def _is_exempt_from_privacy_policy(self, builder):
+    def _is_exempt_from_privacy_policy(self, builder, requester):
         """"Determine if an event to be sent is exempt from having to consent
         to the privacy policy
 
         Args:
             builder (synapse.events.builder.EventBuilder): event being created
+            requester (Requster): user requesting this event
 
         Returns:
             Deferred[bool]: true if the event can be sent without the user
@@ -525,6 +343,9 @@ class EventCreationHandler(object):
             membership = builder.content.get("membership", None)
             if membership == Membership.JOIN:
                 return self._is_server_notices_room(builder.room_id)
+            elif membership == Membership.LEAVE:
+                # the user is always allowed to leave (but not kick people)
+                return builder.state_key == requester.user.to_string()
         return succeed(False)
 
     @defer.inlineCallbacks
@@ -630,7 +451,8 @@ class EventCreationHandler(object):
         If so, returns the version of the event in context.
         Otherwise, returns None.
         """
-        prev_event_id = context.prev_state_ids.get((event.type, event.state_key))
+        prev_state_ids = yield context.get_prev_state_ids(self.store)
+        prev_event_id = prev_state_ids.get((event.type, event.state_key))
         prev_event = yield self.store.get_event(prev_event_id, allow_none=True)
         if not prev_event:
             return
@@ -752,8 +574,8 @@ class EventCreationHandler(object):
         event = builder.build()
 
         logger.debug(
-            "Created event %s with state: %s",
-            event.event_id, context.prev_state_ids,
+            "Created event %s",
+            event.event_id,
         )
 
         defer.returnValue(
@@ -793,7 +615,7 @@ class EventCreationHandler(object):
         # Ensure that we can round trip before trying to persist in db
         try:
             dump = frozendict_json_encoder.encode(event.content)
-            simplejson.loads(dump)
+            json.loads(dump)
         except Exception:
             logger.exception("Failed to encode content: %r", event.content)
             raise
@@ -805,10 +627,9 @@ class EventCreationHandler(object):
         try:
             # If we're a worker we need to hit out to the master.
             if self.config.worker_app:
-                yield send_event_to_master(
-                    self.http_client,
-                    host=self.config.worker_replication_host,
-                    port=self.config.worker_replication_http_port,
+                yield self.send_event_to_master(
+                    event_id=event.event_id,
+                    store=self.store,
                     requester=requester,
                     event=event,
                     context=context,
@@ -883,9 +704,11 @@ class EventCreationHandler(object):
                         e.sender == event.sender
                     )
 
+                current_state_ids = yield context.get_current_state_ids(self.store)
+
                 state_to_include_ids = [
                     e_id
-                    for k, e_id in iteritems(context.current_state_ids)
+                    for k, e_id in iteritems(current_state_ids)
                     if k[0] in self.hs.config.room_invite_state_types
                     or k == (EventTypes.Member, event.sender)
                 ]
@@ -921,8 +744,9 @@ class EventCreationHandler(object):
                     )
 
         if event.type == EventTypes.Redaction:
+            prev_state_ids = yield context.get_prev_state_ids(self.store)
             auth_events_ids = yield self.auth.compute_auth_events(
-                event, context.prev_state_ids, for_verification=True,
+                event, prev_state_ids, for_verification=True,
             )
             auth_events = yield self.store.get_events(auth_events_ids)
             auth_events = {
@@ -942,26 +766,23 @@ class EventCreationHandler(object):
                         "You don't have permission to redact events"
                     )
 
-        if event.type == EventTypes.Create and context.prev_state_ids:
-            raise AuthError(
-                403,
-                "Changing the room create event is forbidden",
-            )
+        if event.type == EventTypes.Create:
+            prev_state_ids = yield context.get_prev_state_ids(self.store)
+            if prev_state_ids:
+                raise AuthError(
+                    403,
+                    "Changing the room create event is forbidden",
+                )
 
         (event_stream_id, max_stream_id) = yield self.store.persist_event(
             event, context=context
         )
 
-        # this intentionally does not yield: we don't care about the result
-        # and don't need to wait for it.
-        run_in_background(
-            self.pusher_pool.on_new_notifications,
-            event_stream_id, max_stream_id
+        self.pusher_pool.on_new_notifications(
+            event_stream_id, max_stream_id,
         )
 
-        @defer.inlineCallbacks
         def _notify():
-            yield run_on_reactor()
             try:
                 self.notifier.on_new_room_event(
                     event, event_stream_id, max_stream_id,