summary refs log tree commit diff
path: root/synapse/storage/appservice.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2016-08-18 11:54:41 +0100
committerErik Johnston <erik@matrix.org>2016-08-18 11:54:41 +0100
commit9da84a9a1ef0b88d2e6170d706425ab36431abda (patch)
treecf6dc7f8655a619490795377eee2d8e5a32861aa /synapse/storage/appservice.py
parentMissed a s/federation reader/media repository/ in a log message (diff)
downloadsynapse-9da84a9a1ef0b88d2e6170d706425ab36431abda.tar.xz
Make AppserviceHandler stream events from database
This is for two reasons:

1. Suppresses duplicates correctly, as the notifier doesn't do any
   duplicate suppression.
2. Makes it easier to connect the AppserviceHandler to the replication
   stream.
Diffstat (limited to 'synapse/storage/appservice.py')
-rw-r--r--synapse/storage/appservice.py39
1 files changed, 39 insertions, 0 deletions
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index d1ee533fac..f0c88e05cd 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -352,3 +352,42 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
             return 0
         else:
             return int(last_txn_id[0])  # select 'last_txn' col
+
+    def set_appservice_last_pos(self, pos):
+        def set_appservice_last_pos_txn(txn):
+            txn.execute(
+                "UPDATE appservice_stream_position SET stream_ordering = ?", (pos,)
+            )
+        return self.runInteraction(
+            "set_appservice_last_pos", set_appservice_last_pos_txn
+        )
+
+    @defer.inlineCallbacks
+    def get_new_events_for_appservice(self, current_id, limit):
+        """Get all new evnets"""
+
+        def get_new_events_for_appservice_txn(txn):
+            sql = (
+                "SELECT e.stream_ordering, e.event_id"
+                " FROM events AS e, appservice_stream_position AS a"
+                " WHERE a.stream_ordering < e.stream_ordering AND e.stream_ordering <= ?"
+                " ORDER BY e.stream_ordering ASC"
+                " LIMIT ?"
+            )
+
+            txn.execute(sql, (current_id, limit))
+            rows = txn.fetchall()
+
+            upper_bound = current_id
+            if len(rows) == limit:
+                upper_bound = rows[-1][0]
+
+            return upper_bound, [row[1] for row in rows]
+
+        upper_bound, event_ids = yield self.runInteraction(
+            "get_new_events_for_appservice", get_new_events_for_appservice_txn,
+        )
+
+        events = yield self._get_events(event_ids)
+
+        defer.returnValue((upper_bound, events))