diff options
author | Erik Johnston <erik@matrix.org> | 2016-11-17 15:46:44 +0000 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2016-11-17 15:48:04 +0000 |
commit | f8ee66250a16cb9dd3af01fb1150ff18cfebbc39 (patch) | |
tree | 9920bd4e8164f705b4e27c714d6c053082dcf7a5 /synapse/storage/stream.py | |
parent | Hook up the send queue and create a federation sender worker (diff) | |
download | synapse-f8ee66250a16cb9dd3af01fb1150ff18cfebbc39.tar.xz |
Handle sending events and device messages over federation
Diffstat (limited to 'synapse/storage/stream.py')
-rw-r--r-- | synapse/storage/stream.py | 31 |
1 files changed, 31 insertions, 0 deletions
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 888b1cb35d..f34cb78f9a 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -765,3 +765,34 @@ class StreamStore(SQLBaseStore): "token": end_token, }, } + + @defer.inlineCallbacks + def get_all_new_events_stream(self, from_id, current_id, limit): + """Get all new events""" + + def get_all_new_events_stream_txn(txn): + sql = ( + "SELECT e.stream_ordering, e.event_id" + " FROM events AS e" + " WHERE" + " ? < e.stream_ordering AND e.stream_ordering <= ?" + " ORDER BY e.stream_ordering ASC" + " LIMIT ?" + ) + + txn.execute(sql, (from_id, current_id, limit)) + rows = txn.fetchall() + + upper_bound = current_id + if len(rows) == limit: + upper_bound = rows[-1][0] + + return upper_bound, [row[1] for row in rows] + + upper_bound, event_ids = yield self.runInteraction( + "get_all_new_events_stream", get_all_new_events_stream_txn, + ) + + events = yield self._get_events(event_ids) + + defer.returnValue((upper_bound, events)) |