summary refs log tree commit diff
path: root/synapse/handlers/message.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/message.py')
-rw-r--r--synapse/handlers/message.py277
1 files changed, 219 insertions, 58 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 5c50c611ba..15caf1950a 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -17,12 +17,19 @@ from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import AuthError, Codes, SynapseError
-from synapse.streams.config import PaginationConfig
+from synapse.crypto.event_signing import add_hashes_and_signatures
 from synapse.events.utils import serialize_event
 from synapse.events.validator import EventValidator
+from synapse.push.action_generator import ActionGenerator
+from synapse.streams.config import PaginationConfig
+from synapse.types import (
+    UserID, RoomAlias, RoomStreamToken, StreamToken, get_domain_from_id
+)
 from synapse.util import unwrapFirstError
+from synapse.util.async import concurrently_execute, run_on_reactor
 from synapse.util.caches.snapshot_cache import SnapshotCache
-from synapse.types import UserID, RoomStreamToken, StreamToken
+from synapse.util.logcontext import preserve_fn
+from synapse.visibility import filter_events_for_client
 
 from ._base import BaseHandler
 
@@ -33,10 +40,6 @@ import logging
 logger = logging.getLogger(__name__)
 
 
-def collect_presencelike_data(distributor, user, content):
-    return distributor.fire("collect_presencelike_data", user, content)
-
-
 class MessageHandler(BaseHandler):
 
     def __init__(self, hs):
@@ -48,35 +51,6 @@ class MessageHandler(BaseHandler):
         self.snapshot_cache = SnapshotCache()
 
     @defer.inlineCallbacks
-    def get_message(self, msg_id=None, room_id=None, sender_id=None,
-                    user_id=None):
-        """ Retrieve a message.
-
-        Args:
-            msg_id (str): The message ID to obtain.
-            room_id (str): The room where the message resides.
-            sender_id (str): The user ID of the user who sent the message.
-            user_id (str): The user ID of the user making this request.
-        Returns:
-            The message, or None if no message exists.
-        Raises:
-            SynapseError if something went wrong.
-        """
-        yield self.auth.check_joined_room(room_id, user_id)
-
-        # Pull out the message from the db
-#        msg = yield self.store.get_message(
-#            room_id=room_id,
-#            msg_id=msg_id,
-#            user_id=sender_id
-#        )
-
-        # TODO (erikj): Once we work out the correct c-s api we need to think
-        # on how to do this.
-
-        defer.returnValue(None)
-
-    @defer.inlineCallbacks
     def get_messages(self, requester, room_id=None, pagin_config=None,
                      as_client_event=True):
         """Get messages in a room.
@@ -155,7 +129,8 @@ class MessageHandler(BaseHandler):
                 "end": next_token.to_string(),
             })
 
-        events = yield self._filter_events_for_client(
+        events = yield filter_events_for_client(
+            self.store,
             user_id,
             events,
             is_peeking=(member_event_id is None),
@@ -175,7 +150,7 @@ class MessageHandler(BaseHandler):
         defer.returnValue(chunk)
 
     @defer.inlineCallbacks
-    def create_event(self, event_dict, token_id=None, txn_id=None):
+    def create_event(self, event_dict, token_id=None, txn_id=None, prev_event_ids=None):
         """
         Given a dict from a client, create a new event.
 
@@ -186,6 +161,9 @@ class MessageHandler(BaseHandler):
 
         Args:
             event_dict (dict): An entire event
+            token_id (str)
+            txn_id (str)
+            prev_event_ids (list): The prev event ids to use when creating the event
 
         Returns:
             Tuple of created event (FrozenEvent), Context
@@ -198,12 +176,8 @@ class MessageHandler(BaseHandler):
             membership = builder.content.get("membership", None)
             target = UserID.from_string(builder.state_key)
 
-            if membership == Membership.JOIN:
+            if membership in {Membership.JOIN, Membership.INVITE}:
                 # If event doesn't include a display name, add one.
-                yield collect_presencelike_data(
-                    self.distributor, target, builder.content
-                )
-            elif membership == Membership.INVITE:
                 profile = self.hs.get_handlers().profile_handler
                 content = builder.content
 
@@ -224,6 +198,7 @@ class MessageHandler(BaseHandler):
 
         event, context = yield self._create_new_client_event(
             builder=builder,
+            prev_event_ids=prev_event_ids,
         )
         defer.returnValue((event, context))
 
@@ -261,7 +236,7 @@ class MessageHandler(BaseHandler):
         )
 
         if event.type == EventTypes.Message:
-            presence = self.hs.get_handlers().presence_handler
+            presence = self.hs.get_presence_handler()
             yield presence.bump_presence_active_time(user)
 
     def deduplicate_state_event(self, event, context):
@@ -515,8 +490,8 @@ class MessageHandler(BaseHandler):
                     ]
                 ).addErrback(unwrapFirstError)
 
-                messages = yield self._filter_events_for_client(
-                    user_id, messages
+                messages = yield filter_events_for_client(
+                    self.store, user_id, messages
                 )
 
                 start_token = now_token.copy_and_replace("room_key", token[0])
@@ -556,14 +531,7 @@ class MessageHandler(BaseHandler):
             except:
                 logger.exception("Failed to get snapshot")
 
-        # Only do N rooms at once
-        n = 5
-        d_list = [handle_room(e) for e in room_list]
-        for i in range(0, len(d_list), n):
-            yield defer.gatherResults(
-                d_list[i:i + n],
-                consumeErrors=True
-            ).addErrback(unwrapFirstError)
+        yield concurrently_execute(handle_room, room_list, 10)
 
         account_data_events = []
         for account_data_type, content in account_data.items():
@@ -658,8 +626,8 @@ class MessageHandler(BaseHandler):
             end_token=stream_token
         )
 
-        messages = yield self._filter_events_for_client(
-            user_id, messages, is_peeking=is_peeking
+        messages = yield filter_events_for_client(
+            self.store, user_id, messages, is_peeking=is_peeking
         )
 
         start_token = StreamToken.START.copy_and_replace("room_key", token[0])
@@ -706,7 +674,7 @@ class MessageHandler(BaseHandler):
             and m.content["membership"] == Membership.JOIN
         ]
 
-        presence_handler = self.hs.get_handlers().presence_handler
+        presence_handler = self.hs.get_presence_handler()
 
         @defer.inlineCallbacks
         def get_presence():
@@ -739,8 +707,8 @@ class MessageHandler(BaseHandler):
             consumeErrors=True,
         ).addErrback(unwrapFirstError)
 
-        messages = yield self._filter_events_for_client(
-            user_id, messages, is_peeking=is_peeking,
+        messages = yield filter_events_for_client(
+            self.store, user_id, messages, is_peeking=is_peeking,
         )
 
         start_token = now_token.copy_and_replace("room_key", token[0])
@@ -763,3 +731,196 @@ class MessageHandler(BaseHandler):
             ret["membership"] = membership
 
         defer.returnValue(ret)
+
+    @defer.inlineCallbacks
+    def _create_new_client_event(self, builder, prev_event_ids=None):
+        if prev_event_ids:
+            prev_events = yield self.store.add_event_hashes(prev_event_ids)
+            prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids)
+            depth = prev_max_depth + 1
+        else:
+            latest_ret = yield self.store.get_latest_event_ids_and_hashes_in_room(
+                builder.room_id,
+            )
+
+            if latest_ret:
+                depth = max([d for _, _, d in latest_ret]) + 1
+            else:
+                depth = 1
+
+            prev_events = [
+                (event_id, prev_hashes)
+                for event_id, prev_hashes, _ in latest_ret
+            ]
+
+        builder.prev_events = prev_events
+        builder.depth = depth
+
+        state_handler = self.state_handler
+
+        context = yield state_handler.compute_event_context(builder)
+
+        if builder.is_state():
+            builder.prev_state = yield self.store.add_event_hashes(
+                context.prev_state_events
+            )
+
+        yield self.auth.add_auth_events(builder, context)
+
+        signing_key = self.hs.config.signing_key[0]
+        add_hashes_and_signatures(
+            builder, self.server_name, signing_key
+        )
+
+        event = builder.build()
+
+        logger.debug(
+            "Created event %s with current state: %s",
+            event.event_id, context.current_state,
+        )
+
+        defer.returnValue(
+            (event, context,)
+        )
+
+    @defer.inlineCallbacks
+    def handle_new_client_event(
+        self,
+        requester,
+        event,
+        context,
+        ratelimit=True,
+        extra_users=[]
+    ):
+        # We now need to go and hit out to wherever we need to hit out to.
+
+        if ratelimit:
+            self.ratelimit(requester)
+
+        try:
+            self.auth.check(event, auth_events=context.current_state)
+        except AuthError as err:
+            logger.warn("Denying new event %r because %s", event, err)
+            raise err
+
+        yield self.maybe_kick_guest_users(event, context.current_state.values())
+
+        if event.type == EventTypes.CanonicalAlias:
+            # Check the alias is acually valid (at this time at least)
+            room_alias_str = event.content.get("alias", None)
+            if room_alias_str:
+                room_alias = RoomAlias.from_string(room_alias_str)
+                directory_handler = self.hs.get_handlers().directory_handler
+                mapping = yield directory_handler.get_association(room_alias)
+
+                if mapping["room_id"] != event.room_id:
+                    raise SynapseError(
+                        400,
+                        "Room alias %s does not point to the room" % (
+                            room_alias_str,
+                        )
+                    )
+
+        federation_handler = self.hs.get_handlers().federation_handler
+
+        if event.type == EventTypes.Member:
+            if event.content["membership"] == Membership.INVITE:
+                def is_inviter_member_event(e):
+                    return (
+                        e.type == EventTypes.Member and
+                        e.sender == event.sender
+                    )
+
+                event.unsigned["invite_room_state"] = [
+                    {
+                        "type": e.type,
+                        "state_key": e.state_key,
+                        "content": e.content,
+                        "sender": e.sender,
+                    }
+                    for k, e in context.current_state.items()
+                    if e.type in self.hs.config.room_invite_state_types
+                    or is_inviter_member_event(e)
+                ]
+
+                invitee = UserID.from_string(event.state_key)
+                if not self.hs.is_mine(invitee):
+                    # TODO: Can we add signature from remote server in a nicer
+                    # way? If we have been invited by a remote server, we need
+                    # to get them to sign the event.
+
+                    returned_invite = yield federation_handler.send_invite(
+                        invitee.domain,
+                        event,
+                    )
+
+                    event.unsigned.pop("room_state", None)
+
+                    # TODO: Make sure the signatures actually are correct.
+                    event.signatures.update(
+                        returned_invite.signatures
+                    )
+
+        if event.type == EventTypes.Redaction:
+            if self.auth.check_redaction(event, auth_events=context.current_state):
+                original_event = yield self.store.get_event(
+                    event.redacts,
+                    check_redacted=False,
+                    get_prev_content=False,
+                    allow_rejected=False,
+                    allow_none=False
+                )
+                if event.user_id != original_event.user_id:
+                    raise AuthError(
+                        403,
+                        "You don't have permission to redact events"
+                    )
+
+        if event.type == EventTypes.Create and context.current_state:
+            raise AuthError(
+                403,
+                "Changing the room create event is forbidden",
+            )
+
+        action_generator = ActionGenerator(self.hs)
+        yield action_generator.handle_push_actions_for_event(
+            event, context
+        )
+
+        (event_stream_id, max_stream_id) = yield self.store.persist_event(
+            event, context=context
+        )
+
+        # this intentionally does not yield: we don't care about the result
+        # and don't need to wait for it.
+        preserve_fn(self.hs.get_pusherpool().on_new_notifications)(
+            event_stream_id, max_stream_id
+        )
+
+        destinations = set()
+        for k, s in context.current_state.items():
+            try:
+                if k[0] == EventTypes.Member:
+                    if s.content["membership"] == Membership.JOIN:
+                        destinations.add(get_domain_from_id(s.state_key))
+            except SynapseError:
+                logger.warn(
+                    "Failed to get destination from event %s", s.event_id
+                )
+
+        @defer.inlineCallbacks
+        def _notify():
+            yield run_on_reactor()
+            self.notifier.on_new_room_event(
+                event, event_stream_id, max_stream_id,
+                extra_users=extra_users
+            )
+
+        preserve_fn(_notify)()
+
+        # If invite, remove room_state from unsigned before sending.
+        event.unsigned.pop("invite_room_state", None)
+
+        federation_handler.handle_new_event(
+            event, destinations=destinations,
+        )