summary refs log tree commit diff
path: root/synapse/push/pusherpool.py
diff options
context:
space:
mode:
authorDavid Baker <dbkr@users.noreply.github.com>2016-04-11 12:58:55 +0100
committerDavid Baker <dbkr@users.noreply.github.com>2016-04-11 12:58:55 +0100
commit2547dffccc2f2ead7e0d35dafd6da5d468e6d1d8 (patch)
tree76c9b510bdb25556553c0cbfa7072e36d11f7e75 /synapse/push/pusherpool.py
parentPEP8 (diff)
parentRun unsafe proces in a loop until we've caught up (diff)
downloadsynapse-2547dffccc2f2ead7e0d35dafd6da5d468e6d1d8.tar.xz
Merge pull request #705 from matrix-org/dbkr/pushers_use_event_actions
Change pushers to use the event_actions table
Diffstat (limited to 'synapse/push/pusherpool.py')
-rw-r--r--synapse/push/pusherpool.py105
1 files changed, 72 insertions, 33 deletions
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 0b463c6fdb..ba513601e7 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -16,9 +16,10 @@
 
 from twisted.internet import defer
 
-from .httppusher import HttpPusher
+import pusher
 from synapse.push import PusherConfigException
 from synapse.util.logcontext import preserve_fn
+from synapse.util.async import run_on_reactor
 
 import logging
 
@@ -48,7 +49,7 @@ class PusherPool:
         # will then get pulled out of the database,
         # recreated, added and started: this means we have only one
         # code path adding pushers.
-        self._create_pusher({
+        pusher.create_pusher(self.hs, {
             "user_name": user_id,
             "kind": kind,
             "app_id": app_id,
@@ -58,10 +59,18 @@ class PusherPool:
             "ts": time_now_msec,
             "lang": lang,
             "data": data,
-            "last_token": None,
+            "last_stream_ordering": None,
             "last_success": None,
             "failing_since": None
         })
+
+        # create the pusher setting last_stream_ordering to the current maximum
+        # stream ordering in event_push_actions, so it will process
+        # pushes from this point onwards.
+        last_stream_ordering = (
+            yield self.store.get_latest_push_action_stream_ordering()
+        )
+
         yield self.store.add_pusher(
             user_id=user_id,
             access_token=access_token,
@@ -73,6 +82,7 @@ class PusherPool:
             pushkey_ts=time_now_msec,
             lang=lang,
             data=data,
+            last_stream_ordering=last_stream_ordering,
             profile_tag=profile_tag,
         )
         yield self._refresh_pusher(app_id, pushkey, user_id)
@@ -106,26 +116,51 @@ class PusherPool:
                 )
                 yield self.remove_pusher(p['app_id'], p['pushkey'], p['user_name'])
 
-    def _create_pusher(self, pusherdict):
-        if pusherdict['kind'] == 'http':
-            return HttpPusher(
-                self.hs,
-                user_id=pusherdict['user_name'],
-                app_id=pusherdict['app_id'],
-                app_display_name=pusherdict['app_display_name'],
-                device_display_name=pusherdict['device_display_name'],
-                pushkey=pusherdict['pushkey'],
-                pushkey_ts=pusherdict['ts'],
-                data=pusherdict['data'],
-                last_token=pusherdict['last_token'],
-                last_success=pusherdict['last_success'],
-                failing_since=pusherdict['failing_since']
+    @defer.inlineCallbacks
+    def on_new_notifications(self, min_stream_id, max_stream_id):
+        yield run_on_reactor()
+        try:
+            users_affected = yield self.store.get_push_action_users_in_range(
+                min_stream_id, max_stream_id
             )
-        else:
-            raise PusherConfigException(
-                "Unknown pusher type '%s' for user %s" %
-                (pusherdict['kind'], pusherdict['user_name'])
+
+            deferreds = []
+
+            for u in users_affected:
+                if u in self.pushers:
+                    for p in self.pushers[u].values():
+                        deferreds.append(
+                            p.on_new_notifications(min_stream_id, max_stream_id)
+                        )
+
+            yield defer.gatherResults(deferreds)
+        except:
+            logger.exception("Exception in pusher on_new_notifications")
+
+    @defer.inlineCallbacks
+    def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
+        yield run_on_reactor()
+        try:
+            # Need to subtract 1 from the minimum because the lower bound here
+            # is not inclusive
+            updated_receipts = yield self.store.get_all_updated_receipts(
+                min_stream_id - 1, max_stream_id
             )
+            # This returns a tuple, user_id is at index 3
+            users_affected = set([r[3] for r in updated_receipts])
+
+            deferreds = []
+
+            for u in users_affected:
+                if u in self.pushers:
+                    for p in self.pushers[u].values():
+                        deferreds.append(
+                            p.on_new_receipts(min_stream_id, max_stream_id)
+                        )
+
+            yield defer.gatherResults(deferreds)
+        except:
+            logger.exception("Exception in pusher on_new_receipts")
 
     @defer.inlineCallbacks
     def _refresh_pusher(self, app_id, pushkey, user_id):
@@ -146,30 +181,34 @@ class PusherPool:
         logger.info("Starting %d pushers", len(pushers))
         for pusherdict in pushers:
             try:
-                p = self._create_pusher(pusherdict)
+                p = pusher.create_pusher(self.hs, pusherdict)
             except PusherConfigException:
                 logger.exception("Couldn't start a pusher: caught PusherConfigException")
                 continue
             if p:
-                fullid = "%s:%s:%s" % (
+                appid_pushkey = "%s:%s" % (
                     pusherdict['app_id'],
                     pusherdict['pushkey'],
-                    pusherdict['user_name']
                 )
-                if fullid in self.pushers:
-                    self.pushers[fullid].stop()
-                self.pushers[fullid] = p
-                preserve_fn(p.start)()
+                byuser = self.pushers.setdefault(pusherdict['user_name'], {})
+
+                if appid_pushkey in byuser:
+                    byuser[appid_pushkey].on_stop()
+                byuser[appid_pushkey] = p
+                preserve_fn(p.on_started)()
 
         logger.info("Started pushers")
 
     @defer.inlineCallbacks
     def remove_pusher(self, app_id, pushkey, user_id):
-        fullid = "%s:%s:%s" % (app_id, pushkey, user_id)
-        if fullid in self.pushers:
-            logger.info("Stopping pusher %s", fullid)
-            self.pushers[fullid].stop()
-            del self.pushers[fullid]
+        appid_pushkey = "%s:%s" % (app_id, pushkey)
+
+        byuser = self.pushers.get(user_id, {})
+
+        if appid_pushkey in byuser:
+            logger.info("Stopping pusher %s / %s", user_id, appid_pushkey)
+            byuser[appid_pushkey].on_stop()
+            del byuser[appid_pushkey]
         yield self.store.delete_pusher_by_app_id_pushkey_user_id(
             app_id, pushkey, user_id
         )