summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/_base.py1
-rw-r--r--synapse/handlers/events.py8
-rw-r--r--synapse/handlers/federation.py124
-rw-r--r--synapse/handlers/presence.py5
-rw-r--r--synapse/handlers/room.py271
5 files changed, 228 insertions, 181 deletions
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index c2f4685c92..3f07b5aa4a 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -24,4 +24,5 @@ class BaseHandler(object):
         self.notifier = hs.get_notifier()
         self.room_lock = hs.get_room_lock_manager()
         self.state_handler = hs.get_state_handler()
+        self.distributor = hs.get_distributor()
         self.hs = hs
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 3af7d824a2..6bb797caf2 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -17,8 +17,7 @@ from twisted.internet import defer
 
 from ._base import BaseHandler
 from synapse.api.streams.event import (
-    EventStream, MessagesStreamData, RoomMemberStreamData, FeedbackStreamData,
-    RoomDataStreamData
+    EventStream, EventsStreamData
 )
 from synapse.handlers.presence import PresenceStreamData
 
@@ -26,10 +25,7 @@ from synapse.handlers.presence import PresenceStreamData
 class EventStreamHandler(BaseHandler):
 
     stream_data_classes = [
-        MessagesStreamData,
-        RoomMemberStreamData,
-        FeedbackStreamData,
-        RoomDataStreamData,
+        EventsStreamData,
         PresenceStreamData,
     ]
 
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 7026df90a2..9cff444779 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -32,10 +32,19 @@ logger = logging.getLogger(__name__)
 class FederationHandler(BaseHandler):
 
     """Handles events that originated from federation."""
+    def __init__(self, hs):
+        super(FederationHandler, self).__init__(hs)
+
+        self.distributor.observe(
+            "user_joined_room",
+            self._on_user_joined
+        )
+
+        self.waiting_for_join_list = {}
 
     @log_function
     @defer.inlineCallbacks
-    def on_receive(self, event, is_new_state):
+    def on_receive(self, event, is_new_state, backfilled):
         if hasattr(event, "state_key") and not is_new_state:
             logger.debug("Ignoring old state.")
             return
@@ -70,6 +79,115 @@ class FederationHandler(BaseHandler):
 
         else:
             with (yield self.room_lock.lock(event.room_id)):
-                store_id = yield self.store.persist_event(event)
+                store_id = yield self.store.persist_event(event, backfilled)
+
+            room = yield self.store.get_room(event.room_id)
+
+            if not room:
+                # Huh, let's try and get the current state
+                try:
+                    federation = self.hs.get_federation()
+                    yield federation.get_state_for_room(
+                        event.origin, event.room_id
+                    )
+
+                    hosts = yield self.store.get_joined_hosts_for_room(
+                        event.room_id
+                    )
+                    if self.hs.hostname in hosts:
+                        try:
+                            yield self.store.store_room(
+                                event.room_id,
+                                "",
+                                is_public=False
+                            )
+                        except:
+                            pass
+                except:
+                    logger.exception(
+                        "Failed to get current state for room %s",
+                        event.room_id
+                    )
+
+            if not backfilled:
+                yield self.notifier.on_new_room_event(event, store_id)
+
+        if event.type == RoomMemberEvent.TYPE:
+            if event.membership == Membership.JOIN:
+                user = self.hs.parse_userid(event.target_user_id)
+                self.distributor.fire(
+                    "user_joined_room", user=user, room_id=event.room_id
+                )
+
+
+    @log_function
+    @defer.inlineCallbacks
+    def backfill(self, dest, room_id, limit):
+        events = yield self.hs.get_federation().backfill(dest, room_id, limit)
+
+        for event in events:
+            try:
+                yield self.store.persist_event(event, backfilled=True)
+            except:
+                logger.exception("Failed to persist event: %s", event)
+
+        defer.returnValue(events)
 
-            yield self.notifier.on_new_room_event(event, store_id)
+    @log_function
+    @defer.inlineCallbacks
+    def do_invite_join(self, target_host, room_id, joinee, content):
+        federation = self.hs.get_federation()
+
+        hosts = yield self.store.get_joined_hosts_for_room(room_id)
+        if self.hs.hostname in hosts:
+            # We are already in the room.
+            logger.debug("We're already in the room apparently")
+            defer.returnValue(False)
+
+        # First get current state to see if we are already joined.
+        try:
+            yield federation.get_state_for_room(target_host, room_id)
+
+            hosts = yield self.store.get_joined_hosts_for_room(room_id)
+            if self.hs.hostname in hosts:
+                # Oh, we were actually in the room already.
+                logger.debug("We're already in the room apparently")
+                defer.returnValue(False)
+        except Exception:
+            logger.exception("Failed to get current state")
+
+        new_event = self.event_factory.create_event(
+            etype=InviteJoinEvent.TYPE,
+            target_host=target_host,
+            room_id=room_id,
+            user_id=joinee,
+            content=content
+        )
+
+        new_event.destinations = [target_host]
+
+        yield federation.handle_new_event(new_event)
+
+        # TODO (erikj): Time out here.
+        d = defer.Deferred()
+        self.waiting_for_join_list.setdefault((joinee, room_id), []).append(d)
+        yield d
+
+        try:
+            yield self.store.store_room(
+                event.room_id,
+                "",
+                is_public=False
+            )
+        except:
+            pass
+
+
+        defer.returnValue(True)
+
+
+    @log_function
+    def _on_user_joined(self, user, room_id):
+        waiters = self.waiting_for_join_list.get((user.to_string(), room_id), [])
+        while waiters:
+            waiters.pop().callback(None)
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index a2152c99cf..540e114b82 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -685,7 +685,10 @@ class PresenceStreamData(StreamData):
         super(PresenceStreamData, self).__init__(hs)
         self.presence = hs.get_handlers().presence_handler
 
-    def get_rows(self, user_id, from_key, to_key, limit):
+    def get_rows(self, user_id, from_key, to_key, limit, direction):
+        from_key = int(from_key)
+        to_key = int(to_key)
+
         cachemap = self.presence._user_cachemap
 
         # TODO(paul): limit, and filter by visibility
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 5d0379254b..899b653fb7 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -23,7 +23,8 @@ from synapse.api.events.room import (
     RoomTopicEvent, MessageEvent, InviteJoinEvent, RoomMemberEvent,
     RoomConfigEvent
 )
-from synapse.api.streams.event import EventStream, MessagesStreamData
+from synapse.api.streams.event import EventStream, EventsStreamData
+from synapse.handlers.presence import PresenceStreamData
 from synapse.util import stringutils
 from ._base import BaseHandler
 
@@ -59,12 +60,14 @@ class MessageHandler(BaseHandler):
         yield self.auth.check_joined_room(room_id, user_id)
 
         # Pull out the message from the db
-        msg = yield self.store.get_message(room_id=room_id,
-                                           msg_id=msg_id,
-                                           user_id=sender_id)
+#        msg = yield self.store.get_message(
+#            room_id=room_id,
+#            msg_id=msg_id,
+#            user_id=sender_id
+#        )
+
+        # TODO (erikj): Once we work out the correct c-s api we need to think on how to do this.
 
-        if msg:
-            defer.returnValue(msg)
         defer.returnValue(None)
 
     @defer.inlineCallbacks
@@ -114,8 +117,9 @@ class MessageHandler(BaseHandler):
         """
         yield self.auth.check_joined_room(room_id, user_id)
 
-        data_source = [MessagesStreamData(self.hs, room_id=room_id,
-                                          feedback=feedback)]
+        data_source = [
+            EventsStreamData(self.hs, room_id=room_id, feedback=feedback)
+        ]
         event_stream = EventStream(user_id, data_source)
         pagin_config = yield event_stream.fix_tokens(pagin_config)
         data_chunk = yield event_stream.get_chunk(config=pagin_config)
@@ -141,12 +145,7 @@ class MessageHandler(BaseHandler):
             yield self.state_handler.handle_new_event(event)
 
             # store in db
-            store_id = yield self.store.store_room_data(
-                room_id=event.room_id,
-                etype=event.type,
-                state_key=event.state_key,
-                content=json.dumps(event.content)
-            )
+            store_id = yield self.store.persist_event(event)
 
             event.destinations = yield self.store.get_joined_hosts_for_room(
                 event.room_id
@@ -201,19 +200,17 @@ class MessageHandler(BaseHandler):
                 raise RoomError(
                     403, "Member does not meet private room rules.")
 
-        data = yield self.store.get_room_data(room_id, event_type, state_key)
+        data = yield self.store.get_current_state(
+            room_id, event_type, state_key
+        )
         defer.returnValue(data)
 
     @defer.inlineCallbacks
-    def get_feedback(self, room_id=None, msg_sender_id=None, msg_id=None,
-                     user_id=None, fb_sender_id=None, fb_type=None):
-        yield self.auth.check_joined_room(room_id, user_id)
+    def get_feedback(self, event_id):
+        # yield self.auth.check_joined_room(room_id, user_id)
 
         # Pull out the feedback from the db
-        fb = yield self.store.get_feedback(
-            room_id=room_id, msg_id=msg_id, msg_sender_id=msg_sender_id,
-            fb_sender_id=fb_sender_id, fb_type=fb_type
-        )
+        fb = yield self.store.get_feedback(event_id)
 
         if fb:
             defer.returnValue(fb)
@@ -260,20 +257,59 @@ class MessageHandler(BaseHandler):
             user_id=user_id,
             membership_list=[Membership.INVITE, Membership.JOIN]
         )
-        for room_info in room_list:
-            if room_info["membership"] != Membership.JOIN:
+
+        rooms_ret = []
+
+        now_rooms_token = yield self.store.get_room_events_max_id()
+
+        # FIXME (erikj): Fix this.
+        presence_stream = PresenceStreamData(self.hs)
+        now_presence_token = yield presence_stream.max_token()
+        presence = yield presence_stream.get_rows(
+            user_id, 0, now_presence_token, None, None
+        )
+
+        # FIXME (erikj): We need to not generate this token,
+        now_token = "%s_%s" % (now_rooms_token, now_presence_token)
+
+        for event in room_list:
+            d = {
+                "room_id": event.room_id,
+                "membership": event.membership,
+            }
+
+            if event.membership == Membership.INVITE:
+                d["inviter"] = event.user_id
+
+            rooms_ret.append(d)
+
+            if event.membership != Membership.JOIN:
                 continue
             try:
-                event_chunk = yield self.get_messages(
-                    user_id=user_id,
-                    pagin_config=pagin_config,
-                    feedback=feedback,
-                    room_id=room_info["room_id"]
+                messages, token = yield self.store.get_recent_events_for_room(
+                    event.room_id,
+                    limit=10,
+                    end_token=now_rooms_token,
                 )
-                room_info["messages"] = event_chunk
+
+                d["messages"] = {
+                    "chunk": [m.get_dict() for m in messages],
+                    "start": token[0],
+                    "end": token[1],
+                }
+
+                current_state = yield self.store.get_current_state(event.room_id)
+                d["state"] = [c.get_dict() for c in current_state]
             except:
-                pass
-        defer.returnValue(room_list)
+                logger.exception("Failed to get snapshot")
+
+        user = self.hs.parse_userid(user_id)
+
+        ret = {"rooms": rooms_ret, "presence": presence[0], "end": now_token}
+
+        logger.debug("snapshot_all_rooms returning: %s", ret)
+
+        defer.returnValue(ret)
 
 
 class RoomCreationHandler(BaseHandler):
@@ -372,7 +408,6 @@ class RoomCreationHandler(BaseHandler):
 
         yield self.hs.get_handlers().room_member_handler.change_membership(
             join_event,
-            broadcast_msg=True,
             do_auth=False
         )
 
@@ -451,11 +486,11 @@ class RoomMemberHandler(BaseHandler):
 
         member_list = yield self.store.get_room_members(room_id=room_id)
         event_list = [
-            entry.as_event(self.event_factory).get_dict()
+            entry.get_dict()
             for entry in member_list
         ]
         chunk_data = {
-            "start": "START",
+            "start": "START",  # FIXME (erikj): START is no longer a valid value
             "end": "END",
             "chunk": event_list
         }
@@ -484,29 +519,28 @@ class RoomMemberHandler(BaseHandler):
         defer.returnValue(member)
 
     @defer.inlineCallbacks
-    def change_membership(self, event=None, broadcast_msg=False, do_auth=True):
+    def change_membership(self, event=None, do_auth=True):
         """ Change the membership status of a user in a room.
 
         Args:
             event (SynapseEvent): The membership event
-            broadcast_msg (bool): True to inject a membership message into this
-                room on success.
         Raises:
             SynapseError if there was a problem changing the membership.
         """
 
-        #broadcast_msg = False
-
         prev_state = yield self.store.get_room_member(
             event.target_user_id, event.room_id
         )
 
-        if prev_state and prev_state.membership == event.membership:
-            # treat this event as a NOOP.
-            if do_auth:  # This is mainly to fix a unit test.
-                yield self.auth.check(event, raises=True)
-            defer.returnValue({})
-            return
+        if prev_state:
+            event.content["prev"] = prev_state.membership
+
+#        if prev_state and prev_state.membership == event.membership:
+#            # treat this event as a NOOP.
+#            if do_auth:  # This is mainly to fix a unit test.
+#                yield self.auth.check(event, raises=True)
+#            defer.returnValue({})
+#            return
 
         room_id = event.room_id
 
@@ -514,9 +548,7 @@ class RoomMemberHandler(BaseHandler):
         # if this HS is not currently in the room, i.e. we have to do the
         # invite/join dance.
         if event.membership == Membership.JOIN:
-            yield self._do_join(
-                event, do_auth=do_auth, broadcast_msg=broadcast_msg
-            )
+            yield self._do_join(event, do_auth=do_auth)
         else:
             # This is not a JOIN, so we can handle it normally.
             if do_auth:
@@ -534,7 +566,6 @@ class RoomMemberHandler(BaseHandler):
             yield self._do_local_membership_update(
                 event,
                 membership=event.content["membership"],
-                broadcast_msg=broadcast_msg,
             )
 
         defer.returnValue({"room_id": room_id})
@@ -569,14 +600,14 @@ class RoomMemberHandler(BaseHandler):
         defer.returnValue({"room_id": room_id})
 
     @defer.inlineCallbacks
-    def _do_join(self, event, room_host=None, do_auth=True, broadcast_msg=True):
+    def _do_join(self, event, room_host=None, do_auth=True):
         joinee = self.hs.parse_userid(event.target_user_id)
         # room_id = RoomID.from_string(event.room_id, self.hs)
         room_id = event.room_id
 
         # If event doesn't include a display name, add one.
-        yield self._fill_out_join_content(
-            joinee, event.content
+        yield self.distributor.fire(
+            "collect_presencelike_data", joinee, event.content
         )
 
         # XXX: We don't do an auth check if we are doing an invite
@@ -584,9 +615,9 @@ class RoomMemberHandler(BaseHandler):
         # that we are allowed to join when we decide whether or not we
         # need to do the invite/join dance.
 
-        room = yield self.store.get_room(room_id)
+        hosts = yield self.store.get_joined_hosts_for_room(room_id)
 
-        if room:
+        if self.hs.hostname in hosts:
             should_do_dance = False
         elif room_host:
             should_do_dance = True
@@ -598,7 +629,7 @@ class RoomMemberHandler(BaseHandler):
             if prev_state and prev_state.membership == Membership.INVITE:
                 room = yield self.store.get_room(room_id)
                 inviter = UserID.from_string(
-                    prev_state.sender, self.hs
+                    prev_state.user_id, self.hs
                 )
 
                 should_do_dance = not inviter.is_mine and not room
@@ -606,8 +637,15 @@ class RoomMemberHandler(BaseHandler):
             else:
                 should_do_dance = False
 
+        have_joined = False
+        if should_do_dance:
+            handler = self.hs.get_handlers().federation_handler
+            have_joined = yield handler.do_invite_join(
+                room_host, room_id, event.user_id, event.content
+            )
+
         # We want to do the _do_update inside the room lock.
-        if not should_do_dance:
+        if not have_joined:
             logger.debug("Doing normal join")
 
             if do_auth:
@@ -617,16 +655,6 @@ class RoomMemberHandler(BaseHandler):
             yield self._do_local_membership_update(
                 event,
                 membership=event.content["membership"],
-                broadcast_msg=broadcast_msg,
-            )
-
-
-        if should_do_dance:
-            yield self._do_invite_join_dance(
-                room_id=room_id,
-                joinee=event.user_id,
-                target_host=room_host,
-                content=event.content,
             )
 
         user = self.hs.parse_userid(event.user_id)
@@ -635,32 +663,6 @@ class RoomMemberHandler(BaseHandler):
         )
 
     @defer.inlineCallbacks
-    def _fill_out_join_content(self, user_id, content):
-        # If event doesn't include a display name, add one.
-        profile_handler = self.hs.get_handlers().profile_handler
-        if "displayname" not in content:
-            try:
-                display_name = yield profile_handler.get_displayname(
-                    user_id
-                )
-
-                if display_name:
-                    content["displayname"] = display_name
-            except:
-                logger.exception("Failed to set display_name")
-
-        if "avatar_url" not in content:
-            try:
-                avatar_url = yield profile_handler.get_avatar_url(
-                    user_id
-                )
-
-                if avatar_url:
-                    content["avatar_url"] = avatar_url
-            except:
-                logger.exception("Failed to set display_name")
-
-    @defer.inlineCallbacks
     def _should_invite_join(self, room_id, prev_state, do_auth):
         logger.debug("_should_invite_join: room_id: %s", room_id)
 
@@ -694,18 +696,12 @@ class RoomMemberHandler(BaseHandler):
             user_id=user.to_string(), membership_list=membership_list
         )
 
-        defer.returnValue([r["room_id"] for r in rooms])
+        defer.returnValue([r.room_id for r in rooms])
 
     @defer.inlineCallbacks
-    def _do_local_membership_update(self, event, membership, broadcast_msg):
+    def _do_local_membership_update(self, event, membership):
         # store membership
-        store_id = yield self.store.store_room_member(
-            user_id=event.target_user_id,
-            sender=event.user_id,
-            room_id=event.room_id,
-            content=event.content,
-            membership=membership
-        )
+        store_id = yield self.store.persist_event(event)
 
         # Send a PDU to all hosts who have joined the room.
         destinations = yield self.store.get_joined_hosts_for_room(
@@ -732,78 +728,11 @@ class RoomMemberHandler(BaseHandler):
         yield self.hs.get_federation().handle_new_event(event)
         self.notifier.on_new_room_event(event, store_id)
 
-        if broadcast_msg:
-            yield self._inject_membership_msg(
-                source=event.user_id,
-                target=event.target_user_id,
-                room_id=event.room_id,
-                membership=event.content["membership"]
-            )
-
-    @defer.inlineCallbacks
-    def _do_invite_join_dance(self, room_id, joinee, target_host, content):
-        logger.debug("Doing remote join dance")
-
-        # do invite join dance
-        federation = self.hs.get_federation()
-        new_event = self.event_factory.create_event(
-            etype=InviteJoinEvent.TYPE,
-            target_host=target_host,
-            room_id=room_id,
-            user_id=joinee,
-            content=content
-        )
-
-        new_event.destinations = [target_host]
-
-        yield self.store.store_room(
-            room_id, "", is_public=False
-        )
-
-        #yield self.state_handler.handle_new_event(event)
-        yield federation.handle_new_event(new_event)
-        yield federation.get_state_for_room(
-            target_host, room_id
-        )
-
-    @defer.inlineCallbacks
-    def _inject_membership_msg(self, room_id=None, source=None, target=None,
-                               membership=None):
-        # TODO this should be a different type of message, not m.text
-        if membership == Membership.INVITE:
-            body = "%s invited %s to the room." % (source, target)
-        elif membership == Membership.JOIN:
-            body = "%s joined the room." % (target)
-        elif membership == Membership.LEAVE:
-            body = "%s left the room." % (target)
-        else:
-            raise RoomError(500, "Unknown membership value %s" % membership)
-
-        membership_json = {
-            "msgtype": u"m.text",
-            "body": body,
-            "membership_source": source,
-            "membership_target": target,
-            "membership": membership,
-        }
-
-        msg_id = "m%s" % int(self.clock.time_msec())
-
-        event = self.event_factory.create_event(
-            etype=MessageEvent.TYPE,
-            room_id=room_id,
-            user_id="_homeserver_",
-            msg_id=msg_id,
-            content=membership_json
-        )
-
-        handler = self.hs.get_handlers().message_handler
-        yield handler.send_message(event, suppress_auth=True)
-
 
 class RoomListHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def get_public_room_list(self):
-        chunk = yield self.store.get_rooms(is_public=True, with_topics=True)
+        chunk = yield self.store.get_rooms(is_public=True)
+        # FIXME (erikj): START is no longer a valid value
         defer.returnValue({"start": "START", "end": "END", "chunk": chunk})