diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index a8b0c95636..80f7ee3f12 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -566,7 +566,9 @@ class RoomEventSource(object):
to_key = yield self.get_current_key()
- app_service = self.store.get_app_service_by_user_id(user.to_string())
+ app_service = yield self.store.get_app_service_by_user_id(
+ user.to_string()
+ )
if app_service:
events, end_key = yield self.store.get_appservice_room_stream(
service=app_service,
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 3c8f3320f1..6946e9fe70 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -128,25 +128,73 @@ class _StreamToken(namedtuple("_StreamToken", "topological stream")):
class StreamStore(SQLBaseStore):
+ @defer.inlineCallbacks
def get_appservice_room_stream(self, service, from_key, to_key, limit=0):
# NB this lives here instead of appservice.py so we can reuse the
# 'private' StreamToken class in this file.
- logger.info("get_appservice_room_stream -> %s", service)
-
if limit:
limit = max(limit, MAX_STREAM_SIZE)
else:
limit = MAX_STREAM_SIZE
# From and to keys should be integers from ordering.
- # from_id = _StreamToken.parse_stream_token(from_key)
- # to_id = _StreamToken.parse_stream_token(to_key)
+ from_id = _StreamToken.parse_stream_token(from_key)
+ to_id = _StreamToken.parse_stream_token(to_key)
if from_key == to_key:
- return defer.succeed(([], to_key))
+ defer.returnValue(([], to_key))
+ return
+
+ # Logic:
+ # - We want ALL events which match the AS room_id regex
+ # - We want ALL events which match the rooms represented by the AS
+ # room_alias regex
+ # - We want ALL events for rooms that AS users have joined.
+ # This is currently supported via get_app_service_rooms (which is used
+ # for the Notifier listener rooms). We can't reasonably make a SQL
+ # query for these room IDs, so we'll pull all the events between from/to
+ # and filter in python.
+ rooms_for_as = yield self.get_app_service_rooms(service)
+ room_ids_for_as = [r.room_id for r in rooms_for_as]
+
+ # select all the events between from/to with a sensible limit
+ sql = (
+ "SELECT e.event_id, e.room_id, e.stream_ordering FROM events AS e "
+ "WHERE e.stream_ordering > ? AND e.stream_ordering <= ? "
+ "ORDER BY stream_ordering ASC LIMIT %(limit)d "
+ ) % {
+ "limit": limit
+ }
+
+ def f(txn):
+ txn.execute(sql, (from_id.stream, to_id.stream,))
+
+ rows = self.cursor_to_dict(txn)
+
+ ret = self._get_events_txn(
+ txn,
+ # apply the filter on the room id list
+ [
+ r["event_id"] for r in rows
+ if r["room_id"] in room_ids_for_as
+ ],
+ get_prev_content=True
+ )
+
+ self._set_before_and_after(ret, rows)
+
+ if rows:
+ key = "s%d" % max([r["stream_ordering"] for r in rows])
+
+ else:
+ # Assume we didn't get anything because there was nothing to
+ # get.
+ key = to_key
+
+ return ret, key
- # TODO stub
- return defer.succeed(([], to_key))
+ results = yield self.runInteraction("get_appservice_room_stream", f)
+ defer.returnValue(results)
@log_function
def get_room_events_stream(self, user_id, from_key, to_key, room_id,
|