summary refs log tree commit diff
path: root/synapse/push/pusherpool.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/push/pusherpool.py')
-rw-r--r--synapse/push/pusherpool.py116
1 files changed, 79 insertions, 37 deletions
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 0b463c6fdb..5853ec36a9 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -16,9 +16,9 @@
 
 from twisted.internet import defer
 
-from .httppusher import HttpPusher
-from synapse.push import PusherConfigException
+import pusher
 from synapse.util.logcontext import preserve_fn
+from synapse.util.async import run_on_reactor
 
 import logging
 
@@ -28,10 +28,10 @@ logger = logging.getLogger(__name__)
 class PusherPool:
     def __init__(self, _hs):
         self.hs = _hs
+        self.start_pushers = _hs.config.start_pushers
         self.store = self.hs.get_datastore()
         self.clock = self.hs.get_clock()
         self.pushers = {}
-        self.last_pusher_started = -1
 
     @defer.inlineCallbacks
     def start(self):
@@ -48,7 +48,8 @@ class PusherPool:
         # will then get pulled out of the database,
         # recreated, added and started: this means we have only one
         # code path adding pushers.
-        self._create_pusher({
+        pusher.create_pusher(self.hs, {
+            "id": None,
             "user_name": user_id,
             "kind": kind,
             "app_id": app_id,
@@ -58,10 +59,18 @@ class PusherPool:
             "ts": time_now_msec,
             "lang": lang,
             "data": data,
-            "last_token": None,
+            "last_stream_ordering": None,
             "last_success": None,
             "failing_since": None
         })
+
+        # create the pusher setting last_stream_ordering to the current maximum
+        # stream ordering in event_push_actions, so it will process
+        # pushes from this point onwards.
+        last_stream_ordering = (
+            yield self.store.get_latest_push_action_stream_ordering()
+        )
+
         yield self.store.add_pusher(
             user_id=user_id,
             access_token=access_token,
@@ -73,6 +82,7 @@ class PusherPool:
             pushkey_ts=time_now_msec,
             lang=lang,
             data=data,
+            last_stream_ordering=last_stream_ordering,
             profile_tag=profile_tag,
         )
         yield self._refresh_pusher(app_id, pushkey, user_id)
@@ -106,26 +116,51 @@ class PusherPool:
                 )
                 yield self.remove_pusher(p['app_id'], p['pushkey'], p['user_name'])
 
-    def _create_pusher(self, pusherdict):
-        if pusherdict['kind'] == 'http':
-            return HttpPusher(
-                self.hs,
-                user_id=pusherdict['user_name'],
-                app_id=pusherdict['app_id'],
-                app_display_name=pusherdict['app_display_name'],
-                device_display_name=pusherdict['device_display_name'],
-                pushkey=pusherdict['pushkey'],
-                pushkey_ts=pusherdict['ts'],
-                data=pusherdict['data'],
-                last_token=pusherdict['last_token'],
-                last_success=pusherdict['last_success'],
-                failing_since=pusherdict['failing_since']
+    @defer.inlineCallbacks
+    def on_new_notifications(self, min_stream_id, max_stream_id):
+        yield run_on_reactor()
+        try:
+            users_affected = yield self.store.get_push_action_users_in_range(
+                min_stream_id, max_stream_id
             )
-        else:
-            raise PusherConfigException(
-                "Unknown pusher type '%s' for user %s" %
-                (pusherdict['kind'], pusherdict['user_name'])
+
+            deferreds = []
+
+            for u in users_affected:
+                if u in self.pushers:
+                    for p in self.pushers[u].values():
+                        deferreds.append(
+                            p.on_new_notifications(min_stream_id, max_stream_id)
+                        )
+
+            yield defer.gatherResults(deferreds)
+        except:
+            logger.exception("Exception in pusher on_new_notifications")
+
+    @defer.inlineCallbacks
+    def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
+        yield run_on_reactor()
+        try:
+            # Need to subtract 1 from the minimum because the lower bound here
+            # is not inclusive
+            updated_receipts = yield self.store.get_all_updated_receipts(
+                min_stream_id - 1, max_stream_id
             )
+            # This returns a tuple, user_id is at index 3
+            users_affected = set([r[3] for r in updated_receipts])
+
+            deferreds = []
+
+            for u in users_affected:
+                if u in self.pushers:
+                    for p in self.pushers[u].values():
+                        deferreds.append(
+                            p.on_new_receipts(min_stream_id, max_stream_id)
+                        )
+
+            yield defer.gatherResults(deferreds)
+        except:
+            logger.exception("Exception in pusher on_new_receipts")
 
     @defer.inlineCallbacks
     def _refresh_pusher(self, app_id, pushkey, user_id):
@@ -143,33 +178,40 @@ class PusherPool:
             self._start_pushers([p])
 
     def _start_pushers(self, pushers):
+        if not self.start_pushers:
+            logger.info("Not starting pushers because they are disabled in the config")
+            return
         logger.info("Starting %d pushers", len(pushers))
         for pusherdict in pushers:
             try:
-                p = self._create_pusher(pusherdict)
-            except PusherConfigException:
-                logger.exception("Couldn't start a pusher: caught PusherConfigException")
+                p = pusher.create_pusher(self.hs, pusherdict)
+            except:
+                logger.exception("Couldn't start a pusher: caught Exception")
                 continue
             if p:
-                fullid = "%s:%s:%s" % (
+                appid_pushkey = "%s:%s" % (
                     pusherdict['app_id'],
                     pusherdict['pushkey'],
-                    pusherdict['user_name']
                 )
-                if fullid in self.pushers:
-                    self.pushers[fullid].stop()
-                self.pushers[fullid] = p
-                preserve_fn(p.start)()
+                byuser = self.pushers.setdefault(pusherdict['user_name'], {})
+
+                if appid_pushkey in byuser:
+                    byuser[appid_pushkey].on_stop()
+                byuser[appid_pushkey] = p
+                preserve_fn(p.on_started)()
 
         logger.info("Started pushers")
 
     @defer.inlineCallbacks
     def remove_pusher(self, app_id, pushkey, user_id):
-        fullid = "%s:%s:%s" % (app_id, pushkey, user_id)
-        if fullid in self.pushers:
-            logger.info("Stopping pusher %s", fullid)
-            self.pushers[fullid].stop()
-            del self.pushers[fullid]
+        appid_pushkey = "%s:%s" % (app_id, pushkey)
+
+        byuser = self.pushers.get(user_id, {})
+
+        if appid_pushkey in byuser:
+            logger.info("Stopping pusher %s / %s", user_id, appid_pushkey)
+            byuser[appid_pushkey].on_stop()
+            del byuser[appid_pushkey]
         yield self.store.delete_pusher_by_app_id_pushkey_user_id(
             app_id, pushkey, user_id
         )