summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2016-08-18 11:54:41 +0100
committerErik Johnston <erik@matrix.org>2016-08-18 11:54:41 +0100
commit9da84a9a1ef0b88d2e6170d706425ab36431abda (patch)
treecf6dc7f8655a619490795377eee2d8e5a32861aa /synapse/handlers
parentMissed a s/federation reader/media repository/ in a log message (diff)
downloadsynapse-9da84a9a1ef0b88d2e6170d706425ab36431abda.tar.xz
Make AppserviceHandler stream events from database
This is for two reasons:

1. Suppresses duplicates correctly, as the notifier doesn't do any
   duplicate suppression.
2. Makes it easier to connect the AppserviceHandler to the replication
   stream.
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/appservice.py65
1 files changed, 44 insertions, 21 deletions
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 79805cdc2e..84341b0d20 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -17,6 +17,7 @@ from twisted.internet import defer
 
 from synapse.api.constants import EventTypes
 from synapse.util.metrics import Measure
+from synapse.util.logcontext import preserve_fn
 
 import logging
 
@@ -45,35 +46,57 @@ class ApplicationServicesHandler(object):
         self.clock = hs.get_clock()
 
     @defer.inlineCallbacks
-    def notify_interested_services(self, event):
+    def notify_interested_services(self, current_id):
         """Notifies (pushes) all application services interested in this event.
 
         Pushing is done asynchronously, so this method won't block for any
         prolonged length of time.
 
         Args:
-            event(Event): The event to push out to interested services.
+            current_id(int): The current maximum ID.
         """
+        services = yield self.store.get_app_services()
+        if not services:
+            return
+
         with Measure(self.clock, "notify_interested_services"):
-            # Gather interested services
-            services = yield self._get_services_for_event(event)
-            if len(services) == 0:
-                return  # no services need notifying
-
-            # Do we know this user exists? If not, poke the user query API for
-            # all services which match that user regex. This needs to block as these
-            # user queries need to be made BEFORE pushing the event.
-            yield self._check_user_exists(event.sender)
-            if event.type == EventTypes.Member:
-                yield self._check_user_exists(event.state_key)
-
-            if not self.started_scheduler:
-                self.scheduler.start().addErrback(log_failure)
-                self.started_scheduler = True
-
-            # Fork off pushes to these services
-            for service in services:
-                self.scheduler.submit_event_for_as(service, event)
+            upper_bound = current_id
+            limit = 100
+            while True:
+                upper_bound, events = yield self.store.get_new_events_for_appservice(
+                    upper_bound, limit
+                )
+
+                logger.info("Current_id: %r, upper_bound: %r", current_id, upper_bound)
+
+                if not events:
+                    break
+
+                for event in events:
+                    # Gather interested services
+                    services = yield self._get_services_for_event(event)
+                    if len(services) == 0:
+                        continue  # no services need notifying
+
+                    # Do we know this user exists? If not, poke the user query API for
+                    # all services which match that user regex. This needs to block as
+                    # these user queries need to be made BEFORE pushing the event.
+                    yield self._check_user_exists(event.sender)
+                    if event.type == EventTypes.Member:
+                        yield self._check_user_exists(event.state_key)
+
+                    if not self.started_scheduler:
+                        self.scheduler.start().addErrback(log_failure)
+                        self.started_scheduler = True
+
+                    # Fork off pushes to these services
+                    for service in services:
+                        preserve_fn(self.scheduler.submit_event_for_as)(service, event)
+
+                yield self.store.set_appservice_last_pos(upper_bound)
+
+                if len(events) < limit:
+                    break
 
     @defer.inlineCallbacks
     def query_user_exists(self, user_id):