diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index d1ee533fac..f0c88e05cd 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -352,3 +352,42 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
return 0
else:
return int(last_txn_id[0]) # select 'last_txn' col
+
+ def set_appservice_last_pos(self, pos):
+ def set_appservice_last_pos_txn(txn):
+ txn.execute(
+ "UPDATE appservice_stream_position SET stream_ordering = ?", (pos,)
+ )
+ return self.runInteraction(
+ "set_appservice_last_pos", set_appservice_last_pos_txn
+ )
+
+ @defer.inlineCallbacks
+ def get_new_events_for_appservice(self, current_id, limit):
+ """Get all new evnets"""
+
+ def get_new_events_for_appservice_txn(txn):
+ sql = (
+ "SELECT e.stream_ordering, e.event_id"
+ " FROM events AS e, appservice_stream_position AS a"
+ " WHERE a.stream_ordering < e.stream_ordering AND e.stream_ordering <= ?"
+ " ORDER BY e.stream_ordering ASC"
+ " LIMIT ?"
+ )
+
+ txn.execute(sql, (current_id, limit))
+ rows = txn.fetchall()
+
+ upper_bound = current_id
+ if len(rows) == limit:
+ upper_bound = rows[-1][0]
+
+ return upper_bound, [row[1] for row in rows]
+
+ upper_bound, event_ids = yield self.runInteraction(
+ "get_new_events_for_appservice", get_new_events_for_appservice_txn,
+ )
+
+ events = yield self._get_events(event_ids)
+
+ defer.returnValue((upper_bound, events))
diff --git a/synapse/storage/schema/delta/34/appservice_stream.sql b/synapse/storage/schema/delta/34/appservice_stream.sql
new file mode 100644
index 0000000000..69e16eda0f
--- /dev/null
+++ b/synapse/storage/schema/delta/34/appservice_stream.sql
@@ -0,0 +1,23 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS appservice_stream_position(
+ Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row.
+ stream_ordering BIGINT,
+ CHECK (Lock='X')
+);
+
+INSERT INTO appservice_stream_position (stream_ordering)
+ SELECT COALESCE(MAX(stream_ordering), 0) FROM events;
|