diff options
author | Erik Johnston <erikj@jki.re> | 2016-08-18 12:59:56 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-08-18 12:59:56 +0100 |
commit | 6762a7268c049d254a2543bcbf20236735a31880 (patch) | |
tree | cf6dc7f8655a619490795377eee2d8e5a32861aa /synapse/storage/appservice.py | |
parent | Missed a s/federation reader/media repository/ in a log message (diff) | |
parent | Make AppserviceHandler stream events from database (diff) | |
download | synapse-6762a7268c049d254a2543bcbf20236735a31880.tar.xz |
Merge pull request #1025 from matrix-org/erikj/appservice_stream
Make AppserviceHandler stream events from database
Diffstat (limited to 'synapse/storage/appservice.py')
-rw-r--r-- | synapse/storage/appservice.py | 39 |
1 files changed, 39 insertions, 0 deletions
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index d1ee533fac..f0c88e05cd 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -352,3 +352,42 @@ class ApplicationServiceTransactionStore(SQLBaseStore): return 0 else: return int(last_txn_id[0]) # select 'last_txn' col + + def set_appservice_last_pos(self, pos): + def set_appservice_last_pos_txn(txn): + txn.execute( + "UPDATE appservice_stream_position SET stream_ordering = ?", (pos,) + ) + return self.runInteraction( + "set_appservice_last_pos", set_appservice_last_pos_txn + ) + + @defer.inlineCallbacks + def get_new_events_for_appservice(self, current_id, limit): + """Get all new evnets""" + + def get_new_events_for_appservice_txn(txn): + sql = ( + "SELECT e.stream_ordering, e.event_id" + " FROM events AS e, appservice_stream_position AS a" + " WHERE a.stream_ordering < e.stream_ordering AND e.stream_ordering <= ?" + " ORDER BY e.stream_ordering ASC" + " LIMIT ?" + ) + + txn.execute(sql, (current_id, limit)) + rows = txn.fetchall() + + upper_bound = current_id + if len(rows) == limit: + upper_bound = rows[-1][0] + + return upper_bound, [row[1] for row in rows] + + upper_bound, event_ids = yield self.runInteraction( + "get_new_events_for_appservice", get_new_events_for_appservice_txn, + ) + + events = yield self._get_events(event_ids) + + defer.returnValue((upper_bound, events)) |