summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/push/emailpusher.py19
-rw-r--r--synapse/push/pusherpool.py30
-rw-r--r--tests/push/test_email.py29
3 files changed, 70 insertions, 8 deletions
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index e8ee67401f..c89a8438a9 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -114,6 +114,21 @@ class EmailPusher(object):
 
         run_as_background_process("emailpush.process", self._process)
 
+    def _pause_processing(self):
+        """Used by tests to temporarily pause processing of events.
+
+        Asserts that its not currently processing.
+        """
+        assert not self._is_processing
+        self._is_processing = True
+
+    def _resume_processing(self):
+        """Used by tests to resume processing of events after pausing.
+        """
+        assert self._is_processing
+        self._is_processing = False
+        self._start_processing()
+
     @defer.inlineCallbacks
     def _process(self):
         # we should never get here if we are already processing
@@ -215,6 +230,10 @@ class EmailPusher(object):
 
     @defer.inlineCallbacks
     def save_last_stream_ordering_and_success(self, last_stream_ordering):
+        if last_stream_ordering is None:
+            # This happens if we haven't yet processed anything
+            return
+
         self.last_stream_ordering = last_stream_ordering
         yield self.store.update_pusher_last_stream_ordering_and_success(
             self.app_id, self.email, self.user_id,
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 40a7709c09..63c583565f 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -60,6 +60,11 @@ class PusherPool:
     def add_pusher(self, user_id, access_token, kind, app_id,
                    app_display_name, device_display_name, pushkey, lang, data,
                    profile_tag=""):
+        """Creates a new pusher and adds it to the pool
+
+        Returns:
+            Deferred[EmailPusher|HttpPusher]
+        """
         time_now_msec = self.clock.time_msec()
 
         # we try to create the pusher just to validate the config: it
@@ -103,7 +108,9 @@ class PusherPool:
             last_stream_ordering=last_stream_ordering,
             profile_tag=profile_tag,
         )
-        yield self.start_pusher_by_id(app_id, pushkey, user_id)
+        pusher = yield self.start_pusher_by_id(app_id, pushkey, user_id)
+
+        defer.returnValue(pusher)
 
     @defer.inlineCallbacks
     def remove_pushers_by_app_id_and_pushkey_not_user(self, app_id, pushkey,
@@ -184,7 +191,11 @@ class PusherPool:
 
     @defer.inlineCallbacks
     def start_pusher_by_id(self, app_id, pushkey, user_id):
-        """Look up the details for the given pusher, and start it"""
+        """Look up the details for the given pusher, and start it
+
+        Returns:
+            Deferred[EmailPusher|HttpPusher|None]: The pusher started, if any
+        """
         if not self._should_start_pushers:
             return
 
@@ -192,13 +203,16 @@ class PusherPool:
             app_id, pushkey
         )
 
-        p = None
+        pusher_dict = None
         for r in resultlist:
             if r['user_name'] == user_id:
-                p = r
+                pusher_dict = r
 
-        if p:
-            yield self._start_pusher(p)
+        pusher = None
+        if pusher_dict:
+            pusher = yield self._start_pusher(pusher_dict)
+
+        defer.returnValue(pusher)
 
     @defer.inlineCallbacks
     def _start_pushers(self):
@@ -224,7 +238,7 @@ class PusherPool:
             pusherdict (dict):
 
         Returns:
-            None
+            Deferred[EmailPusher|HttpPusher]
         """
         try:
             p = self.pusher_factory.create_pusher(pusherdict)
@@ -270,6 +284,8 @@ class PusherPool:
 
         p.on_started(have_notifs)
 
+        defer.returnValue(p)
+
     @defer.inlineCallbacks
     def remove_pusher(self, app_id, pushkey, user_id):
         appid_pushkey = "%s:%s" % (app_id, pushkey)
diff --git a/tests/push/test_email.py b/tests/push/test_email.py
index 62b3c2a99d..c10b65d4b8 100644
--- a/tests/push/test_email.py
+++ b/tests/push/test_email.py
@@ -108,7 +108,7 @@ class EmailPusherTests(HomeserverTestCase):
         )
         token_id = user_tuple["token_id"]
 
-        self.get_success(
+        self.pusher = self.get_success(
             self.hs.get_pusherpool().add_pusher(
                 user_id=self.user_id,
                 access_token=token_id,
@@ -137,6 +137,33 @@ class EmailPusherTests(HomeserverTestCase):
         # We should get emailed about that message
         self._check_for_mail()
 
+    def test_multiple_members_email(self):
+        # We want to test multiple notifications, so we pause processing of push
+        # while we send messages.
+        self.pusher._pause_processing()
+
+        # Create a simple room with multiple other users
+        room = self.helper.create_room_as(self.user_id, tok=self.access_token)
+
+        for other in self.others:
+            self.helper.invite(
+                room=room, src=self.user_id, tok=self.access_token, targ=other.id,
+            )
+            self.helper.join(room=room, user=other.id, tok=other.token)
+
+        # The other users send some messages
+        self.helper.send(room, body="Hi!", tok=self.others[0].token)
+        self.helper.send(room, body="There!", tok=self.others[1].token)
+        self.helper.send(room, body="There!", tok=self.others[1].token)
+
+        # Nothing should have happened yet, as we're paused.
+        assert not self.email_attempts
+
+        self.pusher._resume_processing()
+
+        # We should get emailed about those messages
+        self._check_for_mail()
+
     def _check_for_mail(self):
         "Check that the user receives an email notification"