diff options
author | matrix.org <matrix@matrix.org> | 2014-08-12 15:10:52 +0100 |
---|---|---|
committer | matrix.org <matrix@matrix.org> | 2014-08-12 15:10:52 +0100 |
commit | 4f475c7697722e946e39e42f38f3dd03a95d8765 (patch) | |
tree | 076d96d3809fb836c7245fd9f7960e7b75888a77 /synapse/handlers | |
download | synapse-4f475c7697722e946e39e42f38f3dd03a95d8765.tar.xz |
Reference Matrix Home Server
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/__init__.py | 46 | ||||
-rw-r--r-- | synapse/handlers/_base.py | 26 | ||||
-rw-r--r-- | synapse/handlers/directory.py | 100 | ||||
-rw-r--r-- | synapse/handlers/events.py | 149 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 74 | ||||
-rw-r--r-- | synapse/handlers/login.py | 64 | ||||
-rw-r--r-- | synapse/handlers/presence.py | 697 | ||||
-rw-r--r-- | synapse/handlers/profile.py | 169 | ||||
-rw-r--r-- | synapse/handlers/register.py | 100 | ||||
-rw-r--r-- | synapse/handlers/room.py | 808 |
10 files changed, 2233 insertions, 0 deletions
diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py new file mode 100644 index 0000000000..5688b68e49 --- /dev/null +++ b/synapse/handlers/__init__.py @@ -0,0 +1,46 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 matrix.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from .register import RegistrationHandler +from .room import ( + MessageHandler, RoomCreationHandler, RoomMemberHandler, RoomListHandler +) +from .events import EventStreamHandler +from .federation import FederationHandler +from .login import LoginHandler +from .profile import ProfileHandler +from .presence import PresenceHandler +from .directory import DirectoryHandler + + +class Handlers(object): + + """ A collection of all the event handlers. + + There's no need to lazily create these; we'll just make them all eagerly + at construction time. + """ + + def __init__(self, hs): + self.registration_handler = RegistrationHandler(hs) + self.message_handler = MessageHandler(hs) + self.room_creation_handler = RoomCreationHandler(hs) + self.room_member_handler = RoomMemberHandler(hs) + self.event_stream_handler = EventStreamHandler(hs) + self.federation_handler = FederationHandler(hs) + self.profile_handler = ProfileHandler(hs) + self.presence_handler = PresenceHandler(hs) + self.room_list_handler = RoomListHandler(hs) + self.login_handler = LoginHandler(hs) + self.directory_handler = DirectoryHandler(hs) diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py new file mode 100644 index 0000000000..87a392dd77 --- /dev/null +++ b/synapse/handlers/_base.py @@ -0,0 +1,26 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 matrix.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +class BaseHandler(object): + + def __init__(self, hs): + self.store = hs.get_datastore() + self.event_factory = hs.get_event_factory() + self.auth = hs.get_auth() + self.notifier = hs.get_notifier() + self.room_lock = hs.get_room_lock_manager() + self.state_handler = hs.get_state_handler() + self.hs = hs diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py new file mode 100644 index 0000000000..456007c71d --- /dev/null +++ b/synapse/handlers/directory.py @@ -0,0 +1,100 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 matrix.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer +from ._base import BaseHandler + +from synapse.api.errors import SynapseError + +import logging +import json +import urllib + + +logger = logging.getLogger(__name__) + + +# TODO(erikj): This needs to be factored out somewere +PREFIX = "/matrix/client/api/v1" + + +class DirectoryHandler(BaseHandler): + + def __init__(self, hs): + super(DirectoryHandler, self).__init__(hs) + self.hs = hs + self.http_client = hs.get_http_client() + self.clock = hs.get_clock() + + @defer.inlineCallbacks + def create_association(self, room_alias, room_id, servers): + # TODO(erikj): Do auth. + + if not room_alias.is_mine: + raise SynapseError(400, "Room alias must be local") + # TODO(erikj): Change this. + + # TODO(erikj): Add transactions. + + # TODO(erikj): Check if there is a current association. + + yield self.store.create_room_alias_association( + room_alias, + room_id, + servers + ) + + @defer.inlineCallbacks + def get_association(self, room_alias, local_only=False): + # TODO(erikj): Do auth + + room_id = None + if room_alias.is_mine: + result = yield self.store.get_association_from_room_alias( + room_alias + ) + + if result: + room_id = result.room_id + servers = result.servers + elif not local_only: + path = "%s/ds/room/%s?local_only=1" % ( + PREFIX, + urllib.quote(room_alias.to_string()) + ) + + result = None + try: + result = yield self.http_client.get_json( + destination=room_alias.domain, + path=path, + ) + except: + # TODO(erikj): Handle this better? + logger.exception("Failed to get remote room alias") + + if result and "room_id" in result and "servers" in result: + room_id = result["room_id"] + servers = result["servers"] + + if not room_id: + defer.returnValue({}) + return + + defer.returnValue({ + "room_id": room_id, + "servers": servers, + }) + return diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py new file mode 100644 index 0000000000..79742a4e1c --- /dev/null +++ b/synapse/handlers/events.py @@ -0,0 +1,149 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 matrix.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from twisted.internet import defer + +from ._base import BaseHandler +from synapse.api.streams.event import ( + EventStream, MessagesStreamData, RoomMemberStreamData, FeedbackStreamData, + RoomDataStreamData +) +from synapse.handlers.presence import PresenceStreamData + + +class EventStreamHandler(BaseHandler): + + stream_data_classes = [ + MessagesStreamData, + RoomMemberStreamData, + FeedbackStreamData, + RoomDataStreamData, + PresenceStreamData, + ] + + def __init__(self, hs): + super(EventStreamHandler, self).__init__(hs) + + # Count of active streams per user + self._streams_per_user = {} + # Grace timers per user to delay the "stopped" signal + self._stop_timer_per_user = {} + + self.distributor = hs.get_distributor() + self.distributor.declare("started_user_eventstream") + self.distributor.declare("stopped_user_eventstream") + + self.clock = hs.get_clock() + + def get_event_stream_token(self, stream_type, store_id, start_token): + """Return the next token after this event. + + Args: + stream_type (str): The StreamData.EVENT_TYPE + store_id (int): The new storage ID assigned from the data store. + start_token (str): The token the user started with. + Returns: + str: The end token. + """ + for i, stream_cls in enumerate(EventStreamHandler.stream_data_classes): + if stream_cls.EVENT_TYPE == stream_type: + # this is the stream for this event, so replace this part of + # the token + store_ids = start_token.split(EventStream.SEPARATOR) + store_ids[i] = str(store_id) + return EventStream.SEPARATOR.join(store_ids) + raise RuntimeError("Didn't find a stream type %s" % stream_type) + + @defer.inlineCallbacks + def get_stream(self, auth_user_id, pagin_config, timeout=0): + """Gets events as an event stream for this user. + + This function looks for interesting *events* for this user. This is + different from the notifier, which looks for interested *users* who may + want to know about a single event. + + Args: + auth_user_id (str): The user requesting their event stream. + pagin_config (synapse.api.streams.PaginationConfig): The config to + use when obtaining the stream. + timeout (int): The max time to wait for an incoming event in ms. + Returns: + A pagination stream API dict + """ + auth_user = self.hs.parse_userid(auth_user_id) + + stream_id = object() + + try: + if auth_user not in self._streams_per_user: + self._streams_per_user[auth_user] = 0 + if auth_user in self._stop_timer_per_user: + self.clock.cancel_call_later( + self._stop_timer_per_user.pop(auth_user)) + else: + self.distributor.fire( + "started_user_eventstream", auth_user + ) + self._streams_per_user[auth_user] += 1 + + # construct an event stream with the correct data ordering + stream_data_list = [] + for stream_class in EventStreamHandler.stream_data_classes: + stream_data_list.append(stream_class(self.hs)) + event_stream = EventStream(auth_user_id, stream_data_list) + + # fix unknown tokens to known tokens + pagin_config = yield event_stream.fix_tokens(pagin_config) + + # register interest in receiving new events + self.notifier.store_events_for(user_id=auth_user_id, + stream_id=stream_id, + from_tok=pagin_config.from_tok) + + # see if we can grab a chunk now + data_chunk = yield event_stream.get_chunk(config=pagin_config) + + # if there are previous events, return those. If not, wait on the + # new events for 'timeout' seconds. + if len(data_chunk["chunk"]) == 0 and timeout != 0: + results = yield defer.maybeDeferred( + self.notifier.get_events_for, + user_id=auth_user_id, + stream_id=stream_id, + timeout=timeout + ) + if results: + defer.returnValue(results) + + defer.returnValue(data_chunk) + finally: + # cleanup + self.notifier.purge_events_for(user_id=auth_user_id, + stream_id=stream_id) + + self._streams_per_user[auth_user] -= 1 + if not self._streams_per_user[auth_user]: + del self._streams_per_user[auth_user] + + # 10 seconds of grace to allow the client to reconnect again + # before we think they're gone + def _later(): + self.distributor.fire( + "stopped_user_eventstream", auth_user + ) + del self._stop_timer_per_user[auth_user] + + self._stop_timer_per_user[auth_user] = ( + self.clock.call_later(5, _later) + ) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py new file mode 100644 index 0000000000..12e7afca4c --- /dev/null +++ b/synapse/handlers/federation.py @@ -0,0 +1,74 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 matrix.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Contains handlers for federation events.""" + +from ._base import BaseHandler + +from synapse.api.events.room import InviteJoinEvent, RoomMemberEvent +from synapse.api.constants import Membership +from synapse.util.logutils import log_function + +from twisted.internet import defer + +import logging + + +logger = logging.getLogger(__name__) + + +class FederationHandler(BaseHandler): + + """Handles events that originated from federation.""" + + @log_function + @defer.inlineCallbacks + def on_receive(self, event, is_new_state): + if hasattr(event, "state_key") and not is_new_state: + logger.debug("Ignoring old state.") + return + + target_is_mine = False + if hasattr(event, "target_host"): + target_is_mine = event.target_host == self.hs.hostname + + if event.type == InviteJoinEvent.TYPE: + if not target_is_mine: + logger.debug("Ignoring invite/join event %s", event) + return + + # If we receive an invite/join event then we need to join the + # sender to the given room. + # TODO: We should probably auth this or some such + content = event.content + content.update({"membership": Membership.JOIN}) + new_event = self.event_factory.create_event( + etype=RoomMemberEvent.TYPE, + target_user_id=event.user_id, + room_id=event.room_id, + user_id=event.user_id, + membership=Membership.JOIN, + content=content + ) + + yield self.hs.get_handlers().room_member_handler.change_membership( + new_event, + True + ) + + else: + with (yield self.room_lock.lock(event.room_id)): + store_id = yield self.store.persist_event(event) + + yield self.notifier.on_new_room_event(event, store_id) diff --git a/synapse/handlers/login.py b/synapse/handlers/login.py new file mode 100644 index 0000000000..5a1acd7102 --- /dev/null +++ b/synapse/handlers/login.py @@ -0,0 +1,64 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 matrix.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from twisted.internet import defer + +from ._base import BaseHandler +from synapse.api.errors import LoginError + +import bcrypt +import logging + +logger = logging.getLogger(__name__) + + +class LoginHandler(BaseHandler): + + def __init__(self, hs): + super(LoginHandler, self).__init__(hs) + self.hs = hs + + @defer.inlineCallbacks + def login(self, user, password): + """Login as the specified user with the specified password. + + Args: + user (str): The user ID. + password (str): The password. + Returns: + The newly allocated access token. + Raises: + StoreError if there was a problem storing the token. + LoginError if there was an authentication problem. + """ + # TODO do this better, it can't go in __init__ else it cyclic loops + if not hasattr(self, "reg_handler"): + self.reg_handler = self.hs.get_handlers().registration_handler + + # pull out the hash for this user if they exist + user_info = yield self.store.get_user_by_id(user_id=user) + if not user_info: + logger.warn("Attempted to login as %s but they do not exist.", user) + raise LoginError(403, "") + + stored_hash = user_info[0]["password_hash"] + if bcrypt.checkpw(password, stored_hash): + # generate an access token and store it. + token = self.reg_handler._generate_token(user) + logger.info("Adding token %s for user %s", token, user) + yield self.store.add_access_token_to_user(user, token) + defer.returnValue(token) + else: + logger.warn("Failed password login for user %s", user) + raise LoginError(403, "") \ No newline at end of file diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py new file mode 100644 index 0000000000..38db4b1d67 --- /dev/null +++ b/synapse/handlers/presence.py @@ -0,0 +1,697 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 matrix.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from twisted.internet import defer + +from synapse.api.errors import SynapseError, AuthError +from synapse.api.constants import PresenceState +from synapse.api.streams import StreamData + +from ._base import BaseHandler + +import logging + + +logger = logging.getLogger(__name__) + + +# TODO(paul): Maybe there's one of these I can steal from somewhere +def partition(l, func): + """Partition the list by the result of func applied to each element.""" + ret = {} + + for x in l: + key = func(x) + if key not in ret: + ret[key] = [] + ret[key].append(x) + + return ret + + +def partitionbool(l, func): + def boolfunc(x): + return bool(func(x)) + + ret = partition(l, boolfunc) + return ret.get(True, []), ret.get(False, []) + + +class PresenceHandler(BaseHandler): + + def __init__(self, hs): + super(PresenceHandler, self).__init__(hs) + + self.homeserver = hs + + distributor = hs.get_distributor() + distributor.observe("registered_user", self.registered_user) + + distributor.observe( + "started_user_eventstream", self.started_user_eventstream + ) + distributor.observe( + "stopped_user_eventstream", self.stopped_user_eventstream + ) + + distributor.observe("user_joined_room", + self.user_joined_room + ) + + distributor.declare("collect_presencelike_data") + + distributor.declare("changed_presencelike_data") + distributor.observe( + "changed_presencelike_data", self.changed_presencelike_data + ) + + self.distributor = distributor + + self.federation = hs.get_replication_layer() + + self.federation.register_edu_handler( + "m.presence", self.incoming_presence + ) + self.federation.register_edu_handler( + "m.presence_invite", + lambda origin, content: self.invite_presence( + observed_user=hs.parse_userid(content["observed_user"]), + observer_user=hs.parse_userid(content["observer_user"]), + ) + ) + self.federation.register_edu_handler( + "m.presence_accept", + lambda origin, content: self.accept_presence( + observed_user=hs.parse_userid(content["observed_user"]), + observer_user=hs.parse_userid(content["observer_user"]), + ) + ) + self.federation.register_edu_handler( + "m.presence_deny", + lambda origin, content: self.deny_presence( + observed_user=hs.parse_userid(content["observed_user"]), + observer_user=hs.parse_userid(content["observer_user"]), + ) + ) + + # IN-MEMORY store, mapping local userparts to sets of local users to + # be informed of state changes. + self._local_pushmap = {} + # map local users to sets of remote /domain names/ who are interested + # in them + self._remote_sendmap = {} + # map remote users to sets of local users who're interested in them + self._remote_recvmap = {} + + # map any user to a UserPresenceCache + self._user_cachemap = {} + self._user_cachemap_latest_serial = 0 + + def _get_or_make_usercache(self, user): + """If the cache entry doesn't exist, initialise a new one.""" + if user not in self._user_cachemap: + self._user_cachemap[user] = UserPresenceCache() + return self._user_cachemap[user] + + def _get_or_offline_usercache(self, user): + """If the cache entry doesn't exist, return an OFFLINE one but do not + store it into the cache.""" + if user in self._user_cachemap: + return self._user_cachemap[user] + else: + statuscache = UserPresenceCache() + statuscache.update({"state": PresenceState.OFFLINE}, user) + return statuscache + + def registered_user(self, user): + self.store.create_presence(user.localpart) + + @defer.inlineCallbacks + def is_presence_visible(self, observer_user, observed_user): + assert(observed_user.is_mine) + + if observer_user == observed_user: + defer.returnValue(True) + + allowed_by_subscription = yield self.store.is_presence_visible( + observed_localpart=observed_user.localpart, + observer_userid=observer_user.to_string(), + ) + + if allowed_by_subscription: + defer.returnValue(True) + + # TODO(paul): Check same channel + + defer.returnValue(False) + + @defer.inlineCallbacks + def get_state(self, target_user, auth_user): + if target_user.is_mine: + visible = yield self.is_presence_visible(observer_user=auth_user, + observed_user=target_user + ) + + if visible: + state = yield self.store.get_presence_state( + target_user.localpart + ) + defer.returnValue(state) + else: + raise SynapseError(404, "Presence information not visible") + else: + # TODO(paul): Have remote server send us permissions set + defer.returnValue( + self._get_or_offline_usercache(target_user).get_state() + ) + + @defer.inlineCallbacks + def set_state(self, target_user, auth_user, state): + if not target_user.is_mine: + raise SynapseError(400, "User is not hosted on this Home Server") + + if target_user != auth_user: + raise AuthError(400, "Cannot set another user's displayname") + + # TODO(paul): Sanity-check 'state' + if "status_msg" not in state: + state["status_msg"] = None + + for k in state.keys(): + if k not in ("state", "status_msg"): + raise SynapseError( + 400, "Unexpected presence state key '%s'" % (k,) + ) + + logger.debug("Updating presence state of %s to %s", + target_user.localpart, state["state"]) + + state_to_store = dict(state) + + yield defer.DeferredList([ + self.store.set_presence_state( + target_user.localpart, state_to_store + ), + self.distributor.fire( + "collect_presencelike_data", target_user, state + ), + ]) + + now_online = state["state"] != PresenceState.OFFLINE + was_polling = target_user in self._user_cachemap + + if now_online and not was_polling: + self.start_polling_presence(target_user, state=state) + elif not now_online and was_polling: + self.stop_polling_presence(target_user) + + # TODO(paul): perform a presence push as part of start/stop poll so + # we don't have to do this all the time + self.changed_presencelike_data(target_user, state) + + if not now_online: + del self._user_cachemap[target_user] + + def changed_presencelike_data(self, user, state): + statuscache = self._get_or_make_usercache(user) + + self._user_cachemap_latest_serial += 1 + statuscache.update(state, serial=self._user_cachemap_latest_serial) + + self.push_presence(user, statuscache=statuscache) + + def started_user_eventstream(self, user): + # TODO(paul): Use "last online" state + self.set_state(user, user, {"state": PresenceState.ONLINE}) + + def stopped_user_eventstream(self, user): + # TODO(paul): Save current state as "last online" state + self.set_state(user, user, {"state": PresenceState.OFFLINE}) + + @defer.inlineCallbacks + def user_joined_room(self, user, room_id): + localusers = set() + remotedomains = set() + + rm_handler = self.homeserver.get_handlers().room_member_handler + yield rm_handler.fetch_room_distributions_into(room_id, + localusers=localusers, remotedomains=remotedomains, + ignore_user=user) + + if user.is_mine: + yield self._send_presence_to_distribution(srcuser=user, + localusers=localusers, remotedomains=remotedomains, + statuscache=self._get_or_offline_usercache(user), + ) + + for srcuser in localusers: + yield self._send_presence(srcuser=srcuser, destuser=user, + statuscache=self._get_or_offline_usercache(srcuser), + ) + + @defer.inlineCallbacks + def send_invite(self, observer_user, observed_user): + if not observer_user.is_mine: + raise SynapseError(400, "User is not hosted on this Home Server") + + yield self.store.add_presence_list_pending( + observer_user.localpart, observed_user.to_string() + ) + + if observed_user.is_mine: + yield self.invite_presence(observed_user, observer_user) + else: + yield self.federation.send_edu( + destination=observed_user.domain, + edu_type="m.presence_invite", + content={ + "observed_user": observed_user.to_string(), + "observer_user": observer_user.to_string(), + } + ) + + @defer.inlineCallbacks + def _should_accept_invite(self, observed_user, observer_user): + if not observed_user.is_mine: + defer.returnValue(False) + + row = yield self.store.has_presence_state(observed_user.localpart) + if not row: + defer.returnValue(False) + + # TODO(paul): Eventually we'll ask the user's permission for this + # before accepting. For now just accept any invite request + defer.returnValue(True) + + @defer.inlineCallbacks + def invite_presence(self, observed_user, observer_user): + accept = yield self._should_accept_invite(observed_user, observer_user) + + if accept: + yield self.store.allow_presence_visible( + observed_user.localpart, observer_user.to_string() + ) + + if observer_user.is_mine: + if accept: + yield self.accept_presence(observed_user, observer_user) + else: + yield self.deny_presence(observed_user, observer_user) + else: + edu_type = "m.presence_accept" if accept else "m.presence_deny" + + yield self.federation.send_edu( + destination=observer_user.domain, + edu_type=edu_type, + content={ + "observed_user": observed_user.to_string(), + "observer_user": observer_user.to_string(), + } + ) + + @defer.inlineCallbacks + def accept_presence(self, observed_user, observer_user): + yield self.store.set_presence_list_accepted( + observer_user.localpart, observed_user.to_string() + ) + + self.start_polling_presence(observer_user, target_user=observed_user) + + @defer.inlineCallbacks + def deny_presence(self, observed_user, observer_user): + yield self.store.del_presence_list( + observer_user.localpart, observed_user.to_string() + ) + + # TODO(paul): Inform the user somehow? + + @defer.inlineCallbacks + def drop(self, observed_user, observer_user): + if not observer_user.is_mine: + raise SynapseError(400, "User is not hosted on this Home Server") + + yield self.store.del_presence_list( + observer_user.localpart, observed_user.to_string() + ) + + self.stop_polling_presence(observer_user, target_user=observed_user) + + @defer.inlineCallbacks + def get_presence_list(self, observer_user, accepted=None): + if not observer_user.is_mine: + raise SynapseError(400, "User is not hosted on this Home Server") + + presence = yield self.store.get_presence_list( + observer_user.localpart, accepted=accepted + ) + + for p in presence: + observed_user = self.hs.parse_userid(p.pop("observed_user_id")) + p["observed_user"] = observed_user + p.update(self._get_or_offline_usercache(observed_user).get_state()) + + defer.returnValue(presence) + + @defer.inlineCallbacks + def start_polling_presence(self, user, target_user=None, state=None): + logger.debug("Start polling for presence from %s", user) + + if target_user: + target_users = [target_user] + else: + presence = yield self.store.get_presence_list( + user.localpart, accepted=True + ) + target_users = [ + self.hs.parse_userid(x["observed_user_id"]) for x in presence + ] + + if state is None: + state = yield self.store.get_presence_state(user.localpart) + + localusers, remoteusers = partitionbool( + target_users, + lambda u: u.is_mine + ) + + for target_user in localusers: + self._start_polling_local(user, target_user) + + deferreds = [] + remoteusers_by_domain = partition(remoteusers, lambda u: u.domain) + for domain in remoteusers_by_domain: + remoteusers = remoteusers_by_domain[domain] + + deferreds.append(self._start_polling_remote( + user, domain, remoteusers + )) + + yield defer.DeferredList(deferreds) + + def _start_polling_local(self, user, target_user): + target_localpart = target_user.localpart + + if not self.is_presence_visible(observer_user=user, + observed_user=target_user): + return + + if target_localpart not in self._local_pushmap: + self._local_pushmap[target_localpart] = set() + + self._local_pushmap[target_localpart].add(user) + + self.push_update_to_clients( + observer_user=user, + observed_user=target_user, + statuscache=self._get_or_offline_usercache(target_user), + ) + + def _start_polling_remote(self, user, domain, remoteusers): + for u in remoteusers: + if u not in self._remote_recvmap: + self._remote_recvmap[u] = set() + + self._remote_recvmap[u].add(user) + + return self.federation.send_edu( + destination=domain, + edu_type="m.presence", + content={"poll": [u.to_string() for u in remoteusers]} + ) + + def stop_polling_presence(self, user, target_user=None): + logger.debug("Stop polling for presence from %s", user) + + if not target_user or target_user.is_mine: + self._stop_polling_local(user, target_user=target_user) + + deferreds = [] + + if target_user: + raise NotImplementedError("TODO: remove one user") + + remoteusers = [u for u in self._remote_recvmap + if user in self._remote_recvmap[u]] + remoteusers_by_domain = partition(remoteusers, lambda u: u.domain) + + for domain in remoteusers_by_domain: + remoteusers = remoteusers_by_domain[domain] + + deferreds.append( + self._stop_polling_remote(user, domain, remoteusers) + ) + + return defer.DeferredList(deferreds) + + def _stop_polling_local(self, user, target_user): + for localpart in self._local_pushmap.keys(): + if target_user and localpart != target_user.localpart: + continue + + if user in self._local_pushmap[localpart]: + self._local_pushmap[localpart].remove(user) + + if not self._local_pushmap[localpart]: + del self._local_pushmap[localpart] + + def _stop_polling_remote(self, user, domain, remoteusers): + for u in remoteusers: + self._remote_recvmap[u].remove(user) + + if not self._remote_recvmap[u]: + del self._remote_recvmap[u] + + return self.federation.send_edu( + destination=domain, + edu_type="m.presence", + content={"unpoll": [u.to_string() for u in remoteusers]} + ) + + @defer.inlineCallbacks + def push_presence(self, user, statuscache): + assert(user.is_mine) + + logger.debug("Pushing presence update from %s", user) + + localusers = set(self._local_pushmap.get(user.localpart, set())) + remotedomains = set(self._remote_sendmap.get(user.localpart, set())) + + # Reflect users' status changes back to themselves, so UIs look nice + # and also user is informed of server-forced pushes + localusers.add(user) + + rm_handler = self.homeserver.get_handlers().room_member_handler + room_ids = yield rm_handler.get_rooms_for_user(user) + + for room_id in room_ids: + yield rm_handler.fetch_room_distributions_into( + room_id, localusers=localusers, remotedomains=remotedomains, + ignore_user=user, + ) + + if not localusers and not remotedomains: + defer.returnValue(None) + + yield self._send_presence_to_distribution(user, + localusers=localusers, remotedomains=remotedomains, + statuscache=statuscache + ) + + def _send_presence(self, srcuser, destuser, statuscache): + if destuser.is_mine: + self.push_update_to_clients( + observer_user=destuser, + observed_user=srcuser, + statuscache=statuscache) + return defer.succeed(None) + else: + return self._push_presence_remote(srcuser, destuser.domain, + state=statuscache.get_state() + ) + + @defer.inlineCallbacks + def _send_presence_to_distribution(self, srcuser, localusers=set(), + remotedomains=set(), statuscache=None): + + for u in localusers: + logger.debug(" | push to local user %s", u) + self.push_update_to_clients( + observer_user=u, + observed_user=srcuser, + statuscache=statuscache, + ) + + deferreds = [] + for domain in remotedomains: + logger.debug(" | push to remote domain %s", domain) + deferreds.append(self._push_presence_remote(srcuser, domain, + state=statuscache.get_state()) + ) + + yield defer.DeferredList(deferreds) + + @defer.inlineCallbacks + def _push_presence_remote(self, user, destination, state=None): + if state is None: + state = yield self.store.get_presence_state(user.localpart) + yield self.distributor.fire( + "collect_presencelike_data", user, state + ) + + yield self.federation.send_edu( + destination=destination, + edu_type="m.presence", + content={ + "push": [ + dict(user_id=user.to_string(), **state), + ], + } + ) + + @defer.inlineCallbacks + def incoming_presence(self, origin, content): + deferreds = [] + + for push in content.get("push", []): + user = self.hs.parse_userid(push["user_id"]) + + logger.debug("Incoming presence update from %s", user) + + observers = set(self._remote_recvmap.get(user, set())) + + rm_handler = self.homeserver.get_handlers().room_member_handler + room_ids = yield rm_handler.get_rooms_for_user(user) + + for room_id in room_ids: + yield rm_handler.fetch_room_distributions_into( + room_id, localusers=observers, ignore_user=user + ) + + if not observers: + break + + state = dict(push) + del state["user_id"] + + statuscache = self._get_or_make_usercache(user) + + self._user_cachemap_latest_serial += 1 + statuscache.update(state, serial=self._user_cachemap_latest_serial) + + for observer_user in observers: + self.push_update_to_clients( + observer_user=observer_user, + observed_user=user, + statuscache=statuscache, + ) + + if state["state"] == PresenceState.OFFLINE: + del self._user_cachemap[user] + + for poll in content.get("poll", []): + user = self.hs.parse_userid(poll) + + if not user.is_mine: + continue + + # TODO(paul) permissions checks + + if not user in self._remote_sendmap: + self._remote_sendmap[user] = set() + + self._remote_sendmap[user].add(origin) + + deferreds.append(self._push_presence_remote(user, origin)) + + for unpoll in content.get("unpoll", []): + user = self.hs.parse_userid(unpoll) + + if not user.is_mine: + continue + + if user in self._remote_sendmap: + self._remote_sendmap[user].remove(origin) + + if not self._remote_sendmap[user]: + del self._remote_sendmap[user] + + yield defer.DeferredList(deferreds) + + def push_update_to_clients(self, observer_user, observed_user, + statuscache): + self.notifier.on_new_user_event( + observer_user.to_string(), + event_data=statuscache.make_event(user=observed_user), + stream_type=PresenceStreamData, + store_id=statuscache.serial + ) + + +class PresenceStreamData(StreamData): + def __init__(self, hs): + super(PresenceStreamData, self).__init__(hs) + self.presence = hs.get_handlers().presence_handler + + def get_rows(self, user_id, from_key, to_key, limit): + cachemap = self.presence._user_cachemap + + # TODO(paul): limit, and filter by visibility + updates = [(k, cachemap[k]) for k in cachemap + if from_key < cachemap[k].serial <= to_key] + + if updates: + latest_serial = max([x[1].serial for x in updates]) + data = [x[1].make_event(user=x[0]) for x in updates] + return ((data, latest_serial)) + else: + return (([], self.presence._user_cachemap_latest_serial)) + + def max_token(self): + return self.presence._user_cachemap_latest_serial + +PresenceStreamData.EVENT_TYPE = PresenceStreamData + + +class UserPresenceCache(object): + """Store an observed user's state and status message. + + Includes the update timestamp. + """ + def __init__(self): + self.state = {} + self.serial = None + + def update(self, state, serial): + self.state.update(state) + # Delete keys that are now 'None' + for k in self.state.keys(): + if self.state[k] is None: + del self.state[k] + + self.serial = serial + + if "status_msg" in state: + self.status_msg = state["status_msg"] + else: + self.status_msg = None + + def get_state(self): + # clone it so caller can't break our cache + return dict(self.state) + + def make_event(self, user): + content = self.get_state() + content["user_id"] = user.to_string() + + return {"type": "m.presence", "content": content} diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py new file mode 100644 index 0000000000..a27206b002 --- /dev/null +++ b/synapse/handlers/profile.py @@ -0,0 +1,169 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 matrix.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from twisted.internet import defer + +from synapse.api.errors import SynapseError, AuthError + +from synapse.api.errors import CodeMessageException + +from ._base import BaseHandler + +import logging + + +logger = logging.getLogger(__name__) + +PREFIX = "/matrix/client/api/v1" + + +class ProfileHandler(BaseHandler): + + def __init__(self, hs): + super(ProfileHandler, self).__init__(hs) + + self.client = hs.get_http_client() + + distributor = hs.get_distributor() + self.distributor = distributor + + distributor.observe("registered_user", self.registered_user) + + distributor.observe( + "collect_presencelike_data", self.collect_presencelike_data + ) + + def registered_user(self, user): + self.store.create_profile(user.localpart) + + @defer.inlineCallbacks + def get_displayname(self, target_user, local_only=False): + if target_user.is_mine: + displayname = yield self.store.get_profile_displayname( + target_user.localpart + ) + + defer.returnValue(displayname) + elif not local_only: + # TODO(paul): This should use the server-server API to ask another + # HS. For now we'll just have it use the http client to talk to the + # other HS's REST client API + path = PREFIX + "/profile/%s/displayname?local_only=1" % ( + target_user.to_string() + ) + + try: + result = yield self.client.get_json( + destination=target_user.domain, + path=path + ) + except CodeMessageException as e: + if e.code != 404: + logger.exception("Failed to get displayname") + + raise + except: + logger.exception("Failed to get displayname") + + defer.returnValue(result["displayname"]) + else: + raise SynapseError(400, "User is not hosted on this Home Server") + + @defer.inlineCallbacks + def set_displayname(self, target_user, auth_user, new_displayname): + """target_user is the user whose displayname is to be changed; + auth_user is the user attempting to make this change.""" + if not target_user.is_mine: + raise SynapseError(400, "User is not hosted on this Home Server") + + if target_user != auth_user: + raise AuthError(400, "Cannot set another user's displayname") + + yield self.store.set_profile_displayname( + target_user.localpart, new_displayname + ) + + yield self.distributor.fire( + "changed_presencelike_data", target_user, { + "displayname": new_displayname, + } + ) + + @defer.inlineCallbacks + def get_avatar_url(self, target_user, local_only=False): + if target_user.is_mine: + avatar_url = yield self.store.get_profile_avatar_url( + target_user.localpart + ) + + defer.returnValue(avatar_url) + elif not local_only: + # TODO(paul): This should use the server-server API to ask another + # HS. For now we'll just have it use the http client to talk to the + # other HS's REST client API + destination = target_user.domain + path = PREFIX + "/profile/%s/avatar_url?local_only=1" % ( + target_user.to_string(), + ) + + try: + result = yield self.client.get_json( + destination=destination, + path=path + ) + except CodeMessageException as e: + if e.code != 404: + logger.exception("Failed to get avatar_url") + raise + except: + logger.exception("Failed to get avatar_url") + + defer.returnValue(result["avatar_url"]) + else: + raise SynapseError(400, "User is not hosted on this Home Server") + + @defer.inlineCallbacks + def set_avatar_url(self, target_user, auth_user, new_avatar_url): + """target_user is the user whose avatar_url is to be changed; + auth_user is the user attempting to make this change.""" + if not target_user.is_mine: + raise SynapseError(400, "User is not hosted on this Home Server") + + if target_user != auth_user: + raise AuthError(400, "Cannot set another user's avatar_url") + + yield self.store.set_profile_avatar_url( + target_user.localpart, new_avatar_url + ) + + yield self.distributor.fire( + "changed_presencelike_data", target_user, { + "avatar_url": new_avatar_url, + } + ) + + @defer.inlineCallbacks + def collect_presencelike_data(self, user, state): + if not user.is_mine: + defer.returnValue(None) + + (displayname, avatar_url) = yield defer.gatherResults([ + self.store.get_profile_displayname(user.localpart), + self.store.get_profile_avatar_url(user.localpart), + ]) + + state["displayname"] = displayname + state["avatar_url"] = avatar_url + + defer.returnValue(None) diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py new file mode 100644 index 0000000000..246c1f6530 --- /dev/null +++ b/synapse/handlers/register.py @@ -0,0 +1,100 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 matrix.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Contains functions for registering clients.""" +from twisted.internet import defer + +from synapse.types import UserID +from synapse.api.errors import SynapseError, RegistrationError +from ._base import BaseHandler +import synapse.util.stringutils as stringutils + +import base64 +import bcrypt + + +class RegistrationHandler(BaseHandler): + + def __init__(self, hs): + super(RegistrationHandler, self).__init__(hs) + + self.distributor = hs.get_distributor() + self.distributor.declare("registered_user") + + @defer.inlineCallbacks + def register(self, localpart=None, password=None): + """Registers a new client on the server. + + Args: + localpart : The local part of the user ID to register. If None, + one will be randomly generated. + password (str) : The password to assign to this user so they can + login again. + Returns: + A tuple of (user_id, access_token). + Raises: + RegistrationError if there was a problem registering. + """ + password_hash = None + if password: + password_hash = bcrypt.hashpw(password, bcrypt.gensalt()) + + if localpart: + user = UserID(localpart, self.hs.hostname, True) + user_id = user.to_string() + + token = self._generate_token(user_id) + yield self.store.register(user_id=user_id, + token=token, + password_hash=password_hash) + + self.distributor.fire("registered_user", user) + defer.returnValue((user_id, token)) + else: + # autogen a random user ID + attempts = 0 + user_id = None + token = None + while not user_id and not token: + try: + localpart = self._generate_user_id() + user = UserID(localpart, self.hs.hostname, True) + user_id = user.to_string() + + token = self._generate_token(user_id) + yield self.store.register( + user_id=user_id, + token=token, + password_hash=password_hash) + + self.distributor.fire("registered_user", user) + defer.returnValue((user_id, token)) + except SynapseError: + # if user id is taken, just generate another + user_id = None + token = None + attempts += 1 + if attempts > 5: + raise RegistrationError( + 500, "Cannot generate user ID.") + + def _generate_token(self, user_id): + # urlsafe variant uses _ and - so use . as the separator and replace + # all =s with .s so http clients don't quote =s when it is used as + # query params. + return (base64.urlsafe_b64encode(user_id).replace('=', '.') + '.' + + stringutils.random_string(18)) + + def _generate_user_id(self): + return "-" + stringutils.random_string(18) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py new file mode 100644 index 0000000000..4d82b33993 --- /dev/null +++ b/synapse/handlers/room.py @@ -0,0 +1,808 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 matrix.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Contains functions for performing events on rooms.""" +from twisted.internet import defer + +from synapse.types import UserID, RoomAlias, RoomID +from synapse.api.constants import Membership +from synapse.api.errors import RoomError, StoreError, SynapseError +from synapse.api.events.room import ( + RoomTopicEvent, MessageEvent, InviteJoinEvent, RoomMemberEvent, + RoomConfigEvent +) +from synapse.api.streams.event import EventStream, MessagesStreamData +from synapse.util import stringutils +from ._base import BaseHandler + +import logging +import json + +logger = logging.getLogger(__name__) + + +class MessageHandler(BaseHandler): + + def __init__(self, hs): + super(MessageHandler, self).__init__(hs) + self.hs = hs + self.clock = hs.get_clock() + self.event_factory = hs.get_event_factory() + + @defer.inlineCallbacks + def get_message(self, msg_id=None, room_id=None, sender_id=None, + user_id=None): + """ Retrieve a message. + + Args: + msg_id (str): The message ID to obtain. + room_id (str): The room where the message resides. + sender_id (str): The user ID of the user who sent the message. + user_id (str): The user ID of the user making this request. + Returns: + The message, or None if no message exists. + Raises: + SynapseError if something went wrong. + """ + yield self.auth.check_joined_room(room_id, user_id) + + # Pull out the message from the db + msg = yield self.store.get_message(room_id=room_id, + msg_id=msg_id, + user_id=sender_id) + + if msg: + defer.returnValue(msg) + defer.returnValue(None) + + @defer.inlineCallbacks + def send_message(self, event=None, suppress_auth=False, stamp_event=True): + """ Send a message. + + Args: + event : The message event to store. + suppress_auth (bool) : True to suppress auth for this message. This + is primarily so the home server can inject messages into rooms at + will. + stamp_event (bool) : True to stamp event content with server keys. + Raises: + SynapseError if something went wrong. + """ + if stamp_event: + event.content["hsob_ts"] = int(self.clock.time_msec()) + + with (yield self.room_lock.lock(event.room_id)): + if not suppress_auth: + yield self.auth.check(event, raises=True) + + # store message in db + store_id = yield self.store.persist_event(event) + + event.destinations = yield self.store.get_joined_hosts_for_room( + event.room_id + ) + + yield self.hs.get_federation().handle_new_event(event) + + self.notifier.on_new_room_event(event, store_id) + + @defer.inlineCallbacks + def get_messages(self, user_id=None, room_id=None, pagin_config=None, + feedback=False): + """Get messages in a room. + + Args: + user_id (str): The user requesting messages. + room_id (str): The room they want messages from. + pagin_config (synapse.api.streams.PaginationConfig): The pagination + config rules to apply, if any. + feedback (bool): True to get compressed feedback with the messages + Returns: + dict: Pagination API results + """ + yield self.auth.check_joined_room(room_id, user_id) + + data_source = [MessagesStreamData(self.hs, room_id=room_id, + feedback=feedback)] + event_stream = EventStream(user_id, data_source) + pagin_config = yield event_stream.fix_tokens(pagin_config) + data_chunk = yield event_stream.get_chunk(config=pagin_config) + defer.returnValue(data_chunk) + + @defer.inlineCallbacks + def store_room_data(self, event=None, stamp_event=True): + """ Stores data for a room. + + Args: + event : The room path event + stamp_event (bool) : True to stamp event content with server keys. + Raises: + SynapseError if something went wrong. + """ + + with (yield self.room_lock.lock(event.room_id)): + yield self.auth.check(event, raises=True) + + if stamp_event: + event.content["hsob_ts"] = int(self.clock.time_msec()) + + yield self.state_handler.handle_new_event(event) + + # store in db + store_id = yield self.store.store_room_data( + room_id=event.room_id, + etype=event.type, + state_key=event.state_key, + content=json.dumps(event.content) + ) + + event.destinations = yield self.store.get_joined_hosts_for_room( + event.room_id + ) + self.notifier.on_new_room_event(event, store_id) + + yield self.hs.get_federation().handle_new_event(event) + + @defer.inlineCallbacks + def get_room_data(self, user_id=None, room_id=None, + event_type=None, state_key="", + public_room_rules=[], + private_room_rules=["join"]): + """ Get data from a room. + + Args: + event : The room path event + public_room_rules : A list of membership states the user can be in, + in order to read this data IN A PUBLIC ROOM. An empty list means + 'any state'. + private_room_rules : A list of membership states the user can be + in, in order to read this data IN A PRIVATE ROOM. An empty list + means 'any state'. + Returns: + The path data content. + Raises: + SynapseError if something went wrong. + """ + if event_type == RoomTopicEvent.TYPE: + # anyone invited/joined can read the topic + private_room_rules = ["invite", "join"] + + # does this room exist + room = yield self.store.get_room(room_id) + if not room: + raise RoomError(403, "Room does not exist.") + + # does this user exist in this room + member = yield self.store.get_room_member( + room_id=room_id, + user_id="" if not user_id else user_id) + + member_state = member.membership if member else None + + if room.is_public and public_room_rules: + # make sure the user meets public room rules + if member_state not in public_room_rules: + raise RoomError(403, "Member does not meet public room rules.") + elif not room.is_public and private_room_rules: + # make sure the user meets private room rules + if member_state not in private_room_rules: + raise RoomError( + 403, "Member does not meet private room rules.") + + data = yield self.store.get_room_data(room_id, event_type, state_key) + defer.returnValue(data) + + @defer.inlineCallbacks + def get_feedback(self, room_id=None, msg_sender_id=None, msg_id=None, + user_id=None, fb_sender_id=None, fb_type=None): + yield self.auth.check_joined_room(room_id, user_id) + + # Pull out the feedback from the db + fb = yield self.store.get_feedback( + room_id=room_id, msg_id=msg_id, msg_sender_id=msg_sender_id, + fb_sender_id=fb_sender_id, fb_type=fb_type + ) + + if fb: + defer.returnValue(fb) + defer.returnValue(None) + + @defer.inlineCallbacks + def send_feedback(self, event, stamp_event=True): + if stamp_event: + event.content["hsob_ts"] = int(self.clock.time_msec()) + + with (yield self.room_lock.lock(event.room_id)): + yield self.auth.check(event, raises=True) + + # store message in db + store_id = yield self.store.persist_event(event) + + event.destinations = yield self.store.get_joined_hosts_for_room( + event.room_id + ) + yield self.hs.get_federation().handle_new_event(event) + + self.notifier.on_new_room_event(event, store_id) + + @defer.inlineCallbacks + def snapshot_all_rooms(self, user_id=None, pagin_config=None, + feedback=False): + """Retrieve a snapshot of all rooms the user is invited or has joined. + + This snapshot may include messages for all rooms where the user is + joined, depending on the pagination config. + + Args: + user_id (str): The ID of the user making the request. + pagin_config (synapse.api.streams.PaginationConfig): The pagination + config used to determine how many messages *PER ROOM* to return. + feedback (bool): True to get feedback along with these messages. + Returns: + A list of dicts with "room_id" and "membership" keys for all rooms + the user is currently invited or joined in on. Rooms where the user + is joined on, may return a "messages" key with messages, depending + on the specified PaginationConfig. + """ + room_list = yield self.store.get_rooms_for_user_where_membership_is( + user_id=user_id, + membership_list=[Membership.INVITE, Membership.JOIN] + ) + for room_info in room_list: + if room_info["membership"] != Membership.JOIN: + continue + try: + event_chunk = yield self.get_messages( + user_id=user_id, + pagin_config=pagin_config, + feedback=feedback, + room_id=room_info["room_id"] + ) + room_info["messages"] = event_chunk + except: + pass + defer.returnValue(room_list) + + +class RoomCreationHandler(BaseHandler): + + @defer.inlineCallbacks + def create_room(self, user_id, room_id, config): + """ Creates a new room. + + Args: + user_id (str): The ID of the user creating the new room. + room_id (str): The proposed ID for the new room. Can be None, in + which case one will be created for you. + config (dict) : A dict of configuration options. + Returns: + The new room ID. + Raises: + SynapseError if the room ID was taken, couldn't be stored, or + something went horribly wrong. + """ + + if "room_alias_name" in config: + room_alias = RoomAlias.create_local( + config["room_alias_name"], + self.hs + ) + mapping = yield self.store.get_association_from_room_alias( + room_alias + ) + + if mapping: + raise SynapseError(400, "Room alias already taken") + else: + room_alias = None + + if room_id: + # Ensure room_id is the correct type + room_id_obj = RoomID.from_string(room_id, self.hs) + if not room_id_obj.is_mine: + raise SynapseError(400, "Room id must be local") + + yield self.store.store_room( + room_id=room_id, + room_creator_user_id=user_id, + is_public=config["visibility"] == "public" + ) + else: + # autogen room IDs and try to create it. We may clash, so just + # try a few times till one goes through, giving up eventually. + attempts = 0 + room_id = None + while attempts < 5: + try: + random_string = stringutils.random_string(18) + gen_room_id = RoomID.create_local(random_string, self.hs) + yield self.store.store_room( + room_id=gen_room_id.to_string(), + room_creator_user_id=user_id, + is_public=config["visibility"] == "public" + ) + room_id = gen_room_id.to_string() + break + except StoreError: + attempts += 1 + if not room_id: + raise StoreError(500, "Couldn't generate a room ID.") + + config_event = self.event_factory.create_event( + etype=RoomConfigEvent.TYPE, + room_id=room_id, + user_id=user_id, + content=config, + ) + + if room_alias: + yield self.store.create_room_alias_association( + room_id=room_id, + room_alias=room_alias, + servers=[self.hs.hostname], + ) + + yield self.state_handler.handle_new_event(config_event) + # store_id = persist... + + yield self.hs.get_federation().handle_new_event(config_event) + # self.notifier.on_new_room_event(event, store_id) + + content = {"membership": Membership.JOIN} + join_event = self.event_factory.create_event( + etype=RoomMemberEvent.TYPE, + target_user_id=user_id, + room_id=room_id, + user_id=user_id, + membership=Membership.JOIN, + content=content + ) + + yield self.hs.get_handlers().room_member_handler.change_membership( + join_event, + broadcast_msg=True, + do_auth=False + ) + + result = {"room_id": room_id} + if room_alias: + result["room_alias"] = room_alias.to_string() + + defer.returnValue(result) + + +class RoomMemberHandler(BaseHandler): + # TODO(paul): This handler currently contains a messy conflation of + # low-level API that works on UserID objects and so on, and REST-level + # API that takes ID strings and returns pagination chunks. These concerns + # ought to be separated out a lot better. + + def __init__(self, hs): + super(RoomMemberHandler, self).__init__(hs) + + self.clock = hs.get_clock() + + self.distributor = hs.get_distributor() + self.distributor.declare("user_joined_room") + + @defer.inlineCallbacks + def get_room_members(self, room_id, membership=Membership.JOIN): + hs = self.hs + + memberships = yield self.store.get_room_members( + room_id=room_id, membership=membership + ) + + defer.returnValue([hs.parse_userid(m.user_id) for m in memberships]) + + @defer.inlineCallbacks + def fetch_room_distributions_into(self, room_id, localusers=None, + remotedomains=None, ignore_user=None): + """Fetch the distribution of a room, adding elements to either + 'localusers' or 'remotedomains', which should be a set() if supplied. + If ignore_user is set, ignore that user. + + This function returns nothing; its result is performed by the + side-effect on the two passed sets. This allows easy accumulation of + member lists of multiple rooms at once if required. + """ + members = yield self.get_room_members(room_id) + for member in members: + if ignore_user is not None and member == ignore_user: + continue + + if member.is_mine: + if localusers is not None: + localusers.add(member) + else: + if remotedomains is not None: + remotedomains.add(member.domain) + + @defer.inlineCallbacks + def get_room_members_as_pagination_chunk(self, room_id=None, user_id=None, + limit=0, start_tok=None, + end_tok=None): + """Retrieve a list of room members in the room. + + Args: + room_id (str): The room to get the member list for. + user_id (str): The ID of the user making the request. + limit (int): The max number of members to return. + start_tok (str): Optional. The start token if known. + end_tok (str): Optional. The end token if known. + Returns: + dict: A Pagination streamable dict. + Raises: + SynapseError if something goes wrong. + """ + yield self.auth.check_joined_room(room_id, user_id) + + member_list = yield self.store.get_room_members(room_id=room_id) + event_list = [ + entry.as_event(self.event_factory).get_dict() + for entry in member_list + ] + chunk_data = { + "start": "START", + "end": "END", + "chunk": event_list + } + # TODO honor Pagination stream params + # TODO snapshot this list to return on subsequent requests when + # paginating + defer.returnValue(chunk_data) + + @defer.inlineCallbacks + def get_room_member(self, room_id, member_user_id, auth_user_id): + """Retrieve a room member from a room. + + Args: + room_id : The room the member is in. + member_user_id : The member's user ID + auth_user_id : The user ID of the user making this request. + Returns: + The room member, or None if this member does not exist. + Raises: + SynapseError if something goes wrong. + """ + yield self.auth.check_joined_room(room_id, auth_user_id) + + member = yield self.store.get_room_member(user_id=member_user_id, + room_id=room_id) + defer.returnValue(member) + + @defer.inlineCallbacks + def change_membership(self, event=None, broadcast_msg=False, do_auth=True): + """ Change the membership status of a user in a room. + + Args: + event (SynapseEvent): The membership event + broadcast_msg (bool): True to inject a membership message into this + room on success. + Raises: + SynapseError if there was a problem changing the membership. + """ + + #broadcast_msg = False + + prev_state = yield self.store.get_room_member( + event.target_user_id, event.room_id + ) + + if prev_state and prev_state.membership == event.membership: + # treat this event as a NOOP. + if do_auth: # This is mainly to fix a unit test. + yield self.auth.check(event, raises=True) + defer.returnValue({}) + return + + room_id = event.room_id + + # If we're trying to join a room then we have to do this differently + # if this HS is not currently in the room, i.e. we have to do the + # invite/join dance. + if event.membership == Membership.JOIN: + yield self._do_join( + event, do_auth=do_auth, broadcast_msg=broadcast_msg + ) + else: + # This is not a JOIN, so we can handle it normally. + if do_auth: + yield self.auth.check(event, raises=True) + + prev_state = yield self.store.get_room_member( + event.target_user_id, event.room_id + ) + if prev_state and prev_state.membership == event.membership: + # double same action, treat this event as a NOOP. + defer.returnValue({}) + return + + yield self.state_handler.handle_new_event(event) + yield self._do_local_membership_update( + event, + membership=event.content["membership"], + broadcast_msg=broadcast_msg, + ) + + defer.returnValue({"room_id": room_id}) + + @defer.inlineCallbacks + def join_room_alias(self, joinee, room_alias, do_auth=True, content={}): + directory_handler = self.hs.get_handlers().directory_handler + mapping = yield directory_handler.get_association(room_alias) + + if not mapping: + raise SynapseError(404, "No such room alias") + + room_id = mapping["room_id"] + hosts = mapping["servers"] + if not hosts: + raise SynapseError(404, "No known servers") + + host = hosts[0] + + content.update({"membership": Membership.JOIN}) + new_event = self.event_factory.create_event( + etype=RoomMemberEvent.TYPE, + target_user_id=joinee.to_string(), + room_id=room_id, + user_id=joinee.to_string(), + membership=Membership.JOIN, + content=content, + ) + + yield self._do_join(new_event, room_host=host, do_auth=True) + + defer.returnValue({"room_id": room_id}) + + @defer.inlineCallbacks + def _do_join(self, event, room_host=None, do_auth=True, broadcast_msg=True): + joinee = self.hs.parse_userid(event.target_user_id) + # room_id = RoomID.from_string(event.room_id, self.hs) + room_id = event.room_id + + # If event doesn't include a display name, add one. + yield self._fill_out_join_content( + joinee, event.content + ) + + # XXX: We don't do an auth check if we are doing an invite + # join dance for now, since we're kinda implicitly checking + # that we are allowed to join when we decide whether or not we + # need to do the invite/join dance. + + room = yield self.store.get_room(room_id) + + if room: + should_do_dance = False + elif room_host: + should_do_dance = True + else: + prev_state = yield self.store.get_room_member( + joinee.to_string(), room_id + ) + + if prev_state and prev_state.membership == Membership.INVITE: + room = yield self.store.get_room(room_id) + inviter = UserID.from_string( + prev_state.sender, self.hs + ) + + should_do_dance = not inviter.is_mine and not room + room_host = inviter.domain + else: + should_do_dance = False + + # We want to do the _do_update inside the room lock. + if not should_do_dance: + logger.debug("Doing normal join") + + if do_auth: + yield self.auth.check(event, raises=True) + + yield self.state_handler.handle_new_event(event) + yield self._do_local_membership_update( + event, + membership=event.content["membership"], + broadcast_msg=broadcast_msg, + ) + + + if should_do_dance: + yield self._do_invite_join_dance( + room_id=room_id, + joinee=event.user_id, + target_host=room_host, + content=event.content, + ) + + user = self.hs.parse_userid(event.user_id) + self.distributor.fire( + "user_joined_room", user=user, room_id=room_id + ) + + @defer.inlineCallbacks + def _fill_out_join_content(self, user_id, content): + # If event doesn't include a display name, add one. + profile_handler = self.hs.get_handlers().profile_handler + if "displayname" not in content: + try: + display_name = yield profile_handler.get_displayname( + user_id + ) + + if display_name: + content["displayname"] = display_name + except: + logger.exception("Failed to set display_name") + + if "avatar_url" not in content: + try: + avatar_url = yield profile_handler.get_avatar_url( + user_id + ) + + if avatar_url: + content["avatar_url"] = avatar_url + except: + logger.exception("Failed to set display_name") + + @defer.inlineCallbacks + def _should_invite_join(self, room_id, prev_state, do_auth): + logger.debug("_should_invite_join: room_id: %s", room_id) + + # XXX: We don't do an auth check if we are doing an invite + # join dance for now, since we're kinda implicitly checking + # that we are allowed to join when we decide whether or not we + # need to do the invite/join dance. + + # Only do an invite join dance if a) we were invited, + # b) the person inviting was from a differnt HS and c) we are + # not currently in the room + room_host = None + if prev_state and prev_state.membership == Membership.INVITE: + room = yield self.store.get_room(room_id) + inviter = UserID.from_string( + prev_state.sender, self.hs + ) + + is_remote_invite_join = not inviter.is_mine and not room + room_host = inviter.domain + else: + is_remote_invite_join = False + + defer.returnValue((is_remote_invite_join, room_host)) + + @defer.inlineCallbacks + def get_rooms_for_user(self, user, membership_list=[Membership.JOIN]): + """Returns a list of roomids that the user has any of the given + membership states in.""" + rooms = yield self.store.get_rooms_for_user_where_membership_is( + user_id=user.to_string(), membership_list=membership_list + ) + + defer.returnValue([r["room_id"] for r in rooms]) + + @defer.inlineCallbacks + def _do_local_membership_update(self, event, membership, broadcast_msg): + # store membership + store_id = yield self.store.store_room_member( + user_id=event.target_user_id, + sender=event.user_id, + room_id=event.room_id, + content=event.content, + membership=membership + ) + + # Send a PDU to all hosts who have joined the room. + destinations = yield self.store.get_joined_hosts_for_room( + event.room_id + ) + + # If we're inviting someone, then we should also send it to that + # HS. + if membership == Membership.INVITE: + host = UserID.from_string( + event.target_user_id, self.hs + ).domain + destinations.append(host) + + # If we are joining a remote HS, include that. + if membership == Membership.JOIN: + host = UserID.from_string( + event.target_user_id, self.hs + ).domain + destinations.append(host) + + event.destinations = list(set(destinations)) + + yield self.hs.get_federation().handle_new_event(event) + self.notifier.on_new_room_event(event, store_id) + + if broadcast_msg: + yield self._inject_membership_msg( + source=event.user_id, + target=event.target_user_id, + room_id=event.room_id, + membership=event.content["membership"] + ) + + @defer.inlineCallbacks + def _do_invite_join_dance(self, room_id, joinee, target_host, content): + logger.debug("Doing remote join dance") + + # do invite join dance + federation = self.hs.get_federation() + new_event = self.event_factory.create_event( + etype=InviteJoinEvent.TYPE, + target_host=target_host, + room_id=room_id, + user_id=joinee, + content=content + ) + + new_event.destinations = [target_host] + + yield self.store.store_room( + room_id, "", is_public=False + ) + + #yield self.state_handler.handle_new_event(event) + yield federation.handle_new_event(new_event) + yield federation.get_state_for_room( + target_host, room_id + ) + + @defer.inlineCallbacks + def _inject_membership_msg(self, room_id=None, source=None, target=None, + membership=None): + # TODO this should be a different type of message, not m.text + if membership == Membership.INVITE: + body = "%s invited %s to the room." % (source, target) + elif membership == Membership.JOIN: + body = "%s joined the room." % (target) + elif membership == Membership.LEAVE: + body = "%s left the room." % (target) + else: + raise RoomError(500, "Unknown membership value %s" % membership) + + membership_json = { + "msgtype": u"m.text", + "body": body, + "membership_source": source, + "membership_target": target, + "membership": membership, + } + + msg_id = "m%s" % int(self.clock.time_msec()) + + event = self.event_factory.create_event( + etype=MessageEvent.TYPE, + room_id=room_id, + user_id="_homeserver_", + msg_id=msg_id, + content=membership_json + ) + + handler = self.hs.get_handlers().message_handler + yield handler.send_message(event, suppress_auth=True) + + +class RoomListHandler(BaseHandler): + + @defer.inlineCallbacks + def get_public_room_list(self): + chunk = yield self.store.get_rooms(is_public=True, with_topics=True) + defer.returnValue({"start": "START", "end": "END", "chunk": chunk}) |