diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py
new file mode 100644
index 0000000000..5688b68e49
--- /dev/null
+++ b/synapse/handlers/__init__.py
@@ -0,0 +1,46 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 matrix.org
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from .register import RegistrationHandler
+from .room import (
+ MessageHandler, RoomCreationHandler, RoomMemberHandler, RoomListHandler
+)
+from .events import EventStreamHandler
+from .federation import FederationHandler
+from .login import LoginHandler
+from .profile import ProfileHandler
+from .presence import PresenceHandler
+from .directory import DirectoryHandler
+
+
+class Handlers(object):
+
+ """ A collection of all the event handlers.
+
+ There's no need to lazily create these; we'll just make them all eagerly
+ at construction time.
+ """
+
+ def __init__(self, hs):
+ self.registration_handler = RegistrationHandler(hs)
+ self.message_handler = MessageHandler(hs)
+ self.room_creation_handler = RoomCreationHandler(hs)
+ self.room_member_handler = RoomMemberHandler(hs)
+ self.event_stream_handler = EventStreamHandler(hs)
+ self.federation_handler = FederationHandler(hs)
+ self.profile_handler = ProfileHandler(hs)
+ self.presence_handler = PresenceHandler(hs)
+ self.room_list_handler = RoomListHandler(hs)
+ self.login_handler = LoginHandler(hs)
+ self.directory_handler = DirectoryHandler(hs)
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
new file mode 100644
index 0000000000..87a392dd77
--- /dev/null
+++ b/synapse/handlers/_base.py
@@ -0,0 +1,26 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 matrix.org
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+class BaseHandler(object):
+
+ def __init__(self, hs):
+ self.store = hs.get_datastore()
+ self.event_factory = hs.get_event_factory()
+ self.auth = hs.get_auth()
+ self.notifier = hs.get_notifier()
+ self.room_lock = hs.get_room_lock_manager()
+ self.state_handler = hs.get_state_handler()
+ self.hs = hs
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
new file mode 100644
index 0000000000..456007c71d
--- /dev/null
+++ b/synapse/handlers/directory.py
@@ -0,0 +1,100 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 matrix.org
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+from ._base import BaseHandler
+
+from synapse.api.errors import SynapseError
+
+import logging
+import json
+import urllib
+
+
+logger = logging.getLogger(__name__)
+
+
+# TODO(erikj): This needs to be factored out somewere
+PREFIX = "/matrix/client/api/v1"
+
+
+class DirectoryHandler(BaseHandler):
+
+ def __init__(self, hs):
+ super(DirectoryHandler, self).__init__(hs)
+ self.hs = hs
+ self.http_client = hs.get_http_client()
+ self.clock = hs.get_clock()
+
+ @defer.inlineCallbacks
+ def create_association(self, room_alias, room_id, servers):
+ # TODO(erikj): Do auth.
+
+ if not room_alias.is_mine:
+ raise SynapseError(400, "Room alias must be local")
+ # TODO(erikj): Change this.
+
+ # TODO(erikj): Add transactions.
+
+ # TODO(erikj): Check if there is a current association.
+
+ yield self.store.create_room_alias_association(
+ room_alias,
+ room_id,
+ servers
+ )
+
+ @defer.inlineCallbacks
+ def get_association(self, room_alias, local_only=False):
+ # TODO(erikj): Do auth
+
+ room_id = None
+ if room_alias.is_mine:
+ result = yield self.store.get_association_from_room_alias(
+ room_alias
+ )
+
+ if result:
+ room_id = result.room_id
+ servers = result.servers
+ elif not local_only:
+ path = "%s/ds/room/%s?local_only=1" % (
+ PREFIX,
+ urllib.quote(room_alias.to_string())
+ )
+
+ result = None
+ try:
+ result = yield self.http_client.get_json(
+ destination=room_alias.domain,
+ path=path,
+ )
+ except:
+ # TODO(erikj): Handle this better?
+ logger.exception("Failed to get remote room alias")
+
+ if result and "room_id" in result and "servers" in result:
+ room_id = result["room_id"]
+ servers = result["servers"]
+
+ if not room_id:
+ defer.returnValue({})
+ return
+
+ defer.returnValue({
+ "room_id": room_id,
+ "servers": servers,
+ })
+ return
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
new file mode 100644
index 0000000000..79742a4e1c
--- /dev/null
+++ b/synapse/handlers/events.py
@@ -0,0 +1,149 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 matrix.org
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from twisted.internet import defer
+
+from ._base import BaseHandler
+from synapse.api.streams.event import (
+ EventStream, MessagesStreamData, RoomMemberStreamData, FeedbackStreamData,
+ RoomDataStreamData
+)
+from synapse.handlers.presence import PresenceStreamData
+
+
+class EventStreamHandler(BaseHandler):
+
+ stream_data_classes = [
+ MessagesStreamData,
+ RoomMemberStreamData,
+ FeedbackStreamData,
+ RoomDataStreamData,
+ PresenceStreamData,
+ ]
+
+ def __init__(self, hs):
+ super(EventStreamHandler, self).__init__(hs)
+
+ # Count of active streams per user
+ self._streams_per_user = {}
+ # Grace timers per user to delay the "stopped" signal
+ self._stop_timer_per_user = {}
+
+ self.distributor = hs.get_distributor()
+ self.distributor.declare("started_user_eventstream")
+ self.distributor.declare("stopped_user_eventstream")
+
+ self.clock = hs.get_clock()
+
+ def get_event_stream_token(self, stream_type, store_id, start_token):
+ """Return the next token after this event.
+
+ Args:
+ stream_type (str): The StreamData.EVENT_TYPE
+ store_id (int): The new storage ID assigned from the data store.
+ start_token (str): The token the user started with.
+ Returns:
+ str: The end token.
+ """
+ for i, stream_cls in enumerate(EventStreamHandler.stream_data_classes):
+ if stream_cls.EVENT_TYPE == stream_type:
+ # this is the stream for this event, so replace this part of
+ # the token
+ store_ids = start_token.split(EventStream.SEPARATOR)
+ store_ids[i] = str(store_id)
+ return EventStream.SEPARATOR.join(store_ids)
+ raise RuntimeError("Didn't find a stream type %s" % stream_type)
+
+ @defer.inlineCallbacks
+ def get_stream(self, auth_user_id, pagin_config, timeout=0):
+ """Gets events as an event stream for this user.
+
+ This function looks for interesting *events* for this user. This is
+ different from the notifier, which looks for interested *users* who may
+ want to know about a single event.
+
+ Args:
+ auth_user_id (str): The user requesting their event stream.
+ pagin_config (synapse.api.streams.PaginationConfig): The config to
+ use when obtaining the stream.
+ timeout (int): The max time to wait for an incoming event in ms.
+ Returns:
+ A pagination stream API dict
+ """
+ auth_user = self.hs.parse_userid(auth_user_id)
+
+ stream_id = object()
+
+ try:
+ if auth_user not in self._streams_per_user:
+ self._streams_per_user[auth_user] = 0
+ if auth_user in self._stop_timer_per_user:
+ self.clock.cancel_call_later(
+ self._stop_timer_per_user.pop(auth_user))
+ else:
+ self.distributor.fire(
+ "started_user_eventstream", auth_user
+ )
+ self._streams_per_user[auth_user] += 1
+
+ # construct an event stream with the correct data ordering
+ stream_data_list = []
+ for stream_class in EventStreamHandler.stream_data_classes:
+ stream_data_list.append(stream_class(self.hs))
+ event_stream = EventStream(auth_user_id, stream_data_list)
+
+ # fix unknown tokens to known tokens
+ pagin_config = yield event_stream.fix_tokens(pagin_config)
+
+ # register interest in receiving new events
+ self.notifier.store_events_for(user_id=auth_user_id,
+ stream_id=stream_id,
+ from_tok=pagin_config.from_tok)
+
+ # see if we can grab a chunk now
+ data_chunk = yield event_stream.get_chunk(config=pagin_config)
+
+ # if there are previous events, return those. If not, wait on the
+ # new events for 'timeout' seconds.
+ if len(data_chunk["chunk"]) == 0 and timeout != 0:
+ results = yield defer.maybeDeferred(
+ self.notifier.get_events_for,
+ user_id=auth_user_id,
+ stream_id=stream_id,
+ timeout=timeout
+ )
+ if results:
+ defer.returnValue(results)
+
+ defer.returnValue(data_chunk)
+ finally:
+ # cleanup
+ self.notifier.purge_events_for(user_id=auth_user_id,
+ stream_id=stream_id)
+
+ self._streams_per_user[auth_user] -= 1
+ if not self._streams_per_user[auth_user]:
+ del self._streams_per_user[auth_user]
+
+ # 10 seconds of grace to allow the client to reconnect again
+ # before we think they're gone
+ def _later():
+ self.distributor.fire(
+ "stopped_user_eventstream", auth_user
+ )
+ del self._stop_timer_per_user[auth_user]
+
+ self._stop_timer_per_user[auth_user] = (
+ self.clock.call_later(5, _later)
+ )
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
new file mode 100644
index 0000000000..12e7afca4c
--- /dev/null
+++ b/synapse/handlers/federation.py
@@ -0,0 +1,74 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 matrix.org
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Contains handlers for federation events."""
+
+from ._base import BaseHandler
+
+from synapse.api.events.room import InviteJoinEvent, RoomMemberEvent
+from synapse.api.constants import Membership
+from synapse.util.logutils import log_function
+
+from twisted.internet import defer
+
+import logging
+
+
+logger = logging.getLogger(__name__)
+
+
+class FederationHandler(BaseHandler):
+
+ """Handles events that originated from federation."""
+
+ @log_function
+ @defer.inlineCallbacks
+ def on_receive(self, event, is_new_state):
+ if hasattr(event, "state_key") and not is_new_state:
+ logger.debug("Ignoring old state.")
+ return
+
+ target_is_mine = False
+ if hasattr(event, "target_host"):
+ target_is_mine = event.target_host == self.hs.hostname
+
+ if event.type == InviteJoinEvent.TYPE:
+ if not target_is_mine:
+ logger.debug("Ignoring invite/join event %s", event)
+ return
+
+ # If we receive an invite/join event then we need to join the
+ # sender to the given room.
+ # TODO: We should probably auth this or some such
+ content = event.content
+ content.update({"membership": Membership.JOIN})
+ new_event = self.event_factory.create_event(
+ etype=RoomMemberEvent.TYPE,
+ target_user_id=event.user_id,
+ room_id=event.room_id,
+ user_id=event.user_id,
+ membership=Membership.JOIN,
+ content=content
+ )
+
+ yield self.hs.get_handlers().room_member_handler.change_membership(
+ new_event,
+ True
+ )
+
+ else:
+ with (yield self.room_lock.lock(event.room_id)):
+ store_id = yield self.store.persist_event(event)
+
+ yield self.notifier.on_new_room_event(event, store_id)
diff --git a/synapse/handlers/login.py b/synapse/handlers/login.py
new file mode 100644
index 0000000000..5a1acd7102
--- /dev/null
+++ b/synapse/handlers/login.py
@@ -0,0 +1,64 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 matrix.org
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from twisted.internet import defer
+
+from ._base import BaseHandler
+from synapse.api.errors import LoginError
+
+import bcrypt
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class LoginHandler(BaseHandler):
+
+ def __init__(self, hs):
+ super(LoginHandler, self).__init__(hs)
+ self.hs = hs
+
+ @defer.inlineCallbacks
+ def login(self, user, password):
+ """Login as the specified user with the specified password.
+
+ Args:
+ user (str): The user ID.
+ password (str): The password.
+ Returns:
+ The newly allocated access token.
+ Raises:
+ StoreError if there was a problem storing the token.
+ LoginError if there was an authentication problem.
+ """
+ # TODO do this better, it can't go in __init__ else it cyclic loops
+ if not hasattr(self, "reg_handler"):
+ self.reg_handler = self.hs.get_handlers().registration_handler
+
+ # pull out the hash for this user if they exist
+ user_info = yield self.store.get_user_by_id(user_id=user)
+ if not user_info:
+ logger.warn("Attempted to login as %s but they do not exist.", user)
+ raise LoginError(403, "")
+
+ stored_hash = user_info[0]["password_hash"]
+ if bcrypt.checkpw(password, stored_hash):
+ # generate an access token and store it.
+ token = self.reg_handler._generate_token(user)
+ logger.info("Adding token %s for user %s", token, user)
+ yield self.store.add_access_token_to_user(user, token)
+ defer.returnValue(token)
+ else:
+ logger.warn("Failed password login for user %s", user)
+ raise LoginError(403, "")
\ No newline at end of file
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
new file mode 100644
index 0000000000..38db4b1d67
--- /dev/null
+++ b/synapse/handlers/presence.py
@@ -0,0 +1,697 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 matrix.org
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from twisted.internet import defer
+
+from synapse.api.errors import SynapseError, AuthError
+from synapse.api.constants import PresenceState
+from synapse.api.streams import StreamData
+
+from ._base import BaseHandler
+
+import logging
+
+
+logger = logging.getLogger(__name__)
+
+
+# TODO(paul): Maybe there's one of these I can steal from somewhere
+def partition(l, func):
+ """Partition the list by the result of func applied to each element."""
+ ret = {}
+
+ for x in l:
+ key = func(x)
+ if key not in ret:
+ ret[key] = []
+ ret[key].append(x)
+
+ return ret
+
+
+def partitionbool(l, func):
+ def boolfunc(x):
+ return bool(func(x))
+
+ ret = partition(l, boolfunc)
+ return ret.get(True, []), ret.get(False, [])
+
+
+class PresenceHandler(BaseHandler):
+
+ def __init__(self, hs):
+ super(PresenceHandler, self).__init__(hs)
+
+ self.homeserver = hs
+
+ distributor = hs.get_distributor()
+ distributor.observe("registered_user", self.registered_user)
+
+ distributor.observe(
+ "started_user_eventstream", self.started_user_eventstream
+ )
+ distributor.observe(
+ "stopped_user_eventstream", self.stopped_user_eventstream
+ )
+
+ distributor.observe("user_joined_room",
+ self.user_joined_room
+ )
+
+ distributor.declare("collect_presencelike_data")
+
+ distributor.declare("changed_presencelike_data")
+ distributor.observe(
+ "changed_presencelike_data", self.changed_presencelike_data
+ )
+
+ self.distributor = distributor
+
+ self.federation = hs.get_replication_layer()
+
+ self.federation.register_edu_handler(
+ "m.presence", self.incoming_presence
+ )
+ self.federation.register_edu_handler(
+ "m.presence_invite",
+ lambda origin, content: self.invite_presence(
+ observed_user=hs.parse_userid(content["observed_user"]),
+ observer_user=hs.parse_userid(content["observer_user"]),
+ )
+ )
+ self.federation.register_edu_handler(
+ "m.presence_accept",
+ lambda origin, content: self.accept_presence(
+ observed_user=hs.parse_userid(content["observed_user"]),
+ observer_user=hs.parse_userid(content["observer_user"]),
+ )
+ )
+ self.federation.register_edu_handler(
+ "m.presence_deny",
+ lambda origin, content: self.deny_presence(
+ observed_user=hs.parse_userid(content["observed_user"]),
+ observer_user=hs.parse_userid(content["observer_user"]),
+ )
+ )
+
+ # IN-MEMORY store, mapping local userparts to sets of local users to
+ # be informed of state changes.
+ self._local_pushmap = {}
+ # map local users to sets of remote /domain names/ who are interested
+ # in them
+ self._remote_sendmap = {}
+ # map remote users to sets of local users who're interested in them
+ self._remote_recvmap = {}
+
+ # map any user to a UserPresenceCache
+ self._user_cachemap = {}
+ self._user_cachemap_latest_serial = 0
+
+ def _get_or_make_usercache(self, user):
+ """If the cache entry doesn't exist, initialise a new one."""
+ if user not in self._user_cachemap:
+ self._user_cachemap[user] = UserPresenceCache()
+ return self._user_cachemap[user]
+
+ def _get_or_offline_usercache(self, user):
+ """If the cache entry doesn't exist, return an OFFLINE one but do not
+ store it into the cache."""
+ if user in self._user_cachemap:
+ return self._user_cachemap[user]
+ else:
+ statuscache = UserPresenceCache()
+ statuscache.update({"state": PresenceState.OFFLINE}, user)
+ return statuscache
+
+ def registered_user(self, user):
+ self.store.create_presence(user.localpart)
+
+ @defer.inlineCallbacks
+ def is_presence_visible(self, observer_user, observed_user):
+ assert(observed_user.is_mine)
+
+ if observer_user == observed_user:
+ defer.returnValue(True)
+
+ allowed_by_subscription = yield self.store.is_presence_visible(
+ observed_localpart=observed_user.localpart,
+ observer_userid=observer_user.to_string(),
+ )
+
+ if allowed_by_subscription:
+ defer.returnValue(True)
+
+ # TODO(paul): Check same channel
+
+ defer.returnValue(False)
+
+ @defer.inlineCallbacks
+ def get_state(self, target_user, auth_user):
+ if target_user.is_mine:
+ visible = yield self.is_presence_visible(observer_user=auth_user,
+ observed_user=target_user
+ )
+
+ if visible:
+ state = yield self.store.get_presence_state(
+ target_user.localpart
+ )
+ defer.returnValue(state)
+ else:
+ raise SynapseError(404, "Presence information not visible")
+ else:
+ # TODO(paul): Have remote server send us permissions set
+ defer.returnValue(
+ self._get_or_offline_usercache(target_user).get_state()
+ )
+
+ @defer.inlineCallbacks
+ def set_state(self, target_user, auth_user, state):
+ if not target_user.is_mine:
+ raise SynapseError(400, "User is not hosted on this Home Server")
+
+ if target_user != auth_user:
+ raise AuthError(400, "Cannot set another user's displayname")
+
+ # TODO(paul): Sanity-check 'state'
+ if "status_msg" not in state:
+ state["status_msg"] = None
+
+ for k in state.keys():
+ if k not in ("state", "status_msg"):
+ raise SynapseError(
+ 400, "Unexpected presence state key '%s'" % (k,)
+ )
+
+ logger.debug("Updating presence state of %s to %s",
+ target_user.localpart, state["state"])
+
+ state_to_store = dict(state)
+
+ yield defer.DeferredList([
+ self.store.set_presence_state(
+ target_user.localpart, state_to_store
+ ),
+ self.distributor.fire(
+ "collect_presencelike_data", target_user, state
+ ),
+ ])
+
+ now_online = state["state"] != PresenceState.OFFLINE
+ was_polling = target_user in self._user_cachemap
+
+ if now_online and not was_polling:
+ self.start_polling_presence(target_user, state=state)
+ elif not now_online and was_polling:
+ self.stop_polling_presence(target_user)
+
+ # TODO(paul): perform a presence push as part of start/stop poll so
+ # we don't have to do this all the time
+ self.changed_presencelike_data(target_user, state)
+
+ if not now_online:
+ del self._user_cachemap[target_user]
+
+ def changed_presencelike_data(self, user, state):
+ statuscache = self._get_or_make_usercache(user)
+
+ self._user_cachemap_latest_serial += 1
+ statuscache.update(state, serial=self._user_cachemap_latest_serial)
+
+ self.push_presence(user, statuscache=statuscache)
+
+ def started_user_eventstream(self, user):
+ # TODO(paul): Use "last online" state
+ self.set_state(user, user, {"state": PresenceState.ONLINE})
+
+ def stopped_user_eventstream(self, user):
+ # TODO(paul): Save current state as "last online" state
+ self.set_state(user, user, {"state": PresenceState.OFFLINE})
+
+ @defer.inlineCallbacks
+ def user_joined_room(self, user, room_id):
+ localusers = set()
+ remotedomains = set()
+
+ rm_handler = self.homeserver.get_handlers().room_member_handler
+ yield rm_handler.fetch_room_distributions_into(room_id,
+ localusers=localusers, remotedomains=remotedomains,
+ ignore_user=user)
+
+ if user.is_mine:
+ yield self._send_presence_to_distribution(srcuser=user,
+ localusers=localusers, remotedomains=remotedomains,
+ statuscache=self._get_or_offline_usercache(user),
+ )
+
+ for srcuser in localusers:
+ yield self._send_presence(srcuser=srcuser, destuser=user,
+ statuscache=self._get_or_offline_usercache(srcuser),
+ )
+
+ @defer.inlineCallbacks
+ def send_invite(self, observer_user, observed_user):
+ if not observer_user.is_mine:
+ raise SynapseError(400, "User is not hosted on this Home Server")
+
+ yield self.store.add_presence_list_pending(
+ observer_user.localpart, observed_user.to_string()
+ )
+
+ if observed_user.is_mine:
+ yield self.invite_presence(observed_user, observer_user)
+ else:
+ yield self.federation.send_edu(
+ destination=observed_user.domain,
+ edu_type="m.presence_invite",
+ content={
+ "observed_user": observed_user.to_string(),
+ "observer_user": observer_user.to_string(),
+ }
+ )
+
+ @defer.inlineCallbacks
+ def _should_accept_invite(self, observed_user, observer_user):
+ if not observed_user.is_mine:
+ defer.returnValue(False)
+
+ row = yield self.store.has_presence_state(observed_user.localpart)
+ if not row:
+ defer.returnValue(False)
+
+ # TODO(paul): Eventually we'll ask the user's permission for this
+ # before accepting. For now just accept any invite request
+ defer.returnValue(True)
+
+ @defer.inlineCallbacks
+ def invite_presence(self, observed_user, observer_user):
+ accept = yield self._should_accept_invite(observed_user, observer_user)
+
+ if accept:
+ yield self.store.allow_presence_visible(
+ observed_user.localpart, observer_user.to_string()
+ )
+
+ if observer_user.is_mine:
+ if accept:
+ yield self.accept_presence(observed_user, observer_user)
+ else:
+ yield self.deny_presence(observed_user, observer_user)
+ else:
+ edu_type = "m.presence_accept" if accept else "m.presence_deny"
+
+ yield self.federation.send_edu(
+ destination=observer_user.domain,
+ edu_type=edu_type,
+ content={
+ "observed_user": observed_user.to_string(),
+ "observer_user": observer_user.to_string(),
+ }
+ )
+
+ @defer.inlineCallbacks
+ def accept_presence(self, observed_user, observer_user):
+ yield self.store.set_presence_list_accepted(
+ observer_user.localpart, observed_user.to_string()
+ )
+
+ self.start_polling_presence(observer_user, target_user=observed_user)
+
+ @defer.inlineCallbacks
+ def deny_presence(self, observed_user, observer_user):
+ yield self.store.del_presence_list(
+ observer_user.localpart, observed_user.to_string()
+ )
+
+ # TODO(paul): Inform the user somehow?
+
+ @defer.inlineCallbacks
+ def drop(self, observed_user, observer_user):
+ if not observer_user.is_mine:
+ raise SynapseError(400, "User is not hosted on this Home Server")
+
+ yield self.store.del_presence_list(
+ observer_user.localpart, observed_user.to_string()
+ )
+
+ self.stop_polling_presence(observer_user, target_user=observed_user)
+
+ @defer.inlineCallbacks
+ def get_presence_list(self, observer_user, accepted=None):
+ if not observer_user.is_mine:
+ raise SynapseError(400, "User is not hosted on this Home Server")
+
+ presence = yield self.store.get_presence_list(
+ observer_user.localpart, accepted=accepted
+ )
+
+ for p in presence:
+ observed_user = self.hs.parse_userid(p.pop("observed_user_id"))
+ p["observed_user"] = observed_user
+ p.update(self._get_or_offline_usercache(observed_user).get_state())
+
+ defer.returnValue(presence)
+
+ @defer.inlineCallbacks
+ def start_polling_presence(self, user, target_user=None, state=None):
+ logger.debug("Start polling for presence from %s", user)
+
+ if target_user:
+ target_users = [target_user]
+ else:
+ presence = yield self.store.get_presence_list(
+ user.localpart, accepted=True
+ )
+ target_users = [
+ self.hs.parse_userid(x["observed_user_id"]) for x in presence
+ ]
+
+ if state is None:
+ state = yield self.store.get_presence_state(user.localpart)
+
+ localusers, remoteusers = partitionbool(
+ target_users,
+ lambda u: u.is_mine
+ )
+
+ for target_user in localusers:
+ self._start_polling_local(user, target_user)
+
+ deferreds = []
+ remoteusers_by_domain = partition(remoteusers, lambda u: u.domain)
+ for domain in remoteusers_by_domain:
+ remoteusers = remoteusers_by_domain[domain]
+
+ deferreds.append(self._start_polling_remote(
+ user, domain, remoteusers
+ ))
+
+ yield defer.DeferredList(deferreds)
+
+ def _start_polling_local(self, user, target_user):
+ target_localpart = target_user.localpart
+
+ if not self.is_presence_visible(observer_user=user,
+ observed_user=target_user):
+ return
+
+ if target_localpart not in self._local_pushmap:
+ self._local_pushmap[target_localpart] = set()
+
+ self._local_pushmap[target_localpart].add(user)
+
+ self.push_update_to_clients(
+ observer_user=user,
+ observed_user=target_user,
+ statuscache=self._get_or_offline_usercache(target_user),
+ )
+
+ def _start_polling_remote(self, user, domain, remoteusers):
+ for u in remoteusers:
+ if u not in self._remote_recvmap:
+ self._remote_recvmap[u] = set()
+
+ self._remote_recvmap[u].add(user)
+
+ return self.federation.send_edu(
+ destination=domain,
+ edu_type="m.presence",
+ content={"poll": [u.to_string() for u in remoteusers]}
+ )
+
+ def stop_polling_presence(self, user, target_user=None):
+ logger.debug("Stop polling for presence from %s", user)
+
+ if not target_user or target_user.is_mine:
+ self._stop_polling_local(user, target_user=target_user)
+
+ deferreds = []
+
+ if target_user:
+ raise NotImplementedError("TODO: remove one user")
+
+ remoteusers = [u for u in self._remote_recvmap
+ if user in self._remote_recvmap[u]]
+ remoteusers_by_domain = partition(remoteusers, lambda u: u.domain)
+
+ for domain in remoteusers_by_domain:
+ remoteusers = remoteusers_by_domain[domain]
+
+ deferreds.append(
+ self._stop_polling_remote(user, domain, remoteusers)
+ )
+
+ return defer.DeferredList(deferreds)
+
+ def _stop_polling_local(self, user, target_user):
+ for localpart in self._local_pushmap.keys():
+ if target_user and localpart != target_user.localpart:
+ continue
+
+ if user in self._local_pushmap[localpart]:
+ self._local_pushmap[localpart].remove(user)
+
+ if not self._local_pushmap[localpart]:
+ del self._local_pushmap[localpart]
+
+ def _stop_polling_remote(self, user, domain, remoteusers):
+ for u in remoteusers:
+ self._remote_recvmap[u].remove(user)
+
+ if not self._remote_recvmap[u]:
+ del self._remote_recvmap[u]
+
+ return self.federation.send_edu(
+ destination=domain,
+ edu_type="m.presence",
+ content={"unpoll": [u.to_string() for u in remoteusers]}
+ )
+
+ @defer.inlineCallbacks
+ def push_presence(self, user, statuscache):
+ assert(user.is_mine)
+
+ logger.debug("Pushing presence update from %s", user)
+
+ localusers = set(self._local_pushmap.get(user.localpart, set()))
+ remotedomains = set(self._remote_sendmap.get(user.localpart, set()))
+
+ # Reflect users' status changes back to themselves, so UIs look nice
+ # and also user is informed of server-forced pushes
+ localusers.add(user)
+
+ rm_handler = self.homeserver.get_handlers().room_member_handler
+ room_ids = yield rm_handler.get_rooms_for_user(user)
+
+ for room_id in room_ids:
+ yield rm_handler.fetch_room_distributions_into(
+ room_id, localusers=localusers, remotedomains=remotedomains,
+ ignore_user=user,
+ )
+
+ if not localusers and not remotedomains:
+ defer.returnValue(None)
+
+ yield self._send_presence_to_distribution(user,
+ localusers=localusers, remotedomains=remotedomains,
+ statuscache=statuscache
+ )
+
+ def _send_presence(self, srcuser, destuser, statuscache):
+ if destuser.is_mine:
+ self.push_update_to_clients(
+ observer_user=destuser,
+ observed_user=srcuser,
+ statuscache=statuscache)
+ return defer.succeed(None)
+ else:
+ return self._push_presence_remote(srcuser, destuser.domain,
+ state=statuscache.get_state()
+ )
+
+ @defer.inlineCallbacks
+ def _send_presence_to_distribution(self, srcuser, localusers=set(),
+ remotedomains=set(), statuscache=None):
+
+ for u in localusers:
+ logger.debug(" | push to local user %s", u)
+ self.push_update_to_clients(
+ observer_user=u,
+ observed_user=srcuser,
+ statuscache=statuscache,
+ )
+
+ deferreds = []
+ for domain in remotedomains:
+ logger.debug(" | push to remote domain %s", domain)
+ deferreds.append(self._push_presence_remote(srcuser, domain,
+ state=statuscache.get_state())
+ )
+
+ yield defer.DeferredList(deferreds)
+
+ @defer.inlineCallbacks
+ def _push_presence_remote(self, user, destination, state=None):
+ if state is None:
+ state = yield self.store.get_presence_state(user.localpart)
+ yield self.distributor.fire(
+ "collect_presencelike_data", user, state
+ )
+
+ yield self.federation.send_edu(
+ destination=destination,
+ edu_type="m.presence",
+ content={
+ "push": [
+ dict(user_id=user.to_string(), **state),
+ ],
+ }
+ )
+
+ @defer.inlineCallbacks
+ def incoming_presence(self, origin, content):
+ deferreds = []
+
+ for push in content.get("push", []):
+ user = self.hs.parse_userid(push["user_id"])
+
+ logger.debug("Incoming presence update from %s", user)
+
+ observers = set(self._remote_recvmap.get(user, set()))
+
+ rm_handler = self.homeserver.get_handlers().room_member_handler
+ room_ids = yield rm_handler.get_rooms_for_user(user)
+
+ for room_id in room_ids:
+ yield rm_handler.fetch_room_distributions_into(
+ room_id, localusers=observers, ignore_user=user
+ )
+
+ if not observers:
+ break
+
+ state = dict(push)
+ del state["user_id"]
+
+ statuscache = self._get_or_make_usercache(user)
+
+ self._user_cachemap_latest_serial += 1
+ statuscache.update(state, serial=self._user_cachemap_latest_serial)
+
+ for observer_user in observers:
+ self.push_update_to_clients(
+ observer_user=observer_user,
+ observed_user=user,
+ statuscache=statuscache,
+ )
+
+ if state["state"] == PresenceState.OFFLINE:
+ del self._user_cachemap[user]
+
+ for poll in content.get("poll", []):
+ user = self.hs.parse_userid(poll)
+
+ if not user.is_mine:
+ continue
+
+ # TODO(paul) permissions checks
+
+ if not user in self._remote_sendmap:
+ self._remote_sendmap[user] = set()
+
+ self._remote_sendmap[user].add(origin)
+
+ deferreds.append(self._push_presence_remote(user, origin))
+
+ for unpoll in content.get("unpoll", []):
+ user = self.hs.parse_userid(unpoll)
+
+ if not user.is_mine:
+ continue
+
+ if user in self._remote_sendmap:
+ self._remote_sendmap[user].remove(origin)
+
+ if not self._remote_sendmap[user]:
+ del self._remote_sendmap[user]
+
+ yield defer.DeferredList(deferreds)
+
+ def push_update_to_clients(self, observer_user, observed_user,
+ statuscache):
+ self.notifier.on_new_user_event(
+ observer_user.to_string(),
+ event_data=statuscache.make_event(user=observed_user),
+ stream_type=PresenceStreamData,
+ store_id=statuscache.serial
+ )
+
+
+class PresenceStreamData(StreamData):
+ def __init__(self, hs):
+ super(PresenceStreamData, self).__init__(hs)
+ self.presence = hs.get_handlers().presence_handler
+
+ def get_rows(self, user_id, from_key, to_key, limit):
+ cachemap = self.presence._user_cachemap
+
+ # TODO(paul): limit, and filter by visibility
+ updates = [(k, cachemap[k]) for k in cachemap
+ if from_key < cachemap[k].serial <= to_key]
+
+ if updates:
+ latest_serial = max([x[1].serial for x in updates])
+ data = [x[1].make_event(user=x[0]) for x in updates]
+ return ((data, latest_serial))
+ else:
+ return (([], self.presence._user_cachemap_latest_serial))
+
+ def max_token(self):
+ return self.presence._user_cachemap_latest_serial
+
+PresenceStreamData.EVENT_TYPE = PresenceStreamData
+
+
+class UserPresenceCache(object):
+ """Store an observed user's state and status message.
+
+ Includes the update timestamp.
+ """
+ def __init__(self):
+ self.state = {}
+ self.serial = None
+
+ def update(self, state, serial):
+ self.state.update(state)
+ # Delete keys that are now 'None'
+ for k in self.state.keys():
+ if self.state[k] is None:
+ del self.state[k]
+
+ self.serial = serial
+
+ if "status_msg" in state:
+ self.status_msg = state["status_msg"]
+ else:
+ self.status_msg = None
+
+ def get_state(self):
+ # clone it so caller can't break our cache
+ return dict(self.state)
+
+ def make_event(self, user):
+ content = self.get_state()
+ content["user_id"] = user.to_string()
+
+ return {"type": "m.presence", "content": content}
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
new file mode 100644
index 0000000000..a27206b002
--- /dev/null
+++ b/synapse/handlers/profile.py
@@ -0,0 +1,169 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 matrix.org
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from twisted.internet import defer
+
+from synapse.api.errors import SynapseError, AuthError
+
+from synapse.api.errors import CodeMessageException
+
+from ._base import BaseHandler
+
+import logging
+
+
+logger = logging.getLogger(__name__)
+
+PREFIX = "/matrix/client/api/v1"
+
+
+class ProfileHandler(BaseHandler):
+
+ def __init__(self, hs):
+ super(ProfileHandler, self).__init__(hs)
+
+ self.client = hs.get_http_client()
+
+ distributor = hs.get_distributor()
+ self.distributor = distributor
+
+ distributor.observe("registered_user", self.registered_user)
+
+ distributor.observe(
+ "collect_presencelike_data", self.collect_presencelike_data
+ )
+
+ def registered_user(self, user):
+ self.store.create_profile(user.localpart)
+
+ @defer.inlineCallbacks
+ def get_displayname(self, target_user, local_only=False):
+ if target_user.is_mine:
+ displayname = yield self.store.get_profile_displayname(
+ target_user.localpart
+ )
+
+ defer.returnValue(displayname)
+ elif not local_only:
+ # TODO(paul): This should use the server-server API to ask another
+ # HS. For now we'll just have it use the http client to talk to the
+ # other HS's REST client API
+ path = PREFIX + "/profile/%s/displayname?local_only=1" % (
+ target_user.to_string()
+ )
+
+ try:
+ result = yield self.client.get_json(
+ destination=target_user.domain,
+ path=path
+ )
+ except CodeMessageException as e:
+ if e.code != 404:
+ logger.exception("Failed to get displayname")
+
+ raise
+ except:
+ logger.exception("Failed to get displayname")
+
+ defer.returnValue(result["displayname"])
+ else:
+ raise SynapseError(400, "User is not hosted on this Home Server")
+
+ @defer.inlineCallbacks
+ def set_displayname(self, target_user, auth_user, new_displayname):
+ """target_user is the user whose displayname is to be changed;
+ auth_user is the user attempting to make this change."""
+ if not target_user.is_mine:
+ raise SynapseError(400, "User is not hosted on this Home Server")
+
+ if target_user != auth_user:
+ raise AuthError(400, "Cannot set another user's displayname")
+
+ yield self.store.set_profile_displayname(
+ target_user.localpart, new_displayname
+ )
+
+ yield self.distributor.fire(
+ "changed_presencelike_data", target_user, {
+ "displayname": new_displayname,
+ }
+ )
+
+ @defer.inlineCallbacks
+ def get_avatar_url(self, target_user, local_only=False):
+ if target_user.is_mine:
+ avatar_url = yield self.store.get_profile_avatar_url(
+ target_user.localpart
+ )
+
+ defer.returnValue(avatar_url)
+ elif not local_only:
+ # TODO(paul): This should use the server-server API to ask another
+ # HS. For now we'll just have it use the http client to talk to the
+ # other HS's REST client API
+ destination = target_user.domain
+ path = PREFIX + "/profile/%s/avatar_url?local_only=1" % (
+ target_user.to_string(),
+ )
+
+ try:
+ result = yield self.client.get_json(
+ destination=destination,
+ path=path
+ )
+ except CodeMessageException as e:
+ if e.code != 404:
+ logger.exception("Failed to get avatar_url")
+ raise
+ except:
+ logger.exception("Failed to get avatar_url")
+
+ defer.returnValue(result["avatar_url"])
+ else:
+ raise SynapseError(400, "User is not hosted on this Home Server")
+
+ @defer.inlineCallbacks
+ def set_avatar_url(self, target_user, auth_user, new_avatar_url):
+ """target_user is the user whose avatar_url is to be changed;
+ auth_user is the user attempting to make this change."""
+ if not target_user.is_mine:
+ raise SynapseError(400, "User is not hosted on this Home Server")
+
+ if target_user != auth_user:
+ raise AuthError(400, "Cannot set another user's avatar_url")
+
+ yield self.store.set_profile_avatar_url(
+ target_user.localpart, new_avatar_url
+ )
+
+ yield self.distributor.fire(
+ "changed_presencelike_data", target_user, {
+ "avatar_url": new_avatar_url,
+ }
+ )
+
+ @defer.inlineCallbacks
+ def collect_presencelike_data(self, user, state):
+ if not user.is_mine:
+ defer.returnValue(None)
+
+ (displayname, avatar_url) = yield defer.gatherResults([
+ self.store.get_profile_displayname(user.localpart),
+ self.store.get_profile_avatar_url(user.localpart),
+ ])
+
+ state["displayname"] = displayname
+ state["avatar_url"] = avatar_url
+
+ defer.returnValue(None)
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
new file mode 100644
index 0000000000..246c1f6530
--- /dev/null
+++ b/synapse/handlers/register.py
@@ -0,0 +1,100 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 matrix.org
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Contains functions for registering clients."""
+from twisted.internet import defer
+
+from synapse.types import UserID
+from synapse.api.errors import SynapseError, RegistrationError
+from ._base import BaseHandler
+import synapse.util.stringutils as stringutils
+
+import base64
+import bcrypt
+
+
+class RegistrationHandler(BaseHandler):
+
+ def __init__(self, hs):
+ super(RegistrationHandler, self).__init__(hs)
+
+ self.distributor = hs.get_distributor()
+ self.distributor.declare("registered_user")
+
+ @defer.inlineCallbacks
+ def register(self, localpart=None, password=None):
+ """Registers a new client on the server.
+
+ Args:
+ localpart : The local part of the user ID to register. If None,
+ one will be randomly generated.
+ password (str) : The password to assign to this user so they can
+ login again.
+ Returns:
+ A tuple of (user_id, access_token).
+ Raises:
+ RegistrationError if there was a problem registering.
+ """
+ password_hash = None
+ if password:
+ password_hash = bcrypt.hashpw(password, bcrypt.gensalt())
+
+ if localpart:
+ user = UserID(localpart, self.hs.hostname, True)
+ user_id = user.to_string()
+
+ token = self._generate_token(user_id)
+ yield self.store.register(user_id=user_id,
+ token=token,
+ password_hash=password_hash)
+
+ self.distributor.fire("registered_user", user)
+ defer.returnValue((user_id, token))
+ else:
+ # autogen a random user ID
+ attempts = 0
+ user_id = None
+ token = None
+ while not user_id and not token:
+ try:
+ localpart = self._generate_user_id()
+ user = UserID(localpart, self.hs.hostname, True)
+ user_id = user.to_string()
+
+ token = self._generate_token(user_id)
+ yield self.store.register(
+ user_id=user_id,
+ token=token,
+ password_hash=password_hash)
+
+ self.distributor.fire("registered_user", user)
+ defer.returnValue((user_id, token))
+ except SynapseError:
+ # if user id is taken, just generate another
+ user_id = None
+ token = None
+ attempts += 1
+ if attempts > 5:
+ raise RegistrationError(
+ 500, "Cannot generate user ID.")
+
+ def _generate_token(self, user_id):
+ # urlsafe variant uses _ and - so use . as the separator and replace
+ # all =s with .s so http clients don't quote =s when it is used as
+ # query params.
+ return (base64.urlsafe_b64encode(user_id).replace('=', '.') + '.' +
+ stringutils.random_string(18))
+
+ def _generate_user_id(self):
+ return "-" + stringutils.random_string(18)
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
new file mode 100644
index 0000000000..4d82b33993
--- /dev/null
+++ b/synapse/handlers/room.py
@@ -0,0 +1,808 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 matrix.org
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Contains functions for performing events on rooms."""
+from twisted.internet import defer
+
+from synapse.types import UserID, RoomAlias, RoomID
+from synapse.api.constants import Membership
+from synapse.api.errors import RoomError, StoreError, SynapseError
+from synapse.api.events.room import (
+ RoomTopicEvent, MessageEvent, InviteJoinEvent, RoomMemberEvent,
+ RoomConfigEvent
+)
+from synapse.api.streams.event import EventStream, MessagesStreamData
+from synapse.util import stringutils
+from ._base import BaseHandler
+
+import logging
+import json
+
+logger = logging.getLogger(__name__)
+
+
+class MessageHandler(BaseHandler):
+
+ def __init__(self, hs):
+ super(MessageHandler, self).__init__(hs)
+ self.hs = hs
+ self.clock = hs.get_clock()
+ self.event_factory = hs.get_event_factory()
+
+ @defer.inlineCallbacks
+ def get_message(self, msg_id=None, room_id=None, sender_id=None,
+ user_id=None):
+ """ Retrieve a message.
+
+ Args:
+ msg_id (str): The message ID to obtain.
+ room_id (str): The room where the message resides.
+ sender_id (str): The user ID of the user who sent the message.
+ user_id (str): The user ID of the user making this request.
+ Returns:
+ The message, or None if no message exists.
+ Raises:
+ SynapseError if something went wrong.
+ """
+ yield self.auth.check_joined_room(room_id, user_id)
+
+ # Pull out the message from the db
+ msg = yield self.store.get_message(room_id=room_id,
+ msg_id=msg_id,
+ user_id=sender_id)
+
+ if msg:
+ defer.returnValue(msg)
+ defer.returnValue(None)
+
+ @defer.inlineCallbacks
+ def send_message(self, event=None, suppress_auth=False, stamp_event=True):
+ """ Send a message.
+
+ Args:
+ event : The message event to store.
+ suppress_auth (bool) : True to suppress auth for this message. This
+ is primarily so the home server can inject messages into rooms at
+ will.
+ stamp_event (bool) : True to stamp event content with server keys.
+ Raises:
+ SynapseError if something went wrong.
+ """
+ if stamp_event:
+ event.content["hsob_ts"] = int(self.clock.time_msec())
+
+ with (yield self.room_lock.lock(event.room_id)):
+ if not suppress_auth:
+ yield self.auth.check(event, raises=True)
+
+ # store message in db
+ store_id = yield self.store.persist_event(event)
+
+ event.destinations = yield self.store.get_joined_hosts_for_room(
+ event.room_id
+ )
+
+ yield self.hs.get_federation().handle_new_event(event)
+
+ self.notifier.on_new_room_event(event, store_id)
+
+ @defer.inlineCallbacks
+ def get_messages(self, user_id=None, room_id=None, pagin_config=None,
+ feedback=False):
+ """Get messages in a room.
+
+ Args:
+ user_id (str): The user requesting messages.
+ room_id (str): The room they want messages from.
+ pagin_config (synapse.api.streams.PaginationConfig): The pagination
+ config rules to apply, if any.
+ feedback (bool): True to get compressed feedback with the messages
+ Returns:
+ dict: Pagination API results
+ """
+ yield self.auth.check_joined_room(room_id, user_id)
+
+ data_source = [MessagesStreamData(self.hs, room_id=room_id,
+ feedback=feedback)]
+ event_stream = EventStream(user_id, data_source)
+ pagin_config = yield event_stream.fix_tokens(pagin_config)
+ data_chunk = yield event_stream.get_chunk(config=pagin_config)
+ defer.returnValue(data_chunk)
+
+ @defer.inlineCallbacks
+ def store_room_data(self, event=None, stamp_event=True):
+ """ Stores data for a room.
+
+ Args:
+ event : The room path event
+ stamp_event (bool) : True to stamp event content with server keys.
+ Raises:
+ SynapseError if something went wrong.
+ """
+
+ with (yield self.room_lock.lock(event.room_id)):
+ yield self.auth.check(event, raises=True)
+
+ if stamp_event:
+ event.content["hsob_ts"] = int(self.clock.time_msec())
+
+ yield self.state_handler.handle_new_event(event)
+
+ # store in db
+ store_id = yield self.store.store_room_data(
+ room_id=event.room_id,
+ etype=event.type,
+ state_key=event.state_key,
+ content=json.dumps(event.content)
+ )
+
+ event.destinations = yield self.store.get_joined_hosts_for_room(
+ event.room_id
+ )
+ self.notifier.on_new_room_event(event, store_id)
+
+ yield self.hs.get_federation().handle_new_event(event)
+
+ @defer.inlineCallbacks
+ def get_room_data(self, user_id=None, room_id=None,
+ event_type=None, state_key="",
+ public_room_rules=[],
+ private_room_rules=["join"]):
+ """ Get data from a room.
+
+ Args:
+ event : The room path event
+ public_room_rules : A list of membership states the user can be in,
+ in order to read this data IN A PUBLIC ROOM. An empty list means
+ 'any state'.
+ private_room_rules : A list of membership states the user can be
+ in, in order to read this data IN A PRIVATE ROOM. An empty list
+ means 'any state'.
+ Returns:
+ The path data content.
+ Raises:
+ SynapseError if something went wrong.
+ """
+ if event_type == RoomTopicEvent.TYPE:
+ # anyone invited/joined can read the topic
+ private_room_rules = ["invite", "join"]
+
+ # does this room exist
+ room = yield self.store.get_room(room_id)
+ if not room:
+ raise RoomError(403, "Room does not exist.")
+
+ # does this user exist in this room
+ member = yield self.store.get_room_member(
+ room_id=room_id,
+ user_id="" if not user_id else user_id)
+
+ member_state = member.membership if member else None
+
+ if room.is_public and public_room_rules:
+ # make sure the user meets public room rules
+ if member_state not in public_room_rules:
+ raise RoomError(403, "Member does not meet public room rules.")
+ elif not room.is_public and private_room_rules:
+ # make sure the user meets private room rules
+ if member_state not in private_room_rules:
+ raise RoomError(
+ 403, "Member does not meet private room rules.")
+
+ data = yield self.store.get_room_data(room_id, event_type, state_key)
+ defer.returnValue(data)
+
+ @defer.inlineCallbacks
+ def get_feedback(self, room_id=None, msg_sender_id=None, msg_id=None,
+ user_id=None, fb_sender_id=None, fb_type=None):
+ yield self.auth.check_joined_room(room_id, user_id)
+
+ # Pull out the feedback from the db
+ fb = yield self.store.get_feedback(
+ room_id=room_id, msg_id=msg_id, msg_sender_id=msg_sender_id,
+ fb_sender_id=fb_sender_id, fb_type=fb_type
+ )
+
+ if fb:
+ defer.returnValue(fb)
+ defer.returnValue(None)
+
+ @defer.inlineCallbacks
+ def send_feedback(self, event, stamp_event=True):
+ if stamp_event:
+ event.content["hsob_ts"] = int(self.clock.time_msec())
+
+ with (yield self.room_lock.lock(event.room_id)):
+ yield self.auth.check(event, raises=True)
+
+ # store message in db
+ store_id = yield self.store.persist_event(event)
+
+ event.destinations = yield self.store.get_joined_hosts_for_room(
+ event.room_id
+ )
+ yield self.hs.get_federation().handle_new_event(event)
+
+ self.notifier.on_new_room_event(event, store_id)
+
+ @defer.inlineCallbacks
+ def snapshot_all_rooms(self, user_id=None, pagin_config=None,
+ feedback=False):
+ """Retrieve a snapshot of all rooms the user is invited or has joined.
+
+ This snapshot may include messages for all rooms where the user is
+ joined, depending on the pagination config.
+
+ Args:
+ user_id (str): The ID of the user making the request.
+ pagin_config (synapse.api.streams.PaginationConfig): The pagination
+ config used to determine how many messages *PER ROOM* to return.
+ feedback (bool): True to get feedback along with these messages.
+ Returns:
+ A list of dicts with "room_id" and "membership" keys for all rooms
+ the user is currently invited or joined in on. Rooms where the user
+ is joined on, may return a "messages" key with messages, depending
+ on the specified PaginationConfig.
+ """
+ room_list = yield self.store.get_rooms_for_user_where_membership_is(
+ user_id=user_id,
+ membership_list=[Membership.INVITE, Membership.JOIN]
+ )
+ for room_info in room_list:
+ if room_info["membership"] != Membership.JOIN:
+ continue
+ try:
+ event_chunk = yield self.get_messages(
+ user_id=user_id,
+ pagin_config=pagin_config,
+ feedback=feedback,
+ room_id=room_info["room_id"]
+ )
+ room_info["messages"] = event_chunk
+ except:
+ pass
+ defer.returnValue(room_list)
+
+
+class RoomCreationHandler(BaseHandler):
+
+ @defer.inlineCallbacks
+ def create_room(self, user_id, room_id, config):
+ """ Creates a new room.
+
+ Args:
+ user_id (str): The ID of the user creating the new room.
+ room_id (str): The proposed ID for the new room. Can be None, in
+ which case one will be created for you.
+ config (dict) : A dict of configuration options.
+ Returns:
+ The new room ID.
+ Raises:
+ SynapseError if the room ID was taken, couldn't be stored, or
+ something went horribly wrong.
+ """
+
+ if "room_alias_name" in config:
+ room_alias = RoomAlias.create_local(
+ config["room_alias_name"],
+ self.hs
+ )
+ mapping = yield self.store.get_association_from_room_alias(
+ room_alias
+ )
+
+ if mapping:
+ raise SynapseError(400, "Room alias already taken")
+ else:
+ room_alias = None
+
+ if room_id:
+ # Ensure room_id is the correct type
+ room_id_obj = RoomID.from_string(room_id, self.hs)
+ if not room_id_obj.is_mine:
+ raise SynapseError(400, "Room id must be local")
+
+ yield self.store.store_room(
+ room_id=room_id,
+ room_creator_user_id=user_id,
+ is_public=config["visibility"] == "public"
+ )
+ else:
+ # autogen room IDs and try to create it. We may clash, so just
+ # try a few times till one goes through, giving up eventually.
+ attempts = 0
+ room_id = None
+ while attempts < 5:
+ try:
+ random_string = stringutils.random_string(18)
+ gen_room_id = RoomID.create_local(random_string, self.hs)
+ yield self.store.store_room(
+ room_id=gen_room_id.to_string(),
+ room_creator_user_id=user_id,
+ is_public=config["visibility"] == "public"
+ )
+ room_id = gen_room_id.to_string()
+ break
+ except StoreError:
+ attempts += 1
+ if not room_id:
+ raise StoreError(500, "Couldn't generate a room ID.")
+
+ config_event = self.event_factory.create_event(
+ etype=RoomConfigEvent.TYPE,
+ room_id=room_id,
+ user_id=user_id,
+ content=config,
+ )
+
+ if room_alias:
+ yield self.store.create_room_alias_association(
+ room_id=room_id,
+ room_alias=room_alias,
+ servers=[self.hs.hostname],
+ )
+
+ yield self.state_handler.handle_new_event(config_event)
+ # store_id = persist...
+
+ yield self.hs.get_federation().handle_new_event(config_event)
+ # self.notifier.on_new_room_event(event, store_id)
+
+ content = {"membership": Membership.JOIN}
+ join_event = self.event_factory.create_event(
+ etype=RoomMemberEvent.TYPE,
+ target_user_id=user_id,
+ room_id=room_id,
+ user_id=user_id,
+ membership=Membership.JOIN,
+ content=content
+ )
+
+ yield self.hs.get_handlers().room_member_handler.change_membership(
+ join_event,
+ broadcast_msg=True,
+ do_auth=False
+ )
+
+ result = {"room_id": room_id}
+ if room_alias:
+ result["room_alias"] = room_alias.to_string()
+
+ defer.returnValue(result)
+
+
+class RoomMemberHandler(BaseHandler):
+ # TODO(paul): This handler currently contains a messy conflation of
+ # low-level API that works on UserID objects and so on, and REST-level
+ # API that takes ID strings and returns pagination chunks. These concerns
+ # ought to be separated out a lot better.
+
+ def __init__(self, hs):
+ super(RoomMemberHandler, self).__init__(hs)
+
+ self.clock = hs.get_clock()
+
+ self.distributor = hs.get_distributor()
+ self.distributor.declare("user_joined_room")
+
+ @defer.inlineCallbacks
+ def get_room_members(self, room_id, membership=Membership.JOIN):
+ hs = self.hs
+
+ memberships = yield self.store.get_room_members(
+ room_id=room_id, membership=membership
+ )
+
+ defer.returnValue([hs.parse_userid(m.user_id) for m in memberships])
+
+ @defer.inlineCallbacks
+ def fetch_room_distributions_into(self, room_id, localusers=None,
+ remotedomains=None, ignore_user=None):
+ """Fetch the distribution of a room, adding elements to either
+ 'localusers' or 'remotedomains', which should be a set() if supplied.
+ If ignore_user is set, ignore that user.
+
+ This function returns nothing; its result is performed by the
+ side-effect on the two passed sets. This allows easy accumulation of
+ member lists of multiple rooms at once if required.
+ """
+ members = yield self.get_room_members(room_id)
+ for member in members:
+ if ignore_user is not None and member == ignore_user:
+ continue
+
+ if member.is_mine:
+ if localusers is not None:
+ localusers.add(member)
+ else:
+ if remotedomains is not None:
+ remotedomains.add(member.domain)
+
+ @defer.inlineCallbacks
+ def get_room_members_as_pagination_chunk(self, room_id=None, user_id=None,
+ limit=0, start_tok=None,
+ end_tok=None):
+ """Retrieve a list of room members in the room.
+
+ Args:
+ room_id (str): The room to get the member list for.
+ user_id (str): The ID of the user making the request.
+ limit (int): The max number of members to return.
+ start_tok (str): Optional. The start token if known.
+ end_tok (str): Optional. The end token if known.
+ Returns:
+ dict: A Pagination streamable dict.
+ Raises:
+ SynapseError if something goes wrong.
+ """
+ yield self.auth.check_joined_room(room_id, user_id)
+
+ member_list = yield self.store.get_room_members(room_id=room_id)
+ event_list = [
+ entry.as_event(self.event_factory).get_dict()
+ for entry in member_list
+ ]
+ chunk_data = {
+ "start": "START",
+ "end": "END",
+ "chunk": event_list
+ }
+ # TODO honor Pagination stream params
+ # TODO snapshot this list to return on subsequent requests when
+ # paginating
+ defer.returnValue(chunk_data)
+
+ @defer.inlineCallbacks
+ def get_room_member(self, room_id, member_user_id, auth_user_id):
+ """Retrieve a room member from a room.
+
+ Args:
+ room_id : The room the member is in.
+ member_user_id : The member's user ID
+ auth_user_id : The user ID of the user making this request.
+ Returns:
+ The room member, or None if this member does not exist.
+ Raises:
+ SynapseError if something goes wrong.
+ """
+ yield self.auth.check_joined_room(room_id, auth_user_id)
+
+ member = yield self.store.get_room_member(user_id=member_user_id,
+ room_id=room_id)
+ defer.returnValue(member)
+
+ @defer.inlineCallbacks
+ def change_membership(self, event=None, broadcast_msg=False, do_auth=True):
+ """ Change the membership status of a user in a room.
+
+ Args:
+ event (SynapseEvent): The membership event
+ broadcast_msg (bool): True to inject a membership message into this
+ room on success.
+ Raises:
+ SynapseError if there was a problem changing the membership.
+ """
+
+ #broadcast_msg = False
+
+ prev_state = yield self.store.get_room_member(
+ event.target_user_id, event.room_id
+ )
+
+ if prev_state and prev_state.membership == event.membership:
+ # treat this event as a NOOP.
+ if do_auth: # This is mainly to fix a unit test.
+ yield self.auth.check(event, raises=True)
+ defer.returnValue({})
+ return
+
+ room_id = event.room_id
+
+ # If we're trying to join a room then we have to do this differently
+ # if this HS is not currently in the room, i.e. we have to do the
+ # invite/join dance.
+ if event.membership == Membership.JOIN:
+ yield self._do_join(
+ event, do_auth=do_auth, broadcast_msg=broadcast_msg
+ )
+ else:
+ # This is not a JOIN, so we can handle it normally.
+ if do_auth:
+ yield self.auth.check(event, raises=True)
+
+ prev_state = yield self.store.get_room_member(
+ event.target_user_id, event.room_id
+ )
+ if prev_state and prev_state.membership == event.membership:
+ # double same action, treat this event as a NOOP.
+ defer.returnValue({})
+ return
+
+ yield self.state_handler.handle_new_event(event)
+ yield self._do_local_membership_update(
+ event,
+ membership=event.content["membership"],
+ broadcast_msg=broadcast_msg,
+ )
+
+ defer.returnValue({"room_id": room_id})
+
+ @defer.inlineCallbacks
+ def join_room_alias(self, joinee, room_alias, do_auth=True, content={}):
+ directory_handler = self.hs.get_handlers().directory_handler
+ mapping = yield directory_handler.get_association(room_alias)
+
+ if not mapping:
+ raise SynapseError(404, "No such room alias")
+
+ room_id = mapping["room_id"]
+ hosts = mapping["servers"]
+ if not hosts:
+ raise SynapseError(404, "No known servers")
+
+ host = hosts[0]
+
+ content.update({"membership": Membership.JOIN})
+ new_event = self.event_factory.create_event(
+ etype=RoomMemberEvent.TYPE,
+ target_user_id=joinee.to_string(),
+ room_id=room_id,
+ user_id=joinee.to_string(),
+ membership=Membership.JOIN,
+ content=content,
+ )
+
+ yield self._do_join(new_event, room_host=host, do_auth=True)
+
+ defer.returnValue({"room_id": room_id})
+
+ @defer.inlineCallbacks
+ def _do_join(self, event, room_host=None, do_auth=True, broadcast_msg=True):
+ joinee = self.hs.parse_userid(event.target_user_id)
+ # room_id = RoomID.from_string(event.room_id, self.hs)
+ room_id = event.room_id
+
+ # If event doesn't include a display name, add one.
+ yield self._fill_out_join_content(
+ joinee, event.content
+ )
+
+ # XXX: We don't do an auth check if we are doing an invite
+ # join dance for now, since we're kinda implicitly checking
+ # that we are allowed to join when we decide whether or not we
+ # need to do the invite/join dance.
+
+ room = yield self.store.get_room(room_id)
+
+ if room:
+ should_do_dance = False
+ elif room_host:
+ should_do_dance = True
+ else:
+ prev_state = yield self.store.get_room_member(
+ joinee.to_string(), room_id
+ )
+
+ if prev_state and prev_state.membership == Membership.INVITE:
+ room = yield self.store.get_room(room_id)
+ inviter = UserID.from_string(
+ prev_state.sender, self.hs
+ )
+
+ should_do_dance = not inviter.is_mine and not room
+ room_host = inviter.domain
+ else:
+ should_do_dance = False
+
+ # We want to do the _do_update inside the room lock.
+ if not should_do_dance:
+ logger.debug("Doing normal join")
+
+ if do_auth:
+ yield self.auth.check(event, raises=True)
+
+ yield self.state_handler.handle_new_event(event)
+ yield self._do_local_membership_update(
+ event,
+ membership=event.content["membership"],
+ broadcast_msg=broadcast_msg,
+ )
+
+
+ if should_do_dance:
+ yield self._do_invite_join_dance(
+ room_id=room_id,
+ joinee=event.user_id,
+ target_host=room_host,
+ content=event.content,
+ )
+
+ user = self.hs.parse_userid(event.user_id)
+ self.distributor.fire(
+ "user_joined_room", user=user, room_id=room_id
+ )
+
+ @defer.inlineCallbacks
+ def _fill_out_join_content(self, user_id, content):
+ # If event doesn't include a display name, add one.
+ profile_handler = self.hs.get_handlers().profile_handler
+ if "displayname" not in content:
+ try:
+ display_name = yield profile_handler.get_displayname(
+ user_id
+ )
+
+ if display_name:
+ content["displayname"] = display_name
+ except:
+ logger.exception("Failed to set display_name")
+
+ if "avatar_url" not in content:
+ try:
+ avatar_url = yield profile_handler.get_avatar_url(
+ user_id
+ )
+
+ if avatar_url:
+ content["avatar_url"] = avatar_url
+ except:
+ logger.exception("Failed to set display_name")
+
+ @defer.inlineCallbacks
+ def _should_invite_join(self, room_id, prev_state, do_auth):
+ logger.debug("_should_invite_join: room_id: %s", room_id)
+
+ # XXX: We don't do an auth check if we are doing an invite
+ # join dance for now, since we're kinda implicitly checking
+ # that we are allowed to join when we decide whether or not we
+ # need to do the invite/join dance.
+
+ # Only do an invite join dance if a) we were invited,
+ # b) the person inviting was from a differnt HS and c) we are
+ # not currently in the room
+ room_host = None
+ if prev_state and prev_state.membership == Membership.INVITE:
+ room = yield self.store.get_room(room_id)
+ inviter = UserID.from_string(
+ prev_state.sender, self.hs
+ )
+
+ is_remote_invite_join = not inviter.is_mine and not room
+ room_host = inviter.domain
+ else:
+ is_remote_invite_join = False
+
+ defer.returnValue((is_remote_invite_join, room_host))
+
+ @defer.inlineCallbacks
+ def get_rooms_for_user(self, user, membership_list=[Membership.JOIN]):
+ """Returns a list of roomids that the user has any of the given
+ membership states in."""
+ rooms = yield self.store.get_rooms_for_user_where_membership_is(
+ user_id=user.to_string(), membership_list=membership_list
+ )
+
+ defer.returnValue([r["room_id"] for r in rooms])
+
+ @defer.inlineCallbacks
+ def _do_local_membership_update(self, event, membership, broadcast_msg):
+ # store membership
+ store_id = yield self.store.store_room_member(
+ user_id=event.target_user_id,
+ sender=event.user_id,
+ room_id=event.room_id,
+ content=event.content,
+ membership=membership
+ )
+
+ # Send a PDU to all hosts who have joined the room.
+ destinations = yield self.store.get_joined_hosts_for_room(
+ event.room_id
+ )
+
+ # If we're inviting someone, then we should also send it to that
+ # HS.
+ if membership == Membership.INVITE:
+ host = UserID.from_string(
+ event.target_user_id, self.hs
+ ).domain
+ destinations.append(host)
+
+ # If we are joining a remote HS, include that.
+ if membership == Membership.JOIN:
+ host = UserID.from_string(
+ event.target_user_id, self.hs
+ ).domain
+ destinations.append(host)
+
+ event.destinations = list(set(destinations))
+
+ yield self.hs.get_federation().handle_new_event(event)
+ self.notifier.on_new_room_event(event, store_id)
+
+ if broadcast_msg:
+ yield self._inject_membership_msg(
+ source=event.user_id,
+ target=event.target_user_id,
+ room_id=event.room_id,
+ membership=event.content["membership"]
+ )
+
+ @defer.inlineCallbacks
+ def _do_invite_join_dance(self, room_id, joinee, target_host, content):
+ logger.debug("Doing remote join dance")
+
+ # do invite join dance
+ federation = self.hs.get_federation()
+ new_event = self.event_factory.create_event(
+ etype=InviteJoinEvent.TYPE,
+ target_host=target_host,
+ room_id=room_id,
+ user_id=joinee,
+ content=content
+ )
+
+ new_event.destinations = [target_host]
+
+ yield self.store.store_room(
+ room_id, "", is_public=False
+ )
+
+ #yield self.state_handler.handle_new_event(event)
+ yield federation.handle_new_event(new_event)
+ yield federation.get_state_for_room(
+ target_host, room_id
+ )
+
+ @defer.inlineCallbacks
+ def _inject_membership_msg(self, room_id=None, source=None, target=None,
+ membership=None):
+ # TODO this should be a different type of message, not m.text
+ if membership == Membership.INVITE:
+ body = "%s invited %s to the room." % (source, target)
+ elif membership == Membership.JOIN:
+ body = "%s joined the room." % (target)
+ elif membership == Membership.LEAVE:
+ body = "%s left the room." % (target)
+ else:
+ raise RoomError(500, "Unknown membership value %s" % membership)
+
+ membership_json = {
+ "msgtype": u"m.text",
+ "body": body,
+ "membership_source": source,
+ "membership_target": target,
+ "membership": membership,
+ }
+
+ msg_id = "m%s" % int(self.clock.time_msec())
+
+ event = self.event_factory.create_event(
+ etype=MessageEvent.TYPE,
+ room_id=room_id,
+ user_id="_homeserver_",
+ msg_id=msg_id,
+ content=membership_json
+ )
+
+ handler = self.hs.get_handlers().message_handler
+ yield handler.send_message(event, suppress_auth=True)
+
+
+class RoomListHandler(BaseHandler):
+
+ @defer.inlineCallbacks
+ def get_public_room_list(self):
+ chunk = yield self.store.get_rooms(is_public=True, with_topics=True)
+ defer.returnValue({"start": "START", "end": "END", "chunk": chunk})
|