summary refs log tree commit diff
path: root/synapse/handlers/message.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/message.py')
-rw-r--r--synapse/handlers/message.py354
1 files changed, 241 insertions, 113 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 4f97c8db79..81cff0870e 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -13,11 +13,23 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import logging
+import simplejson
+import sys
+
+from canonicaljson import encode_canonical_json
+import six
+from six import string_types, itervalues, iteritems
 from twisted.internet import defer, reactor
+from twisted.internet.defer import succeed
 from twisted.python.failure import Failure
 
-from synapse.api.constants import EventTypes, Membership
-from synapse.api.errors import AuthError, Codes, SynapseError
+from synapse.api.constants import EventTypes, Membership, MAX_DEPTH
+from synapse.api.errors import (
+    AuthError, Codes, SynapseError,
+    ConsentNotGivenError,
+)
+from synapse.api.urls import ConsentURIBuilder
 from synapse.crypto.event_signing import add_hashes_and_signatures
 from synapse.events.utils import serialize_event
 from synapse.events.validator import EventValidator
@@ -25,21 +37,15 @@ from synapse.types import (
     UserID, RoomAlias, RoomStreamToken,
 )
 from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter
-from synapse.util.logcontext import preserve_fn, run_in_background
+from synapse.util.logcontext import run_in_background
 from synapse.util.metrics import measure_func
-from synapse.util.frozenutils import unfreeze
+from synapse.util.frozenutils import frozendict_json_encoder
 from synapse.util.stringutils import random_string
 from synapse.visibility import filter_events_for_client
 from synapse.replication.http.send_event import send_event_to_master
 
 from ._base import BaseHandler
 
-from canonicaljson import encode_canonical_json
-
-import logging
-import random
-import ujson
-
 logger = logging.getLogger(__name__)
 
 
@@ -86,14 +92,14 @@ class MessageHandler(BaseHandler):
         # map from purge id to PurgeStatus
         self._purges_by_id = {}
 
-    def start_purge_history(self, room_id, topological_ordering,
+    def start_purge_history(self, room_id, token,
                             delete_local_events=False):
         """Start off a history purge on a room.
 
         Args:
             room_id (str): The room to purge from
 
-            topological_ordering (int): minimum topo ordering to preserve
+            token (str): topological token to delete events before
             delete_local_events (bool): True to delete local events as well as
                 remote ones
 
@@ -115,19 +121,19 @@ class MessageHandler(BaseHandler):
         self._purges_by_id[purge_id] = PurgeStatus()
         run_in_background(
             self._purge_history,
-            purge_id, room_id, topological_ordering, delete_local_events,
+            purge_id, room_id, token, delete_local_events,
         )
         return purge_id
 
     @defer.inlineCallbacks
-    def _purge_history(self, purge_id, room_id, topological_ordering,
+    def _purge_history(self, purge_id, room_id, token,
                        delete_local_events):
         """Carry out a history purge on a room.
 
         Args:
             purge_id (str): The id for this purge
             room_id (str): The room to purge from
-            topological_ordering (int): minimum topo ordering to preserve
+            token (str): topological token to delete events before
             delete_local_events (bool): True to delete local events as well as
                 remote ones
 
@@ -138,7 +144,7 @@ class MessageHandler(BaseHandler):
         try:
             with (yield self.pagination_lock.write(room_id)):
                 yield self.store.purge_history(
-                    room_id, topological_ordering, delete_local_events,
+                    room_id, token, delete_local_events,
                 )
             logger.info("[purge] complete")
             self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
@@ -397,7 +403,7 @@ class MessageHandler(BaseHandler):
                 "avatar_url": profile.avatar_url,
                 "display_name": profile.display_name,
             }
-            for user_id, profile in users_with_profile.iteritems()
+            for user_id, profile in iteritems(users_with_profile)
         })
 
 
@@ -431,9 +437,12 @@ class EventCreationHandler(object):
 
         self.spam_checker = hs.get_spam_checker()
 
+        if self.config.block_events_without_consent_error is not None:
+            self._consent_uri_builder = ConsentURIBuilder(self.config)
+
     @defer.inlineCallbacks
     def create_event(self, requester, event_dict, token_id=None, txn_id=None,
-                     prev_event_ids=None):
+                     prev_events_and_hashes=None):
         """
         Given a dict from a client, create a new event.
 
@@ -447,50 +456,136 @@ class EventCreationHandler(object):
             event_dict (dict): An entire event
             token_id (str)
             txn_id (str)
-            prev_event_ids (list): The prev event ids to use when creating the event
+
+            prev_events_and_hashes (list[(str, dict[str, str], int)]|None):
+                the forward extremities to use as the prev_events for the
+                new event. For each event, a tuple of (event_id, hashes, depth)
+                where *hashes* is a map from algorithm to hash.
+
+                If None, they will be requested from the database.
 
         Returns:
             Tuple of created event (FrozenEvent), Context
         """
         builder = self.event_builder_factory.new(event_dict)
 
-        with (yield self.limiter.queue(builder.room_id)):
-            self.validator.validate_new(builder)
-
-            if builder.type == EventTypes.Member:
-                membership = builder.content.get("membership", None)
-                target = UserID.from_string(builder.state_key)
-
-                if membership in {Membership.JOIN, Membership.INVITE}:
-                    # If event doesn't include a display name, add one.
-                    profile = self.profile_handler
-                    content = builder.content
-
-                    try:
-                        if "displayname" not in content:
-                            content["displayname"] = yield profile.get_displayname(target)
-                        if "avatar_url" not in content:
-                            content["avatar_url"] = yield profile.get_avatar_url(target)
-                    except Exception as e:
-                        logger.info(
-                            "Failed to get profile information for %r: %s",
-                            target, e
-                        )
+        self.validator.validate_new(builder)
+
+        if builder.type == EventTypes.Member:
+            membership = builder.content.get("membership", None)
+            target = UserID.from_string(builder.state_key)
+
+            if membership in {Membership.JOIN, Membership.INVITE}:
+                # If event doesn't include a display name, add one.
+                profile = self.profile_handler
+                content = builder.content
+
+                try:
+                    if "displayname" not in content:
+                        content["displayname"] = yield profile.get_displayname(target)
+                    if "avatar_url" not in content:
+                        content["avatar_url"] = yield profile.get_avatar_url(target)
+                except Exception as e:
+                    logger.info(
+                        "Failed to get profile information for %r: %s",
+                        target, e
+                    )
 
-            if token_id is not None:
-                builder.internal_metadata.token_id = token_id
+        is_exempt = yield self._is_exempt_from_privacy_policy(builder)
+        if not is_exempt:
+            yield self.assert_accepted_privacy_policy(requester)
 
-            if txn_id is not None:
-                builder.internal_metadata.txn_id = txn_id
+        if token_id is not None:
+            builder.internal_metadata.token_id = token_id
 
-            event, context = yield self.create_new_client_event(
-                builder=builder,
-                requester=requester,
-                prev_event_ids=prev_event_ids,
-            )
+        if txn_id is not None:
+            builder.internal_metadata.txn_id = txn_id
+
+        event, context = yield self.create_new_client_event(
+            builder=builder,
+            requester=requester,
+            prev_events_and_hashes=prev_events_and_hashes,
+        )
 
         defer.returnValue((event, context))
 
+    def _is_exempt_from_privacy_policy(self, builder):
+        """"Determine if an event to be sent is exempt from having to consent
+        to the privacy policy
+
+        Args:
+            builder (synapse.events.builder.EventBuilder): event being created
+
+        Returns:
+            Deferred[bool]: true if the event can be sent without the user
+                consenting
+        """
+        # the only thing the user can do is join the server notices room.
+        if builder.type == EventTypes.Member:
+            membership = builder.content.get("membership", None)
+            if membership == Membership.JOIN:
+                return self._is_server_notices_room(builder.room_id)
+        return succeed(False)
+
+    @defer.inlineCallbacks
+    def _is_server_notices_room(self, room_id):
+        if self.config.server_notices_mxid is None:
+            defer.returnValue(False)
+        user_ids = yield self.store.get_users_in_room(room_id)
+        defer.returnValue(self.config.server_notices_mxid in user_ids)
+
+    @defer.inlineCallbacks
+    def assert_accepted_privacy_policy(self, requester):
+        """Check if a user has accepted the privacy policy
+
+        Called when the given user is about to do something that requires
+        privacy consent. We see if the user is exempt and otherwise check that
+        they have given consent. If they have not, a ConsentNotGiven error is
+        raised.
+
+        Args:
+            requester (synapse.types.Requester):
+                The user making the request
+
+        Returns:
+            Deferred[None]: returns normally if the user has consented or is
+                exempt
+
+        Raises:
+            ConsentNotGivenError: if the user has not given consent yet
+        """
+        if self.config.block_events_without_consent_error is None:
+            return
+
+        # exempt AS users from needing consent
+        if requester.app_service is not None:
+            return
+
+        user_id = requester.user.to_string()
+
+        # exempt the system notices user
+        if (
+            self.config.server_notices_mxid is not None and
+            user_id == self.config.server_notices_mxid
+        ):
+            return
+
+        u = yield self.store.get_user_by_id(user_id)
+        assert u is not None
+        if u["consent_version"] == self.config.user_consent_version:
+            return
+
+        consent_uri = self._consent_uri_builder.build_user_consent_uri(
+            requester.user.localpart,
+        )
+        msg = self.config.block_events_without_consent_error % {
+            'consent_uri': consent_uri,
+        }
+        raise ConsentNotGivenError(
+            msg=msg,
+            consent_uri=consent_uri,
+        )
+
     @defer.inlineCallbacks
     def send_nonmember_event(self, requester, event, context, ratelimit=True):
         """
@@ -557,64 +652,80 @@ class EventCreationHandler(object):
 
         See self.create_event and self.send_nonmember_event.
         """
-        event, context = yield self.create_event(
-            requester,
-            event_dict,
-            token_id=requester.access_token_id,
-            txn_id=txn_id
-        )
 
-        spam_error = self.spam_checker.check_event_for_spam(event)
-        if spam_error:
-            if not isinstance(spam_error, basestring):
-                spam_error = "Spam is not permitted here"
-            raise SynapseError(
-                403, spam_error, Codes.FORBIDDEN
+        # We limit the number of concurrent event sends in a room so that we
+        # don't fork the DAG too much. If we don't limit then we can end up in
+        # a situation where event persistence can't keep up, causing
+        # extremities to pile up, which in turn leads to state resolution
+        # taking longer.
+        with (yield self.limiter.queue(event_dict["room_id"])):
+            event, context = yield self.create_event(
+                requester,
+                event_dict,
+                token_id=requester.access_token_id,
+                txn_id=txn_id
             )
 
-        yield self.send_nonmember_event(
-            requester,
-            event,
-            context,
-            ratelimit=ratelimit,
-        )
+            spam_error = self.spam_checker.check_event_for_spam(event)
+            if spam_error:
+                if not isinstance(spam_error, string_types):
+                    spam_error = "Spam is not permitted here"
+                raise SynapseError(
+                    403, spam_error, Codes.FORBIDDEN
+                )
+
+            yield self.send_nonmember_event(
+                requester,
+                event,
+                context,
+                ratelimit=ratelimit,
+            )
         defer.returnValue(event)
 
     @measure_func("create_new_client_event")
     @defer.inlineCallbacks
-    def create_new_client_event(self, builder, requester=None, prev_event_ids=None):
-        if prev_event_ids:
-            prev_events = yield self.store.add_event_hashes(prev_event_ids)
-            prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids)
-            depth = prev_max_depth + 1
-        else:
-            latest_ret = yield self.store.get_latest_event_ids_and_hashes_in_room(
-                builder.room_id,
-            )
+    def create_new_client_event(self, builder, requester=None,
+                                prev_events_and_hashes=None):
+        """Create a new event for a local client
 
-            # We want to limit the max number of prev events we point to in our
-            # new event
-            if len(latest_ret) > 10:
-                # Sort by reverse depth, so we point to the most recent.
-                latest_ret.sort(key=lambda a: -a[2])
-                new_latest_ret = latest_ret[:5]
-
-                # We also randomly point to some of the older events, to make
-                # sure that we don't completely ignore the older events.
-                if latest_ret[5:]:
-                    sample_size = min(5, len(latest_ret[5:]))
-                    new_latest_ret.extend(random.sample(latest_ret[5:], sample_size))
-                latest_ret = new_latest_ret
-
-            if latest_ret:
-                depth = max([d for _, _, d in latest_ret]) + 1
-            else:
-                depth = 1
+        Args:
+            builder (EventBuilder):
+
+            requester (synapse.types.Requester|None):
+
+            prev_events_and_hashes (list[(str, dict[str, str], int)]|None):
+                the forward extremities to use as the prev_events for the
+                new event. For each event, a tuple of (event_id, hashes, depth)
+                where *hashes* is a map from algorithm to hash.
 
-            prev_events = [
-                (event_id, prev_hashes)
-                for event_id, prev_hashes, _ in latest_ret
-            ]
+                If None, they will be requested from the database.
+
+        Returns:
+            Deferred[(synapse.events.EventBase, synapse.events.snapshot.EventContext)]
+        """
+
+        if prev_events_and_hashes is not None:
+            assert len(prev_events_and_hashes) <= 10, \
+                "Attempting to create an event with %i prev_events" % (
+                    len(prev_events_and_hashes),
+            )
+        else:
+            prev_events_and_hashes = \
+                yield self.store.get_prev_events_for_room(builder.room_id)
+
+        if prev_events_and_hashes:
+            depth = max([d for _, _, d in prev_events_and_hashes]) + 1
+            # we cap depth of generated events, to ensure that they are not
+            # rejected by other servers (and so that they can be persisted in
+            # the db)
+            depth = min(depth, MAX_DEPTH)
+        else:
+            depth = 1
+
+        prev_events = [
+            (event_id, prev_hashes)
+            for event_id, prev_hashes, _ in prev_events_and_hashes
+        ]
 
         builder.prev_events = prev_events
         builder.depth = depth
@@ -678,8 +789,8 @@ class EventCreationHandler(object):
 
         # Ensure that we can round trip before trying to persist in db
         try:
-            dump = ujson.dumps(unfreeze(event.content))
-            ujson.loads(dump)
+            dump = frozendict_json_encoder.encode(event.content)
+            simplejson.loads(dump)
         except Exception:
             logger.exception("Failed to encode content: %r", event.content)
             raise
@@ -713,8 +824,14 @@ class EventCreationHandler(object):
         except:  # noqa: E722, as we reraise the exception this is fine.
             # Ensure that we actually remove the entries in the push actions
             # staging area, if we calculated them.
-            preserve_fn(self.store.remove_push_actions_from_staging)(event.event_id)
-            raise
+            tp, value, tb = sys.exc_info()
+
+            run_in_background(
+                self.store.remove_push_actions_from_staging,
+                event.event_id,
+            )
+
+            six.reraise(tp, value, tb)
 
     @defer.inlineCallbacks
     def persist_and_notify_client_event(
@@ -765,7 +882,7 @@ class EventCreationHandler(object):
 
                 state_to_include_ids = [
                     e_id
-                    for k, e_id in context.current_state_ids.iteritems()
+                    for k, e_id in iteritems(context.current_state_ids)
                     if k[0] in self.hs.config.room_invite_state_types
                     or k == (EventTypes.Member, event.sender)
                 ]
@@ -779,7 +896,7 @@ class EventCreationHandler(object):
                         "content": e.content,
                         "sender": e.sender,
                     }
-                    for e in state_to_include.itervalues()
+                    for e in itervalues(state_to_include)
                 ]
 
                 invitee = UserID.from_string(event.state_key)
@@ -834,22 +951,33 @@ class EventCreationHandler(object):
 
         # this intentionally does not yield: we don't care about the result
         # and don't need to wait for it.
-        preserve_fn(self.pusher_pool.on_new_notifications)(
+        run_in_background(
+            self.pusher_pool.on_new_notifications,
             event_stream_id, max_stream_id
         )
 
         @defer.inlineCallbacks
         def _notify():
             yield run_on_reactor()
-            self.notifier.on_new_room_event(
-                event, event_stream_id, max_stream_id,
-                extra_users=extra_users
-            )
+            try:
+                self.notifier.on_new_room_event(
+                    event, event_stream_id, max_stream_id,
+                    extra_users=extra_users
+                )
+            except Exception:
+                logger.exception("Error notifying about new room event")
 
-        preserve_fn(_notify)()
+        run_in_background(_notify)
 
         if event.type == EventTypes.Message:
-            presence = self.hs.get_presence_handler()
             # We don't want to block sending messages on any presence code. This
             # matters as sometimes presence code can take a while.
-            preserve_fn(presence.bump_presence_active_time)(requester.user)
+            run_in_background(self._bump_active_time, requester.user)
+
+    @defer.inlineCallbacks
+    def _bump_active_time(self, user):
+        try:
+            presence = self.hs.get_presence_handler()
+            yield presence.bump_presence_active_time(user)
+        except Exception:
+            logger.exception("Error bumping presence active time")