summary refs log tree commit diff
path: root/synapse/storage/stream.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/stream.py')
-rw-r--r--synapse/storage/stream.py62
1 files changed, 55 insertions, 7 deletions
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 3c8f3320f1..6946e9fe70 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -128,25 +128,73 @@ class _StreamToken(namedtuple("_StreamToken", "topological stream")):
 
 class StreamStore(SQLBaseStore):
 
+    @defer.inlineCallbacks
     def get_appservice_room_stream(self, service, from_key, to_key, limit=0):
         # NB this lives here instead of appservice.py so we can reuse the
         # 'private' StreamToken class in this file.
-        logger.info("get_appservice_room_stream -> %s", service)
-
         if limit:
             limit = max(limit, MAX_STREAM_SIZE)
         else:
             limit = MAX_STREAM_SIZE
 
         # From and to keys should be integers from ordering.
-        # from_id = _StreamToken.parse_stream_token(from_key)
-        # to_id = _StreamToken.parse_stream_token(to_key)
+        from_id = _StreamToken.parse_stream_token(from_key)
+        to_id = _StreamToken.parse_stream_token(to_key)
 
         if from_key == to_key:
-            return defer.succeed(([], to_key))
+            defer.returnValue(([], to_key))
+            return
+
+        # Logic:
+        #  - We want ALL events which match the AS room_id regex
+        #  - We want ALL events which match the rooms represented by the AS
+        #    room_alias regex
+        #  - We want ALL events for rooms that AS users have joined.
+        # This is currently supported via get_app_service_rooms (which is used
+        # for the Notifier listener rooms). We can't reasonably make a SQL
+        # query for these room IDs, so we'll pull all the events between from/to
+        # and filter in python.
+        rooms_for_as = yield self.get_app_service_rooms(service)
+        room_ids_for_as = [r.room_id for r in rooms_for_as]
+
+        # select all the events between from/to with a sensible limit
+        sql = (
+            "SELECT e.event_id, e.room_id, e.stream_ordering FROM events AS e "
+            "WHERE e.stream_ordering > ? AND e.stream_ordering <= ? "
+            "ORDER BY stream_ordering ASC LIMIT %(limit)d "
+        ) % {
+            "limit": limit
+        }
+
+        def f(txn):
+            txn.execute(sql, (from_id.stream, to_id.stream,))
+
+            rows = self.cursor_to_dict(txn)
+
+            ret = self._get_events_txn(
+                txn,
+                # apply the filter on the room id list
+                [
+                    r["event_id"] for r in rows
+                    if r["room_id"] in room_ids_for_as
+                ],
+                get_prev_content=True
+            )
+
+            self._set_before_and_after(ret, rows)
+
+            if rows:
+                key = "s%d" % max([r["stream_ordering"] for r in rows])
+
+            else:
+                # Assume we didn't get anything because there was nothing to
+                # get.
+                key = to_key
+
+            return ret, key
 
-        # TODO stub
-        return defer.succeed(([], to_key))
+        results = yield self.runInteraction("get_appservice_room_stream", f)
+        defer.returnValue(results)
 
     @log_function
     def get_room_events_stream(self, user_id, from_key, to_key, room_id,