summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2014-08-27 14:13:06 +0100
committerErik Johnston <erik@matrix.org>2014-08-27 14:13:06 +0100
commit47519cd8c27c343405431c206660ba74fdea52f6 (patch)
tree77b7a55778c42e256b99577226a6172f719e8416 /synapse
parentImplement presence event source. Change the way the notifier indexes listeners (diff)
parentfix joining rooms on webclient (diff)
downloadsynapse-47519cd8c27c343405431c206660ba74fdea52f6.tar.xz
Merge branch 'develop' of github.com:matrix-org/synapse into stream_refactor
Conflicts:
	synapse/handlers/events.py
	synapse/rest/events.py
	synapse/rest/room.py
Diffstat (limited to 'synapse')
-rw-r--r--synapse/api/auth.py2
-rwxr-xr-xsynapse/app/homeserver.py65
-rw-r--r--synapse/handlers/__init__.py5
-rw-r--r--synapse/handlers/events.py99
-rw-r--r--synapse/handlers/typing.py146
-rw-r--r--synapse/rest/events.py16
-rw-r--r--synapse/rest/presence.py2
-rw-r--r--synapse/rest/room.py94
-rw-r--r--synapse/server.py15
-rw-r--r--synapse/storage/__init__.py1
10 files changed, 369 insertions, 76 deletions
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 15407df14a..886e132e10 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -162,6 +162,8 @@ class Auth(object):
         """
         try:
             user_id = yield self.store.get_user_by_token(token=token)
+            if not user_id:
+                raise StoreError()
             defer.returnValue(self.hs.parse_userid(user_id))
         except StoreError:
             raise AuthError(403, "Unrecognised access token.",
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index f210d26629..a89770ed7b 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -43,6 +43,22 @@ import re
 logger = logging.getLogger(__name__)
 
 
+SCHEMAS = [
+    "transactions",
+    "pdu",
+    "users",
+    "profiles",
+    "presence",
+    "im",
+    "room_aliases",
+]
+
+
+# Remember to update this number every time an incompatible change is made to
+# database schema files, so the users will be informed on server restarts.
+SCHEMA_VERSION = 1
+
+
 class SynapseHomeServer(HomeServer):
 
     def build_http_client(self):
@@ -65,31 +81,39 @@ class SynapseHomeServer(HomeServer):
         don't have to worry about overwriting existing content.
         """
         logging.info("Preparing database: %s...", self.db_name)
-        pool = adbapi.ConnectionPool(
-            'sqlite3', self.db_name, check_same_thread=False,
-            cp_min=1, cp_max=1)
 
-        schemas = [
-            "transactions",
-            "pdu",
-            "users",
-            "profiles",
-            "presence",
-            "im",
-            "room_aliases",
-        ]
+        with sqlite3.connect(self.db_name) as db_conn:
+            c = db_conn.cursor()
+            c.execute("PRAGMA user_version")
+            row = c.fetchone()
 
-        for sql_loc in schemas:
-            sql_script = read_schema(sql_loc)
+            if row and row[0]:
+                user_version = row[0]
 
-            with sqlite3.connect(self.db_name) as db_conn:
-                c = db_conn.cursor()
-                c.executescript(sql_script)
-                c.close()
-                db_conn.commit()
+                if user_version < SCHEMA_VERSION:
+                    # TODO(paul): add some kind of intelligent fixup here
+                    raise ValueError("Cannot use this database as the " +
+                        "schema version (%d) does not match (%d)" %
+                        (user_version, SCHEMA_VERSION)
+                    )
+
+            else:
+                for sql_loc in SCHEMAS:
+                    sql_script = read_schema(sql_loc)
+
+                    c.executescript(sql_script)
+                    db_conn.commit()
+
+                c.execute("PRAGMA user_version = %d" % SCHEMA_VERSION)
+
+            c.close()
 
         logging.info("Database prepared in %s.", self.db_name)
 
+        pool = adbapi.ConnectionPool(
+            'sqlite3', self.db_name, check_same_thread=False,
+            cp_min=1, cp_max=1)
+
         return pool
 
     def create_resource_tree(self, web_client, redirect_root_to_web_client):
@@ -184,6 +208,7 @@ class SynapseHomeServer(HomeServer):
 
     def start_listening(self, port):
         reactor.listenTCP(port, Site(self.root_resource))
+        logger.info("Synapse now listening on port %d", port)
 
 
 def setup_logging(verbosity=0, filename=None, config_path=None):
@@ -282,7 +307,7 @@ def setup():
         redirect_root_to_web_client=True)
     hs.start_listening(args.port)
 
-    hs.build_db_pool()
+    hs.get_db_pool()
 
     if args.manhole:
         f = twisted.manhole.telnet.ShellFactory()
diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py
index 8a4aa6e5d6..b645977767 100644
--- a/synapse/handlers/__init__.py
+++ b/synapse/handlers/__init__.py
@@ -17,12 +17,13 @@ from .register import RegistrationHandler
 from .room import (
     MessageHandler, RoomCreationHandler, RoomMemberHandler, RoomListHandler
 )
-from .events import EventStreamHandler
+from .events import EventStreamHandler, EventHandler
 from .federation import FederationHandler
 from .login import LoginHandler
 from .profile import ProfileHandler
 from .presence import PresenceHandler
 from .directory import DirectoryHandler
+from .typing import TypingNotificationHandler
 
 
 class Handlers(object):
@@ -39,9 +40,11 @@ class Handlers(object):
         self.room_creation_handler = RoomCreationHandler(hs)
         self.room_member_handler = RoomMemberHandler(hs)
         self.event_stream_handler = EventStreamHandler(hs)
+        self.event_handler = EventHandler(hs)
         self.federation_handler = FederationHandler(hs)
         self.profile_handler = ProfileHandler(hs)
         self.presence_handler = PresenceHandler(hs)
         self.room_list_handler = RoomListHandler(hs)
         self.login_handler = LoginHandler(hs)
         self.directory_handler = DirectoryHandler(hs)
+        self.typing_notification_handler = TypingNotificationHandler(hs)
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index aabec37fc0..b336b292d3 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -47,26 +47,81 @@ class EventStreamHandler(BaseHandler):
     def get_stream(self, auth_user_id, pagin_config, timeout=0):
         auth_user = self.hs.parse_userid(auth_user_id)
 
-        if pagin_config.from_token is None:
-            pagin_config.from_token = None
-
-        rm_handler = self.hs.get_handlers().room_member_handler
-        room_ids = yield rm_handler.get_rooms_for_user(auth_user)
-
-        events, tokens = yield self.notifier.get_events_for(
-            auth_user, room_ids, pagin_config, timeout
-        )
-
-        chunks = [
-            e.get_dict() if isinstance(e, SynapseEvent) else e
-            for e in events
-        ]
-
-        chunk = {
-            "chunk": chunks,
-            "start": tokens[0].to_string(),
-            "end": tokens[1].to_string(),
-        }
-
-        defer.returnValue(chunk)
+        try:
+            if auth_user not in self._streams_per_user:
+                self._streams_per_user[auth_user] = 0
+                if auth_user in self._stop_timer_per_user:
+                    self.clock.cancel_call_later(
+                        self._stop_timer_per_user.pop(auth_user))
+                else:
+                    self.distributor.fire(
+                        "started_user_eventstream", auth_user
+                    )
+            self._streams_per_user[auth_user] += 1
+
+
+            if pagin_config.from_token is None:
+                pagin_config.from_token = None
+
+            rm_handler = self.hs.get_handlers().room_member_handler
+            room_ids = yield rm_handler.get_rooms_for_user(auth_user)
+
+            events, tokens = yield self.notifier.get_events_for(
+                auth_user, room_ids, pagin_config, timeout
+            )
+
+            chunks = [
+                e.get_dict() if isinstance(e, SynapseEvent) else e
+                for e in events
+            ]
+
+            chunk = {
+                "chunk": chunks,
+                "start": tokens[0].to_string(),
+                "end": tokens[1].to_string(),
+            }
+
+            defer.returnValue(chunk)
+
+        finally:
+            self._streams_per_user[auth_user] -= 1
+            if not self._streams_per_user[auth_user]:
+                del self._streams_per_user[auth_user]
+
+                # 10 seconds of grace to allow the client to reconnect again
+                #   before we think they're gone
+                def _later():
+                    self.distributor.fire(
+                        "stopped_user_eventstream", auth_user
+                    )
+                    del self._stop_timer_per_user[auth_user]
+
+                self._stop_timer_per_user[auth_user] = (
+                    self.clock.call_later(5, _later)
+                )
+
+
+class EventHandler(BaseHandler):
 
+    @defer.inlineCallbacks
+    def get_event(self, user, event_id):
+        """Retrieve a single specified event.
+
+        Args:
+            user (synapse.types.UserID): The user requesting the event
+            event_id (str): The event ID to obtain.
+        Returns:
+            dict: An event, or None if there is no event matching this ID.
+        Raises:
+            SynapseError if there was a problem retrieving this event, or
+            AuthError if the user does not have the rights to inspect this
+            event.
+        """
+        event = yield self.store.get_event(event_id)
+
+        if not event:
+            defer.returnValue(None)
+            return
+
+        yield self.auth.check(event, raises=True)
+        defer.returnValue(event)
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
new file mode 100644
index 0000000000..9d38a7336e
--- /dev/null
+++ b/synapse/handlers/typing.py
@@ -0,0 +1,146 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 matrix.org
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+from ._base import BaseHandler
+
+import logging
+
+from collections import namedtuple
+
+
+logger = logging.getLogger(__name__)
+
+
+# A tiny object useful for storing a user's membership in a room, as a mapping
+# key
+RoomMember = namedtuple("RoomMember", ("room_id", "user"))
+
+
+class TypingNotificationHandler(BaseHandler):
+    def __init__(self, hs):
+        super(TypingNotificationHandler, self).__init__(hs)
+
+        self.homeserver = hs
+
+        self.clock = hs.get_clock()
+
+        self.federation = hs.get_replication_layer()
+
+        self.federation.register_edu_handler("m.typing", self._recv_edu)
+
+        self._member_typing_until = {}
+
+    @defer.inlineCallbacks
+    def started_typing(self, target_user, auth_user, room_id, timeout):
+        if not target_user.is_mine:
+            raise SynapseError(400, "User is not hosted on this Home Server")
+
+        if target_user != auth_user:
+            raise AuthError(400, "Cannot set another user's typing state")
+
+        until = self.clock.time_msec() + timeout
+        member = RoomMember(room_id=room_id, user=target_user)
+
+        was_present = member in self._member_typing_until
+
+        self._member_typing_until[member] = until
+
+        if was_present:
+            # No point sending another notification
+            defer.returnValue(None)
+
+        yield self._push_update(
+            room_id=room_id,
+            user=target_user,
+            typing=True,
+        )
+
+    @defer.inlineCallbacks
+    def stopped_typing(self, target_user, auth_user, room_id):
+        if not target_user.is_mine:
+            raise SynapseError(400, "User is not hosted on this Home Server")
+
+        if target_user != auth_user:
+            raise AuthError(400, "Cannot set another user's typing state")
+
+        member = RoomMember(room_id=room_id, user=target_user)
+
+        if member not in self._member_typing_until:
+            # No point
+            defer.returnValue(None)
+
+        yield self._push_update(
+            room_id=room_id,
+            user=target_user,
+            typing=False,
+        )
+
+    @defer.inlineCallbacks
+    def _push_update(self, room_id, user, typing):
+        localusers = set()
+        remotedomains = set()
+
+        rm_handler = self.homeserver.get_handlers().room_member_handler
+        yield rm_handler.fetch_room_distributions_into(room_id,
+                localusers=localusers, remotedomains=remotedomains,
+                ignore_user=user)
+
+        for u in localusers:
+            self.push_update_to_clients(
+                room_id=room_id,
+                observer_user=u,
+                observed_user=user,
+                typing=typing,
+            )
+
+        deferreds = []
+        for domain in remotedomains:
+            deferreds.append(self.federation.send_edu(
+                destination=domain,
+                edu_type="m.typing",
+                content={
+                    "room_id": room_id,
+                    "user_id": user.to_string(),
+                    "typing": typing,
+                },
+            ))
+
+        yield defer.DeferredList(deferreds, consumeErrors=False)
+
+    @defer.inlineCallbacks
+    def _recv_edu(self, origin, content):
+        room_id = content["room_id"]
+        user = self.homeserver.parse_userid(content["user_id"])
+
+        localusers = set()
+
+        rm_handler = self.homeserver.get_handlers().room_member_handler
+        yield rm_handler.fetch_room_distributions_into(room_id,
+                localusers=localusers)
+
+        for u in localusers:
+            self.push_update_to_clients(
+                room_id=room_id,
+                observer_user=u,
+                observed_user=user,
+                typing=content["typing"]
+            )
+
+    def push_update_to_clients(self, room_id, observer_user, observed_user,
+            typing):
+        # TODO(paul) steal this from presence.py
+        pass
diff --git a/synapse/rest/events.py b/synapse/rest/events.py
index 28da418498..2e7563d14b 100644
--- a/synapse/rest/events.py
+++ b/synapse/rest/events.py
@@ -48,6 +48,22 @@ class EventStreamRestServlet(RestServlet):
         return (200, {})
 
 
+# TODO: Unit test gets, with and without auth, with different kinds of events.
+class EventRestServlet(RestServlet):
+    PATTERN = client_path_pattern("/events/(?P<event_id>[^/]*)$")
+
+    @defer.inlineCallbacks
+    def on_GET(self, request, event_id):
+        auth_user = yield self.auth.get_user_by_req(request)
+        handler = self.handlers.event_handler
+        event = yield handler.get_event(auth_user, event_id)
+
+        if event:
+            defer.returnValue((200, event.get_dict()))
+        else:
+            defer.returnValue((404, "Event not found."))
+
 
 def register_servlets(hs, http_server):
     EventStreamRestServlet(hs).register(http_server)
+    EventRestServlet(hs).register(http_server)
diff --git a/synapse/rest/presence.py b/synapse/rest/presence.py
index 6043848595..e013b20853 100644
--- a/synapse/rest/presence.py
+++ b/synapse/rest/presence.py
@@ -68,7 +68,7 @@ class PresenceStatusRestServlet(RestServlet):
 
 
 class PresenceListRestServlet(RestServlet):
-    PATTERN = client_path_pattern("/presence_list/(?P<user_id>[^/]*)")
+    PATTERN = client_path_pattern("/presence/list/(?P<user_id>[^/]*)")
 
     @defer.inlineCallbacks
     def on_GET(self, request, user_id):
diff --git a/synapse/rest/room.py b/synapse/rest/room.py
index 9363acebed..f4c12191c8 100644
--- a/synapse/rest/room.py
+++ b/synapse/rest/room.py
@@ -18,11 +18,9 @@ from twisted.internet import defer
 
 from base import RestServlet, client_path_pattern
 from synapse.api.errors import SynapseError, Codes
-from synapse.api.events.room import (
-    MessageEvent, RoomMemberEvent, FeedbackEvent
-)
-from synapse.api.constants import Feedback
 from synapse.streams.config import PaginationConfig
+from synapse.api.events.room import RoomMemberEvent
+from synapse.api.constants import Membership
 
 import json
 import logging
@@ -36,31 +34,28 @@ class RoomCreateRestServlet(RestServlet):
     # No PATTERN; we have custom dispatch rules here
 
     def register(self, http_server):
-        # /rooms OR /rooms/<roomid>
-        http_server.register_path("POST",
-                                  client_path_pattern("/rooms$"),
-                                  self.on_POST)
-        http_server.register_path("PUT",
-                                  client_path_pattern(
-                                      "/rooms/(?P<room_id>[^/]*)$"),
-                                  self.on_PUT)
+        PATTERN = "/createRoom"
+        register_txn_path(self, PATTERN, http_server)
         # define CORS for all of /rooms in RoomCreateRestServlet for simplicity
         http_server.register_path("OPTIONS",
                                   client_path_pattern("/rooms(?:/.*)?$"),
                                   self.on_OPTIONS)
+        # define CORS for /createRoom[/txnid]
+        http_server.register_path("OPTIONS",
+                                  client_path_pattern("/createRoom(?:/.*)?$"),
+                                  self.on_OPTIONS)
 
     @defer.inlineCallbacks
-    def on_PUT(self, request, room_id):
-        room_id = urllib.unquote(room_id)
-        auth_user = yield self.auth.get_user_by_req(request)
+    def on_PUT(self, request, txn_id):
+        try:
+            defer.returnValue(self.txns.get_client_transaction(request, txn_id))
+        except KeyError:
+            pass
 
-        if not room_id:
-            raise SynapseError(400, "PUT must specify a room ID")
+        response = yield self.on_POST(request)
 
-        room_config = self.get_room_config(request)
-        info = yield self.make_room(room_config, auth_user, room_id)
-        room_config.update(info)
-        defer.returnValue((200, info))
+        self.txns.store_client_transaction(request, txn_id, response)
+        defer.returnValue(response)
 
     @defer.inlineCallbacks
     def on_POST(self, request):
@@ -210,24 +205,63 @@ class RoomSendEventRestServlet(RestServlet):
         defer.returnValue(response)
 
 
+# TODO: Needs unit testing for room ID + alias joins
 class JoinRoomAliasServlet(RestServlet):
-    PATTERN = client_path_pattern("/join/(?P<room_alias>[^/]+)$")
+
+    def register(self, http_server):
+        # /join/$room_identifier[/$txn_id]
+        PATTERN = ("/join/(?P<room_identifier>[^/]*)")
+        register_txn_path(self, PATTERN, http_server)
 
     @defer.inlineCallbacks
-    def on_PUT(self, request, room_alias):
+    def on_POST(self, request, room_identifier):
         user = yield self.auth.get_user_by_req(request)
 
-        if not user:
-            defer.returnValue((403, "Unrecognized user"))
+        # the identifier could be a room alias or a room id. Try one then the
+        # other if it fails to parse, without swallowing other valid
+        # SynapseErrors.
 
-        logger.debug("room_alias: %s", room_alias)
+        identifier = None
+        is_room_alias = False
+        try:
+            identifier = self.hs.parse_roomalias(
+                urllib.unquote(room_identifier)
+            )
+            is_room_alias = True
+        except SynapseError:
+            identifier = self.hs.parse_roomid(
+                urllib.unquote(room_identifier)
+            )
 
-        room_alias = self.hs.parse_roomalias(urllib.unquote(room_alias))
+        # TODO: Support for specifying the home server to join with?
 
-        handler = self.handlers.room_member_handler
-        ret_dict = yield handler.join_room_alias(user, room_alias)
+        if is_room_alias:
+            handler = self.handlers.room_member_handler
+            ret_dict = yield handler.join_room_alias(user, identifier)
+            defer.returnValue((200, ret_dict))
+        else:  # room id
+            event = self.event_factory.create_event(
+                etype=RoomMemberEvent.TYPE,
+                content={"membership": Membership.JOIN},
+                room_id=urllib.unquote(identifier.to_string()),
+                user_id=user.to_string(),
+                state_key=user.to_string()
+            )
+            handler = self.handlers.room_member_handler
+            yield handler.change_membership(event)
+            defer.returnValue((200, ""))
 
-        defer.returnValue((200, ret_dict))
+    @defer.inlineCallbacks
+    def on_PUT(self, request, room_identifier, txn_id):
+        try:
+            defer.returnValue(self.txns.get_client_transaction(request, txn_id))
+        except KeyError:
+            pass
+
+        response = yield self.on_POST(request, room_identifier)
+
+        self.txns.store_client_transaction(request, txn_id, response)
+        defer.returnValue(response)
 
 
 # TODO: Needs unit testing
diff --git a/synapse/server.py b/synapse/server.py
index 5e9b76603a..c29c61220d 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -28,7 +28,7 @@ from synapse.handlers import Handlers
 from synapse.rest import RestServletFactory
 from synapse.state import StateHandler
 from synapse.storage import DataStore
-from synapse.types import UserID, RoomAlias
+from synapse.types import UserID, RoomAlias, RoomID
 from synapse.util import Clock
 from synapse.util.distributor import Distributor
 from synapse.util.lockutils import LockManager
@@ -119,17 +119,30 @@ class BaseHomeServer(object):
 
         setattr(BaseHomeServer, "get_%s" % (depname), _get)
 
+    # TODO: Why are these parse_ methods so high up along with other globals?
+    # Surely these should be in a util package or in the api package?
+
     # Other utility methods
     def parse_userid(self, s):
         """Parse the string given by 's' as a User ID and return a UserID
         object."""
         return UserID.from_string(s, hs=self)
 
+    def parse_roomid(self, s):
+        """Parse the string given by 's' as a Room ID and return a RoomID
+        object."""
+        return RoomID.from_string(s, hs=self)
+
     def parse_roomalias(self, s):
         """Parse the string given by 's' as a Room Alias and return a RoomAlias
         object."""
         return RoomAlias.from_string(s, hs=self)
 
+    def parse_roomid(self, s):
+        """Parse the string given by 's' as a Room ID and return a RoomID
+        object."""
+        return RoomID.from_string(s, hs=self)
+
 # Build magic accessors for every dependency
 for depname in BaseHomeServer.DEPENDENCIES:
     BaseHomeServer._make_dependency_method(depname)
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index a97a42e1e3..38ab03c45c 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -80,7 +80,6 @@ class DataStore(RoomMemberStore, RoomStore,
             [
                 "event_id",
                 "type",
-                "sender",
                 "room_id",
                 "content",
                 "unrecognized_keys"