summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/message.py14
-rw-r--r--synapse/handlers/presence.py227
-rw-r--r--synapse/handlers/room.py189
-rw-r--r--synapse/handlers/typing.py14
4 files changed, 366 insertions, 78 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 3d7f97bcff..4aeb2089f5 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -76,6 +76,10 @@ class MessageHandler(BaseRoomHandler):
         Raises:
             SynapseError if something went wrong.
         """
+        # TODO(paul): Why does 'event' not have a 'user' object?
+        user = self.hs.parse_userid(event.user_id)
+        assert user.is_mine, "User must be our own: %s" % (user,)
+
         if stamp_event:
             event.content["hsob_ts"] = int(self.clock.time_msec())
 
@@ -86,6 +90,10 @@ class MessageHandler(BaseRoomHandler):
 
         yield self._on_new_room_event(event, snapshot)
 
+        self.hs.get_handlers().presence_handler.bump_presence_active_time(
+            user
+        )
+
     @defer.inlineCallbacks
     def get_messages(self, user_id=None, room_id=None, pagin_config=None,
                      feedback=False):
@@ -274,11 +282,11 @@ class MessageHandler(BaseRoomHandler):
                 messages, token = yield self.store.get_recent_events_for_room(
                     event.room_id,
                     limit=limit,
-                    end_token=now_token.events_key,
+                    end_token=now_token.room_key,
                 )
 
-                start_token = now_token.copy_and_replace("events_key", token[0])
-                end_token = now_token.copy_and_replace("events_key", token[1])
+                start_token = now_token.copy_and_replace("room_key", token[0])
+                end_token = now_token.copy_and_replace("room_key", token[1])
 
                 d["messages"] = {
                     "chunk": [m.get_dict() for m in messages],
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 7731de85c0..9bfceda88a 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -52,6 +52,13 @@ def partitionbool(l, func):
 
 class PresenceHandler(BaseHandler):
 
+    STATE_LEVELS = {
+        PresenceState.OFFLINE: 0,
+        PresenceState.UNAVAILABLE: 1,
+        PresenceState.ONLINE: 2,
+        PresenceState.FREE_FOR_CHAT: 3,
+    }
+
     def __init__(self, hs):
         super(PresenceHandler, self).__init__(hs)
 
@@ -135,7 +142,7 @@ class PresenceHandler(BaseHandler):
             return self._user_cachemap[user]
         else:
             statuscache = UserPresenceCache()
-            statuscache.update({"state": PresenceState.OFFLINE}, user)
+            statuscache.update({"presence": PresenceState.OFFLINE}, user)
             return statuscache
 
     def registered_user(self, user):
@@ -143,10 +150,6 @@ class PresenceHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def is_presence_visible(self, observer_user, observed_user):
-        defer.returnValue(True)
-        # return
-        # FIXME (erikj): This code path absolutely kills the database.
-
         assert(observed_user.is_mine)
 
         if observer_user == observed_user:
@@ -173,19 +176,24 @@ class PresenceHandler(BaseHandler):
                 observed_user=target_user
             )
 
-            if visible:
-                state = yield self.store.get_presence_state(
-                    target_user.localpart
-                )
-            else:
+            if not visible:
                 raise SynapseError(404, "Presence information not visible")
+            state = yield self.store.get_presence_state(target_user.localpart)
+            if "mtime" in state:
+                del state["mtime"]
+            state["presence"] = state["state"]
+
+            if target_user in self._user_cachemap:
+                state["last_active"] = (
+                    self._user_cachemap[target_user].get_state()["last_active"]
+                )
         else:
             # TODO(paul): Have remote server send us permissions set
             state = self._get_or_offline_usercache(target_user).get_state()
 
-        if "mtime" in state and (state["mtime"] is not None):
-            state["mtime_age"] = int(
-                self.clock.time_msec() - state.pop("mtime")
+        if "last_active" in state:
+            state["last_active_ago"] = int(
+                self.clock.time_msec() - state.pop("last_active")
             )
         defer.returnValue(state)
 
@@ -202,20 +210,33 @@ class PresenceHandler(BaseHandler):
         if target_user != auth_user:
             raise AuthError(400, "Cannot set another user's displayname")
 
-        # TODO(paul): Sanity-check 'state'
         if "status_msg" not in state:
             state["status_msg"] = None
 
         for k in state.keys():
-            if k not in ("state", "status_msg"):
+            if k not in ("presence", "state", "status_msg"):
                 raise SynapseError(
                     400, "Unexpected presence state key '%s'" % (k,)
                 )
 
+        # Handle legacy "state" key for now
+        if "state" in state:
+            state["presence"] = state.pop("state")
+
+        if state["presence"] not in self.STATE_LEVELS:
+            raise SynapseError(400, "'%s' is not a valid presence state" %
+                state["presence"]
+            )
+
         logger.debug("Updating presence state of %s to %s",
-                     target_user.localpart, state["state"])
+                     target_user.localpart, state["presence"])
 
         state_to_store = dict(state)
+        state_to_store["state"] = state_to_store.pop("presence")
+
+        statuscache=self._get_or_offline_usercache(target_user)
+        was_level = self.STATE_LEVELS[statuscache.get_state()["presence"]]
+        now_level = self.STATE_LEVELS[state["presence"]]
 
         yield defer.DeferredList([
             self.store.set_presence_state(
@@ -226,9 +247,10 @@ class PresenceHandler(BaseHandler):
             ),
         ])
 
-        state["mtime"] = self.clock.time_msec()
+        if now_level > was_level:
+            state["last_active"] = self.clock.time_msec()
 
-        now_online = state["state"] != PresenceState.OFFLINE
+        now_online = state["presence"] != PresenceState.OFFLINE
         was_polling = target_user in self._user_cachemap
 
         if now_online and not was_polling:
@@ -240,6 +262,12 @@ class PresenceHandler(BaseHandler):
         #   we don't have to do this all the time
         self.changed_presencelike_data(target_user, state)
 
+    def bump_presence_active_time(self, user, now=None):
+        if now is None:
+            now = self.clock.time_msec()
+
+        self.changed_presencelike_data(user, {"last_active": now})
+
     def changed_presencelike_data(self, user, state):
         statuscache = self._get_or_make_usercache(user)
 
@@ -251,28 +279,27 @@ class PresenceHandler(BaseHandler):
     @log_function
     def started_user_eventstream(self, user):
         # TODO(paul): Use "last online" state
-        self.set_state(user, user, {"state": PresenceState.ONLINE})
+        self.set_state(user, user, {"presence": PresenceState.ONLINE})
 
     @log_function
     def stopped_user_eventstream(self, user):
         # TODO(paul): Save current state as "last online" state
-        self.set_state(user, user, {"state": PresenceState.OFFLINE})
+        self.set_state(user, user, {"presence": PresenceState.OFFLINE})
 
     @defer.inlineCallbacks
     def user_joined_room(self, user, room_id):
-
         if user.is_mine:
-            self.push_update_to_local_and_remote(
-                observed_user=user,
-                room_ids=[room_id],
-                statuscache=self._get_or_offline_usercache(user),
-            )
+            statuscache = self._get_or_make_usercache(user)
 
-        else:
-            self.push_update_to_clients(
+            # No actual update but we need to bump the serial anyway for the
+            # event source
+            self._user_cachemap_latest_serial += 1
+            statuscache.update({}, serial=self._user_cachemap_latest_serial)
+
+            self.push_update_to_local_and_remote(
                 observed_user=user,
                 room_ids=[room_id],
-                statuscache=self._get_or_offline_usercache(user),
+                statuscache=statuscache,
             )
 
         # We also want to tell them about current presence of people.
@@ -386,9 +413,9 @@ class PresenceHandler(BaseHandler):
             observed_user = self.hs.parse_userid(p.pop("observed_user_id"))
             p["observed_user"] = observed_user
             p.update(self._get_or_offline_usercache(observed_user).get_state())
-            if "mtime" in p:
-                p["mtime_age"] = int(
-                    self.clock.time_msec() - p.pop("mtime")
+            if "last_active" in p:
+                p["last_active_ago"] = int(
+                    self.clock.time_msec() - p.pop("last_active")
                 )
 
         defer.returnValue(presence)
@@ -457,10 +484,6 @@ class PresenceHandler(BaseHandler):
     def _start_polling_local(self, user, target_user):
         target_localpart = target_user.localpart
 
-        if not self.is_presence_visible(observer_user=user,
-            observed_user=target_user):
-            return
-
         if target_localpart not in self._local_pushmap:
             self._local_pushmap[target_localpart] = set()
 
@@ -577,21 +600,30 @@ class PresenceHandler(BaseHandler):
     def _push_presence_remote(self, user, destination, state=None):
         if state is None:
             state = yield self.store.get_presence_state(user.localpart)
+            del state["mtime"]
+            state["presence"] = state["state"]
+
+            if user in self._user_cachemap:
+                state["last_active"] = (
+                    self._user_cachemap[user].get_state()["last_active"]
+                )
 
             yield self.distributor.fire(
                 "collect_presencelike_data", user, state
             )
 
-        if "mtime" in state:
+        if "last_active" in state:
             state = dict(state)
-            state["mtime_age"] = int(
-                self.clock.time_msec() - state.pop("mtime")
+            state["last_active_ago"] = int(
+                self.clock.time_msec() - state.pop("last_active")
             )
 
         user_state = {
             "user_id": user.to_string(),
         }
         user_state.update(**state)
+        if "state" in user_state and "presence" not in user_state:
+            user_state["presence"] = user_state["state"]
 
         yield self.federation.send_edu(
             destination=destination,
@@ -618,14 +650,29 @@ class PresenceHandler(BaseHandler):
             room_ids = yield rm_handler.get_rooms_for_user(user)
 
             if not observers and not room_ids:
-                break
+                continue
 
             state = dict(push)
             del state["user_id"]
 
-            if "mtime_age" in state:
-                state["mtime"] = int(
-                    self.clock.time_msec() - state.pop("mtime_age")
+            if "presence" in state:
+                # all is OK
+                pass
+            elif "state" in state:
+                # Legacy handling
+                state["presence"] = state["state"]
+            else:
+                logger.warning("Received a presence 'push' EDU from %s without"
+                    + " either a 'presence' or 'state' key", origin
+                )
+                continue
+
+            if "state" in state:
+                del state["state"]
+
+            if "last_active_ago" in state:
+                state["last_active"] = int(
+                    self.clock.time_msec() - state.pop("last_active_ago")
                 )
 
             statuscache = self._get_or_make_usercache(user)
@@ -640,7 +687,7 @@ class PresenceHandler(BaseHandler):
                 statuscache=statuscache,
             )
 
-            if state["state"] == PresenceState.OFFLINE:
+            if state["presence"] == PresenceState.OFFLINE:
                 del self._user_cachemap[user]
 
         for poll in content.get("poll", []):
@@ -673,10 +720,9 @@ class PresenceHandler(BaseHandler):
         yield defer.DeferredList(deferreds)
 
     @defer.inlineCallbacks
-    def push_update_to_local_and_remote(self, observed_user,
+    def push_update_to_local_and_remote(self, observed_user, statuscache,
                                         users_to_push=[], room_ids=[],
-                                        remote_domains=[],
-                                        statuscache=None):
+                                        remote_domains=[]):
 
         localusers, remoteusers = partitionbool(
             users_to_push,
@@ -722,6 +768,78 @@ class PresenceHandler(BaseHandler):
         )
 
 
+class PresenceEventSource(object):
+    def __init__(self, hs):
+        self.hs = hs
+        self.clock = hs.get_clock()
+
+    def get_new_events_for_user(self, user, from_key, limit):
+        from_key = int(from_key)
+
+        presence = self.hs.get_handlers().presence_handler
+        cachemap = presence._user_cachemap
+
+        # TODO(paul): limit, and filter by visibility
+        updates = [(k, cachemap[k]) for k in cachemap
+                   if from_key < cachemap[k].serial]
+
+        if updates:
+            clock = self.clock
+
+            latest_serial = max([x[1].serial for x in updates])
+            data = [x[1].make_event(user=x[0], clock=clock) for x in updates]
+
+            return ((data, latest_serial))
+        else:
+            return (([], presence._user_cachemap_latest_serial))
+
+    def get_current_key(self):
+        presence = self.hs.get_handlers().presence_handler
+        return presence._user_cachemap_latest_serial
+
+    def get_pagination_rows(self, user, pagination_config, key):
+        # TODO (erikj): Does this make sense? Ordering?
+
+        from_token = pagination_config.from_token
+        to_token = pagination_config.to_token
+
+        from_key = int(from_token.presence_key)
+
+        if to_token:
+            to_key = int(to_token.presence_key)
+        else:
+            to_key = -1
+
+        presence = self.hs.get_handlers().presence_handler
+        cachemap = presence._user_cachemap
+
+        # TODO(paul): limit, and filter by visibility
+        updates = [(k, cachemap[k]) for k in cachemap
+                   if to_key < cachemap[k].serial < from_key]
+
+        if updates:
+            clock = self.clock
+
+            earliest_serial = max([x[1].serial for x in updates])
+            data = [x[1].make_event(user=x[0], clock=clock) for x in updates]
+
+            if to_token:
+                next_token = to_token
+            else:
+                next_token = from_token
+
+            next_token = next_token.copy_and_replace(
+                "presence_key", earliest_serial
+            )
+            return ((data, next_token))
+        else:
+            if not to_token:
+                to_token = from_token.copy_and_replace(
+                    "presence_key", 0
+                )
+            return (([], to_token))
+
+
 class UserPresenceCache(object):
     """Store an observed user's state and status message.
 
@@ -733,6 +851,7 @@ class UserPresenceCache(object):
 
     def update(self, state, serial):
         assert("mtime_age" not in state)
+        assert("state" not in state)
 
         self.state.update(state)
         # Delete keys that are now 'None'
@@ -749,15 +868,21 @@ class UserPresenceCache(object):
 
     def get_state(self):
         # clone it so caller can't break our cache
-        return dict(self.state)
+        state = dict(self.state)
+
+        # Legacy handling
+        if "presence" in state:
+            state["state"] = state["presence"]
+
+        return state
 
     def make_event(self, user, clock):
         content = self.get_state()
         content["user_id"] = user.to_string()
 
-        if "mtime" in content:
-            content["mtime_age"] = int(
-                clock.time_msec() - content.pop("mtime")
+        if "last_active" in content:
+            content["last_active_ago"] = int(
+                clock.time_msec() - content.pop("last_active")
             )
 
         return {"type": "m.presence", "content": content}
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 3e41d7a46b..53aa77405c 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -17,10 +17,12 @@
 from twisted.internet import defer
 
 from synapse.types import UserID, RoomAlias, RoomID
-from synapse.api.constants import Membership
+from synapse.api.constants import Membership, JoinRules
 from synapse.api.errors import StoreError, SynapseError
 from synapse.api.events.room import (
-    RoomMemberEvent, RoomConfigEvent
+    RoomMemberEvent, RoomCreateEvent, RoomPowerLevelsEvent,
+    RoomJoinRulesEvent, RoomAddStateLevelEvent, RoomTopicEvent,
+    RoomSendEventLevelEvent, RoomOpsPowerLevelsEvent, RoomNameEvent,
 )
 from synapse.util import stringutils
 from ._base import BaseRoomHandler
@@ -62,6 +64,8 @@ class RoomCreationHandler(BaseRoomHandler):
         else:
             room_alias = None
 
+        is_public = config.get("visibility", None) == "public"
+
         if room_id:
             # Ensure room_id is the correct type
             room_id_obj = RoomID.from_string(room_id, self.hs)
@@ -71,7 +75,7 @@ class RoomCreationHandler(BaseRoomHandler):
             yield self.store.store_room(
                 room_id=room_id,
                 room_creator_user_id=user_id,
-                is_public=config["visibility"] == "public"
+                is_public=is_public
             )
         else:
             # autogen room IDs and try to create it. We may clash, so just
@@ -85,7 +89,7 @@ class RoomCreationHandler(BaseRoomHandler):
                     yield self.store.store_room(
                         room_id=gen_room_id.to_string(),
                         room_creator_user_id=user_id,
-                        is_public=config["visibility"] == "public"
+                        is_public=is_public
                     )
                     room_id = gen_room_id.to_string()
                     break
@@ -94,18 +98,9 @@ class RoomCreationHandler(BaseRoomHandler):
             if not room_id:
                 raise StoreError(500, "Couldn't generate a room ID.")
 
-        config_event = self.event_factory.create_event(
-            etype=RoomConfigEvent.TYPE,
-            room_id=room_id,
-            user_id=user_id,
-            content=config,
-        )
-
-        snapshot = yield self.store.snapshot_room(
-            room_id=room_id,
-            user_id=user_id,
-            state_type=RoomConfigEvent.TYPE,
-            state_key="",
+        user = self.hs.parse_userid(user_id)
+        creation_events = self._create_events_for_new_room(
+            user, room_id, is_public=is_public
         )
 
         if room_alias:
@@ -115,11 +110,46 @@ class RoomCreationHandler(BaseRoomHandler):
                 servers=[self.hs.hostname],
             )
 
-        yield self.state_handler.handle_new_event(config_event, snapshot)
-        # store_id = persist...
-
         federation_handler = self.hs.get_handlers().federation_handler
-        yield federation_handler.handle_new_event(config_event, snapshot)
+
+        @defer.inlineCallbacks
+        def handle_event(event):
+            snapshot = yield self.store.snapshot_room(
+                room_id=room_id,
+                user_id=user_id,
+            )
+
+            logger.debug("Event: %s", event)
+
+            yield self.state_handler.handle_new_event(event, snapshot)
+            yield self._on_new_room_event(event, snapshot, extra_users=[user])
+
+        for event in creation_events:
+            yield handle_event(event)
+
+        if "name" in config:
+            name = config["name"]
+            name_event = self.event_factory.create_event(
+                etype=RoomNameEvent.TYPE,
+                room_id=room_id,
+                user_id=user_id,
+                required_power_level=5,
+                content={"name": name},
+            )
+
+            yield handle_event(name_event)
+
+        if "topic" in config:
+            topic = config["topic"]
+            topic_event = self.event_factory.create_event(
+                etype=RoomTopicEvent.TYPE,
+                room_id=room_id,
+                user_id=user_id,
+                required_power_level=5,
+                content={"topic": topic},
+            )
+
+            yield handle_event(topic_event)
 
         content = {"membership": Membership.JOIN}
         join_event = self.event_factory.create_event(
@@ -142,6 +172,62 @@ class RoomCreationHandler(BaseRoomHandler):
 
         defer.returnValue(result)
 
+    def _create_events_for_new_room(self, creator, room_id, is_public=False):
+        event_keys = {
+            "room_id": room_id,
+            "user_id": creator.to_string(),
+            "required_power_level": 10,
+        }
+
+        def create(etype, **content):
+            return self.event_factory.create_event(
+                etype=etype,
+                content=content,
+                **event_keys
+            )
+
+        creation_event = create(
+            etype=RoomCreateEvent.TYPE,
+            creator=creator.to_string(),
+        )
+
+        power_levels_event = self.event_factory.create_event(
+            etype=RoomPowerLevelsEvent.TYPE,
+            content={creator.to_string(): 10, "default": 0},
+            **event_keys
+        )
+
+        join_rule = JoinRules.PUBLIC if is_public else JoinRules.INVITE
+        join_rules_event = create(
+            etype=RoomJoinRulesEvent.TYPE,
+            join_rule=join_rule,
+        )
+
+        add_state_event = create(
+            etype=RoomAddStateLevelEvent.TYPE,
+            level=10,
+        )
+
+        send_event = create(
+            etype=RoomSendEventLevelEvent.TYPE,
+            level=0,
+        )
+
+        ops = create(
+            etype=RoomOpsPowerLevelsEvent.TYPE,
+            ban_level=5,
+            kick_level=5,
+        )
+
+        return [
+            creation_event,
+            power_levels_event,
+            join_rules_event,
+            add_state_event,
+            send_event,
+            ops,
+        ]
+
 
 class RoomMemberHandler(BaseRoomHandler):
     # TODO(paul): This handler currently contains a messy conflation of
@@ -285,6 +371,16 @@ class RoomMemberHandler(BaseRoomHandler):
             if do_auth:
                 yield self.auth.check(event, snapshot, raises=True)
 
+            # If we're banning someone, set a req power level
+            if event.membership == Membership.BAN:
+                if not hasattr(event, "required_power_level") or event.required_power_level is None:
+                    # Add some default required_power_level
+                    user_level = yield self.store.get_power_level(
+                        event.room_id,
+                        event.user_id,
+                    )
+                    event.required_power_level = user_level
+
             if prev_state and prev_state.membership == event.membership:
                 # double same action, treat this event as a NOOP.
                 defer.returnValue({})
@@ -445,10 +541,9 @@ class RoomMemberHandler(BaseRoomHandler):
             host = target_user.domain
             destinations.append(host)
 
-        # If we are joining a remote HS, include that.
-        if membership == Membership.JOIN:
-            host = target_user.domain
-            destinations.append(host)
+        # Always include target domain
+        host = target_user.domain
+        destinations.append(host)
 
         return self._on_new_room_event(
             event, snapshot, extra_destinations=destinations,
@@ -462,3 +557,49 @@ class RoomListHandler(BaseRoomHandler):
         chunk = yield self.store.get_rooms(is_public=True)
         # FIXME (erikj): START is no longer a valid value
         defer.returnValue({"start": "START", "end": "END", "chunk": chunk})
+
+
+class RoomEventSource(object):
+    def __init__(self, hs):
+        self.store = hs.get_datastore()
+
+    @defer.inlineCallbacks
+    def get_new_events_for_user(self, user, from_key, limit):
+        # We just ignore the key for now.
+
+        to_key = yield self.get_current_key()
+
+        events, end_key = yield self.store.get_room_events_stream(
+            user_id=user.to_string(),
+            from_key=from_key,
+            to_key=to_key,
+            room_id=None,
+            limit=limit,
+        )
+
+        defer.returnValue((events, end_key))
+
+    def get_current_key(self):
+        return self.store.get_room_events_max_id()
+
+    @defer.inlineCallbacks
+    def get_pagination_rows(self, user, pagination_config, key):
+        from_token = pagination_config.from_token
+        to_token = pagination_config.to_token
+        limit = pagination_config.limit
+        direction = pagination_config.direction
+
+        to_key = to_token.room_key if to_token else None
+
+        events, next_key = yield self.store.paginate_room_events(
+            room_id=key,
+            from_key=from_token.room_key,
+            to_key=to_key,
+            direction=direction,
+            limit=limit,
+            with_feedback=True
+        )
+
+        next_token = from_token.copy_and_replace("room_key", next_key)
+
+        defer.returnValue((events, next_token))
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 9fab0ff37c..3268427ecd 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -145,3 +145,17 @@ class TypingNotificationHandler(BaseHandler):
             typing):
         # TODO(paul) steal this from presence.py
         pass
+
+
+class TypingNotificationEventSource(object):
+    def __init__(self, hs):
+        self.hs = hs
+
+    def get_new_events_for_user(self, user, from_key, limit):
+        return ([], from_key)
+
+    def get_current_key(self):
+        return 0
+
+    def get_pagination_rows(self, user, pagination_config, key):
+        return ([], pagination_config.from_token)