summary refs log tree commit diff
path: root/synapse/handlers/message.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2019-06-20 11:59:14 +0100
committerErik Johnston <erik@matrix.org>2019-06-20 11:59:14 +0100
commit45f28a9d2fc0466dcf2a05b0063b7caa3b7e12c3 (patch)
tree07bb21377c6611db89f64f948a2e27645662ff0e /synapse/handlers/message.py
parentAdd descriptions and remove redundant set(..) (diff)
parentRun Black. (#5482) (diff)
downloadsynapse-45f28a9d2fc0466dcf2a05b0063b7caa3b7e12c3.tar.xz
Merge branch 'develop' of github.com:matrix-org/synapse into erikj/histogram_extremities
Diffstat (limited to 'synapse/handlers/message.py')
-rw-r--r--synapse/handlers/message.py303
1 files changed, 159 insertions, 144 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 11650dc80c..683da6bf32 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -34,9 +34,10 @@ from synapse.api.errors import (
 from synapse.api.room_versions import RoomVersions
 from synapse.api.urls import ConsentURIBuilder
 from synapse.events.validator import EventValidator
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.replication.http.send_event import ReplicationSendEventRestServlet
 from synapse.storage.state import StateFilter
-from synapse.types import RoomAlias, UserID
+from synapse.types import RoomAlias, UserID, create_requester
 from synapse.util.async_helpers import Linearizer
 from synapse.util.frozenutils import frozendict_json_encoder
 from synapse.util.logcontext import run_in_background
@@ -60,8 +61,9 @@ class MessageHandler(object):
         self._event_serializer = hs.get_event_client_serializer()
 
     @defer.inlineCallbacks
-    def get_room_data(self, user_id=None, room_id=None,
-                      event_type=None, state_key="", is_guest=False):
+    def get_room_data(
+        self, user_id=None, room_id=None, event_type=None, state_key="", is_guest=False
+    ):
         """ Get data from a room.
 
         Args:
@@ -76,9 +78,7 @@ class MessageHandler(object):
         )
 
         if membership == Membership.JOIN:
-            data = yield self.state.get_current_state(
-                room_id, event_type, state_key
-            )
+            data = yield self.state.get_current_state(room_id, event_type, state_key)
         elif membership == Membership.LEAVE:
             key = (event_type, state_key)
             room_state = yield self.store.get_state_for_events(
@@ -90,8 +90,12 @@ class MessageHandler(object):
 
     @defer.inlineCallbacks
     def get_state_events(
-        self, user_id, room_id, state_filter=StateFilter.all(),
-        at_token=None, is_guest=False,
+        self,
+        user_id,
+        room_id,
+        state_filter=StateFilter.all(),
+        at_token=None,
+        is_guest=False,
     ):
         """Retrieve all state events for a given room. If the user is
         joined to the room then return the current state. If the user has
@@ -123,50 +127,48 @@ class MessageHandler(object):
             # does not reliably give you the state at the given stream position.
             # (https://github.com/matrix-org/synapse/issues/3305)
             last_events, _ = yield self.store.get_recent_events_for_room(
-                room_id, end_token=at_token.room_key, limit=1,
+                room_id, end_token=at_token.room_key, limit=1
             )
 
             if not last_events:
-                raise NotFoundError("Can't find event for token %s" % (at_token, ))
+                raise NotFoundError("Can't find event for token %s" % (at_token,))
 
             visible_events = yield filter_events_for_client(
-                self.store, user_id, last_events,
+                self.store, user_id, last_events
             )
 
             event = last_events[0]
             if visible_events:
                 room_state = yield self.store.get_state_for_events(
-                    [event.event_id], state_filter=state_filter,
+                    [event.event_id], state_filter=state_filter
                 )
                 room_state = room_state[event.event_id]
             else:
                 raise AuthError(
                     403,
-                    "User %s not allowed to view events in room %s at token %s" % (
-                        user_id, room_id, at_token,
-                    )
+                    "User %s not allowed to view events in room %s at token %s"
+                    % (user_id, room_id, at_token),
                 )
         else:
             membership, membership_event_id = (
-                yield self.auth.check_in_room_or_world_readable(
-                    room_id, user_id,
-                )
+                yield self.auth.check_in_room_or_world_readable(room_id, user_id)
             )
 
             if membership == Membership.JOIN:
                 state_ids = yield self.store.get_filtered_current_state_ids(
-                    room_id, state_filter=state_filter,
+                    room_id, state_filter=state_filter
                 )
                 room_state = yield self.store.get_events(state_ids.values())
             elif membership == Membership.LEAVE:
                 room_state = yield self.store.get_state_for_events(
-                    [membership_event_id], state_filter=state_filter,
+                    [membership_event_id], state_filter=state_filter
                 )
                 room_state = room_state[membership_event_id]
 
         now = self.clock.time_msec()
         events = yield self._event_serializer.serialize_events(
-            room_state.values(), now,
+            room_state.values(),
+            now,
             # We don't bother bundling aggregations in when asked for state
             # events, as clients won't use them.
             bundle_aggregations=False,
@@ -210,13 +212,15 @@ class MessageHandler(object):
                 # Loop fell through, AS has no interested users in room
                 raise AuthError(403, "Appservice not in room")
 
-        defer.returnValue({
-            user_id: {
-                "avatar_url": profile.avatar_url,
-                "display_name": profile.display_name,
+        defer.returnValue(
+            {
+                user_id: {
+                    "avatar_url": profile.avatar_url,
+                    "display_name": profile.display_name,
+                }
+                for user_id, profile in iteritems(users_with_profile)
             }
-            for user_id, profile in iteritems(users_with_profile)
-        })
+        )
 
 
 class EventCreationHandler(object):
@@ -261,9 +265,28 @@ class EventCreationHandler(object):
         if self._block_events_without_consent_error:
             self._consent_uri_builder = ConsentURIBuilder(self.config)
 
+        if (
+            not self.config.worker_app
+            and self.config.cleanup_extremities_with_dummy_events
+        ):
+            self.clock.looping_call(
+                lambda: run_as_background_process(
+                    "send_dummy_events_to_fill_extremities",
+                    self._send_dummy_events_to_fill_extremities,
+                ),
+                5 * 60 * 1000,
+            )
+
     @defer.inlineCallbacks
-    def create_event(self, requester, event_dict, token_id=None, txn_id=None,
-                     prev_events_and_hashes=None, require_consent=True):
+    def create_event(
+        self,
+        requester,
+        event_dict,
+        token_id=None,
+        txn_id=None,
+        prev_events_and_hashes=None,
+        require_consent=True,
+    ):
         """
         Given a dict from a client, create a new event.
 
@@ -323,8 +346,7 @@ class EventCreationHandler(object):
                         content["avatar_url"] = yield profile.get_avatar_url(target)
                 except Exception as e:
                     logger.info(
-                        "Failed to get profile information for %r: %s",
-                        target, e
+                        "Failed to get profile information for %r: %s", target, e
                     )
 
         is_exempt = yield self._is_exempt_from_privacy_policy(builder, requester)
@@ -360,16 +382,17 @@ class EventCreationHandler(object):
             prev_event = yield self.store.get_event(prev_event_id, allow_none=True)
             if not prev_event or prev_event.membership != Membership.JOIN:
                 logger.warning(
-                    ("Attempt to send `m.room.aliases` in room %s by user %s but"
-                     " membership is %s"),
+                    (
+                        "Attempt to send `m.room.aliases` in room %s by user %s but"
+                        " membership is %s"
+                    ),
                     event.room_id,
                     event.sender,
                     prev_event.membership if prev_event else None,
                 )
 
                 raise AuthError(
-                    403,
-                    "You must be in the room to create an alias for it",
+                    403, "You must be in the room to create an alias for it"
                 )
 
         self.validator.validate_new(event)
@@ -436,8 +459,8 @@ class EventCreationHandler(object):
 
         # exempt the system notices user
         if (
-            self.config.server_notices_mxid is not None and
-            user_id == self.config.server_notices_mxid
+            self.config.server_notices_mxid is not None
+            and user_id == self.config.server_notices_mxid
         ):
             return
 
@@ -450,15 +473,10 @@ class EventCreationHandler(object):
             return
 
         consent_uri = self._consent_uri_builder.build_user_consent_uri(
-            requester.user.localpart,
-        )
-        msg = self._block_events_without_consent_error % {
-            'consent_uri': consent_uri,
-        }
-        raise ConsentNotGivenError(
-            msg=msg,
-            consent_uri=consent_uri,
+            requester.user.localpart
         )
+        msg = self._block_events_without_consent_error % {"consent_uri": consent_uri}
+        raise ConsentNotGivenError(msg=msg, consent_uri=consent_uri)
 
     @defer.inlineCallbacks
     def send_nonmember_event(self, requester, event, context, ratelimit=True):
@@ -473,8 +491,7 @@ class EventCreationHandler(object):
         """
         if event.type == EventTypes.Member:
             raise SynapseError(
-                500,
-                "Tried to send member event through non-member codepath"
+                500, "Tried to send member event through non-member codepath"
             )
 
         user = UserID.from_string(event.sender)
@@ -486,15 +503,13 @@ class EventCreationHandler(object):
             if prev_state is not None:
                 logger.info(
                     "Not bothering to persist state event %s duplicated by %s",
-                    event.event_id, prev_state.event_id,
+                    event.event_id,
+                    prev_state.event_id,
                 )
                 defer.returnValue(prev_state)
 
         yield self.handle_new_client_event(
-            requester=requester,
-            event=event,
-            context=context,
-            ratelimit=ratelimit,
+            requester=requester, event=event, context=context, ratelimit=ratelimit
         )
 
     @defer.inlineCallbacks
@@ -520,11 +535,7 @@ class EventCreationHandler(object):
 
     @defer.inlineCallbacks
     def create_and_send_nonmember_event(
-        self,
-        requester,
-        event_dict,
-        ratelimit=True,
-        txn_id=None
+        self, requester, event_dict, ratelimit=True, txn_id=None
     ):
         """
         Creates an event, then sends it.
@@ -539,32 +550,25 @@ class EventCreationHandler(object):
         # taking longer.
         with (yield self.limiter.queue(event_dict["room_id"])):
             event, context = yield self.create_event(
-                requester,
-                event_dict,
-                token_id=requester.access_token_id,
-                txn_id=txn_id
+                requester, event_dict, token_id=requester.access_token_id, txn_id=txn_id
             )
 
             spam_error = self.spam_checker.check_event_for_spam(event)
             if spam_error:
                 if not isinstance(spam_error, string_types):
                     spam_error = "Spam is not permitted here"
-                raise SynapseError(
-                    403, spam_error, Codes.FORBIDDEN
-                )
+                raise SynapseError(403, spam_error, Codes.FORBIDDEN)
 
             yield self.send_nonmember_event(
-                requester,
-                event,
-                context,
-                ratelimit=ratelimit,
+                requester, event, context, ratelimit=ratelimit
             )
         defer.returnValue(event)
 
     @measure_func("create_new_client_event")
     @defer.inlineCallbacks
-    def create_new_client_event(self, builder, requester=None,
-                                prev_events_and_hashes=None):
+    def create_new_client_event(
+        self, builder, requester=None, prev_events_and_hashes=None
+    ):
         """Create a new event for a local client
 
         Args:
@@ -584,22 +588,21 @@ class EventCreationHandler(object):
         """
 
         if prev_events_and_hashes is not None:
-            assert len(prev_events_and_hashes) <= 10, \
-                "Attempting to create an event with %i prev_events" % (
-                    len(prev_events_and_hashes),
+            assert len(prev_events_and_hashes) <= 10, (
+                "Attempting to create an event with %i prev_events"
+                % (len(prev_events_and_hashes),)
             )
         else:
-            prev_events_and_hashes = \
-                yield self.store.get_prev_events_for_room(builder.room_id)
+            prev_events_and_hashes = yield self.store.get_prev_events_for_room(
+                builder.room_id
+            )
 
         prev_events = [
             (event_id, prev_hashes)
             for event_id, prev_hashes, _ in prev_events_and_hashes
         ]
 
-        event = yield builder.build(
-            prev_event_ids=[p for p, _ in prev_events],
-        )
+        event = yield builder.build(prev_event_ids=[p for p, _ in prev_events])
         context = yield self.state.compute_event_context(event)
         if requester:
             context.app_service = requester.app_service
@@ -615,29 +618,19 @@ class EventCreationHandler(object):
             aggregation_key = relation["key"]
 
             already_exists = yield self.store.has_user_annotated_event(
-                relates_to, event.type, aggregation_key, event.sender,
+                relates_to, event.type, aggregation_key, event.sender
             )
             if already_exists:
                 raise SynapseError(400, "Can't send same reaction twice")
 
-        logger.debug(
-            "Created event %s",
-            event.event_id,
-        )
+        logger.debug("Created event %s", event.event_id)
 
-        defer.returnValue(
-            (event, context,)
-        )
+        defer.returnValue((event, context))
 
     @measure_func("handle_new_client_event")
     @defer.inlineCallbacks
     def handle_new_client_event(
-        self,
-        requester,
-        event,
-        context,
-        ratelimit=True,
-        extra_users=[],
+        self, requester, event, context, ratelimit=True, extra_users=[]
     ):
         """Processes a new event. This includes checking auth, persisting it,
         notifying users, sending to remote servers, etc.
@@ -653,19 +646,20 @@ class EventCreationHandler(object):
             extra_users (list(UserID)): Any extra users to notify about event
         """
 
-        if event.is_state() and (event.type, event.state_key) == (EventTypes.Create, ""):
-            room_version = event.content.get(
-                "room_version", RoomVersions.V1.identifier
-            )
+        if event.is_state() and (event.type, event.state_key) == (
+            EventTypes.Create,
+            "",
+        ):
+            room_version = event.content.get("room_version", RoomVersions.V1.identifier)
         else:
             room_version = yield self.store.get_room_version(event.room_id)
 
         event_allowed = yield self.third_party_event_rules.check_event_allowed(
-            event, context,
+            event, context
         )
         if not event_allowed:
             raise SynapseError(
-                403, "This event is not allowed in this context", Codes.FORBIDDEN,
+                403, "This event is not allowed in this context", Codes.FORBIDDEN
             )
 
         try:
@@ -682,9 +676,7 @@ class EventCreationHandler(object):
             logger.exception("Failed to encode content: %r", event.content)
             raise
 
-        yield self.action_generator.handle_push_actions_for_event(
-            event, context
-        )
+        yield self.action_generator.handle_push_actions_for_event(event, context)
 
         # reraise does not allow inlineCallbacks to preserve the stacktrace, so we
         # hack around with a try/finally instead.
@@ -705,11 +697,7 @@ class EventCreationHandler(object):
                 return
 
             yield self.persist_and_notify_client_event(
-                requester,
-                event,
-                context,
-                ratelimit=ratelimit,
-                extra_users=extra_users,
+                requester, event, context, ratelimit=ratelimit, extra_users=extra_users
             )
 
             success = True
@@ -718,18 +706,12 @@ class EventCreationHandler(object):
                 # Ensure that we actually remove the entries in the push actions
                 # staging area, if we calculated them.
                 run_in_background(
-                    self.store.remove_push_actions_from_staging,
-                    event.event_id,
+                    self.store.remove_push_actions_from_staging, event.event_id
                 )
 
     @defer.inlineCallbacks
     def persist_and_notify_client_event(
-        self,
-        requester,
-        event,
-        context,
-        ratelimit=True,
-        extra_users=[],
+        self, requester, event, context, ratelimit=True, extra_users=[]
     ):
         """Called when we have fully built the event, have already
         calculated the push actions for the event, and checked auth.
@@ -754,20 +736,16 @@ class EventCreationHandler(object):
                 if mapping["room_id"] != event.room_id:
                     raise SynapseError(
                         400,
-                        "Room alias %s does not point to the room" % (
-                            room_alias_str,
-                        )
+                        "Room alias %s does not point to the room" % (room_alias_str,),
                     )
 
         federation_handler = self.hs.get_handlers().federation_handler
 
         if event.type == EventTypes.Member:
             if event.content["membership"] == Membership.INVITE:
+
                 def is_inviter_member_event(e):
-                    return (
-                        e.type == EventTypes.Member and
-                        e.sender == event.sender
-                    )
+                    return e.type == EventTypes.Member and e.sender == event.sender
 
                 current_state_ids = yield context.get_current_state_ids(self.store)
 
@@ -797,26 +775,21 @@ class EventCreationHandler(object):
                     # to get them to sign the event.
 
                     returned_invite = yield federation_handler.send_invite(
-                        invitee.domain,
-                        event,
+                        invitee.domain, event
                     )
 
                     event.unsigned.pop("room_state", None)
 
                     # TODO: Make sure the signatures actually are correct.
-                    event.signatures.update(
-                        returned_invite.signatures
-                    )
+                    event.signatures.update(returned_invite.signatures)
 
         if event.type == EventTypes.Redaction:
             prev_state_ids = yield context.get_prev_state_ids(self.store)
             auth_events_ids = yield self.auth.compute_auth_events(
-                event, prev_state_ids, for_verification=True,
+                event, prev_state_ids, for_verification=True
             )
             auth_events = yield self.store.get_events(auth_events_ids)
-            auth_events = {
-                (e.type, e.state_key): e for e in auth_events.values()
-            }
+            auth_events = {(e.type, e.state_key): e for e in auth_events.values()}
             room_version = yield self.store.get_room_version(event.room_id)
             if self.auth.check_redaction(room_version, event, auth_events=auth_events):
                 original_event = yield self.store.get_event(
@@ -824,13 +797,10 @@ class EventCreationHandler(object):
                     check_redacted=False,
                     get_prev_content=False,
                     allow_rejected=False,
-                    allow_none=False
+                    allow_none=False,
                 )
                 if event.user_id != original_event.user_id:
-                    raise AuthError(
-                        403,
-                        "You don't have permission to redact events"
-                    )
+                    raise AuthError(403, "You don't have permission to redact events")
 
                 # We've already checked.
                 event.internal_metadata.recheck_redaction = False
@@ -838,24 +808,18 @@ class EventCreationHandler(object):
         if event.type == EventTypes.Create:
             prev_state_ids = yield context.get_prev_state_ids(self.store)
             if prev_state_ids:
-                raise AuthError(
-                    403,
-                    "Changing the room create event is forbidden",
-                )
+                raise AuthError(403, "Changing the room create event is forbidden")
 
         (event_stream_id, max_stream_id) = yield self.store.persist_event(
             event, context=context
         )
 
-        yield self.pusher_pool.on_new_notifications(
-            event_stream_id, max_stream_id,
-        )
+        yield self.pusher_pool.on_new_notifications(event_stream_id, max_stream_id)
 
         def _notify():
             try:
                 self.notifier.on_new_room_event(
-                    event, event_stream_id, max_stream_id,
-                    extra_users=extra_users
+                    event, event_stream_id, max_stream_id, extra_users=extra_users
                 )
             except Exception:
                 logger.exception("Error notifying about new room event")
@@ -874,3 +838,54 @@ class EventCreationHandler(object):
             yield presence.bump_presence_active_time(user)
         except Exception:
             logger.exception("Error bumping presence active time")
+
+    @defer.inlineCallbacks
+    def _send_dummy_events_to_fill_extremities(self):
+        """Background task to send dummy events into rooms that have a large
+        number of extremities
+        """
+
+        room_ids = yield self.store.get_rooms_with_many_extremities(
+            min_count=10, limit=5
+        )
+
+        for room_id in room_ids:
+            # For each room we need to find a joined member we can use to send
+            # the dummy event with.
+
+            prev_events_and_hashes = yield self.store.get_prev_events_for_room(room_id)
+
+            latest_event_ids = (event_id for (event_id, _, _) in prev_events_and_hashes)
+
+            members = yield self.state.get_current_users_in_room(
+                room_id, latest_event_ids=latest_event_ids
+            )
+
+            user_id = None
+            for member in members:
+                if self.hs.is_mine_id(member):
+                    user_id = member
+                    break
+
+            if not user_id:
+                # We don't have a joined user.
+                # TODO: We should do something here to stop the room from
+                # appearing next time.
+                continue
+
+            requester = create_requester(user_id)
+
+            event, context = yield self.create_event(
+                requester,
+                {
+                    "type": "org.matrix.dummy_event",
+                    "content": {},
+                    "room_id": room_id,
+                    "sender": user_id,
+                },
+                prev_events_and_hashes=prev_events_and_hashes,
+            )
+
+            event.internal_metadata.proactively_send = False
+
+            yield self.send_nonmember_event(requester, event, context, ratelimit=False)