summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/handlers/room.py4
-rw-r--r--synapse/storage/stream.py62
2 files changed, 58 insertions, 8 deletions
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index a8b0c95636..80f7ee3f12 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -566,7 +566,9 @@ class RoomEventSource(object):
 
         to_key = yield self.get_current_key()
 
-        app_service = self.store.get_app_service_by_user_id(user.to_string())
+        app_service = yield self.store.get_app_service_by_user_id(
+            user.to_string()
+        )
         if app_service:
             events, end_key = yield self.store.get_appservice_room_stream(
                 service=app_service,
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 3c8f3320f1..6946e9fe70 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -128,25 +128,73 @@ class _StreamToken(namedtuple("_StreamToken", "topological stream")):
 
 class StreamStore(SQLBaseStore):
 
+    @defer.inlineCallbacks
     def get_appservice_room_stream(self, service, from_key, to_key, limit=0):
         # NB this lives here instead of appservice.py so we can reuse the
         # 'private' StreamToken class in this file.
-        logger.info("get_appservice_room_stream -> %s", service)
-
         if limit:
             limit = max(limit, MAX_STREAM_SIZE)
         else:
             limit = MAX_STREAM_SIZE
 
         # From and to keys should be integers from ordering.
-        # from_id = _StreamToken.parse_stream_token(from_key)
-        # to_id = _StreamToken.parse_stream_token(to_key)
+        from_id = _StreamToken.parse_stream_token(from_key)
+        to_id = _StreamToken.parse_stream_token(to_key)
 
         if from_key == to_key:
-            return defer.succeed(([], to_key))
+            defer.returnValue(([], to_key))
+            return
+
+        # Logic:
+        #  - We want ALL events which match the AS room_id regex
+        #  - We want ALL events which match the rooms represented by the AS
+        #    room_alias regex
+        #  - We want ALL events for rooms that AS users have joined.
+        # This is currently supported via get_app_service_rooms (which is used
+        # for the Notifier listener rooms). We can't reasonably make a SQL
+        # query for these room IDs, so we'll pull all the events between from/to
+        # and filter in python.
+        rooms_for_as = yield self.get_app_service_rooms(service)
+        room_ids_for_as = [r.room_id for r in rooms_for_as]
+
+        # select all the events between from/to with a sensible limit
+        sql = (
+            "SELECT e.event_id, e.room_id, e.stream_ordering FROM events AS e "
+            "WHERE e.stream_ordering > ? AND e.stream_ordering <= ? "
+            "ORDER BY stream_ordering ASC LIMIT %(limit)d "
+        ) % {
+            "limit": limit
+        }
+
+        def f(txn):
+            txn.execute(sql, (from_id.stream, to_id.stream,))
+
+            rows = self.cursor_to_dict(txn)
+
+            ret = self._get_events_txn(
+                txn,
+                # apply the filter on the room id list
+                [
+                    r["event_id"] for r in rows
+                    if r["room_id"] in room_ids_for_as
+                ],
+                get_prev_content=True
+            )
+
+            self._set_before_and_after(ret, rows)
+
+            if rows:
+                key = "s%d" % max([r["stream_ordering"] for r in rows])
+
+            else:
+                # Assume we didn't get anything because there was nothing to
+                # get.
+                key = to_key
+
+            return ret, key
 
-        # TODO stub
-        return defer.succeed(([], to_key))
+        results = yield self.runInteraction("get_appservice_room_stream", f)
+        defer.returnValue(results)
 
     @log_function
     def get_room_events_stream(self, user_id, from_key, to_key, room_id,