summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/handlers/receipts.py21
-rw-r--r--synapse/push/httppusher.py45
-rw-r--r--synapse/push/pusherpool.py20
-rw-r--r--synapse/storage/receipts.py9
4 files changed, 71 insertions, 24 deletions
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 935c339707..26b0368080 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -80,6 +80,9 @@ class ReceiptsHandler(BaseHandler):
     def _handle_new_receipts(self, receipts):
         """Takes a list of receipts, stores them and informs the notifier.
         """
+        min_batch_id = None
+        max_batch_id = None
+
         for receipt in receipts:
             room_id = receipt["room_id"]
             receipt_type = receipt["receipt_type"]
@@ -97,10 +100,20 @@ class ReceiptsHandler(BaseHandler):
 
             stream_id, max_persisted_id = res
 
-            with PreserveLoggingContext():
-                self.notifier.on_new_event(
-                    "receipt_key", max_persisted_id, rooms=[room_id]
-                )
+            if min_batch_id is None or stream_id < min_batch_id:
+                min_batch_id = stream_id
+            if max_batch_id is None or max_persisted_id > max_batch_id:
+                max_batch_id = max_persisted_id
+
+        affected_room_ids = list(set([r["room_id"] for r in receipts]))
+
+        with PreserveLoggingContext():
+            self.notifier.on_new_event(
+                "receipt_key", max_batch_id, rooms=affected_room_ids
+            )
+            self.hs.get_pusherpool().on_new_receipts(
+                min_batch_id, max_batch_id, affected_room_ids
+            )
 
             defer.returnValue(True)
 
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index d695885649..0d5450bc01 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -76,15 +76,25 @@ class HttpPusher(object):
         self.data_minus_url.update(self.data)
         del self.data_minus_url['url']
 
+    @defer.inlineCallbacks
     def on_started(self):
-        self._process()
+        yield self._process()
 
+    @defer.inlineCallbacks
     def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
         self.max_stream_ordering = max_stream_ordering
-        self._process()
+        yield self._process()
+
+    @defer.inlineCallbacks
+    def on_new_receipts(self, min_stream_id, max_stream_id):
+        # We could check the receipts are actually m.read receipts here,
+        # but currently that's the only type of receipt anyway...
+        badge = yield push_tools.get_badge_count(self.hs, self.user_id)
+        yield self.send_badge(badge)
 
+    @defer.inlineCallbacks
     def on_timer(self):
-        self._process()
+        yield self._process()
 
     def on_stop(self):
         if self.timed_call:
@@ -106,22 +116,24 @@ class HttpPusher(object):
                     self.last_stream_ordering,
                     self.clock.time_msec()
                 )
-                self.failing_since = None
-                yield self.store.update_pusher_failing_since(
-                    self.app_id, self.pushkey, self.user_id,
-                    self.failing_since
-                )
+                if self.failing_since:
+                    self.failing_since = None
+                    yield self.store.update_pusher_failing_since(
+                        self.app_id, self.pushkey, self.user_id,
+                        self.failing_since
+                    )
             else:
-                self.failing_since = self.clock.time_msec()
-                yield self.store.update_pusher_failing_since(
-                    self.app_id, self.pushkey, self.user_id,
-                    self.failing_since
-                )
+                if not self.failing_since:
+                    self.failing_since = self.clock.time_msec()
+                    yield self.store.update_pusher_failing_since(
+                        self.app_id, self.pushkey, self.user_id,
+                        self.failing_since
+                    )
 
                 if (
                     self.failing_since and
                     self.failing_since <
-                    self.clock.time_msec() - HttpPusher.GIVE_UP_AFTER
+                    self.clock.time_msec() - HttpPusher.GIVE_UP_AFTER_MS
                 ):
                     # we really only give up so that if the URL gets
                     # fixed, we don't suddenly deliver a load
@@ -148,7 +160,7 @@ class HttpPusher(object):
                 else:
                     logger.info("Push failed: delaying for %ds", self.backoff_delay)
                     self.timed_call = reactor.callLater(self.backoff_delay, self.on_timer)
-                    self.backoff_delay = min(self.backoff_delay, self.MAX_BACKOFF_SEC)
+                    self.backoff_delay = min(self.backoff_delay * 2, self.MAX_BACKOFF_SEC)
                     break
 
     @defer.inlineCallbacks
@@ -191,7 +203,8 @@ class HttpPusher(object):
 
         d = {
             'notification': {
-                'id': event.event_id,
+                'id': event.event_id,  # deprecated: remove soon
+                'event_id': event.event_id,
                 'room_id': event.room_id,
                 'type': event.type,
                 'sender': event.user_id,
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index b67ad455ea..7b1ce81e9a 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -126,11 +126,29 @@ class PusherPool:
             for u in users_affected:
                 if u in self.pushers:
                     for p in self.pushers[u].values():
-                        p.on_new_notifications(min_stream_id, max_stream_id)
+                        yield p.on_new_notifications(min_stream_id, max_stream_id)
         except:
             logger.exception("Exception in pusher on_new_notifications")
 
     @defer.inlineCallbacks
+    def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
+        yield run_on_reactor()
+        try:
+            # Need to subtract 1 from the minimum because the lower bound here
+            # is not inclusive
+            updated_receipts = yield self.store.get_all_updated_receipts(
+                min_stream_id - 1, max_stream_id
+            )
+            # This returns a tuple, user_id is at index 3
+            users_affected = set([r[3] for r in updated_receipts])
+            for u in users_affected:
+                if u in self.pushers:
+                    for p in self.pushers[u].values():
+                        yield p.on_new_receipts(min_stream_id, max_stream_id)
+        except:
+            logger.exception("Exception in pusher on_new_receipts")
+
+    @defer.inlineCallbacks
     def _refresh_pusher(self, app_id, pushkey, user_id):
         resultlist = yield self.store.get_pushers_by_app_id_and_pushkey(
             app_id, pushkey
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index 4befebc8e2..59d1ac0314 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -390,16 +390,19 @@ class ReceiptsStore(SQLBaseStore):
             }
         )
 
-    def get_all_updated_receipts(self, last_id, current_id, limit):
+    def get_all_updated_receipts(self, last_id, current_id, limit=None):
         def get_all_updated_receipts_txn(txn):
             sql = (
                 "SELECT stream_id, room_id, receipt_type, user_id, event_id, data"
                 " FROM receipts_linearized"
                 " WHERE ? < stream_id AND stream_id <= ?"
                 " ORDER BY stream_id ASC"
-                " LIMIT ?"
             )
-            txn.execute(sql, (last_id, current_id, limit))
+            args = [last_id, current_id]
+            if limit is not None:
+                sql += " LIMIT ?"
+                args.append(limit)
+            txn.execute(sql, args)
 
             return txn.fetchall()
         return self.runInteraction(