diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/message.py | 14 | ||||
-rw-r--r-- | synapse/handlers/presence.py | 227 | ||||
-rw-r--r-- | synapse/handlers/room.py | 189 | ||||
-rw-r--r-- | synapse/handlers/typing.py | 14 |
4 files changed, 366 insertions, 78 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 3d7f97bcff..4aeb2089f5 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -76,6 +76,10 @@ class MessageHandler(BaseRoomHandler): Raises: SynapseError if something went wrong. """ + # TODO(paul): Why does 'event' not have a 'user' object? + user = self.hs.parse_userid(event.user_id) + assert user.is_mine, "User must be our own: %s" % (user,) + if stamp_event: event.content["hsob_ts"] = int(self.clock.time_msec()) @@ -86,6 +90,10 @@ class MessageHandler(BaseRoomHandler): yield self._on_new_room_event(event, snapshot) + self.hs.get_handlers().presence_handler.bump_presence_active_time( + user + ) + @defer.inlineCallbacks def get_messages(self, user_id=None, room_id=None, pagin_config=None, feedback=False): @@ -274,11 +282,11 @@ class MessageHandler(BaseRoomHandler): messages, token = yield self.store.get_recent_events_for_room( event.room_id, limit=limit, - end_token=now_token.events_key, + end_token=now_token.room_key, ) - start_token = now_token.copy_and_replace("events_key", token[0]) - end_token = now_token.copy_and_replace("events_key", token[1]) + start_token = now_token.copy_and_replace("room_key", token[0]) + end_token = now_token.copy_and_replace("room_key", token[1]) d["messages"] = { "chunk": [m.get_dict() for m in messages], diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 7731de85c0..9bfceda88a 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -52,6 +52,13 @@ def partitionbool(l, func): class PresenceHandler(BaseHandler): + STATE_LEVELS = { + PresenceState.OFFLINE: 0, + PresenceState.UNAVAILABLE: 1, + PresenceState.ONLINE: 2, + PresenceState.FREE_FOR_CHAT: 3, + } + def __init__(self, hs): super(PresenceHandler, self).__init__(hs) @@ -135,7 +142,7 @@ class PresenceHandler(BaseHandler): return self._user_cachemap[user] else: statuscache = UserPresenceCache() - statuscache.update({"state": PresenceState.OFFLINE}, user) + statuscache.update({"presence": PresenceState.OFFLINE}, user) return statuscache def registered_user(self, user): @@ -143,10 +150,6 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks def is_presence_visible(self, observer_user, observed_user): - defer.returnValue(True) - # return - # FIXME (erikj): This code path absolutely kills the database. - assert(observed_user.is_mine) if observer_user == observed_user: @@ -173,19 +176,24 @@ class PresenceHandler(BaseHandler): observed_user=target_user ) - if visible: - state = yield self.store.get_presence_state( - target_user.localpart - ) - else: + if not visible: raise SynapseError(404, "Presence information not visible") + state = yield self.store.get_presence_state(target_user.localpart) + if "mtime" in state: + del state["mtime"] + state["presence"] = state["state"] + + if target_user in self._user_cachemap: + state["last_active"] = ( + self._user_cachemap[target_user].get_state()["last_active"] + ) else: # TODO(paul): Have remote server send us permissions set state = self._get_or_offline_usercache(target_user).get_state() - if "mtime" in state and (state["mtime"] is not None): - state["mtime_age"] = int( - self.clock.time_msec() - state.pop("mtime") + if "last_active" in state: + state["last_active_ago"] = int( + self.clock.time_msec() - state.pop("last_active") ) defer.returnValue(state) @@ -202,20 +210,33 @@ class PresenceHandler(BaseHandler): if target_user != auth_user: raise AuthError(400, "Cannot set another user's displayname") - # TODO(paul): Sanity-check 'state' if "status_msg" not in state: state["status_msg"] = None for k in state.keys(): - if k not in ("state", "status_msg"): + if k not in ("presence", "state", "status_msg"): raise SynapseError( 400, "Unexpected presence state key '%s'" % (k,) ) + # Handle legacy "state" key for now + if "state" in state: + state["presence"] = state.pop("state") + + if state["presence"] not in self.STATE_LEVELS: + raise SynapseError(400, "'%s' is not a valid presence state" % + state["presence"] + ) + logger.debug("Updating presence state of %s to %s", - target_user.localpart, state["state"]) + target_user.localpart, state["presence"]) state_to_store = dict(state) + state_to_store["state"] = state_to_store.pop("presence") + + statuscache=self._get_or_offline_usercache(target_user) + was_level = self.STATE_LEVELS[statuscache.get_state()["presence"]] + now_level = self.STATE_LEVELS[state["presence"]] yield defer.DeferredList([ self.store.set_presence_state( @@ -226,9 +247,10 @@ class PresenceHandler(BaseHandler): ), ]) - state["mtime"] = self.clock.time_msec() + if now_level > was_level: + state["last_active"] = self.clock.time_msec() - now_online = state["state"] != PresenceState.OFFLINE + now_online = state["presence"] != PresenceState.OFFLINE was_polling = target_user in self._user_cachemap if now_online and not was_polling: @@ -240,6 +262,12 @@ class PresenceHandler(BaseHandler): # we don't have to do this all the time self.changed_presencelike_data(target_user, state) + def bump_presence_active_time(self, user, now=None): + if now is None: + now = self.clock.time_msec() + + self.changed_presencelike_data(user, {"last_active": now}) + def changed_presencelike_data(self, user, state): statuscache = self._get_or_make_usercache(user) @@ -251,28 +279,27 @@ class PresenceHandler(BaseHandler): @log_function def started_user_eventstream(self, user): # TODO(paul): Use "last online" state - self.set_state(user, user, {"state": PresenceState.ONLINE}) + self.set_state(user, user, {"presence": PresenceState.ONLINE}) @log_function def stopped_user_eventstream(self, user): # TODO(paul): Save current state as "last online" state - self.set_state(user, user, {"state": PresenceState.OFFLINE}) + self.set_state(user, user, {"presence": PresenceState.OFFLINE}) @defer.inlineCallbacks def user_joined_room(self, user, room_id): - if user.is_mine: - self.push_update_to_local_and_remote( - observed_user=user, - room_ids=[room_id], - statuscache=self._get_or_offline_usercache(user), - ) + statuscache = self._get_or_make_usercache(user) - else: - self.push_update_to_clients( + # No actual update but we need to bump the serial anyway for the + # event source + self._user_cachemap_latest_serial += 1 + statuscache.update({}, serial=self._user_cachemap_latest_serial) + + self.push_update_to_local_and_remote( observed_user=user, room_ids=[room_id], - statuscache=self._get_or_offline_usercache(user), + statuscache=statuscache, ) # We also want to tell them about current presence of people. @@ -386,9 +413,9 @@ class PresenceHandler(BaseHandler): observed_user = self.hs.parse_userid(p.pop("observed_user_id")) p["observed_user"] = observed_user p.update(self._get_or_offline_usercache(observed_user).get_state()) - if "mtime" in p: - p["mtime_age"] = int( - self.clock.time_msec() - p.pop("mtime") + if "last_active" in p: + p["last_active_ago"] = int( + self.clock.time_msec() - p.pop("last_active") ) defer.returnValue(presence) @@ -457,10 +484,6 @@ class PresenceHandler(BaseHandler): def _start_polling_local(self, user, target_user): target_localpart = target_user.localpart - if not self.is_presence_visible(observer_user=user, - observed_user=target_user): - return - if target_localpart not in self._local_pushmap: self._local_pushmap[target_localpart] = set() @@ -577,21 +600,30 @@ class PresenceHandler(BaseHandler): def _push_presence_remote(self, user, destination, state=None): if state is None: state = yield self.store.get_presence_state(user.localpart) + del state["mtime"] + state["presence"] = state["state"] + + if user in self._user_cachemap: + state["last_active"] = ( + self._user_cachemap[user].get_state()["last_active"] + ) yield self.distributor.fire( "collect_presencelike_data", user, state ) - if "mtime" in state: + if "last_active" in state: state = dict(state) - state["mtime_age"] = int( - self.clock.time_msec() - state.pop("mtime") + state["last_active_ago"] = int( + self.clock.time_msec() - state.pop("last_active") ) user_state = { "user_id": user.to_string(), } user_state.update(**state) + if "state" in user_state and "presence" not in user_state: + user_state["presence"] = user_state["state"] yield self.federation.send_edu( destination=destination, @@ -618,14 +650,29 @@ class PresenceHandler(BaseHandler): room_ids = yield rm_handler.get_rooms_for_user(user) if not observers and not room_ids: - break + continue state = dict(push) del state["user_id"] - if "mtime_age" in state: - state["mtime"] = int( - self.clock.time_msec() - state.pop("mtime_age") + if "presence" in state: + # all is OK + pass + elif "state" in state: + # Legacy handling + state["presence"] = state["state"] + else: + logger.warning("Received a presence 'push' EDU from %s without" + + " either a 'presence' or 'state' key", origin + ) + continue + + if "state" in state: + del state["state"] + + if "last_active_ago" in state: + state["last_active"] = int( + self.clock.time_msec() - state.pop("last_active_ago") ) statuscache = self._get_or_make_usercache(user) @@ -640,7 +687,7 @@ class PresenceHandler(BaseHandler): statuscache=statuscache, ) - if state["state"] == PresenceState.OFFLINE: + if state["presence"] == PresenceState.OFFLINE: del self._user_cachemap[user] for poll in content.get("poll", []): @@ -673,10 +720,9 @@ class PresenceHandler(BaseHandler): yield defer.DeferredList(deferreds) @defer.inlineCallbacks - def push_update_to_local_and_remote(self, observed_user, + def push_update_to_local_and_remote(self, observed_user, statuscache, users_to_push=[], room_ids=[], - remote_domains=[], - statuscache=None): + remote_domains=[]): localusers, remoteusers = partitionbool( users_to_push, @@ -722,6 +768,78 @@ class PresenceHandler(BaseHandler): ) +class PresenceEventSource(object): + def __init__(self, hs): + self.hs = hs + self.clock = hs.get_clock() + + def get_new_events_for_user(self, user, from_key, limit): + from_key = int(from_key) + + presence = self.hs.get_handlers().presence_handler + cachemap = presence._user_cachemap + + # TODO(paul): limit, and filter by visibility + updates = [(k, cachemap[k]) for k in cachemap + if from_key < cachemap[k].serial] + + if updates: + clock = self.clock + + latest_serial = max([x[1].serial for x in updates]) + data = [x[1].make_event(user=x[0], clock=clock) for x in updates] + + return ((data, latest_serial)) + else: + return (([], presence._user_cachemap_latest_serial)) + + def get_current_key(self): + presence = self.hs.get_handlers().presence_handler + return presence._user_cachemap_latest_serial + + def get_pagination_rows(self, user, pagination_config, key): + # TODO (erikj): Does this make sense? Ordering? + + from_token = pagination_config.from_token + to_token = pagination_config.to_token + + from_key = int(from_token.presence_key) + + if to_token: + to_key = int(to_token.presence_key) + else: + to_key = -1 + + presence = self.hs.get_handlers().presence_handler + cachemap = presence._user_cachemap + + # TODO(paul): limit, and filter by visibility + updates = [(k, cachemap[k]) for k in cachemap + if to_key < cachemap[k].serial < from_key] + + if updates: + clock = self.clock + + earliest_serial = max([x[1].serial for x in updates]) + data = [x[1].make_event(user=x[0], clock=clock) for x in updates] + + if to_token: + next_token = to_token + else: + next_token = from_token + + next_token = next_token.copy_and_replace( + "presence_key", earliest_serial + ) + return ((data, next_token)) + else: + if not to_token: + to_token = from_token.copy_and_replace( + "presence_key", 0 + ) + return (([], to_token)) + + class UserPresenceCache(object): """Store an observed user's state and status message. @@ -733,6 +851,7 @@ class UserPresenceCache(object): def update(self, state, serial): assert("mtime_age" not in state) + assert("state" not in state) self.state.update(state) # Delete keys that are now 'None' @@ -749,15 +868,21 @@ class UserPresenceCache(object): def get_state(self): # clone it so caller can't break our cache - return dict(self.state) + state = dict(self.state) + + # Legacy handling + if "presence" in state: + state["state"] = state["presence"] + + return state def make_event(self, user, clock): content = self.get_state() content["user_id"] = user.to_string() - if "mtime" in content: - content["mtime_age"] = int( - clock.time_msec() - content.pop("mtime") + if "last_active" in content: + content["last_active_ago"] = int( + clock.time_msec() - content.pop("last_active") ) return {"type": "m.presence", "content": content} diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 3e41d7a46b..53aa77405c 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -17,10 +17,12 @@ from twisted.internet import defer from synapse.types import UserID, RoomAlias, RoomID -from synapse.api.constants import Membership +from synapse.api.constants import Membership, JoinRules from synapse.api.errors import StoreError, SynapseError from synapse.api.events.room import ( - RoomMemberEvent, RoomConfigEvent + RoomMemberEvent, RoomCreateEvent, RoomPowerLevelsEvent, + RoomJoinRulesEvent, RoomAddStateLevelEvent, RoomTopicEvent, + RoomSendEventLevelEvent, RoomOpsPowerLevelsEvent, RoomNameEvent, ) from synapse.util import stringutils from ._base import BaseRoomHandler @@ -62,6 +64,8 @@ class RoomCreationHandler(BaseRoomHandler): else: room_alias = None + is_public = config.get("visibility", None) == "public" + if room_id: # Ensure room_id is the correct type room_id_obj = RoomID.from_string(room_id, self.hs) @@ -71,7 +75,7 @@ class RoomCreationHandler(BaseRoomHandler): yield self.store.store_room( room_id=room_id, room_creator_user_id=user_id, - is_public=config["visibility"] == "public" + is_public=is_public ) else: # autogen room IDs and try to create it. We may clash, so just @@ -85,7 +89,7 @@ class RoomCreationHandler(BaseRoomHandler): yield self.store.store_room( room_id=gen_room_id.to_string(), room_creator_user_id=user_id, - is_public=config["visibility"] == "public" + is_public=is_public ) room_id = gen_room_id.to_string() break @@ -94,18 +98,9 @@ class RoomCreationHandler(BaseRoomHandler): if not room_id: raise StoreError(500, "Couldn't generate a room ID.") - config_event = self.event_factory.create_event( - etype=RoomConfigEvent.TYPE, - room_id=room_id, - user_id=user_id, - content=config, - ) - - snapshot = yield self.store.snapshot_room( - room_id=room_id, - user_id=user_id, - state_type=RoomConfigEvent.TYPE, - state_key="", + user = self.hs.parse_userid(user_id) + creation_events = self._create_events_for_new_room( + user, room_id, is_public=is_public ) if room_alias: @@ -115,11 +110,46 @@ class RoomCreationHandler(BaseRoomHandler): servers=[self.hs.hostname], ) - yield self.state_handler.handle_new_event(config_event, snapshot) - # store_id = persist... - federation_handler = self.hs.get_handlers().federation_handler - yield federation_handler.handle_new_event(config_event, snapshot) + + @defer.inlineCallbacks + def handle_event(event): + snapshot = yield self.store.snapshot_room( + room_id=room_id, + user_id=user_id, + ) + + logger.debug("Event: %s", event) + + yield self.state_handler.handle_new_event(event, snapshot) + yield self._on_new_room_event(event, snapshot, extra_users=[user]) + + for event in creation_events: + yield handle_event(event) + + if "name" in config: + name = config["name"] + name_event = self.event_factory.create_event( + etype=RoomNameEvent.TYPE, + room_id=room_id, + user_id=user_id, + required_power_level=5, + content={"name": name}, + ) + + yield handle_event(name_event) + + if "topic" in config: + topic = config["topic"] + topic_event = self.event_factory.create_event( + etype=RoomTopicEvent.TYPE, + room_id=room_id, + user_id=user_id, + required_power_level=5, + content={"topic": topic}, + ) + + yield handle_event(topic_event) content = {"membership": Membership.JOIN} join_event = self.event_factory.create_event( @@ -142,6 +172,62 @@ class RoomCreationHandler(BaseRoomHandler): defer.returnValue(result) + def _create_events_for_new_room(self, creator, room_id, is_public=False): + event_keys = { + "room_id": room_id, + "user_id": creator.to_string(), + "required_power_level": 10, + } + + def create(etype, **content): + return self.event_factory.create_event( + etype=etype, + content=content, + **event_keys + ) + + creation_event = create( + etype=RoomCreateEvent.TYPE, + creator=creator.to_string(), + ) + + power_levels_event = self.event_factory.create_event( + etype=RoomPowerLevelsEvent.TYPE, + content={creator.to_string(): 10, "default": 0}, + **event_keys + ) + + join_rule = JoinRules.PUBLIC if is_public else JoinRules.INVITE + join_rules_event = create( + etype=RoomJoinRulesEvent.TYPE, + join_rule=join_rule, + ) + + add_state_event = create( + etype=RoomAddStateLevelEvent.TYPE, + level=10, + ) + + send_event = create( + etype=RoomSendEventLevelEvent.TYPE, + level=0, + ) + + ops = create( + etype=RoomOpsPowerLevelsEvent.TYPE, + ban_level=5, + kick_level=5, + ) + + return [ + creation_event, + power_levels_event, + join_rules_event, + add_state_event, + send_event, + ops, + ] + class RoomMemberHandler(BaseRoomHandler): # TODO(paul): This handler currently contains a messy conflation of @@ -285,6 +371,16 @@ class RoomMemberHandler(BaseRoomHandler): if do_auth: yield self.auth.check(event, snapshot, raises=True) + # If we're banning someone, set a req power level + if event.membership == Membership.BAN: + if not hasattr(event, "required_power_level") or event.required_power_level is None: + # Add some default required_power_level + user_level = yield self.store.get_power_level( + event.room_id, + event.user_id, + ) + event.required_power_level = user_level + if prev_state and prev_state.membership == event.membership: # double same action, treat this event as a NOOP. defer.returnValue({}) @@ -445,10 +541,9 @@ class RoomMemberHandler(BaseRoomHandler): host = target_user.domain destinations.append(host) - # If we are joining a remote HS, include that. - if membership == Membership.JOIN: - host = target_user.domain - destinations.append(host) + # Always include target domain + host = target_user.domain + destinations.append(host) return self._on_new_room_event( event, snapshot, extra_destinations=destinations, @@ -462,3 +557,49 @@ class RoomListHandler(BaseRoomHandler): chunk = yield self.store.get_rooms(is_public=True) # FIXME (erikj): START is no longer a valid value defer.returnValue({"start": "START", "end": "END", "chunk": chunk}) + + +class RoomEventSource(object): + def __init__(self, hs): + self.store = hs.get_datastore() + + @defer.inlineCallbacks + def get_new_events_for_user(self, user, from_key, limit): + # We just ignore the key for now. + + to_key = yield self.get_current_key() + + events, end_key = yield self.store.get_room_events_stream( + user_id=user.to_string(), + from_key=from_key, + to_key=to_key, + room_id=None, + limit=limit, + ) + + defer.returnValue((events, end_key)) + + def get_current_key(self): + return self.store.get_room_events_max_id() + + @defer.inlineCallbacks + def get_pagination_rows(self, user, pagination_config, key): + from_token = pagination_config.from_token + to_token = pagination_config.to_token + limit = pagination_config.limit + direction = pagination_config.direction + + to_key = to_token.room_key if to_token else None + + events, next_key = yield self.store.paginate_room_events( + room_id=key, + from_key=from_token.room_key, + to_key=to_key, + direction=direction, + limit=limit, + with_feedback=True + ) + + next_token = from_token.copy_and_replace("room_key", next_key) + + defer.returnValue((events, next_token)) diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 9fab0ff37c..3268427ecd 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -145,3 +145,17 @@ class TypingNotificationHandler(BaseHandler): typing): # TODO(paul) steal this from presence.py pass + + +class TypingNotificationEventSource(object): + def __init__(self, hs): + self.hs = hs + + def get_new_events_for_user(self, user, from_key, limit): + return ([], from_key) + + def get_current_key(self): + return 0 + + def get_pagination_rows(self, user, pagination_config, key): + return ([], pagination_config.from_token) |