summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erikj@jki.re>2016-08-18 12:59:56 +0100
committerGitHub <noreply@github.com>2016-08-18 12:59:56 +0100
commit6762a7268c049d254a2543bcbf20236735a31880 (patch)
treecf6dc7f8655a619490795377eee2d8e5a32861aa
parentMissed a s/federation reader/media repository/ in a log message (diff)
parentMake AppserviceHandler stream events from database (diff)
downloadsynapse-6762a7268c049d254a2543bcbf20236735a31880.tar.xz
Merge pull request #1025 from matrix-org/erikj/appservice_stream
Make AppserviceHandler stream events from database
-rw-r--r--synapse/handlers/appservice.py65
-rw-r--r--synapse/notifier.py2
-rw-r--r--synapse/storage/appservice.py39
-rw-r--r--synapse/storage/schema/delta/34/appservice_stream.sql23
-rw-r--r--tests/handlers/test_appservice.py9
5 files changed, 113 insertions, 25 deletions
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 79805cdc2e..84341b0d20 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -17,6 +17,7 @@ from twisted.internet import defer
 
 from synapse.api.constants import EventTypes
 from synapse.util.metrics import Measure
+from synapse.util.logcontext import preserve_fn
 
 import logging
 
@@ -45,35 +46,57 @@ class ApplicationServicesHandler(object):
         self.clock = hs.get_clock()
 
     @defer.inlineCallbacks
-    def notify_interested_services(self, event):
+    def notify_interested_services(self, current_id):
         """Notifies (pushes) all application services interested in this event.
 
         Pushing is done asynchronously, so this method won't block for any
         prolonged length of time.
 
         Args:
-            event(Event): The event to push out to interested services.
+            current_id(int): The current maximum ID.
         """
+        services = yield self.store.get_app_services()
+        if not services:
+            return
+
         with Measure(self.clock, "notify_interested_services"):
-            # Gather interested services
-            services = yield self._get_services_for_event(event)
-            if len(services) == 0:
-                return  # no services need notifying
-
-            # Do we know this user exists? If not, poke the user query API for
-            # all services which match that user regex. This needs to block as these
-            # user queries need to be made BEFORE pushing the event.
-            yield self._check_user_exists(event.sender)
-            if event.type == EventTypes.Member:
-                yield self._check_user_exists(event.state_key)
-
-            if not self.started_scheduler:
-                self.scheduler.start().addErrback(log_failure)
-                self.started_scheduler = True
-
-            # Fork off pushes to these services
-            for service in services:
-                self.scheduler.submit_event_for_as(service, event)
+            upper_bound = current_id
+            limit = 100
+            while True:
+                upper_bound, events = yield self.store.get_new_events_for_appservice(
+                    upper_bound, limit
+                )
+
+                logger.info("Current_id: %r, upper_bound: %r", current_id, upper_bound)
+
+                if not events:
+                    break
+
+                for event in events:
+                    # Gather interested services
+                    services = yield self._get_services_for_event(event)
+                    if len(services) == 0:
+                        continue  # no services need notifying
+
+                    # Do we know this user exists? If not, poke the user query API for
+                    # all services which match that user regex. This needs to block as
+                    # these user queries need to be made BEFORE pushing the event.
+                    yield self._check_user_exists(event.sender)
+                    if event.type == EventTypes.Member:
+                        yield self._check_user_exists(event.state_key)
+
+                    if not self.started_scheduler:
+                        self.scheduler.start().addErrback(log_failure)
+                        self.started_scheduler = True
+
+                    # Fork off pushes to these services
+                    for service in services:
+                        preserve_fn(self.scheduler.submit_event_for_as)(service, event)
+
+                yield self.store.set_appservice_last_pos(upper_bound)
+
+                if len(events) < limit:
+                    break
 
     @defer.inlineCallbacks
     def query_user_exists(self, user_id):
diff --git a/synapse/notifier.py b/synapse/notifier.py
index e4a25f2411..40a148994f 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -214,7 +214,7 @@ class Notifier(object):
     def _on_new_room_event(self, event, room_stream_id, extra_users=[]):
         """Notify any user streams that are interested in this room event"""
         # poke any interested application service.
-        self.appservice_handler.notify_interested_services(event)
+        self.appservice_handler.notify_interested_services(room_stream_id)
 
         if event.type == EventTypes.Member and event.membership == Membership.JOIN:
             self._user_joined_room(event.state_key, event.room_id)
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index d1ee533fac..f0c88e05cd 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -352,3 +352,42 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
             return 0
         else:
             return int(last_txn_id[0])  # select 'last_txn' col
+
+    def set_appservice_last_pos(self, pos):
+        def set_appservice_last_pos_txn(txn):
+            txn.execute(
+                "UPDATE appservice_stream_position SET stream_ordering = ?", (pos,)
+            )
+        return self.runInteraction(
+            "set_appservice_last_pos", set_appservice_last_pos_txn
+        )
+
+    @defer.inlineCallbacks
+    def get_new_events_for_appservice(self, current_id, limit):
+        """Get all new evnets"""
+
+        def get_new_events_for_appservice_txn(txn):
+            sql = (
+                "SELECT e.stream_ordering, e.event_id"
+                " FROM events AS e, appservice_stream_position AS a"
+                " WHERE a.stream_ordering < e.stream_ordering AND e.stream_ordering <= ?"
+                " ORDER BY e.stream_ordering ASC"
+                " LIMIT ?"
+            )
+
+            txn.execute(sql, (current_id, limit))
+            rows = txn.fetchall()
+
+            upper_bound = current_id
+            if len(rows) == limit:
+                upper_bound = rows[-1][0]
+
+            return upper_bound, [row[1] for row in rows]
+
+        upper_bound, event_ids = yield self.runInteraction(
+            "get_new_events_for_appservice", get_new_events_for_appservice_txn,
+        )
+
+        events = yield self._get_events(event_ids)
+
+        defer.returnValue((upper_bound, events))
diff --git a/synapse/storage/schema/delta/34/appservice_stream.sql b/synapse/storage/schema/delta/34/appservice_stream.sql
new file mode 100644
index 0000000000..69e16eda0f
--- /dev/null
+++ b/synapse/storage/schema/delta/34/appservice_stream.sql
@@ -0,0 +1,23 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS appservice_stream_position(
+    Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE,  -- Makes sure this table only has one row.
+    stream_ordering BIGINT,
+    CHECK (Lock='X')
+);
+
+INSERT INTO appservice_stream_position (stream_ordering)
+    SELECT COALESCE(MAX(stream_ordering), 0) FROM events;
diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py
index 9c1e5cc67c..7fe88172c0 100644
--- a/tests/handlers/test_appservice.py
+++ b/tests/handlers/test_appservice.py
@@ -53,8 +53,9 @@ class AppServiceHandlerTestCase(unittest.TestCase):
             type="m.room.message",
             room_id="!foo:bar"
         )
+        self.mock_store.get_new_events_for_appservice.return_value = (0, [event])
         self.mock_as_api.push = Mock()
-        yield self.handler.notify_interested_services(event)
+        yield self.handler.notify_interested_services(0)
         self.mock_scheduler.submit_event_for_as.assert_called_once_with(
             interested_service, event
         )
@@ -74,7 +75,8 @@ class AppServiceHandlerTestCase(unittest.TestCase):
         )
         self.mock_as_api.push = Mock()
         self.mock_as_api.query_user = Mock()
-        yield self.handler.notify_interested_services(event)
+        self.mock_store.get_new_events_for_appservice.return_value = (0, [event])
+        yield self.handler.notify_interested_services(0)
         self.mock_as_api.query_user.assert_called_once_with(
             services[0], user_id
         )
@@ -96,7 +98,8 @@ class AppServiceHandlerTestCase(unittest.TestCase):
         )
         self.mock_as_api.push = Mock()
         self.mock_as_api.query_user = Mock()
-        yield self.handler.notify_interested_services(event)
+        self.mock_store.get_new_events_for_appservice.return_value = (0, [event])
+        yield self.handler.notify_interested_services(0)
         self.assertFalse(
             self.mock_as_api.query_user.called,
             "query_user called when it shouldn't have been."