summary refs log tree commit diff
path: root/synapse/push
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/push')
-rw-r--r--synapse/push/action_generator.py4
-rw-r--r--synapse/push/baserules.py404
-rw-r--r--synapse/push/bulk_push_rule_evaluator.py63
-rw-r--r--synapse/push/clientformat.py38
-rw-r--r--synapse/push/emailpusher.py82
-rw-r--r--synapse/push/httppusher.py226
-rw-r--r--synapse/push/mailer.py386
-rw-r--r--synapse/push/presentable_names.py56
-rw-r--r--synapse/push/push_rule_evaluator.py57
-rw-r--r--synapse/push/push_tools.py8
-rw-r--r--synapse/push/pusher.py14
-rw-r--r--synapse/push/pusherpool.py127
-rw-r--r--synapse/push/rulekinds.py10
13 files changed, 774 insertions, 701 deletions
diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py
index a5de75c48a..1ffd5e2df3 100644
--- a/synapse/push/action_generator.py
+++ b/synapse/push/action_generator.py
@@ -40,6 +40,4 @@ class ActionGenerator(object):
     @defer.inlineCallbacks
     def handle_push_actions_for_event(self, event, context):
         with Measure(self.clock, "action_for_event_by_user"):
-            yield self.bulk_evaluator.action_for_event_by_user(
-                event, context
-            )
+            yield self.bulk_evaluator.action_for_event_by_user(event, context)
diff --git a/synapse/push/baserules.py b/synapse/push/baserules.py
index 3523a40108..96d087de22 100644
--- a/synapse/push/baserules.py
+++ b/synapse/push/baserules.py
@@ -31,48 +31,54 @@ def list_with_base_rules(rawrules):
 
     # Grab the base rules that the user has modified.
     # The modified base rules have a priority_class of -1.
-    modified_base_rules = {
-        r['rule_id']: r for r in rawrules if r['priority_class'] < 0
-    }
+    modified_base_rules = {r["rule_id"]: r for r in rawrules if r["priority_class"] < 0}
 
     # Remove the modified base rules from the list, They'll be added back
     # in the default postions in the list.
-    rawrules = [r for r in rawrules if r['priority_class'] >= 0]
+    rawrules = [r for r in rawrules if r["priority_class"] >= 0]
 
     # shove the server default rules for each kind onto the end of each
     current_prio_class = list(PRIORITY_CLASS_INVERSE_MAP)[-1]
 
-    ruleslist.extend(make_base_prepend_rules(
-        PRIORITY_CLASS_INVERSE_MAP[current_prio_class], modified_base_rules
-    ))
+    ruleslist.extend(
+        make_base_prepend_rules(
+            PRIORITY_CLASS_INVERSE_MAP[current_prio_class], modified_base_rules
+        )
+    )
 
     for r in rawrules:
-        if r['priority_class'] < current_prio_class:
-            while r['priority_class'] < current_prio_class:
-                ruleslist.extend(make_base_append_rules(
-                    PRIORITY_CLASS_INVERSE_MAP[current_prio_class],
-                    modified_base_rules,
-                ))
-                current_prio_class -= 1
-                if current_prio_class > 0:
-                    ruleslist.extend(make_base_prepend_rules(
+        if r["priority_class"] < current_prio_class:
+            while r["priority_class"] < current_prio_class:
+                ruleslist.extend(
+                    make_base_append_rules(
                         PRIORITY_CLASS_INVERSE_MAP[current_prio_class],
                         modified_base_rules,
-                    ))
+                    )
+                )
+                current_prio_class -= 1
+                if current_prio_class > 0:
+                    ruleslist.extend(
+                        make_base_prepend_rules(
+                            PRIORITY_CLASS_INVERSE_MAP[current_prio_class],
+                            modified_base_rules,
+                        )
+                    )
 
         ruleslist.append(r)
 
     while current_prio_class > 0:
-        ruleslist.extend(make_base_append_rules(
-            PRIORITY_CLASS_INVERSE_MAP[current_prio_class],
-            modified_base_rules,
-        ))
+        ruleslist.extend(
+            make_base_append_rules(
+                PRIORITY_CLASS_INVERSE_MAP[current_prio_class], modified_base_rules
+            )
+        )
         current_prio_class -= 1
         if current_prio_class > 0:
-            ruleslist.extend(make_base_prepend_rules(
-                PRIORITY_CLASS_INVERSE_MAP[current_prio_class],
-                modified_base_rules,
-            ))
+            ruleslist.extend(
+                make_base_prepend_rules(
+                    PRIORITY_CLASS_INVERSE_MAP[current_prio_class], modified_base_rules
+                )
+            )
 
     return ruleslist
 
@@ -80,20 +86,20 @@ def list_with_base_rules(rawrules):
 def make_base_append_rules(kind, modified_base_rules):
     rules = []
 
-    if kind == 'override':
+    if kind == "override":
         rules = BASE_APPEND_OVERRIDE_RULES
-    elif kind == 'underride':
+    elif kind == "underride":
         rules = BASE_APPEND_UNDERRIDE_RULES
-    elif kind == 'content':
+    elif kind == "content":
         rules = BASE_APPEND_CONTENT_RULES
 
     # Copy the rules before modifying them
     rules = copy.deepcopy(rules)
     for r in rules:
         # Only modify the actions, keep the conditions the same.
-        modified = modified_base_rules.get(r['rule_id'])
+        modified = modified_base_rules.get(r["rule_id"])
         if modified:
-            r['actions'] = modified['actions']
+            r["actions"] = modified["actions"]
 
     return rules
 
@@ -101,103 +107,86 @@ def make_base_append_rules(kind, modified_base_rules):
 def make_base_prepend_rules(kind, modified_base_rules):
     rules = []
 
-    if kind == 'override':
+    if kind == "override":
         rules = BASE_PREPEND_OVERRIDE_RULES
 
     # Copy the rules before modifying them
     rules = copy.deepcopy(rules)
     for r in rules:
         # Only modify the actions, keep the conditions the same.
-        modified = modified_base_rules.get(r['rule_id'])
+        modified = modified_base_rules.get(r["rule_id"])
         if modified:
-            r['actions'] = modified['actions']
+            r["actions"] = modified["actions"]
 
     return rules
 
 
 BASE_APPEND_CONTENT_RULES = [
     {
-        'rule_id': 'global/content/.m.rule.contains_user_name',
-        'conditions': [
+        "rule_id": "global/content/.m.rule.contains_user_name",
+        "conditions": [
             {
-                'kind': 'event_match',
-                'key': 'content.body',
-                'pattern_type': 'user_localpart'
+                "kind": "event_match",
+                "key": "content.body",
+                "pattern_type": "user_localpart",
             }
         ],
-        'actions': [
-            'notify',
-            {
-                'set_tweak': 'sound',
-                'value': 'default',
-            }, {
-                'set_tweak': 'highlight'
-            }
-        ]
-    },
+        "actions": [
+            "notify",
+            {"set_tweak": "sound", "value": "default"},
+            {"set_tweak": "highlight"},
+        ],
+    }
 ]
 
 
 BASE_PREPEND_OVERRIDE_RULES = [
     {
-        'rule_id': 'global/override/.m.rule.master',
-        'enabled': False,
-        'conditions': [],
-        'actions': [
-            "dont_notify"
-        ]
+        "rule_id": "global/override/.m.rule.master",
+        "enabled": False,
+        "conditions": [],
+        "actions": ["dont_notify"],
     }
 ]
 
 
 BASE_APPEND_OVERRIDE_RULES = [
     {
-        'rule_id': 'global/override/.m.rule.suppress_notices',
-        'conditions': [
+        "rule_id": "global/override/.m.rule.suppress_notices",
+        "conditions": [
             {
-                'kind': 'event_match',
-                'key': 'content.msgtype',
-                'pattern': 'm.notice',
-                '_id': '_suppress_notices',
+                "kind": "event_match",
+                "key": "content.msgtype",
+                "pattern": "m.notice",
+                "_id": "_suppress_notices",
             }
         ],
-        'actions': [
-            'dont_notify',
-        ]
+        "actions": ["dont_notify"],
     },
     # NB. .m.rule.invite_for_me must be higher prio than .m.rule.member_event
     # otherwise invites will be matched by .m.rule.member_event
     {
-        'rule_id': 'global/override/.m.rule.invite_for_me',
-        'conditions': [
+        "rule_id": "global/override/.m.rule.invite_for_me",
+        "conditions": [
             {
-                'kind': 'event_match',
-                'key': 'type',
-                'pattern': 'm.room.member',
-                '_id': '_member',
+                "kind": "event_match",
+                "key": "type",
+                "pattern": "m.room.member",
+                "_id": "_member",
             },
             {
-                'kind': 'event_match',
-                'key': 'content.membership',
-                'pattern': 'invite',
-                '_id': '_invite_member',
-            },
-            {
-                'kind': 'event_match',
-                'key': 'state_key',
-                'pattern_type': 'user_id'
+                "kind": "event_match",
+                "key": "content.membership",
+                "pattern": "invite",
+                "_id": "_invite_member",
             },
+            {"kind": "event_match", "key": "state_key", "pattern_type": "user_id"},
+        ],
+        "actions": [
+            "notify",
+            {"set_tweak": "sound", "value": "default"},
+            {"set_tweak": "highlight", "value": False},
         ],
-        'actions': [
-            'notify',
-            {
-                'set_tweak': 'sound',
-                'value': 'default'
-            }, {
-                'set_tweak': 'highlight',
-                'value': False
-            }
-        ]
     },
     # Will we sometimes want to know about people joining and leaving?
     # Perhaps: if so, this could be expanded upon. Seems the most usual case
@@ -206,217 +195,164 @@ BASE_APPEND_OVERRIDE_RULES = [
     # join/leave/avatar/displayname events.
     # See also: https://matrix.org/jira/browse/SYN-607
     {
-        'rule_id': 'global/override/.m.rule.member_event',
-        'conditions': [
+        "rule_id": "global/override/.m.rule.member_event",
+        "conditions": [
             {
-                'kind': 'event_match',
-                'key': 'type',
-                'pattern': 'm.room.member',
-                '_id': '_member',
+                "kind": "event_match",
+                "key": "type",
+                "pattern": "m.room.member",
+                "_id": "_member",
             }
         ],
-        'actions': [
-            'dont_notify'
-        ]
+        "actions": ["dont_notify"],
     },
     # This was changed from underride to override so it's closer in priority
     # to the content rules where the user name highlight rule lives. This
     # way a room rule is lower priority than both but a custom override rule
     # is higher priority than both.
     {
-        'rule_id': 'global/override/.m.rule.contains_display_name',
-        'conditions': [
-            {
-                'kind': 'contains_display_name'
-            }
+        "rule_id": "global/override/.m.rule.contains_display_name",
+        "conditions": [{"kind": "contains_display_name"}],
+        "actions": [
+            "notify",
+            {"set_tweak": "sound", "value": "default"},
+            {"set_tweak": "highlight"},
         ],
-        'actions': [
-            'notify',
-            {
-                'set_tweak': 'sound',
-                'value': 'default'
-            }, {
-                'set_tweak': 'highlight'
-            }
-        ]
     },
     {
-        'rule_id': 'global/override/.m.rule.roomnotif',
-        'conditions': [
+        "rule_id": "global/override/.m.rule.roomnotif",
+        "conditions": [
             {
-                'kind': 'event_match',
-                'key': 'content.body',
-                'pattern': '@room',
-                '_id': '_roomnotif_content',
+                "kind": "event_match",
+                "key": "content.body",
+                "pattern": "@room",
+                "_id": "_roomnotif_content",
             },
             {
-                'kind': 'sender_notification_permission',
-                'key': 'room',
-                '_id': '_roomnotif_pl',
+                "kind": "sender_notification_permission",
+                "key": "room",
+                "_id": "_roomnotif_pl",
             },
         ],
-        'actions': [
-            'notify', {
-                'set_tweak': 'highlight',
-                'value': True,
-            }
-        ]
+        "actions": ["notify", {"set_tweak": "highlight", "value": True}],
     },
     {
-        'rule_id': 'global/override/.m.rule.tombstone',
-        'conditions': [
+        "rule_id": "global/override/.m.rule.tombstone",
+        "conditions": [
             {
-                'kind': 'event_match',
-                'key': 'type',
-                'pattern': 'm.room.tombstone',
-                '_id': '_tombstone',
+                "kind": "event_match",
+                "key": "type",
+                "pattern": "m.room.tombstone",
+                "_id": "_tombstone",
             }
         ],
-        'actions': [
-            'notify', {
-                'set_tweak': 'highlight',
-                'value': True,
-            }
-        ]
-    }
+        "actions": ["notify", {"set_tweak": "highlight", "value": True}],
+    },
 ]
 
 
 BASE_APPEND_UNDERRIDE_RULES = [
     {
-        'rule_id': 'global/underride/.m.rule.call',
-        'conditions': [
+        "rule_id": "global/underride/.m.rule.call",
+        "conditions": [
             {
-                'kind': 'event_match',
-                'key': 'type',
-                'pattern': 'm.call.invite',
-                '_id': '_call',
+                "kind": "event_match",
+                "key": "type",
+                "pattern": "m.call.invite",
+                "_id": "_call",
             }
         ],
-        'actions': [
-            'notify',
-            {
-                'set_tweak': 'sound',
-                'value': 'ring'
-            }, {
-                'set_tweak': 'highlight',
-                'value': False
-            }
-        ]
+        "actions": [
+            "notify",
+            {"set_tweak": "sound", "value": "ring"},
+            {"set_tweak": "highlight", "value": False},
+        ],
     },
     # XXX: once m.direct is standardised everywhere, we should use it to detect
     # a DM from the user's perspective rather than this heuristic.
     {
-        'rule_id': 'global/underride/.m.rule.room_one_to_one',
-        'conditions': [
+        "rule_id": "global/underride/.m.rule.room_one_to_one",
+        "conditions": [
+            {"kind": "room_member_count", "is": "2", "_id": "member_count"},
             {
-                'kind': 'room_member_count',
-                'is': '2',
-                '_id': 'member_count',
+                "kind": "event_match",
+                "key": "type",
+                "pattern": "m.room.message",
+                "_id": "_message",
             },
-            {
-                'kind': 'event_match',
-                'key': 'type',
-                'pattern': 'm.room.message',
-                '_id': '_message',
-            }
         ],
-        'actions': [
-            'notify',
-            {
-                'set_tweak': 'sound',
-                'value': 'default'
-            }, {
-                'set_tweak': 'highlight',
-                'value': False
-            }
-        ]
+        "actions": [
+            "notify",
+            {"set_tweak": "sound", "value": "default"},
+            {"set_tweak": "highlight", "value": False},
+        ],
     },
     # XXX: this is going to fire for events which aren't m.room.messages
     # but are encrypted (e.g. m.call.*)...
     {
-        'rule_id': 'global/underride/.m.rule.encrypted_room_one_to_one',
-        'conditions': [
+        "rule_id": "global/underride/.m.rule.encrypted_room_one_to_one",
+        "conditions": [
+            {"kind": "room_member_count", "is": "2", "_id": "member_count"},
             {
-                'kind': 'room_member_count',
-                'is': '2',
-                '_id': 'member_count',
+                "kind": "event_match",
+                "key": "type",
+                "pattern": "m.room.encrypted",
+                "_id": "_encrypted",
             },
-            {
-                'kind': 'event_match',
-                'key': 'type',
-                'pattern': 'm.room.encrypted',
-                '_id': '_encrypted',
-            }
         ],
-        'actions': [
-            'notify',
-            {
-                'set_tweak': 'sound',
-                'value': 'default'
-            }, {
-                'set_tweak': 'highlight',
-                'value': False
-            }
-        ]
+        "actions": [
+            "notify",
+            {"set_tweak": "sound", "value": "default"},
+            {"set_tweak": "highlight", "value": False},
+        ],
     },
     {
-        'rule_id': 'global/underride/.m.rule.message',
-        'conditions': [
+        "rule_id": "global/underride/.m.rule.message",
+        "conditions": [
             {
-                'kind': 'event_match',
-                'key': 'type',
-                'pattern': 'm.room.message',
-                '_id': '_message',
+                "kind": "event_match",
+                "key": "type",
+                "pattern": "m.room.message",
+                "_id": "_message",
             }
         ],
-        'actions': [
-            'notify', {
-                'set_tweak': 'highlight',
-                'value': False
-            }
-        ]
+        "actions": ["notify", {"set_tweak": "highlight", "value": False}],
     },
     # XXX: this is going to fire for events which aren't m.room.messages
     # but are encrypted (e.g. m.call.*)...
     {
-        'rule_id': 'global/underride/.m.rule.encrypted',
-        'conditions': [
+        "rule_id": "global/underride/.m.rule.encrypted",
+        "conditions": [
             {
-                'kind': 'event_match',
-                'key': 'type',
-                'pattern': 'm.room.encrypted',
-                '_id': '_encrypted',
+                "kind": "event_match",
+                "key": "type",
+                "pattern": "m.room.encrypted",
+                "_id": "_encrypted",
             }
         ],
-        'actions': [
-            'notify', {
-                'set_tweak': 'highlight',
-                'value': False
-            }
-        ]
-    }
+        "actions": ["notify", {"set_tweak": "highlight", "value": False}],
+    },
 ]
 
 
 BASE_RULE_IDS = set()
 
 for r in BASE_APPEND_CONTENT_RULES:
-    r['priority_class'] = PRIORITY_CLASS_MAP['content']
-    r['default'] = True
-    BASE_RULE_IDS.add(r['rule_id'])
+    r["priority_class"] = PRIORITY_CLASS_MAP["content"]
+    r["default"] = True
+    BASE_RULE_IDS.add(r["rule_id"])
 
 for r in BASE_PREPEND_OVERRIDE_RULES:
-    r['priority_class'] = PRIORITY_CLASS_MAP['override']
-    r['default'] = True
-    BASE_RULE_IDS.add(r['rule_id'])
+    r["priority_class"] = PRIORITY_CLASS_MAP["override"]
+    r["default"] = True
+    BASE_RULE_IDS.add(r["rule_id"])
 
 for r in BASE_APPEND_OVERRIDE_RULES:
-    r['priority_class'] = PRIORITY_CLASS_MAP['override']
-    r['default'] = True
-    BASE_RULE_IDS.add(r['rule_id'])
+    r["priority_class"] = PRIORITY_CLASS_MAP["override"]
+    r["default"] = True
+    BASE_RULE_IDS.add(r["rule_id"])
 
 for r in BASE_APPEND_UNDERRIDE_RULES:
-    r['priority_class'] = PRIORITY_CLASS_MAP['underride']
-    r['default'] = True
-    BASE_RULE_IDS.add(r['rule_id'])
+    r["priority_class"] = PRIORITY_CLASS_MAP["underride"]
+    r["default"] = True
+    BASE_RULE_IDS.add(r["rule_id"])
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index 8f9a76147f..c8a5b381da 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -39,9 +39,11 @@ rules_by_room = {}
 
 
 push_rules_invalidation_counter = Counter(
-    "synapse_push_bulk_push_rule_evaluator_push_rules_invalidation_counter", "")
+    "synapse_push_bulk_push_rule_evaluator_push_rules_invalidation_counter", ""
+)
 push_rules_state_size_counter = Counter(
-    "synapse_push_bulk_push_rule_evaluator_push_rules_state_size_counter", "")
+    "synapse_push_bulk_push_rule_evaluator_push_rules_state_size_counter", ""
+)
 
 # Measures whether we use the fast path of using state deltas, or if we have to
 # recalculate from scratch
@@ -83,7 +85,7 @@ class BulkPushRuleEvaluator(object):
 
         # if this event is an invite event, we may need to run rules for the user
         # who's been invited, otherwise they won't get told they've been invited
-        if event.type == 'm.room.member' and event.content['membership'] == 'invite':
+        if event.type == "m.room.member" and event.content["membership"] == "invite":
             invited = event.state_key
             if invited and self.hs.is_mine_id(invited):
                 has_pusher = yield self.store.user_has_pusher(invited)
@@ -106,7 +108,9 @@ class BulkPushRuleEvaluator(object):
         # before any lookup methods get called on it as otherwise there may be
         # a race if invalidate_all gets called (which assumes its in the cache)
         return RulesForRoom(
-            self.hs, room_id, self._get_rules_for_room.cache,
+            self.hs,
+            room_id,
+            self._get_rules_for_room.cache,
             self.room_push_rule_cache_metrics,
         )
 
@@ -121,12 +125,10 @@ class BulkPushRuleEvaluator(object):
             auth_events = {POWER_KEY: pl_event}
         else:
             auth_events_ids = yield self.auth.compute_auth_events(
-                event, prev_state_ids, for_verification=False,
+                event, prev_state_ids, for_verification=False
             )
             auth_events = yield self.store.get_events(auth_events_ids)
-            auth_events = {
-                (e.type, e.state_key): e for e in itervalues(auth_events)
-            }
+            auth_events = {(e.type, e.state_key): e for e in itervalues(auth_events)}
 
         sender_level = get_user_power_level(event.sender, auth_events)
 
@@ -145,16 +147,14 @@ class BulkPushRuleEvaluator(object):
         rules_by_user = yield self._get_rules_for_event(event, context)
         actions_by_user = {}
 
-        room_members = yield self.store.get_joined_users_from_context(
-            event, context
-        )
+        room_members = yield self.store.get_joined_users_from_context(event, context)
 
         (power_levels, sender_power_level) = (
             yield self._get_power_levels_and_sender_level(event, context)
         )
 
         evaluator = PushRuleEvaluatorForEvent(
-            event, len(room_members), sender_power_level, power_levels,
+            event, len(room_members), sender_power_level, power_levels
         )
 
         condition_cache = {}
@@ -180,15 +180,15 @@ class BulkPushRuleEvaluator(object):
                     display_name = event.content.get("displayname", None)
 
             for rule in rules:
-                if 'enabled' in rule and not rule['enabled']:
+                if "enabled" in rule and not rule["enabled"]:
                     continue
 
                 matches = _condition_checker(
-                    evaluator, rule['conditions'], uid, display_name, condition_cache
+                    evaluator, rule["conditions"], uid, display_name, condition_cache
                 )
                 if matches:
-                    actions = [x for x in rule['actions'] if x != 'dont_notify']
-                    if actions and 'notify' in actions:
+                    actions = [x for x in rule["actions"] if x != "dont_notify"]
+                    if actions and "notify" in actions:
                         # Push rules say we should notify the user of this event
                         actions_by_user[uid] = actions
                     break
@@ -196,9 +196,7 @@ class BulkPushRuleEvaluator(object):
         # Mark in the DB staging area the push actions for users who should be
         # notified for this event. (This will then get handled when we persist
         # the event)
-        yield self.store.add_push_actions_to_staging(
-            event.event_id, actions_by_user,
-        )
+        yield self.store.add_push_actions_to_staging(event.event_id, actions_by_user)
 
 
 def _condition_checker(evaluator, conditions, uid, display_name, cache):
@@ -361,19 +359,19 @@ class RulesForRoom(object):
                     self.sequence,
                     members={},  # There were no membership changes
                     rules_by_user=ret_rules_by_user,
-                    state_group=state_group
+                    state_group=state_group,
                 )
 
         if logger.isEnabledFor(logging.DEBUG):
             logger.debug(
-                "Returning push rules for %r %r",
-                self.room_id, ret_rules_by_user.keys(),
+                "Returning push rules for %r %r", self.room_id, ret_rules_by_user.keys()
             )
         defer.returnValue(ret_rules_by_user)
 
     @defer.inlineCallbacks
-    def _update_rules_with_member_event_ids(self, ret_rules_by_user, member_event_ids,
-                                            state_group, event):
+    def _update_rules_with_member_event_ids(
+        self, ret_rules_by_user, member_event_ids, state_group, event
+    ):
         """Update the partially filled rules_by_user dict by fetching rules for
         any newly joined users in the `member_event_ids` list.
 
@@ -391,16 +389,13 @@ class RulesForRoom(object):
             table="room_memberships",
             column="event_id",
             iterable=member_event_ids.values(),
-            retcols=('user_id', 'membership', 'event_id'),
+            retcols=("user_id", "membership", "event_id"),
             keyvalues={},
             batch_size=500,
             desc="_get_rules_for_member_event_ids",
         )
 
-        members = {
-            row["event_id"]: (row["user_id"], row["membership"])
-            for row in rows
-        }
+        members = {row["event_id"]: (row["user_id"], row["membership"]) for row in rows}
 
         # If the event is a join event then it will be in current state evnts
         # map but not in the DB, so we have to explicitly insert it.
@@ -413,15 +408,15 @@ class RulesForRoom(object):
             logger.debug("Found members %r: %r", self.room_id, members.values())
 
         interested_in_user_ids = set(
-            user_id for user_id, membership in itervalues(members)
+            user_id
+            for user_id, membership in itervalues(members)
             if membership == Membership.JOIN
         )
 
         logger.debug("Joined: %r", interested_in_user_ids)
 
         if_users_with_pushers = yield self.store.get_if_users_have_pushers(
-            interested_in_user_ids,
-            on_invalidate=self.invalidate_all_cb,
+            interested_in_user_ids, on_invalidate=self.invalidate_all_cb
         )
 
         user_ids = set(
@@ -431,7 +426,7 @@ class RulesForRoom(object):
         logger.debug("With pushers: %r", user_ids)
 
         users_with_receipts = yield self.store.get_users_with_read_receipts_in_room(
-            self.room_id, on_invalidate=self.invalidate_all_cb,
+            self.room_id, on_invalidate=self.invalidate_all_cb
         )
 
         logger.debug("With receipts: %r", users_with_receipts)
@@ -442,7 +437,7 @@ class RulesForRoom(object):
                 user_ids.add(uid)
 
         rules_by_user = yield self.store.bulk_get_push_rules(
-            user_ids, on_invalidate=self.invalidate_all_cb,
+            user_ids, on_invalidate=self.invalidate_all_cb
         )
 
         ret_rules_by_user.update(
diff --git a/synapse/push/clientformat.py b/synapse/push/clientformat.py
index 8bd96b1178..a59b639f15 100644
--- a/synapse/push/clientformat.py
+++ b/synapse/push/clientformat.py
@@ -25,14 +25,14 @@ def format_push_rules_for_user(user, ruleslist):
     # We're going to be mutating this a lot, so do a deep copy
     ruleslist = copy.deepcopy(ruleslist)
 
-    rules = {'global': {}, 'device': {}}
+    rules = {"global": {}, "device": {}}
 
-    rules['global'] = _add_empty_priority_class_arrays(rules['global'])
+    rules["global"] = _add_empty_priority_class_arrays(rules["global"])
 
     for r in ruleslist:
         rulearray = None
 
-        template_name = _priority_class_to_template_name(r['priority_class'])
+        template_name = _priority_class_to_template_name(r["priority_class"])
 
         # Remove internal stuff.
         for c in r["conditions"]:
@@ -44,14 +44,14 @@ def format_push_rules_for_user(user, ruleslist):
             elif pattern_type == "user_localpart":
                 c["pattern"] = user.localpart
 
-        rulearray = rules['global'][template_name]
+        rulearray = rules["global"][template_name]
 
         template_rule = _rule_to_template(r)
         if template_rule:
-            if 'enabled' in r:
-                template_rule['enabled'] = r['enabled']
+            if "enabled" in r:
+                template_rule["enabled"] = r["enabled"]
             else:
-                template_rule['enabled'] = True
+                template_rule["enabled"] = True
             rulearray.append(template_rule)
 
     return rules
@@ -65,33 +65,33 @@ def _add_empty_priority_class_arrays(d):
 
 def _rule_to_template(rule):
     unscoped_rule_id = None
-    if 'rule_id' in rule:
-        unscoped_rule_id = _rule_id_from_namespaced(rule['rule_id'])
+    if "rule_id" in rule:
+        unscoped_rule_id = _rule_id_from_namespaced(rule["rule_id"])
 
-    template_name = _priority_class_to_template_name(rule['priority_class'])
-    if template_name in ['override', 'underride']:
+    template_name = _priority_class_to_template_name(rule["priority_class"])
+    if template_name in ["override", "underride"]:
         templaterule = {k: rule[k] for k in ["conditions", "actions"]}
     elif template_name in ["sender", "room"]:
-        templaterule = {'actions': rule['actions']}
-        unscoped_rule_id = rule['conditions'][0]['pattern']
-    elif template_name == 'content':
+        templaterule = {"actions": rule["actions"]}
+        unscoped_rule_id = rule["conditions"][0]["pattern"]
+    elif template_name == "content":
         if len(rule["conditions"]) != 1:
             return None
         thecond = rule["conditions"][0]
         if "pattern" not in thecond:
             return None
-        templaterule = {'actions': rule['actions']}
+        templaterule = {"actions": rule["actions"]}
         templaterule["pattern"] = thecond["pattern"]
 
     if unscoped_rule_id:
-        templaterule['rule_id'] = unscoped_rule_id
-    if 'default' in rule:
-        templaterule['default'] = rule['default']
+        templaterule["rule_id"] = unscoped_rule_id
+    if "default" in rule:
+        templaterule["default"] = rule["default"]
     return templaterule
 
 
 def _rule_id_from_namespaced(in_rule_id):
-    return in_rule_id.split('/')[-1]
+    return in_rule_id.split("/")[-1]
 
 
 def _priority_class_to_template_name(pc):
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index e8ee67401f..424ffa8b68 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -32,13 +32,13 @@ DELAY_BEFORE_MAIL_MS = 10 * 60 * 1000
 THROTTLE_START_MS = 10 * 60 * 1000
 THROTTLE_MAX_MS = 24 * 60 * 60 * 1000  # 24h
 # THROTTLE_MULTIPLIER = 6              # 10 mins, 1 hour, 6 hours, 24 hours
-THROTTLE_MULTIPLIER = 144              # 10 mins, 24 hours - i.e. jump straight to 1 day
+THROTTLE_MULTIPLIER = 144  # 10 mins, 24 hours - i.e. jump straight to 1 day
 
 # If no event triggers a notification for this long after the previous,
 # the throttle is released.
 # 12 hours - a gap of 12 hours in conversation is surely enough to merit a new
 # notification when things get going again...
-THROTTLE_RESET_AFTER_MS = (12 * 60 * 60 * 1000)
+THROTTLE_RESET_AFTER_MS = 12 * 60 * 60 * 1000
 
 # does each email include all unread notifs, or just the ones which have happened
 # since the last mail?
@@ -53,17 +53,18 @@ class EmailPusher(object):
     This shares quite a bit of code with httpusher: it would be good to
     factor out the common parts
     """
+
     def __init__(self, hs, pusherdict, mailer):
         self.hs = hs
         self.mailer = mailer
 
         self.store = self.hs.get_datastore()
         self.clock = self.hs.get_clock()
-        self.pusher_id = pusherdict['id']
-        self.user_id = pusherdict['user_name']
-        self.app_id = pusherdict['app_id']
-        self.email = pusherdict['pushkey']
-        self.last_stream_ordering = pusherdict['last_stream_ordering']
+        self.pusher_id = pusherdict["id"]
+        self.user_id = pusherdict["user_name"]
+        self.app_id = pusherdict["app_id"]
+        self.email = pusherdict["pushkey"]
+        self.last_stream_ordering = pusherdict["last_stream_ordering"]
         self.timed_call = None
         self.throttle_params = None
 
@@ -93,7 +94,9 @@ class EmailPusher(object):
 
     def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
         if self.max_stream_ordering:
-            self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering)
+            self.max_stream_ordering = max(
+                max_stream_ordering, self.max_stream_ordering
+            )
         else:
             self.max_stream_ordering = max_stream_ordering
         self._start_processing()
@@ -114,6 +117,21 @@ class EmailPusher(object):
 
         run_as_background_process("emailpush.process", self._process)
 
+    def _pause_processing(self):
+        """Used by tests to temporarily pause processing of events.
+
+        Asserts that its not currently processing.
+        """
+        assert not self._is_processing
+        self._is_processing = True
+
+    def _resume_processing(self):
+        """Used by tests to resume processing of events after pausing.
+        """
+        assert self._is_processing
+        self._is_processing = False
+        self._start_processing()
+
     @defer.inlineCallbacks
     def _process(self):
         # we should never get here if we are already processing
@@ -159,14 +177,12 @@ class EmailPusher(object):
             return
 
         for push_action in unprocessed:
-            received_at = push_action['received_ts']
+            received_at = push_action["received_ts"]
             if received_at is None:
                 received_at = 0
             notif_ready_at = received_at + DELAY_BEFORE_MAIL_MS
 
-            room_ready_at = self.room_ready_to_notify_at(
-                push_action['room_id']
-            )
+            room_ready_at = self.room_ready_to_notify_at(push_action["room_id"])
 
             should_notify_at = max(notif_ready_at, room_ready_at)
 
@@ -177,25 +193,23 @@ class EmailPusher(object):
                 # to be delivered.
 
                 reason = {
-                    'room_id': push_action['room_id'],
-                    'now': self.clock.time_msec(),
-                    'received_at': received_at,
-                    'delay_before_mail_ms': DELAY_BEFORE_MAIL_MS,
-                    'last_sent_ts': self.get_room_last_sent_ts(push_action['room_id']),
-                    'throttle_ms': self.get_room_throttle_ms(push_action['room_id']),
+                    "room_id": push_action["room_id"],
+                    "now": self.clock.time_msec(),
+                    "received_at": received_at,
+                    "delay_before_mail_ms": DELAY_BEFORE_MAIL_MS,
+                    "last_sent_ts": self.get_room_last_sent_ts(push_action["room_id"]),
+                    "throttle_ms": self.get_room_throttle_ms(push_action["room_id"]),
                 }
 
                 yield self.send_notification(unprocessed, reason)
 
-                yield self.save_last_stream_ordering_and_success(max([
-                    ea['stream_ordering'] for ea in unprocessed
-                ]))
+                yield self.save_last_stream_ordering_and_success(
+                    max([ea["stream_ordering"] for ea in unprocessed])
+                )
 
                 # we update the throttle on all the possible unprocessed push actions
                 for ea in unprocessed:
-                    yield self.sent_notif_update_throttle(
-                        ea['room_id'], ea
-                    )
+                    yield self.sent_notif_update_throttle(ea["room_id"], ea)
                 break
             else:
                 if soonest_due_at is None or should_notify_at < soonest_due_at:
@@ -215,10 +229,17 @@ class EmailPusher(object):
 
     @defer.inlineCallbacks
     def save_last_stream_ordering_and_success(self, last_stream_ordering):
+        if last_stream_ordering is None:
+            # This happens if we haven't yet processed anything
+            return
+
         self.last_stream_ordering = last_stream_ordering
         yield self.store.update_pusher_last_stream_ordering_and_success(
-            self.app_id, self.email, self.user_id,
-            last_stream_ordering, self.clock.time_msec()
+            self.app_id,
+            self.email,
+            self.user_id,
+            last_stream_ordering,
+            self.clock.time_msec(),
         )
 
     def seconds_until(self, ts_msec):
@@ -257,10 +278,10 @@ class EmailPusher(object):
         # THROTTLE_RESET_AFTER_MS after the previous one that triggered a
         # notif, we release the throttle. Otherwise, the throttle is increased.
         time_of_previous_notifs = yield self.store.get_time_of_last_push_action_before(
-            notified_push_action['stream_ordering']
+            notified_push_action["stream_ordering"]
         )
 
-        time_of_this_notifs = notified_push_action['received_ts']
+        time_of_this_notifs = notified_push_action["received_ts"]
 
         if time_of_previous_notifs is not None and time_of_this_notifs is not None:
             gap = time_of_this_notifs - time_of_previous_notifs
@@ -279,12 +300,11 @@ class EmailPusher(object):
                 new_throttle_ms = THROTTLE_START_MS
             else:
                 new_throttle_ms = min(
-                    current_throttle_ms * THROTTLE_MULTIPLIER,
-                    THROTTLE_MAX_MS
+                    current_throttle_ms * THROTTLE_MULTIPLIER, THROTTLE_MAX_MS
                 )
         self.throttle_params[room_id] = {
             "last_sent_ts": self.clock.time_msec(),
-            "throttle_ms": new_throttle_ms
+            "throttle_ms": new_throttle_ms,
         }
         yield self.store.set_throttle_params(
             self.pusher_id, room_id, self.throttle_params[room_id]
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index fac05aa44c..4e7b6a5531 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -65,16 +65,16 @@ class HttpPusher(object):
         self.store = self.hs.get_datastore()
         self.clock = self.hs.get_clock()
         self.state_handler = self.hs.get_state_handler()
-        self.user_id = pusherdict['user_name']
-        self.app_id = pusherdict['app_id']
-        self.app_display_name = pusherdict['app_display_name']
-        self.device_display_name = pusherdict['device_display_name']
-        self.pushkey = pusherdict['pushkey']
-        self.pushkey_ts = pusherdict['ts']
-        self.data = pusherdict['data']
-        self.last_stream_ordering = pusherdict['last_stream_ordering']
+        self.user_id = pusherdict["user_name"]
+        self.app_id = pusherdict["app_id"]
+        self.app_display_name = pusherdict["app_display_name"]
+        self.device_display_name = pusherdict["device_display_name"]
+        self.pushkey = pusherdict["pushkey"]
+        self.pushkey_ts = pusherdict["ts"]
+        self.data = pusherdict["data"]
+        self.last_stream_ordering = pusherdict["last_stream_ordering"]
         self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
-        self.failing_since = pusherdict['failing_since']
+        self.failing_since = pusherdict["failing_since"]
         self.timed_call = None
         self._is_processing = False
 
@@ -85,32 +85,26 @@ class HttpPusher(object):
         # off as None though as we don't know any better.
         self.max_stream_ordering = None
 
-        if 'data' not in pusherdict:
-            raise PusherConfigException(
-                "No 'data' key for HTTP pusher"
-            )
-        self.data = pusherdict['data']
+        if "data" not in pusherdict:
+            raise PusherConfigException("No 'data' key for HTTP pusher")
+        self.data = pusherdict["data"]
 
         self.name = "%s/%s/%s" % (
-            pusherdict['user_name'],
-            pusherdict['app_id'],
-            pusherdict['pushkey'],
+            pusherdict["user_name"],
+            pusherdict["app_id"],
+            pusherdict["pushkey"],
         )
 
         if self.data is None:
-            raise PusherConfigException(
-                "data can not be null for HTTP pusher"
-            )
+            raise PusherConfigException("data can not be null for HTTP pusher")
 
-        if 'url' not in self.data:
-            raise PusherConfigException(
-                "'url' required in data for HTTP pusher"
-            )
-        self.url = self.data['url']
+        if "url" not in self.data:
+            raise PusherConfigException("'url' required in data for HTTP pusher")
+        self.url = self.data["url"]
         self.http_client = hs.get_simple_http_client()
         self.data_minus_url = {}
         self.data_minus_url.update(self.data)
-        del self.data_minus_url['url']
+        del self.data_minus_url["url"]
 
     def on_started(self, should_check_for_notifs):
         """Called when this pusher has been started.
@@ -124,7 +118,9 @@ class HttpPusher(object):
             self._start_processing()
 
     def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
-        self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering or 0)
+        self.max_stream_ordering = max(
+            max_stream_ordering, self.max_stream_ordering or 0
+        )
         self._start_processing()
 
     def on_new_receipts(self, min_stream_id, max_stream_id):
@@ -192,7 +188,9 @@ class HttpPusher(object):
         logger.info(
             "Processing %i unprocessed push actions for %s starting at "
             "stream_ordering %s",
-            len(unprocessed), self.name, self.last_stream_ordering,
+            len(unprocessed),
+            self.name,
+            self.last_stream_ordering,
         )
 
         for push_action in unprocessed:
@@ -200,71 +198,72 @@ class HttpPusher(object):
             if processed:
                 http_push_processed_counter.inc()
                 self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
-                self.last_stream_ordering = push_action['stream_ordering']
+                self.last_stream_ordering = push_action["stream_ordering"]
                 yield self.store.update_pusher_last_stream_ordering_and_success(
-                    self.app_id, self.pushkey, self.user_id,
+                    self.app_id,
+                    self.pushkey,
+                    self.user_id,
                     self.last_stream_ordering,
-                    self.clock.time_msec()
+                    self.clock.time_msec(),
                 )
                 if self.failing_since:
                     self.failing_since = None
                     yield self.store.update_pusher_failing_since(
-                        self.app_id, self.pushkey, self.user_id,
-                        self.failing_since
+                        self.app_id, self.pushkey, self.user_id, self.failing_since
                     )
             else:
                 http_push_failed_counter.inc()
                 if not self.failing_since:
                     self.failing_since = self.clock.time_msec()
                     yield self.store.update_pusher_failing_since(
-                        self.app_id, self.pushkey, self.user_id,
-                        self.failing_since
+                        self.app_id, self.pushkey, self.user_id, self.failing_since
                     )
 
                 if (
-                    self.failing_since and
-                    self.failing_since <
-                    self.clock.time_msec() - HttpPusher.GIVE_UP_AFTER_MS
+                    self.failing_since
+                    and self.failing_since
+                    < self.clock.time_msec() - HttpPusher.GIVE_UP_AFTER_MS
                 ):
                     # we really only give up so that if the URL gets
                     # fixed, we don't suddenly deliver a load
                     # of old notifications.
-                    logger.warn("Giving up on a notification to user %s, "
-                                "pushkey %s",
-                                self.user_id, self.pushkey)
+                    logger.warn(
+                        "Giving up on a notification to user %s, " "pushkey %s",
+                        self.user_id,
+                        self.pushkey,
+                    )
                     self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
-                    self.last_stream_ordering = push_action['stream_ordering']
+                    self.last_stream_ordering = push_action["stream_ordering"]
                     yield self.store.update_pusher_last_stream_ordering(
                         self.app_id,
                         self.pushkey,
                         self.user_id,
-                        self.last_stream_ordering
+                        self.last_stream_ordering,
                     )
 
                     self.failing_since = None
                     yield self.store.update_pusher_failing_since(
-                        self.app_id,
-                        self.pushkey,
-                        self.user_id,
-                        self.failing_since
+                        self.app_id, self.pushkey, self.user_id, self.failing_since
                     )
                 else:
                     logger.info("Push failed: delaying for %ds", self.backoff_delay)
                     self.timed_call = self.hs.get_reactor().callLater(
                         self.backoff_delay, self.on_timer
                     )
-                    self.backoff_delay = min(self.backoff_delay * 2, self.MAX_BACKOFF_SEC)
+                    self.backoff_delay = min(
+                        self.backoff_delay * 2, self.MAX_BACKOFF_SEC
+                    )
                     break
 
     @defer.inlineCallbacks
     def _process_one(self, push_action):
-        if 'notify' not in push_action['actions']:
+        if "notify" not in push_action["actions"]:
             defer.returnValue(True)
 
-        tweaks = push_rule_evaluator.tweaks_for_actions(push_action['actions'])
+        tweaks = push_rule_evaluator.tweaks_for_actions(push_action["actions"])
         badge = yield push_tools.get_badge_count(self.hs.get_datastore(), self.user_id)
 
-        event = yield self.store.get_event(push_action['event_id'], allow_none=True)
+        event = yield self.store.get_event(push_action["event_id"], allow_none=True)
         if event is None:
             defer.returnValue(True)  # It's been redacted
         rejected = yield self.dispatch_push(event, tweaks, badge)
@@ -277,37 +276,30 @@ class HttpPusher(object):
                     # for sanity, we only remove the pushkey if it
                     # was the one we actually sent...
                     logger.warn(
-                        ("Ignoring rejected pushkey %s because we"
-                         " didn't send it"), pk
+                        ("Ignoring rejected pushkey %s because we" " didn't send it"),
+                        pk,
                     )
                 else:
-                    logger.info(
-                        "Pushkey %s was rejected: removing",
-                        pk
-                    )
-                    yield self.hs.remove_pusher(
-                        self.app_id, pk, self.user_id
-                    )
+                    logger.info("Pushkey %s was rejected: removing", pk)
+                    yield self.hs.remove_pusher(self.app_id, pk, self.user_id)
         defer.returnValue(True)
 
     @defer.inlineCallbacks
     def _build_notification_dict(self, event, tweaks, badge):
-        if self.data.get('format') == 'event_id_only':
+        if self.data.get("format") == "event_id_only":
             d = {
-                'notification': {
-                    'event_id': event.event_id,
-                    'room_id': event.room_id,
-                    'counts': {
-                        'unread': badge,
-                    },
-                    'devices': [
+                "notification": {
+                    "event_id": event.event_id,
+                    "room_id": event.room_id,
+                    "counts": {"unread": badge},
+                    "devices": [
                         {
-                            'app_id': self.app_id,
-                            'pushkey': self.pushkey,
-                            'pushkey_ts': long(self.pushkey_ts / 1000),
-                            'data': self.data_minus_url,
+                            "app_id": self.app_id,
+                            "pushkey": self.pushkey,
+                            "pushkey_ts": long(self.pushkey_ts / 1000),
+                            "data": self.data_minus_url,
                         }
-                    ]
+                    ],
                 }
             }
             defer.returnValue(d)
@@ -317,41 +309,41 @@ class HttpPusher(object):
         )
 
         d = {
-            'notification': {
-                'id': event.event_id,  # deprecated: remove soon
-                'event_id': event.event_id,
-                'room_id': event.room_id,
-                'type': event.type,
-                'sender': event.user_id,
-                'counts': {  # -- we don't mark messages as read yet so
-                             # we have no way of knowing
+            "notification": {
+                "id": event.event_id,  # deprecated: remove soon
+                "event_id": event.event_id,
+                "room_id": event.room_id,
+                "type": event.type,
+                "sender": event.user_id,
+                "counts": {  # -- we don't mark messages as read yet so
+                    # we have no way of knowing
                     # Just set the badge to 1 until we have read receipts
-                    'unread': badge,
+                    "unread": badge,
                     # 'missed_calls': 2
                 },
-                'devices': [
+                "devices": [
                     {
-                        'app_id': self.app_id,
-                        'pushkey': self.pushkey,
-                        'pushkey_ts': long(self.pushkey_ts / 1000),
-                        'data': self.data_minus_url,
-                        'tweaks': tweaks
+                        "app_id": self.app_id,
+                        "pushkey": self.pushkey,
+                        "pushkey_ts": long(self.pushkey_ts / 1000),
+                        "data": self.data_minus_url,
+                        "tweaks": tweaks,
                     }
-                ]
+                ],
             }
         }
-        if event.type == 'm.room.member' and event.is_state():
-            d['notification']['membership'] = event.content['membership']
-            d['notification']['user_is_target'] = event.state_key == self.user_id
+        if event.type == "m.room.member" and event.is_state():
+            d["notification"]["membership"] = event.content["membership"]
+            d["notification"]["user_is_target"] = event.state_key == self.user_id
         if self.hs.config.push_include_content and event.content:
-            d['notification']['content'] = event.content
+            d["notification"]["content"] = event.content
 
         # We no longer send aliases separately, instead, we send the human
         # readable name of the room, which may be an alias.
-        if 'sender_display_name' in ctx and len(ctx['sender_display_name']) > 0:
-            d['notification']['sender_display_name'] = ctx['sender_display_name']
-        if 'name' in ctx and len(ctx['name']) > 0:
-            d['notification']['room_name'] = ctx['name']
+        if "sender_display_name" in ctx and len(ctx["sender_display_name"]) > 0:
+            d["notification"]["sender_display_name"] = ctx["sender_display_name"]
+        if "name" in ctx and len(ctx["name"]) > 0:
+            d["notification"]["room_name"] = ctx["name"]
 
         defer.returnValue(d)
 
@@ -361,16 +353,21 @@ class HttpPusher(object):
         if not notification_dict:
             defer.returnValue([])
         try:
-            resp = yield self.http_client.post_json_get_json(self.url, notification_dict)
+            resp = yield self.http_client.post_json_get_json(
+                self.url, notification_dict
+            )
         except Exception as e:
             logger.warning(
                 "Failed to push event %s to %s: %s %s",
-                event.event_id, self.name, type(e), e,
+                event.event_id,
+                self.name,
+                type(e),
+                e,
             )
             defer.returnValue(False)
         rejected = []
-        if 'rejected' in resp:
-            rejected = resp['rejected']
+        if "rejected" in resp:
+            rejected = resp["rejected"]
         defer.returnValue(rejected)
 
     @defer.inlineCallbacks
@@ -381,21 +378,19 @@ class HttpPusher(object):
         """
         logger.info("Sending updated badge count %d to %s", badge, self.name)
         d = {
-            'notification': {
-                'id': '',
-                'type': None,
-                'sender': '',
-                'counts': {
-                    'unread': badge
-                },
-                'devices': [
+            "notification": {
+                "id": "",
+                "type": None,
+                "sender": "",
+                "counts": {"unread": badge},
+                "devices": [
                     {
-                        'app_id': self.app_id,
-                        'pushkey': self.pushkey,
-                        'pushkey_ts': long(self.pushkey_ts / 1000),
-                        'data': self.data_minus_url,
+                        "app_id": self.app_id,
+                        "pushkey": self.pushkey,
+                        "pushkey_ts": long(self.pushkey_ts / 1000),
+                        "data": self.data_minus_url,
                     }
-                ]
+                ],
             }
         }
         try:
@@ -403,7 +398,6 @@ class HttpPusher(object):
             http_badges_processed_counter.inc()
         except Exception as e:
             logger.warning(
-                "Failed to send badge count to %s: %s %s",
-                self.name, type(e), e,
+                "Failed to send badge count to %s: %s %s", self.name, type(e), e
             )
             http_badges_failed_counter.inc()
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index c269bcf4a4..809199fe88 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -42,17 +42,21 @@ from synapse.visibility import filter_events_for_client
 logger = logging.getLogger(__name__)
 
 
-MESSAGE_FROM_PERSON_IN_ROOM = "You have a message on %(app)s from %(person)s " \
-                              "in the %(room)s room..."
+MESSAGE_FROM_PERSON_IN_ROOM = (
+    "You have a message on %(app)s from %(person)s " "in the %(room)s room..."
+)
 MESSAGE_FROM_PERSON = "You have a message on %(app)s from %(person)s..."
 MESSAGES_FROM_PERSON = "You have messages on %(app)s from %(person)s..."
 MESSAGES_IN_ROOM = "You have messages on %(app)s in the %(room)s room..."
-MESSAGES_IN_ROOM_AND_OTHERS = \
+MESSAGES_IN_ROOM_AND_OTHERS = (
     "You have messages on %(app)s in the %(room)s room and others..."
-MESSAGES_FROM_PERSON_AND_OTHERS = \
+)
+MESSAGES_FROM_PERSON_AND_OTHERS = (
     "You have messages on %(app)s from %(person)s and others..."
-INVITE_FROM_PERSON_TO_ROOM = "%(person)s has invited you to join the " \
-                             "%(room)s room on %(app)s..."
+)
+INVITE_FROM_PERSON_TO_ROOM = (
+    "%(person)s has invited you to join the " "%(room)s room on %(app)s..."
+)
 INVITE_FROM_PERSON = "%(person)s has invited you to chat on %(app)s..."
 
 CONTEXT_BEFORE = 1
@@ -60,12 +64,38 @@ CONTEXT_AFTER = 1
 
 # From https://github.com/matrix-org/matrix-react-sdk/blob/master/src/HtmlUtils.js
 ALLOWED_TAGS = [
-    'font',  # custom to matrix for IRC-style font coloring
-    'del',  # for markdown
+    "font",  # custom to matrix for IRC-style font coloring
+    "del",  # for markdown
     # deliberately no h1/h2 to stop people shouting.
-    'h3', 'h4', 'h5', 'h6', 'blockquote', 'p', 'a', 'ul', 'ol',
-    'nl', 'li', 'b', 'i', 'u', 'strong', 'em', 'strike', 'code', 'hr', 'br', 'div',
-    'table', 'thead', 'caption', 'tbody', 'tr', 'th', 'td', 'pre'
+    "h3",
+    "h4",
+    "h5",
+    "h6",
+    "blockquote",
+    "p",
+    "a",
+    "ul",
+    "ol",
+    "nl",
+    "li",
+    "b",
+    "i",
+    "u",
+    "strong",
+    "em",
+    "strike",
+    "code",
+    "hr",
+    "br",
+    "div",
+    "table",
+    "thead",
+    "caption",
+    "tbody",
+    "tr",
+    "th",
+    "td",
+    "pre",
 ]
 ALLOWED_ATTRS = {
     # custom ones first:
@@ -80,10 +110,10 @@ ALLOWED_ATTRS = {
 
 
 class Mailer(object):
-    def __init__(self, hs, app_name, notif_template_html, notif_template_text):
+    def __init__(self, hs, app_name, template_html, template_text):
         self.hs = hs
-        self.notif_template_html = notif_template_html
-        self.notif_template_text = notif_template_text
+        self.template_html = template_html
+        self.template_text = template_text
 
         self.sendmail = self.hs.get_sendmail()
         self.store = self.hs.get_datastore()
@@ -94,27 +124,44 @@ class Mailer(object):
         logger.info("Created Mailer for app_name %s" % app_name)
 
     @defer.inlineCallbacks
-    def send_notification_mail(self, app_id, user_id, email_address,
-                               push_actions, reason):
-        try:
-            from_string = self.hs.config.email_notif_from % {
-                "app": self.app_name
-            }
-        except TypeError:
-            from_string = self.hs.config.email_notif_from
-
-        raw_from = email.utils.parseaddr(from_string)[1]
-        raw_to = email.utils.parseaddr(email_address)[1]
+    def send_password_reset_mail(self, email_address, token, client_secret, sid):
+        """Send an email with a password reset link to a user
+
+        Args:
+            email_address (str): Email address we're sending the password
+                reset to
+            token (str): Unique token generated by the server to verify
+                password reset email was received
+            client_secret (str): Unique token generated by the client to
+                group together multiple email sending attempts
+            sid (str): The generated session ID
+        """
+        if email.utils.parseaddr(email_address)[1] == "":
+            raise RuntimeError("Invalid 'to' email address")
+
+        link = (
+            self.hs.config.public_baseurl
+            + "_matrix/client/unstable/password_reset/email/submit_token"
+            "?token=%s&client_secret=%s&sid=%s" % (token, client_secret, sid)
+        )
 
-        if raw_to == '':
-            raise RuntimeError("Invalid 'to' address")
+        template_vars = {"link": link}
 
-        rooms_in_order = deduped_ordered_list(
-            [pa['room_id'] for pa in push_actions]
+        yield self.send_email(
+            email_address,
+            "[%s] Password Reset Email" % self.hs.config.server_name,
+            template_vars,
         )
 
+    @defer.inlineCallbacks
+    def send_notification_mail(
+        self, app_id, user_id, email_address, push_actions, reason
+    ):
+        """Send email regarding a user's room notifications"""
+        rooms_in_order = deduped_ordered_list([pa["room_id"] for pa in push_actions])
+
         notif_events = yield self.store.get_events(
-            [pa['event_id'] for pa in push_actions]
+            [pa["event_id"] for pa in push_actions]
         )
 
         notifs_by_room = {}
@@ -144,9 +191,7 @@ class Mailer(object):
         yield concurrently_execute(_fetch_room_state, rooms_in_order, 3)
 
         # actually sort our so-called rooms_in_order list, most recent room first
-        rooms_in_order.sort(
-            key=lambda r: -(notifs_by_room[r][-1]['received_ts'] or 0)
-        )
+        rooms_in_order.sort(key=lambda r: -(notifs_by_room[r][-1]["received_ts"] or 0))
 
         rooms = []
 
@@ -156,9 +201,11 @@ class Mailer(object):
             )
             rooms.append(roomvars)
 
-        reason['room_name'] = yield calculate_room_name(
-            self.store, state_by_room[reason['room_id']], user_id,
-            fallback_to_members=True
+        reason["room_name"] = yield calculate_room_name(
+            self.store,
+            state_by_room[reason["room_id"]],
+            user_id,
+            fallback_to_members=True,
         )
 
         summary_text = yield self.make_summary_text(
@@ -176,33 +223,55 @@ class Mailer(object):
             "reason": reason,
         }
 
-        html_text = self.notif_template_html.render(**template_vars)
+        yield self.send_email(
+            email_address, "[%s] %s" % (self.app_name, summary_text), template_vars
+        )
+
+    @defer.inlineCallbacks
+    def send_email(self, email_address, subject, template_vars):
+        """Send an email with the given information and template text"""
+        try:
+            from_string = self.hs.config.email_notif_from % {"app": self.app_name}
+        except TypeError:
+            from_string = self.hs.config.email_notif_from
+
+        raw_from = email.utils.parseaddr(from_string)[1]
+        raw_to = email.utils.parseaddr(email_address)[1]
+
+        if raw_to == "":
+            raise RuntimeError("Invalid 'to' address")
+
+        html_text = self.template_html.render(**template_vars)
         html_part = MIMEText(html_text, "html", "utf8")
 
-        plain_text = self.notif_template_text.render(**template_vars)
+        plain_text = self.template_text.render(**template_vars)
         text_part = MIMEText(plain_text, "plain", "utf8")
 
-        multipart_msg = MIMEMultipart('alternative')
-        multipart_msg['Subject'] = "[%s] %s" % (self.app_name, summary_text)
-        multipart_msg['From'] = from_string
-        multipart_msg['To'] = email_address
-        multipart_msg['Date'] = email.utils.formatdate()
-        multipart_msg['Message-ID'] = email.utils.make_msgid()
+        multipart_msg = MIMEMultipart("alternative")
+        multipart_msg["Subject"] = subject
+        multipart_msg["From"] = from_string
+        multipart_msg["To"] = email_address
+        multipart_msg["Date"] = email.utils.formatdate()
+        multipart_msg["Message-ID"] = email.utils.make_msgid()
         multipart_msg.attach(text_part)
         multipart_msg.attach(html_part)
 
-        logger.info("Sending email push notification to %s" % email_address)
-
-        yield make_deferred_yieldable(self.sendmail(
-            self.hs.config.email_smtp_host,
-            raw_from, raw_to, multipart_msg.as_string().encode('utf8'),
-            reactor=self.hs.get_reactor(),
-            port=self.hs.config.email_smtp_port,
-            requireAuthentication=self.hs.config.email_smtp_user is not None,
-            username=self.hs.config.email_smtp_user,
-            password=self.hs.config.email_smtp_pass,
-            requireTransportSecurity=self.hs.config.require_transport_security
-        ))
+        logger.info("Sending email notification to %s" % email_address)
+
+        yield make_deferred_yieldable(
+            self.sendmail(
+                self.hs.config.email_smtp_host,
+                raw_from,
+                raw_to,
+                multipart_msg.as_string().encode("utf8"),
+                reactor=self.hs.get_reactor(),
+                port=self.hs.config.email_smtp_port,
+                requireAuthentication=self.hs.config.email_smtp_user is not None,
+                username=self.hs.config.email_smtp_user,
+                password=self.hs.config.email_smtp_pass,
+                requireTransportSecurity=self.hs.config.require_transport_security,
+            )
+        )
 
     @defer.inlineCallbacks
     def get_room_vars(self, room_id, user_id, notifs, notif_events, room_state_ids):
@@ -223,17 +292,18 @@ class Mailer(object):
         if not is_invite:
             for n in notifs:
                 notifvars = yield self.get_notif_vars(
-                    n, user_id, notif_events[n['event_id']], room_state_ids
+                    n, user_id, notif_events[n["event_id"]], room_state_ids
                 )
 
                 # merge overlapping notifs together.
                 # relies on the notifs being in chronological order.
                 merge = False
-                if room_vars['notifs'] and 'messages' in room_vars['notifs'][-1]:
-                    prev_messages = room_vars['notifs'][-1]['messages']
-                    for message in notifvars['messages']:
-                        pm = list(filter(lambda pm: pm['id'] == message['id'],
-                                         prev_messages))
+                if room_vars["notifs"] and "messages" in room_vars["notifs"][-1]:
+                    prev_messages = room_vars["notifs"][-1]["messages"]
+                    for message in notifvars["messages"]:
+                        pm = list(
+                            filter(lambda pm: pm["id"] == message["id"], prev_messages)
+                        )
                         if pm:
                             if not message["is_historical"]:
                                 pm[0]["is_historical"] = False
@@ -244,20 +314,22 @@ class Mailer(object):
                             prev_messages.append(message)
 
                 if not merge:
-                    room_vars['notifs'].append(notifvars)
+                    room_vars["notifs"].append(notifvars)
 
         defer.returnValue(room_vars)
 
     @defer.inlineCallbacks
     def get_notif_vars(self, notif, user_id, notif_event, room_state_ids):
         results = yield self.store.get_events_around(
-            notif['room_id'], notif['event_id'],
-            before_limit=CONTEXT_BEFORE, after_limit=CONTEXT_AFTER
+            notif["room_id"],
+            notif["event_id"],
+            before_limit=CONTEXT_BEFORE,
+            after_limit=CONTEXT_AFTER,
         )
 
         ret = {
             "link": self.make_notif_link(notif),
-            "ts": notif['received_ts'],
+            "ts": notif["received_ts"],
             "messages": [],
         }
 
@@ -269,7 +341,7 @@ class Mailer(object):
         for event in the_events:
             messagevars = yield self.get_message_vars(notif, event, room_state_ids)
             if messagevars is not None:
-                ret['messages'].append(messagevars)
+                ret["messages"].append(messagevars)
 
         defer.returnValue(ret)
 
@@ -291,7 +363,7 @@ class Mailer(object):
 
         ret = {
             "msgtype": msgtype,
-            "is_historical": event.event_id != notif['event_id'],
+            "is_historical": event.event_id != notif["event_id"],
             "id": event.event_id,
             "ts": event.origin_server_ts,
             "sender_name": sender_name,
@@ -330,8 +402,9 @@ class Mailer(object):
         return messagevars
 
     @defer.inlineCallbacks
-    def make_summary_text(self, notifs_by_room, room_state_ids,
-                          notif_events, user_id, reason):
+    def make_summary_text(
+        self, notifs_by_room, room_state_ids, notif_events, user_id, reason
+    ):
         if len(notifs_by_room) == 1:
             # Only one room has new stuff
             room_id = list(notifs_by_room.keys())[0]
@@ -355,16 +428,19 @@ class Mailer(object):
                 inviter_name = name_from_member_event(inviter_member_event)
 
                 if room_name is None:
-                    defer.returnValue(INVITE_FROM_PERSON % {
-                        "person": inviter_name,
-                        "app": self.app_name
-                    })
+                    defer.returnValue(
+                        INVITE_FROM_PERSON
+                        % {"person": inviter_name, "app": self.app_name}
+                    )
                 else:
-                    defer.returnValue(INVITE_FROM_PERSON_TO_ROOM % {
-                        "person": inviter_name,
-                        "room": room_name,
-                        "app": self.app_name,
-                    })
+                    defer.returnValue(
+                        INVITE_FROM_PERSON_TO_ROOM
+                        % {
+                            "person": inviter_name,
+                            "room": room_name,
+                            "app": self.app_name,
+                        }
+                    )
 
             sender_name = None
             if len(notifs_by_room[room_id]) == 1:
@@ -378,67 +454,86 @@ class Mailer(object):
                     sender_name = name_from_member_event(state_event)
 
                 if sender_name is not None and room_name is not None:
-                    defer.returnValue(MESSAGE_FROM_PERSON_IN_ROOM % {
-                        "person": sender_name,
-                        "room": room_name,
-                        "app": self.app_name,
-                    })
+                    defer.returnValue(
+                        MESSAGE_FROM_PERSON_IN_ROOM
+                        % {
+                            "person": sender_name,
+                            "room": room_name,
+                            "app": self.app_name,
+                        }
+                    )
                 elif sender_name is not None:
-                    defer.returnValue(MESSAGE_FROM_PERSON % {
-                        "person": sender_name,
-                        "app": self.app_name,
-                    })
+                    defer.returnValue(
+                        MESSAGE_FROM_PERSON
+                        % {"person": sender_name, "app": self.app_name}
+                    )
             else:
                 # There's more than one notification for this room, so just
                 # say there are several
                 if room_name is not None:
-                    defer.returnValue(MESSAGES_IN_ROOM % {
-                        "room": room_name,
-                        "app": self.app_name,
-                    })
+                    defer.returnValue(
+                        MESSAGES_IN_ROOM % {"room": room_name, "app": self.app_name}
+                    )
                 else:
                     # If the room doesn't have a name, say who the messages
                     # are from explicitly to avoid, "messages in the Bob room"
-                    sender_ids = list(set([
-                        notif_events[n['event_id']].sender
-                        for n in notifs_by_room[room_id]
-                    ]))
-
-                    member_events = yield self.store.get_events([
-                        room_state_ids[room_id][("m.room.member", s)]
-                        for s in sender_ids
-                    ])
-
-                    defer.returnValue(MESSAGES_FROM_PERSON % {
-                        "person": descriptor_from_member_events(member_events.values()),
-                        "app": self.app_name,
-                    })
+                    sender_ids = list(
+                        set(
+                            [
+                                notif_events[n["event_id"]].sender
+                                for n in notifs_by_room[room_id]
+                            ]
+                        )
+                    )
+
+                    member_events = yield self.store.get_events(
+                        [
+                            room_state_ids[room_id][("m.room.member", s)]
+                            for s in sender_ids
+                        ]
+                    )
+
+                    defer.returnValue(
+                        MESSAGES_FROM_PERSON
+                        % {
+                            "person": descriptor_from_member_events(
+                                member_events.values()
+                            ),
+                            "app": self.app_name,
+                        }
+                    )
         else:
             # Stuff's happened in multiple different rooms
 
             # ...but we still refer to the 'reason' room which triggered the mail
-            if reason['room_name'] is not None:
-                defer.returnValue(MESSAGES_IN_ROOM_AND_OTHERS % {
-                    "room": reason['room_name'],
-                    "app": self.app_name,
-                })
+            if reason["room_name"] is not None:
+                defer.returnValue(
+                    MESSAGES_IN_ROOM_AND_OTHERS
+                    % {"room": reason["room_name"], "app": self.app_name}
+                )
             else:
                 # If the reason room doesn't have a name, say who the messages
                 # are from explicitly to avoid, "messages in the Bob room"
-                sender_ids = list(set([
-                    notif_events[n['event_id']].sender
-                    for n in notifs_by_room[reason['room_id']]
-                ]))
+                sender_ids = list(
+                    set(
+                        [
+                            notif_events[n["event_id"]].sender
+                            for n in notifs_by_room[reason["room_id"]]
+                        ]
+                    )
+                )
 
-                member_events = yield self.store.get_events([
-                    room_state_ids[room_id][("m.room.member", s)]
-                    for s in sender_ids
-                ])
+                member_events = yield self.store.get_events(
+                    [room_state_ids[room_id][("m.room.member", s)] for s in sender_ids]
+                )
 
-                defer.returnValue(MESSAGES_FROM_PERSON_AND_OTHERS % {
-                    "person": descriptor_from_member_events(member_events.values()),
-                    "app": self.app_name,
-                })
+                defer.returnValue(
+                    MESSAGES_FROM_PERSON_AND_OTHERS
+                    % {
+                        "person": descriptor_from_member_events(member_events.values()),
+                        "app": self.app_name,
+                    }
+                )
 
     def make_room_link(self, room_id):
         if self.hs.config.email_riot_base_url:
@@ -454,17 +549,17 @@ class Mailer(object):
         if self.hs.config.email_riot_base_url:
             return "%s/#/room/%s/%s" % (
                 self.hs.config.email_riot_base_url,
-                notif['room_id'], notif['event_id']
+                notif["room_id"],
+                notif["event_id"],
             )
         elif self.app_name == "Vector":
             # need /beta for Universal Links to work on iOS
             return "https://vector.im/beta/#/room/%s/%s" % (
-                notif['room_id'], notif['event_id']
+                notif["room_id"],
+                notif["event_id"],
             )
         else:
-            return "https://matrix.to/#/%s/%s" % (
-                notif['room_id'], notif['event_id']
-            )
+            return "https://matrix.to/#/%s/%s" % (notif["room_id"], notif["event_id"])
 
     def make_unsubscribe_link(self, user_id, app_id, email_address):
         params = {
@@ -481,12 +576,18 @@ class Mailer(object):
 
 
 def safe_markup(raw_html):
-    return jinja2.Markup(bleach.linkify(bleach.clean(
-        raw_html, tags=ALLOWED_TAGS, attributes=ALLOWED_ATTRS,
-        # bleach master has this, but it isn't released yet
-        # protocols=ALLOWED_SCHEMES,
-        strip=True
-    )))
+    return jinja2.Markup(
+        bleach.linkify(
+            bleach.clean(
+                raw_html,
+                tags=ALLOWED_TAGS,
+                attributes=ALLOWED_ATTRS,
+                # bleach master has this, but it isn't released yet
+                # protocols=ALLOWED_SCHEMES,
+                strip=True,
+            )
+        )
+    )
 
 
 def safe_text(raw_text):
@@ -494,10 +595,9 @@ def safe_text(raw_text):
     Process text: treat it as HTML but escape any tags (ie. just escape the
     HTML) then linkify it.
     """
-    return jinja2.Markup(bleach.linkify(bleach.clean(
-        raw_text, tags=[], attributes={},
-        strip=False
-    )))
+    return jinja2.Markup(
+        bleach.linkify(bleach.clean(raw_text, tags=[], attributes={}, strip=False))
+    )
 
 
 def deduped_ordered_list(l):
@@ -546,15 +646,11 @@ def _create_mxc_to_http_filter(config):
 
         serverAndMediaId = value[6:]
         fragment = None
-        if '#' in serverAndMediaId:
-            (serverAndMediaId, fragment) = serverAndMediaId.split('#', 1)
+        if "#" in serverAndMediaId:
+            (serverAndMediaId, fragment) = serverAndMediaId.split("#", 1)
             fragment = "#" + fragment
 
-        params = {
-            "width": width,
-            "height": height,
-            "method": resize_method,
-        }
+        params = {"width": width, "height": height, "method": resize_method}
         return "%s_matrix/media/v1/thumbnail/%s?%s%s" % (
             config.public_baseurl,
             serverAndMediaId,
diff --git a/synapse/push/presentable_names.py b/synapse/push/presentable_names.py
index eef6e18c2e..06056fbf4f 100644
--- a/synapse/push/presentable_names.py
+++ b/synapse/push/presentable_names.py
@@ -28,8 +28,13 @@ ALL_ALONE = "Empty Room"
 
 
 @defer.inlineCallbacks
-def calculate_room_name(store, room_state_ids, user_id, fallback_to_members=True,
-                        fallback_to_single_member=True):
+def calculate_room_name(
+    store,
+    room_state_ids,
+    user_id,
+    fallback_to_members=True,
+    fallback_to_single_member=True,
+):
     """
     Works out a user-facing name for the given room as per Matrix
     spec recommendations.
@@ -58,8 +63,10 @@ def calculate_room_name(store, room_state_ids, user_id, fallback_to_members=True
             room_state_ids[("m.room.canonical_alias", "")], allow_none=True
         )
         if (
-            canon_alias and canon_alias.content and canon_alias.content["alias"] and
-            _looks_like_an_alias(canon_alias.content["alias"])
+            canon_alias
+            and canon_alias.content
+            and canon_alias.content["alias"]
+            and _looks_like_an_alias(canon_alias.content["alias"])
         ):
             defer.returnValue(canon_alias.content["alias"])
 
@@ -71,9 +78,7 @@ def calculate_room_name(store, room_state_ids, user_id, fallback_to_members=True
     if "m.room.aliases" in room_state_bytype_ids:
         m_room_aliases = room_state_bytype_ids["m.room.aliases"]
         for alias_id in m_room_aliases.values():
-            alias_event = yield store.get_event(
-                alias_id, allow_none=True
-            )
+            alias_event = yield store.get_event(alias_id, allow_none=True)
             if alias_event and alias_event.content.get("aliases"):
                 the_aliases = alias_event.content["aliases"]
                 if len(the_aliases) > 0 and _looks_like_an_alias(the_aliases[0]):
@@ -89,8 +94,8 @@ def calculate_room_name(store, room_state_ids, user_id, fallback_to_members=True
         )
 
     if (
-        my_member_event is not None and
-        my_member_event.content['membership'] == "invite"
+        my_member_event is not None
+        and my_member_event.content["membership"] == "invite"
     ):
         if ("m.room.member", my_member_event.sender) in room_state_ids:
             inviter_member_event = yield store.get_event(
@@ -100,9 +105,8 @@ def calculate_room_name(store, room_state_ids, user_id, fallback_to_members=True
             if inviter_member_event:
                 if fallback_to_single_member:
                     defer.returnValue(
-                        "Invite from %s" % (
-                            name_from_member_event(inviter_member_event),
-                        )
+                        "Invite from %s"
+                        % (name_from_member_event(inviter_member_event),)
                     )
                 else:
                     return
@@ -116,8 +120,10 @@ def calculate_room_name(store, room_state_ids, user_id, fallback_to_members=True
             list(room_state_bytype_ids["m.room.member"].values())
         )
         all_members = [
-            ev for ev in member_events.values()
-            if ev.content['membership'] == "join" or ev.content['membership'] == "invite"
+            ev
+            for ev in member_events.values()
+            if ev.content["membership"] == "join"
+            or ev.content["membership"] == "invite"
         ]
         # Sort the member events oldest-first so the we name people in the
         # order the joined (it should at least be deterministic rather than
@@ -134,9 +140,9 @@ def calculate_room_name(store, room_state_ids, user_id, fallback_to_members=True
             # or inbound invite, or outbound 3PID invite.
             if all_members[0].sender == user_id:
                 if "m.room.third_party_invite" in room_state_bytype_ids:
-                    third_party_invites = (
-                        room_state_bytype_ids["m.room.third_party_invite"].values()
-                    )
+                    third_party_invites = room_state_bytype_ids[
+                        "m.room.third_party_invite"
+                    ].values()
 
                     if len(third_party_invites) > 0:
                         # technically third party invite events are not member
@@ -162,6 +168,17 @@ def calculate_room_name(store, room_state_ids, user_id, fallback_to_members=True
 
 
 def descriptor_from_member_events(member_events):
+    """Get a description of the room based on the member events.
+
+    Args:
+        member_events (Iterable[FrozenEvent])
+
+    Returns:
+        str
+    """
+
+    member_events = list(member_events)
+
     if len(member_events) == 0:
         return "nobody"
     elif len(member_events) == 1:
@@ -180,8 +197,9 @@ def descriptor_from_member_events(member_events):
 
 def name_from_member_event(member_event):
     if (
-        member_event.content and "displayname" in member_event.content and
-        member_event.content["displayname"]
+        member_event.content
+        and "displayname" in member_event.content
+        and member_event.content["displayname"]
     ):
         return member_event.content["displayname"]
     return member_event.state_key
diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py
index cf6c8b875e..5ed9147de4 100644
--- a/synapse/push/push_rule_evaluator.py
+++ b/synapse/push/push_rule_evaluator.py
@@ -26,8 +26,8 @@ from synapse.util.caches.lrucache import LruCache
 logger = logging.getLogger(__name__)
 
 
-GLOB_REGEX = re.compile(r'\\\[(\\\!|)(.*)\\\]')
-IS_GLOB = re.compile(r'[\?\*\[\]]')
+GLOB_REGEX = re.compile(r"\\\[(\\\!|)(.*)\\\]")
+IS_GLOB = re.compile(r"[\?\*\[\]]")
 INEQUALITY_EXPR = re.compile("^([=<>]*)([0-9]*)$")
 
 
@@ -36,20 +36,20 @@ def _room_member_count(ev, condition, room_member_count):
 
 
 def _sender_notification_permission(ev, condition, sender_power_level, power_levels):
-    notif_level_key = condition.get('key')
+    notif_level_key = condition.get("key")
     if notif_level_key is None:
         return False
 
-    notif_levels = power_levels.get('notifications', {})
+    notif_levels = power_levels.get("notifications", {})
     room_notif_level = notif_levels.get(notif_level_key, 50)
 
     return sender_power_level >= room_notif_level
 
 
 def _test_ineq_condition(condition, number):
-    if 'is' not in condition:
+    if "is" not in condition:
         return False
-    m = INEQUALITY_EXPR.match(condition['is'])
+    m = INEQUALITY_EXPR.match(condition["is"])
     if not m:
         return False
     ineq = m.group(1)
@@ -58,15 +58,15 @@ def _test_ineq_condition(condition, number):
         return False
     rhs = int(rhs)
 
-    if ineq == '' or ineq == '==':
+    if ineq == "" or ineq == "==":
         return number == rhs
-    elif ineq == '<':
+    elif ineq == "<":
         return number < rhs
-    elif ineq == '>':
+    elif ineq == ">":
         return number > rhs
-    elif ineq == '>=':
+    elif ineq == ">=":
         return number >= rhs
-    elif ineq == '<=':
+    elif ineq == "<=":
         return number <= rhs
     else:
         return False
@@ -77,8 +77,8 @@ def tweaks_for_actions(actions):
     for a in actions:
         if not isinstance(a, dict):
             continue
-        if 'set_tweak' in a and 'value' in a:
-            tweaks[a['set_tweak']] = a['value']
+        if "set_tweak" in a and "value" in a:
+            tweaks[a["set_tweak"]] = a["value"]
     return tweaks
 
 
@@ -93,26 +93,24 @@ class PushRuleEvaluatorForEvent(object):
         self._value_cache = _flatten_dict(event)
 
     def matches(self, condition, user_id, display_name):
-        if condition['kind'] == 'event_match':
+        if condition["kind"] == "event_match":
             return self._event_match(condition, user_id)
-        elif condition['kind'] == 'contains_display_name':
+        elif condition["kind"] == "contains_display_name":
             return self._contains_display_name(display_name)
-        elif condition['kind'] == 'room_member_count':
-            return _room_member_count(
-                self._event, condition, self._room_member_count
-            )
-        elif condition['kind'] == 'sender_notification_permission':
+        elif condition["kind"] == "room_member_count":
+            return _room_member_count(self._event, condition, self._room_member_count)
+        elif condition["kind"] == "sender_notification_permission":
             return _sender_notification_permission(
-                self._event, condition, self._sender_power_level, self._power_levels,
+                self._event, condition, self._sender_power_level, self._power_levels
             )
         else:
             return True
 
     def _event_match(self, condition, user_id):
-        pattern = condition.get('pattern', None)
+        pattern = condition.get("pattern", None)
 
         if not pattern:
-            pattern_type = condition.get('pattern_type', None)
+            pattern_type = condition.get("pattern_type", None)
             if pattern_type == "user_id":
                 pattern = user_id
             elif pattern_type == "user_localpart":
@@ -123,14 +121,14 @@ class PushRuleEvaluatorForEvent(object):
             return False
 
         # XXX: optimisation: cache our pattern regexps
-        if condition['key'] == 'content.body':
+        if condition["key"] == "content.body":
             body = self._event.content.get("body", None)
             if not body:
                 return False
 
             return _glob_matches(pattern, body, word_boundary=True)
         else:
-            haystack = self._get_value(condition['key'])
+            haystack = self._get_value(condition["key"])
             if haystack is None:
                 return False
 
@@ -193,16 +191,13 @@ def _glob_to_re(glob, word_boundary):
     if IS_GLOB.search(glob):
         r = re.escape(glob)
 
-        r = r.replace(r'\*', '.*?')
-        r = r.replace(r'\?', '.')
+        r = r.replace(r"\*", ".*?")
+        r = r.replace(r"\?", ".")
 
         # handle [abc], [a-z] and [!a-z] style ranges.
         r = GLOB_REGEX.sub(
             lambda x: (
-                '[%s%s]' % (
-                    x.group(1) and '^' or '',
-                    x.group(2).replace(r'\\\-', '-')
-                )
+                "[%s%s]" % (x.group(1) and "^" or "", x.group(2).replace(r"\\\-", "-"))
             ),
             r,
         )
diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py
index 8049c298c2..e37269cdb9 100644
--- a/synapse/push/push_tools.py
+++ b/synapse/push/push_tools.py
@@ -23,9 +23,7 @@ def get_badge_count(store, user_id):
     invites = yield store.get_invited_rooms_for_user(user_id)
     joins = yield store.get_rooms_for_user(user_id)
 
-    my_receipts_by_room = yield store.get_receipts_for_user(
-        user_id, "m.read",
-    )
+    my_receipts_by_room = yield store.get_receipts_for_user(user_id, "m.read")
 
     badge = len(invites)
 
@@ -57,10 +55,10 @@ def get_context_for_event(store, state_handler, ev, user_id):
         store, room_state_ids, user_id, fallback_to_single_member=False
     )
     if name:
-        ctx['name'] = name
+        ctx["name"] = name
 
     sender_state_event_id = room_state_ids[("m.room.member", ev.sender)]
     sender_state_event = yield store.get_event(sender_state_event_id)
-    ctx['sender_display_name'] = name_from_member_event(sender_state_event)
+    ctx["sender_display_name"] = name_from_member_event(sender_state_event)
 
     defer.returnValue(ctx)
diff --git a/synapse/push/pusher.py b/synapse/push/pusher.py
index 14bc7823cf..a9c64a9c54 100644
--- a/synapse/push/pusher.py
+++ b/synapse/push/pusher.py
@@ -36,9 +36,7 @@ class PusherFactory(object):
     def __init__(self, hs):
         self.hs = hs
 
-        self.pusher_types = {
-            "http": HttpPusher,
-        }
+        self.pusher_types = {"http": HttpPusher}
 
         logger.info("email enable notifs: %r", hs.config.email_enable_notifs)
         if hs.config.email_enable_notifs:
@@ -56,7 +54,7 @@ class PusherFactory(object):
             logger.info("defined email pusher type")
 
     def create_pusher(self, pusherdict):
-        kind = pusherdict['kind']
+        kind = pusherdict["kind"]
         f = self.pusher_types.get(kind, None)
         if not f:
             return None
@@ -70,15 +68,15 @@ class PusherFactory(object):
             mailer = Mailer(
                 hs=self.hs,
                 app_name=app_name,
-                notif_template_html=self.notif_template_html,
-                notif_template_text=self.notif_template_text,
+                template_html=self.notif_template_html,
+                template_text=self.notif_template_text,
             )
             self.mailers[app_name] = mailer
         return EmailPusher(self.hs, pusherdict, mailer)
 
     def _app_name_from_pusherdict(self, pusherdict):
-        if 'data' in pusherdict and 'brand' in pusherdict['data']:
-            app_name = pusherdict['data']['brand']
+        if "data" in pusherdict and "brand" in pusherdict["data"]:
+            app_name = pusherdict["data"]["brand"]
         else:
             app_name = self.hs.config.email_app_name
 
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 40a7709c09..df6f670740 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -40,6 +40,7 @@ class PusherPool:
     notifications are sent; accordingly Pusher.on_started, Pusher.on_new_notifications and
     Pusher.on_new_receipts are not expected to return deferreds.
     """
+
     def __init__(self, _hs):
         self.hs = _hs
         self.pusher_factory = PusherFactory(_hs)
@@ -57,30 +58,47 @@ class PusherPool:
         run_as_background_process("start_pushers", self._start_pushers)
 
     @defer.inlineCallbacks
-    def add_pusher(self, user_id, access_token, kind, app_id,
-                   app_display_name, device_display_name, pushkey, lang, data,
-                   profile_tag=""):
+    def add_pusher(
+        self,
+        user_id,
+        access_token,
+        kind,
+        app_id,
+        app_display_name,
+        device_display_name,
+        pushkey,
+        lang,
+        data,
+        profile_tag="",
+    ):
+        """Creates a new pusher and adds it to the pool
+
+        Returns:
+            Deferred[EmailPusher|HttpPusher]
+        """
         time_now_msec = self.clock.time_msec()
 
         # we try to create the pusher just to validate the config: it
         # will then get pulled out of the database,
         # recreated, added and started: this means we have only one
         # code path adding pushers.
-        self.pusher_factory.create_pusher({
-            "id": None,
-            "user_name": user_id,
-            "kind": kind,
-            "app_id": app_id,
-            "app_display_name": app_display_name,
-            "device_display_name": device_display_name,
-            "pushkey": pushkey,
-            "ts": time_now_msec,
-            "lang": lang,
-            "data": data,
-            "last_stream_ordering": None,
-            "last_success": None,
-            "failing_since": None
-        })
+        self.pusher_factory.create_pusher(
+            {
+                "id": None,
+                "user_name": user_id,
+                "kind": kind,
+                "app_id": app_id,
+                "app_display_name": app_display_name,
+                "device_display_name": device_display_name,
+                "pushkey": pushkey,
+                "ts": time_now_msec,
+                "lang": lang,
+                "data": data,
+                "last_stream_ordering": None,
+                "last_success": None,
+                "failing_since": None,
+            }
+        )
 
         # create the pusher setting last_stream_ordering to the current maximum
         # stream ordering in event_push_actions, so it will process
@@ -103,21 +121,24 @@ class PusherPool:
             last_stream_ordering=last_stream_ordering,
             profile_tag=profile_tag,
         )
-        yield self.start_pusher_by_id(app_id, pushkey, user_id)
+        pusher = yield self.start_pusher_by_id(app_id, pushkey, user_id)
+
+        defer.returnValue(pusher)
 
     @defer.inlineCallbacks
-    def remove_pushers_by_app_id_and_pushkey_not_user(self, app_id, pushkey,
-                                                      not_user_id):
-        to_remove = yield self.store.get_pushers_by_app_id_and_pushkey(
-            app_id, pushkey
-        )
+    def remove_pushers_by_app_id_and_pushkey_not_user(
+        self, app_id, pushkey, not_user_id
+    ):
+        to_remove = yield self.store.get_pushers_by_app_id_and_pushkey(app_id, pushkey)
         for p in to_remove:
-            if p['user_name'] != not_user_id:
+            if p["user_name"] != not_user_id:
                 logger.info(
                     "Removing pusher for app id %s, pushkey %s, user %s",
-                    app_id, pushkey, p['user_name']
+                    app_id,
+                    pushkey,
+                    p["user_name"],
                 )
-                yield self.remove_pusher(p['app_id'], p['pushkey'], p['user_name'])
+                yield self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"])
 
     @defer.inlineCallbacks
     def remove_pushers_by_access_token(self, user_id, access_tokens):
@@ -131,14 +152,14 @@ class PusherPool:
         """
         tokens = set(access_tokens)
         for p in (yield self.store.get_pushers_by_user_id(user_id)):
-            if p['access_token'] in tokens:
+            if p["access_token"] in tokens:
                 logger.info(
                     "Removing pusher for app id %s, pushkey %s, user %s",
-                    p['app_id'], p['pushkey'], p['user_name']
-                )
-                yield self.remove_pusher(
-                    p['app_id'], p['pushkey'], p['user_name'],
+                    p["app_id"],
+                    p["pushkey"],
+                    p["user_name"],
                 )
+                yield self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"])
 
     @defer.inlineCallbacks
     def on_new_notifications(self, min_stream_id, max_stream_id):
@@ -184,21 +205,26 @@ class PusherPool:
 
     @defer.inlineCallbacks
     def start_pusher_by_id(self, app_id, pushkey, user_id):
-        """Look up the details for the given pusher, and start it"""
+        """Look up the details for the given pusher, and start it
+
+        Returns:
+            Deferred[EmailPusher|HttpPusher|None]: The pusher started, if any
+        """
         if not self._should_start_pushers:
             return
 
-        resultlist = yield self.store.get_pushers_by_app_id_and_pushkey(
-            app_id, pushkey
-        )
+        resultlist = yield self.store.get_pushers_by_app_id_and_pushkey(app_id, pushkey)
 
-        p = None
+        pusher_dict = None
         for r in resultlist:
-            if r['user_name'] == user_id:
-                p = r
+            if r["user_name"] == user_id:
+                pusher_dict = r
 
-        if p:
-            yield self._start_pusher(p)
+        pusher = None
+        if pusher_dict:
+            pusher = yield self._start_pusher(pusher_dict)
+
+        defer.returnValue(pusher)
 
     @defer.inlineCallbacks
     def _start_pushers(self):
@@ -224,16 +250,16 @@ class PusherPool:
             pusherdict (dict):
 
         Returns:
-            None
+            Deferred[EmailPusher|HttpPusher]
         """
         try:
             p = self.pusher_factory.create_pusher(pusherdict)
         except PusherConfigException as e:
             logger.warning(
                 "Pusher incorrectly configured user=%s, appid=%s, pushkey=%s: %s",
-                pusherdict.get('user_name'),
-                pusherdict.get('app_id'),
-                pusherdict.get('pushkey'),
+                pusherdict.get("user_name"),
+                pusherdict.get("app_id"),
+                pusherdict.get("pushkey"),
                 e,
             )
             return
@@ -244,11 +270,8 @@ class PusherPool:
         if not p:
             return
 
-        appid_pushkey = "%s:%s" % (
-            pusherdict['app_id'],
-            pusherdict['pushkey'],
-        )
-        byuser = self.pushers.setdefault(pusherdict['user_name'], {})
+        appid_pushkey = "%s:%s" % (pusherdict["app_id"], pusherdict["pushkey"])
+        byuser = self.pushers.setdefault(pusherdict["user_name"], {})
 
         if appid_pushkey in byuser:
             byuser[appid_pushkey].on_stop()
@@ -261,7 +284,7 @@ class PusherPool:
         last_stream_ordering = pusherdict["last_stream_ordering"]
         if last_stream_ordering:
             have_notifs = yield self.store.get_if_maybe_push_in_range_for_user(
-                user_id, last_stream_ordering,
+                user_id, last_stream_ordering
             )
         else:
             # We always want to default to starting up the pusher rather than
@@ -270,6 +293,8 @@ class PusherPool:
 
         p.on_started(have_notifs)
 
+        defer.returnValue(p)
+
     @defer.inlineCallbacks
     def remove_pusher(self, app_id, pushkey, user_id):
         appid_pushkey = "%s:%s" % (app_id, pushkey)
diff --git a/synapse/push/rulekinds.py b/synapse/push/rulekinds.py
index 4cae48ac07..ce7cc1b4ee 100644
--- a/synapse/push/rulekinds.py
+++ b/synapse/push/rulekinds.py
@@ -13,10 +13,10 @@
 # limitations under the License.
 
 PRIORITY_CLASS_MAP = {
-    'underride': 1,
-    'sender': 2,
-    'room': 3,
-    'content': 4,
-    'override': 5,
+    "underride": 1,
+    "sender": 2,
+    "room": 3,
+    "content": 4,
+    "override": 5,
 }
 PRIORITY_CLASS_INVERSE_MAP = {v: k for k, v in PRIORITY_CLASS_MAP.items()}