diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 888b1cb35d..7fa63b58a7 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -765,3 +765,50 @@ class StreamStore(SQLBaseStore):
"token": end_token,
},
}
+
+ @defer.inlineCallbacks
+ def get_all_new_events_stream(self, from_id, current_id, limit):
+ """Get all new events"""
+
+ def get_all_new_events_stream_txn(txn):
+ sql = (
+ "SELECT e.stream_ordering, e.event_id"
+ " FROM events AS e"
+ " WHERE"
+ " ? < e.stream_ordering AND e.stream_ordering <= ?"
+ " ORDER BY e.stream_ordering ASC"
+ " LIMIT ?"
+ )
+
+ txn.execute(sql, (from_id, current_id, limit))
+ rows = txn.fetchall()
+
+ upper_bound = current_id
+ if len(rows) == limit:
+ upper_bound = rows[-1][0]
+
+ return upper_bound, [row[1] for row in rows]
+
+ upper_bound, event_ids = yield self.runInteraction(
+ "get_all_new_events_stream", get_all_new_events_stream_txn,
+ )
+
+ events = yield self._get_events(event_ids)
+
+ defer.returnValue((upper_bound, events))
+
+ def get_federation_out_pos(self, typ):
+ return self._simple_select_one_onecol(
+ table="federation_stream_position",
+ retcol="stream_id",
+ keyvalues={"type": typ},
+ desc="get_federation_out_pos"
+ )
+
+ def update_federation_out_pos(self, typ, stream_id):
+ return self._simple_update_one(
+ table="federation_stream_position",
+ keyvalues={"type": typ},
+ updatevalues={"stream_id": stream_id},
+ desc="update_federation_out_pos",
+ )
|