summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2015-03-02 13:39:22 +0000
committerErik Johnston <erik@matrix.org>2015-03-02 13:39:22 +0000
commit4195e55ccc0f5751b13136b8c5f60686a902b79d (patch)
treed9e1596e1ecf6cfd6a3e7152a20748496a9e0890
parentUse contextlib.contextmanager instead of a custom class (diff)
parentMerge pull request #95 from matrix-org/serialize_transaction_processing (diff)
downloadsynapse-4195e55ccc0f5751b13136b8c5f60686a902b79d.tar.xz
Merge branch 'develop' of github.com:matrix-org/synapse into federation_rate_limit
-rw-r--r--CHANGES.rst2
-rw-r--r--synapse/federation/federation_server.py32
-rw-r--r--synapse/handlers/events.py3
-rw-r--r--synapse/handlers/room.py34
-rw-r--r--synapse/notifier.py30
-rw-r--r--synapse/storage/_base.py22
-rw-r--r--synapse/storage/appservice.py98
-rw-r--r--synapse/storage/roommember.py36
-rw-r--r--synapse/storage/stream.py83
-rw-r--r--tests/rest/client/v1/test_presence.py3
10 files changed, 284 insertions, 59 deletions
diff --git a/CHANGES.rst b/CHANGES.rst
index 9e36f6232d..ecc30b4672 100644
--- a/CHANGES.rst
+++ b/CHANGES.rst
@@ -4,6 +4,8 @@ Changes in synapse vx.x.x (x-x-x)
 * Add support for registration fallback. This is a page hosted on the server
   which allows a user to register for an account, regardless of what client
   they are using (e.g. mobile devices).
+* Application services can now poll on the CS API ``/events`` for their events,
+  by providing their application service ``access_token``.
 
 Changes in synapse v0.7.1 (2015-02-19)
 ======================================
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 22b9663831..bc9bac809a 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -112,17 +112,19 @@ class FederationServer(FederationBase):
         logger.debug("[%s] Transaction is new", transaction.transaction_id)
 
         with PreserveLoggingContext():
-            dl = []
+            results = []
+
             for pdu in pdu_list:
                 d = self._handle_new_pdu(transaction.origin, pdu)
 
-                def handle_failure(failure):
-                    failure.trap(FederationError)
-                    self.send_failure(failure.value, transaction.origin)
-
-                d.addErrback(handle_failure)
-
-                dl.append(d)
+                try:
+                    yield d
+                    results.append({})
+                except FederationError as e:
+                    self.send_failure(e, transaction.origin)
+                    results.append({"error": str(e)})
+                except Exception as e:
+                    results.append({"error": str(e)})
 
             if hasattr(transaction, "edus"):
                 for edu in [Edu(**x) for x in transaction.edus]:
@@ -135,21 +137,11 @@ class FederationServer(FederationBase):
             for failure in getattr(transaction, "pdu_failures", []):
                 logger.info("Got failure %r", failure)
 
-            results = yield defer.DeferredList(dl, consumeErrors=True)
-
-        ret = []
-        for r in results:
-            if r[0]:
-                ret.append({})
-            else:
-                logger.exception(r[1])
-                ret.append({"error": str(r[1].value)})
-
-        logger.debug("Returning: %s", str(ret))
+        logger.debug("Returning: %s", str(results))
 
         response = {
             "pdus": dict(zip(
-                (p.event_id for p in pdu_list), ret
+                (p.event_id for p in pdu_list), results
             )),
         }
 
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 025e7e7e62..8d5f5c8499 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -69,9 +69,6 @@ class EventStreamHandler(BaseHandler):
                         )
                 self._streams_per_user[auth_user] += 1
 
-            if pagin_config.from_token is None:
-                pagin_config.from_token = None
-
             rm_handler = self.hs.get_handlers().room_member_handler
             room_ids = yield rm_handler.get_rooms_for_user(auth_user)
 
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 914742d913..80f7ee3f12 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -510,9 +510,16 @@ class RoomMemberHandler(BaseHandler):
     def get_rooms_for_user(self, user, membership_list=[Membership.JOIN]):
         """Returns a list of roomids that the user has any of the given
         membership states in."""
-        rooms = yield self.store.get_rooms_for_user_where_membership_is(
-            user_id=user.to_string(), membership_list=membership_list
+
+        app_service = yield self.store.get_app_service_by_user_id(
+            user.to_string()
         )
+        if app_service:
+            rooms = yield self.store.get_app_service_rooms(app_service)
+        else:
+            rooms = yield self.store.get_rooms_for_user_where_membership_is(
+                user_id=user.to_string(), membership_list=membership_list
+            )
 
         # For some reason the list of events contains duplicates
         # TODO(paul): work out why because I really don't think it should
@@ -559,13 +566,24 @@ class RoomEventSource(object):
 
         to_key = yield self.get_current_key()
 
-        events, end_key = yield self.store.get_room_events_stream(
-            user_id=user.to_string(),
-            from_key=from_key,
-            to_key=to_key,
-            room_id=None,
-            limit=limit,
+        app_service = yield self.store.get_app_service_by_user_id(
+            user.to_string()
         )
+        if app_service:
+            events, end_key = yield self.store.get_appservice_room_stream(
+                service=app_service,
+                from_key=from_key,
+                to_key=to_key,
+                limit=limit,
+            )
+        else:
+            events, end_key = yield self.store.get_room_events_stream(
+                user_id=user.to_string(),
+                from_key=from_key,
+                to_key=to_key,
+                room_id=None,
+                limit=limit,
+            )
 
         defer.returnValue((events, end_key))
 
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 2475f3ffbe..09d23e79b8 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -36,8 +36,10 @@ class _NotificationListener(object):
     so that it can remove itself from the indexes in the Notifier class.
     """
 
-    def __init__(self, user, rooms, from_token, limit, timeout, deferred):
+    def __init__(self, user, rooms, from_token, limit, timeout, deferred,
+                 appservice=None):
         self.user = user
+        self.appservice = appservice
         self.from_token = from_token
         self.limit = limit
         self.timeout = timeout
@@ -65,6 +67,10 @@ class _NotificationListener(object):
             lst.discard(self)
 
         notifier.user_to_listeners.get(self.user, set()).discard(self)
+        if self.appservice:
+            notifier.appservice_to_listeners.get(
+                self.appservice, set()
+            ).discard(self)
 
 
 class Notifier(object):
@@ -79,6 +85,7 @@ class Notifier(object):
 
         self.rooms_to_listeners = {}
         self.user_to_listeners = {}
+        self.appservice_to_listeners = {}
 
         self.event_sources = hs.get_event_sources()
 
@@ -114,6 +121,17 @@ class Notifier(object):
         for user in extra_users:
             listeners |= self.user_to_listeners.get(user, set()).copy()
 
+        for appservice in self.appservice_to_listeners:
+            # TODO (kegan): Redundant appservice listener checks?
+            # App services will already be in the rooms_to_listeners set, but
+            # that isn't enough. They need to be checked here in order to
+            # receive *invites* for users they are interested in. Does this
+            # make the rooms_to_listeners check somewhat obselete?
+            if appservice.is_interested(event):
+                listeners |= self.appservice_to_listeners.get(
+                    appservice, set()
+                ).copy()
+
         logger.debug("on_new_room_event listeners %s", listeners)
 
         # TODO (erikj): Can we make this more efficient by hitting the
@@ -280,6 +298,10 @@ class Notifier(object):
         if not from_token:
             from_token = yield self.event_sources.get_current_token()
 
+        appservice = yield self.hs.get_datastore().get_app_service_by_user_id(
+            user.to_string()
+        )
+
         listener = _NotificationListener(
             user,
             rooms,
@@ -287,6 +309,7 @@ class Notifier(object):
             limit,
             timeout,
             deferred,
+            appservice=appservice
         )
 
         def _timeout_listener():
@@ -319,6 +342,11 @@ class Notifier(object):
 
         self.user_to_listeners.setdefault(listener.user, set()).add(listener)
 
+        if listener.appservice:
+            self.appservice_to_listeners.setdefault(
+                listener.appservice, set()
+            ).add(listener)
+
     @defer.inlineCallbacks
     @log_function
     def _check_for_updates(self, listener):
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index c98dd36aed..3725c9795d 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -450,7 +450,8 @@ class SQLBaseStore(object):
 
         Args:
             table : string giving the table name
-            keyvalues : dict of column names and values to select the rows with
+            keyvalues : dict of column names and values to select the rows with,
+            or None to not apply a WHERE clause.
             retcols : list of strings giving the names of the columns to return
         """
         return self.runInteraction(
@@ -469,13 +470,20 @@ class SQLBaseStore(object):
             keyvalues : dict of column names and values to select the rows with
             retcols : list of strings giving the names of the columns to return
         """
-        sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % (
-            ", ".join(retcols),
-            table,
-            " AND ".join("%s = ?" % (k, ) for k in keyvalues)
-        )
+        if keyvalues:
+            sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % (
+                ", ".join(retcols),
+                table,
+                " AND ".join("%s = ?" % (k, ) for k in keyvalues)
+            )
+            txn.execute(sql, keyvalues.values())
+        else:
+            sql = "SELECT %s FROM %s ORDER BY rowid asc" % (
+                ", ".join(retcols),
+                table
+            )
+            txn.execute(sql)
 
-        txn.execute(sql, keyvalues.values())
         return self.cursor_to_dict(txn)
 
     def _simple_update_one(self, table, keyvalues, updatevalues,
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index dc3666efd4..97481d113b 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -15,8 +15,10 @@
 import logging
 from twisted.internet import defer
 
+from synapse.api.constants import Membership
 from synapse.api.errors import StoreError
 from synapse.appservice import ApplicationService
+from synapse.storage.roommember import RoomsForUser
 from ._base import SQLBaseStore
 
 
@@ -151,8 +153,31 @@ class ApplicationServiceStore(SQLBaseStore):
         defer.returnValue(self.services_cache)
 
     @defer.inlineCallbacks
+    def get_app_service_by_user_id(self, user_id):
+        """Retrieve an application service from their user ID.
+
+        All application services have associated with them a particular user ID.
+        There is no distinguishing feature on the user ID which indicates it
+        represents an application service. This function allows you to map from
+        a user ID to an application service.
+
+        Args:
+            user_id(str): The user ID to see if it is an application service.
+        Returns:
+            synapse.appservice.ApplicationService or None.
+        """
+
+        yield self.cache_defer  # make sure the cache is ready
+
+        for service in self.services_cache:
+            if service.sender == user_id:
+                defer.returnValue(service)
+                return
+        defer.returnValue(None)
+
+    @defer.inlineCallbacks
     def get_app_service_by_token(self, token, from_cache=True):
-        """Get the application service with the given token.
+        """Get the application service with the given appservice token.
 
         Args:
             token (str): The application service token.
@@ -173,6 +198,77 @@ class ApplicationServiceStore(SQLBaseStore):
         # TODO: The from_cache=False impl
         # TODO: This should be JOINed with the application_services_regex table.
 
+    def get_app_service_rooms(self, service):
+        """Get a list of RoomsForUser for this application service.
+
+        Application services may be "interested" in lots of rooms depending on
+        the room ID, the room aliases, or the members in the room. This function
+        takes all of these into account and returns a list of RoomsForUser which
+        represent the entire list of room IDs that this application service
+        wants to know about.
+
+        Args:
+            service: The application service to get a room list for.
+        Returns:
+            A list of RoomsForUser.
+        """
+        return self.runInteraction(
+            "get_app_service_rooms",
+            self._get_app_service_rooms_txn,
+            service,
+        )
+
+    def _get_app_service_rooms_txn(self, txn, service):
+        # get all rooms matching the room ID regex.
+        room_entries = self._simple_select_list_txn(
+            txn=txn, table="rooms", keyvalues=None, retcols=["room_id"]
+        )
+        matching_room_list = set([
+            r["room_id"] for r in room_entries if
+            service.is_interested_in_room(r["room_id"])
+        ])
+
+        # resolve room IDs for matching room alias regex.
+        room_alias_mappings = self._simple_select_list_txn(
+            txn=txn, table="room_aliases", keyvalues=None,
+            retcols=["room_id", "room_alias"]
+        )
+        matching_room_list |= set([
+            r["room_id"] for r in room_alias_mappings if
+            service.is_interested_in_alias(r["room_alias"])
+        ])
+
+        # get all rooms for every user for this AS. This is scoped to users on
+        # this HS only.
+        user_list = self._simple_select_list_txn(
+            txn=txn, table="users", keyvalues=None, retcols=["name"]
+        )
+        user_list = [
+            u["name"] for u in user_list if
+            service.is_interested_in_user(u["name"])
+        ]
+        rooms_for_user_matching_user_id = set()  # RoomsForUser list
+        for user_id in user_list:
+            # FIXME: This assumes this store is linked with RoomMemberStore :(
+            rooms_for_user = self._get_rooms_for_user_where_membership_is_txn(
+                txn=txn,
+                user_id=user_id,
+                membership_list=[Membership.JOIN]
+            )
+            rooms_for_user_matching_user_id |= set(rooms_for_user)
+
+        # make RoomsForUser tuples for room ids and aliases which are not in the
+        # main rooms_for_user_list - e.g. they are rooms which do not have AS
+        # registered users in it.
+        known_room_ids = [r.room_id for r in rooms_for_user_matching_user_id]
+        missing_rooms_for_user = [
+            RoomsForUser(r, service.sender, "join") for r in
+            matching_room_list if r not in known_room_ids
+        ]
+        rooms_for_user_matching_user_id |= set(missing_rooms_for_user)
+
+        return rooms_for_user_matching_user_id
+
     @defer.inlineCallbacks
     def _populate_cache(self):
         """Populates the ApplicationServiceCache from the database."""
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 58aa376c20..65ffb4627f 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -180,6 +180,14 @@ class RoomMemberStore(SQLBaseStore):
         if not membership_list:
             return defer.succeed(None)
 
+        return self.runInteraction(
+            "get_rooms_for_user_where_membership_is",
+            self._get_rooms_for_user_where_membership_is_txn,
+            user_id, membership_list
+        )
+
+    def _get_rooms_for_user_where_membership_is_txn(self, txn, user_id,
+                                                    membership_list):
         where_clause = "user_id = ? AND (%s)" % (
             " OR ".join(["membership = ?" for _ in membership_list]),
         )
@@ -187,24 +195,18 @@ class RoomMemberStore(SQLBaseStore):
         args = [user_id]
         args.extend(membership_list)
 
-        def f(txn):
-            sql = (
-                "SELECT m.room_id, m.sender, m.membership"
-                " FROM room_memberships as m"
-                " INNER JOIN current_state_events as c"
-                " ON m.event_id = c.event_id"
-                " WHERE %s"
-            ) % (where_clause,)
-
-            txn.execute(sql, args)
-            return [
-                RoomsForUser(**r) for r in self.cursor_to_dict(txn)
-            ]
+        sql = (
+            "SELECT m.room_id, m.sender, m.membership"
+            " FROM room_memberships as m"
+            " INNER JOIN current_state_events as c"
+            " ON m.event_id = c.event_id"
+            " WHERE %s"
+        ) % (where_clause,)
 
-        return self.runInteraction(
-            "get_rooms_for_user_where_membership_is",
-            f
-        )
+        txn.execute(sql, args)
+        return [
+            RoomsForUser(**r) for r in self.cursor_to_dict(txn)
+        ]
 
     def get_joined_hosts_for_room(self, room_id):
         return self._simple_select_onecol(
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 3ccb6f8a61..09bc522210 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -36,6 +36,7 @@ what sort order was used:
 from twisted.internet import defer
 
 from ._base import SQLBaseStore
+from synapse.api.constants import EventTypes
 from synapse.api.errors import SynapseError
 from synapse.util.logutils import log_function
 
@@ -127,6 +128,85 @@ class _StreamToken(namedtuple("_StreamToken", "topological stream")):
 
 
 class StreamStore(SQLBaseStore):
+
+    @defer.inlineCallbacks
+    def get_appservice_room_stream(self, service, from_key, to_key, limit=0):
+        # NB this lives here instead of appservice.py so we can reuse the
+        # 'private' StreamToken class in this file.
+        if limit:
+            limit = max(limit, MAX_STREAM_SIZE)
+        else:
+            limit = MAX_STREAM_SIZE
+
+        # From and to keys should be integers from ordering.
+        from_id = _StreamToken.parse_stream_token(from_key)
+        to_id = _StreamToken.parse_stream_token(to_key)
+
+        if from_key == to_key:
+            defer.returnValue(([], to_key))
+            return
+
+        # select all the events between from/to with a sensible limit
+        sql = (
+            "SELECT e.event_id, e.room_id, e.type, s.state_key, "
+            "e.stream_ordering FROM events AS e LEFT JOIN state_events as s ON "
+            "e.event_id = s.event_id "
+            "WHERE e.stream_ordering > ? AND e.stream_ordering <= ? "
+            "ORDER BY stream_ordering ASC LIMIT %(limit)d "
+        ) % {
+            "limit": limit
+        }
+
+        def f(txn):
+            # pull out all the events between the tokens
+            txn.execute(sql, (from_id.stream, to_id.stream,))
+            rows = self.cursor_to_dict(txn)
+
+            # Logic:
+            #  - We want ALL events which match the AS room_id regex
+            #  - We want ALL events which match the rooms represented by the AS
+            #    room_alias regex
+            #  - We want ALL events for rooms that AS users have joined.
+            # This is currently supported via get_app_service_rooms (which is
+            # used for the Notifier listener rooms). We can't reasonably make a
+            # SQL query for these room IDs, so we'll pull all the events between
+            # from/to and filter in python.
+            rooms_for_as = self._get_app_service_rooms_txn(txn, service)
+            room_ids_for_as = [r.room_id for r in rooms_for_as]
+
+            def app_service_interested(row):
+                if row["room_id"] in room_ids_for_as:
+                    return True
+
+                if row["type"] == EventTypes.Member:
+                    if service.is_interested_in_user(row.get("state_key")):
+                        return True
+                return False
+
+            ret = self._get_events_txn(
+                txn,
+                # apply the filter on the room id list
+                [
+                    r["event_id"] for r in rows
+                    if app_service_interested(r)
+                ],
+                get_prev_content=True
+            )
+
+            self._set_before_and_after(ret, rows)
+
+            if rows:
+                key = "s%d" % max(r["stream_ordering"] for r in rows)
+            else:
+                # Assume we didn't get anything because there was nothing to
+                # get.
+                key = to_key
+
+            return ret, key
+
+        results = yield self.runInteraction("get_appservice_room_stream", f)
+        defer.returnValue(results)
+
     @log_function
     def get_room_events_stream(self, user_id, from_key, to_key, room_id,
                                limit=0, with_feedback=False):
@@ -184,8 +264,7 @@ class StreamStore(SQLBaseStore):
             self._set_before_and_after(ret, rows)
 
             if rows:
-                key = "s%d" % max([r["stream_ordering"] for r in rows])
-
+                key = "s%d" % max(r["stream_ordering"] for r in rows)
             else:
                 # Assume we didn't get anything because there was nothing to
                 # get.
diff --git a/tests/rest/client/v1/test_presence.py b/tests/rest/client/v1/test_presence.py
index 7c5df5c116..5f2ef64efc 100644
--- a/tests/rest/client/v1/test_presence.py
+++ b/tests/rest/client/v1/test_presence.py
@@ -295,6 +295,9 @@ class PresenceEventStreamTestCase(unittest.TestCase):
 
         self.mock_datastore = hs.get_datastore()
         self.mock_datastore.get_app_service_by_token = Mock(return_value=None)
+        self.mock_datastore.get_app_service_by_user_id = Mock(
+            return_value=defer.succeed(None)
+        )
 
         def get_profile_displayname(user_id):
             return defer.succeed("Frank")