summary refs log tree commit diff
path: root/synapse/push
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/push')
-rw-r--r--synapse/push/bulk_push_rule_evaluator.py35
-rw-r--r--synapse/push/emailpusher.py52
-rw-r--r--synapse/push/httppusher.py41
-rw-r--r--synapse/push/mailer.py155
-rw-r--r--synapse/push/presentable_names.py36
-rw-r--r--synapse/push/push_rule_evaluator.py77
-rw-r--r--synapse/push/push_tools.py11
-rw-r--r--synapse/push/pusher.py12
-rw-r--r--synapse/push/pusherpool.py59
9 files changed, 258 insertions, 220 deletions
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index 22491f3700..e75d964ac8 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -51,6 +51,7 @@ push_rules_delta_state_cache_metric = register_cache(
     "cache",
     "push_rules_delta_state_cache_metric",
     cache=[],  # Meaningless size, as this isn't a cache that stores values
+    resizable=False,
 )
 
 
@@ -67,7 +68,8 @@ class BulkPushRuleEvaluator(object):
         self.room_push_rule_cache_metrics = register_cache(
             "cache",
             "room_push_rule_cache",
-            cache=[],  # Meaningless size, as this isn't a cache that stores values
+            cache=[],  # Meaningless size, as this isn't a cache that stores values,
+            resizable=False,
         )
 
     @defer.inlineCallbacks
@@ -79,7 +81,7 @@ class BulkPushRuleEvaluator(object):
             dict of user_id -> push_rules
         """
         room_id = event.room_id
-        rules_for_room = self._get_rules_for_room(room_id)
+        rules_for_room = yield self._get_rules_for_room(room_id)
 
         rules_by_user = yield rules_for_room.get_rules(event, context)
 
@@ -116,7 +118,7 @@ class BulkPushRuleEvaluator(object):
 
     @defer.inlineCallbacks
     def _get_power_levels_and_sender_level(self, event, context):
-        prev_state_ids = yield context.get_prev_state_ids(self.store)
+        prev_state_ids = yield context.get_prev_state_ids()
         pl_event_id = prev_state_ids.get(POWER_KEY)
         if pl_event_id:
             # fastpath: if there's a power level event, that's all we need, and
@@ -149,9 +151,10 @@ class BulkPushRuleEvaluator(object):
 
         room_members = yield self.store.get_joined_users_from_context(event, context)
 
-        (power_levels, sender_power_level) = (
-            yield self._get_power_levels_and_sender_level(event, context)
-        )
+        (
+            power_levels,
+            sender_power_level,
+        ) = yield self._get_power_levels_and_sender_level(event, context)
 
         evaluator = PushRuleEvaluatorForEvent(
             event, len(room_members), sender_power_level, power_levels
@@ -303,7 +306,7 @@ class RulesForRoom(object):
 
                 push_rules_delta_state_cache_metric.inc_hits()
             else:
-                current_state_ids = yield context.get_current_state_ids(self.store)
+                current_state_ids = yield context.get_current_state_ids()
                 push_rules_delta_state_cache_metric.inc_misses()
 
             push_rules_state_size_counter.inc(len(current_state_ids))
@@ -385,15 +388,7 @@ class RulesForRoom(object):
         """
         sequence = self.sequence
 
-        rows = yield self.store._simple_select_many_batch(
-            table="room_memberships",
-            column="event_id",
-            iterable=member_event_ids.values(),
-            retcols=("user_id", "membership", "event_id"),
-            keyvalues={},
-            batch_size=500,
-            desc="_get_rules_for_member_event_ids",
-        )
+        rows = yield self.store.get_membership_from_event_ids(member_event_ids.values())
 
         members = {row["event_id"]: (row["user_id"], row["membership"]) for row in rows}
 
@@ -407,11 +402,11 @@ class RulesForRoom(object):
         if logger.isEnabledFor(logging.DEBUG):
             logger.debug("Found members %r: %r", self.room_id, members.values())
 
-        interested_in_user_ids = set(
+        interested_in_user_ids = {
             user_id
             for user_id, membership in itervalues(members)
             if membership == Membership.JOIN
-        )
+        }
 
         logger.debug("Joined: %r", interested_in_user_ids)
 
@@ -419,9 +414,9 @@ class RulesForRoom(object):
             interested_in_user_ids, on_invalidate=self.invalidate_all_cb
         )
 
-        user_ids = set(
+        user_ids = {
             uid for uid, have_pusher in iteritems(if_users_with_pushers) if have_pusher
-        )
+        }
 
         logger.debug("With pushers: %r", user_ids)
 
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index 42e5b0c0a5..568c13eaea 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -15,7 +15,6 @@
 
 import logging
 
-from twisted.internet import defer
 from twisted.internet.error import AlreadyCalled, AlreadyCancelled
 
 from synapse.metrics.background_process_metrics import run_as_background_process
@@ -132,8 +131,7 @@ class EmailPusher(object):
         self._is_processing = False
         self._start_processing()
 
-    @defer.inlineCallbacks
-    def _process(self):
+    async def _process(self):
         # we should never get here if we are already processing
         assert not self._is_processing
 
@@ -142,7 +140,7 @@ class EmailPusher(object):
 
             if self.throttle_params is None:
                 # this is our first loop: load up the throttle params
-                self.throttle_params = yield self.store.get_throttle_params_by_room(
+                self.throttle_params = await self.store.get_throttle_params_by_room(
                     self.pusher_id
                 )
 
@@ -151,7 +149,7 @@ class EmailPusher(object):
             while True:
                 starting_max_ordering = self.max_stream_ordering
                 try:
-                    yield self._unsafe_process()
+                    await self._unsafe_process()
                 except Exception:
                     logger.exception("Exception processing notifs")
                 if self.max_stream_ordering == starting_max_ordering:
@@ -159,8 +157,7 @@ class EmailPusher(object):
         finally:
             self._is_processing = False
 
-    @defer.inlineCallbacks
-    def _unsafe_process(self):
+    async def _unsafe_process(self):
         """
         Main logic of the push loop without the wrapper function that sets
         up logging, measures and guards against multiple instances of it
@@ -168,12 +165,12 @@ class EmailPusher(object):
         """
         start = 0 if INCLUDE_ALL_UNREAD_NOTIFS else self.last_stream_ordering
         fn = self.store.get_unread_push_actions_for_user_in_range_for_email
-        unprocessed = yield fn(self.user_id, start, self.max_stream_ordering)
+        unprocessed = await fn(self.user_id, start, self.max_stream_ordering)
 
         soonest_due_at = None
 
         if not unprocessed:
-            yield self.save_last_stream_ordering_and_success(self.max_stream_ordering)
+            await self.save_last_stream_ordering_and_success(self.max_stream_ordering)
             return
 
         for push_action in unprocessed:
@@ -201,15 +198,15 @@ class EmailPusher(object):
                     "throttle_ms": self.get_room_throttle_ms(push_action["room_id"]),
                 }
 
-                yield self.send_notification(unprocessed, reason)
+                await self.send_notification(unprocessed, reason)
 
-                yield self.save_last_stream_ordering_and_success(
-                    max([ea["stream_ordering"] for ea in unprocessed])
+                await self.save_last_stream_ordering_and_success(
+                    max(ea["stream_ordering"] for ea in unprocessed)
                 )
 
                 # we update the throttle on all the possible unprocessed push actions
                 for ea in unprocessed:
-                    yield self.sent_notif_update_throttle(ea["room_id"], ea)
+                    await self.sent_notif_update_throttle(ea["room_id"], ea)
                 break
             else:
                 if soonest_due_at is None or should_notify_at < soonest_due_at:
@@ -227,21 +224,18 @@ class EmailPusher(object):
                 self.seconds_until(soonest_due_at), self.on_timer
             )
 
-    @defer.inlineCallbacks
-    def save_last_stream_ordering_and_success(self, last_stream_ordering):
+    async def save_last_stream_ordering_and_success(self, last_stream_ordering):
         if last_stream_ordering is None:
             # This happens if we haven't yet processed anything
             return
 
         self.last_stream_ordering = last_stream_ordering
-        pusher_still_exists = (
-            yield self.store.update_pusher_last_stream_ordering_and_success(
-                self.app_id,
-                self.email,
-                self.user_id,
-                last_stream_ordering,
-                self.clock.time_msec(),
-            )
+        pusher_still_exists = await self.store.update_pusher_last_stream_ordering_and_success(
+            self.app_id,
+            self.email,
+            self.user_id,
+            last_stream_ordering,
+            self.clock.time_msec(),
         )
         if not pusher_still_exists:
             # The pusher has been deleted while we were processing, so
@@ -277,13 +271,12 @@ class EmailPusher(object):
         may_send_at = last_sent_ts + throttle_ms
         return may_send_at
 
-    @defer.inlineCallbacks
-    def sent_notif_update_throttle(self, room_id, notified_push_action):
+    async def sent_notif_update_throttle(self, room_id, notified_push_action):
         # We have sent a notification, so update the throttle accordingly.
         # If the event that triggered the notif happened more than
         # THROTTLE_RESET_AFTER_MS after the previous one that triggered a
         # notif, we release the throttle. Otherwise, the throttle is increased.
-        time_of_previous_notifs = yield self.store.get_time_of_last_push_action_before(
+        time_of_previous_notifs = await self.store.get_time_of_last_push_action_before(
             notified_push_action["stream_ordering"]
         )
 
@@ -312,14 +305,13 @@ class EmailPusher(object):
             "last_sent_ts": self.clock.time_msec(),
             "throttle_ms": new_throttle_ms,
         }
-        yield self.store.set_throttle_params(
+        await self.store.set_throttle_params(
             self.pusher_id, room_id, self.throttle_params[room_id]
         )
 
-    @defer.inlineCallbacks
-    def send_notification(self, push_actions, reason):
+    async def send_notification(self, push_actions, reason):
         logger.info("Sending notif email for user %r", self.user_id)
 
-        yield self.mailer.send_notification_mail(
+        await self.mailer.send_notification_mail(
             self.app_id, self.user_id, self.email, push_actions, reason
         )
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 6299587808..eaaa7afc91 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -15,8 +15,6 @@
 # limitations under the License.
 import logging
 
-import six
-
 from prometheus_client import Counter
 
 from twisted.internet import defer
@@ -28,9 +26,6 @@ from synapse.push import PusherConfigException
 
 from . import push_rule_evaluator, push_tools
 
-if six.PY3:
-    long = int
-
 logger = logging.getLogger(__name__)
 
 http_push_processed_counter = Counter(
@@ -64,6 +59,7 @@ class HttpPusher(object):
     def __init__(self, hs, pusherdict):
         self.hs = hs
         self.store = self.hs.get_datastore()
+        self.storage = self.hs.get_storage()
         self.clock = self.hs.get_clock()
         self.state_handler = self.hs.get_state_handler()
         self.user_id = pusherdict["user_name"]
@@ -102,7 +98,7 @@ class HttpPusher(object):
         if "url" not in self.data:
             raise PusherConfigException("'url' required in data for HTTP pusher")
         self.url = self.data["url"]
-        self.http_client = hs.get_simple_http_client()
+        self.http_client = hs.get_proxied_http_client()
         self.data_minus_url = {}
         self.data_minus_url.update(self.data)
         del self.data_minus_url["url"]
@@ -210,14 +206,12 @@ class HttpPusher(object):
                 http_push_processed_counter.inc()
                 self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
                 self.last_stream_ordering = push_action["stream_ordering"]
-                pusher_still_exists = (
-                    yield self.store.update_pusher_last_stream_ordering_and_success(
-                        self.app_id,
-                        self.pushkey,
-                        self.user_id,
-                        self.last_stream_ordering,
-                        self.clock.time_msec(),
-                    )
+                pusher_still_exists = yield self.store.update_pusher_last_stream_ordering_and_success(
+                    self.app_id,
+                    self.pushkey,
+                    self.user_id,
+                    self.last_stream_ordering,
+                    self.clock.time_msec(),
                 )
                 if not pusher_still_exists:
                     # The pusher has been deleted while we were processing, so
@@ -246,8 +240,8 @@ class HttpPusher(object):
                     # we really only give up so that if the URL gets
                     # fixed, we don't suddenly deliver a load
                     # of old notifications.
-                    logger.warn(
-                        "Giving up on a notification to user %s, " "pushkey %s",
+                    logger.warning(
+                        "Giving up on a notification to user %s, pushkey %s",
                         self.user_id,
                         self.pushkey,
                     )
@@ -299,9 +293,8 @@ class HttpPusher(object):
                 if pk != self.pushkey:
                     # for sanity, we only remove the pushkey if it
                     # was the one we actually sent...
-                    logger.warn(
-                        ("Ignoring rejected pushkey %s because we" " didn't send it"),
-                        pk,
+                    logger.warning(
+                        ("Ignoring rejected pushkey %s because we didn't send it"), pk,
                     )
                 else:
                     logger.info("Pushkey %s was rejected: removing", pk)
@@ -320,7 +313,7 @@ class HttpPusher(object):
                         {
                             "app_id": self.app_id,
                             "pushkey": self.pushkey,
-                            "pushkey_ts": long(self.pushkey_ts / 1000),
+                            "pushkey_ts": int(self.pushkey_ts / 1000),
                             "data": self.data_minus_url,
                         }
                     ],
@@ -329,7 +322,7 @@ class HttpPusher(object):
             return d
 
         ctx = yield push_tools.get_context_for_event(
-            self.store, self.state_handler, event, self.user_id
+            self.storage, self.state_handler, event, self.user_id
         )
 
         d = {
@@ -349,7 +342,7 @@ class HttpPusher(object):
                     {
                         "app_id": self.app_id,
                         "pushkey": self.pushkey,
-                        "pushkey_ts": long(self.pushkey_ts / 1000),
+                        "pushkey_ts": int(self.pushkey_ts / 1000),
                         "data": self.data_minus_url,
                         "tweaks": tweaks,
                     }
@@ -400,7 +393,7 @@ class HttpPusher(object):
         Args:
             badge (int): number of unread messages
         """
-        logger.info("Sending updated badge count %d to %s", badge, self.name)
+        logger.debug("Sending updated badge count %d to %s", badge, self.name)
         d = {
             "notification": {
                 "id": "",
@@ -411,7 +404,7 @@ class HttpPusher(object):
                     {
                         "app_id": self.app_id,
                         "pushkey": self.pushkey,
-                        "pushkey_ts": long(self.pushkey_ts / 1000),
+                        "pushkey_ts": int(self.pushkey_ts / 1000),
                         "data": self.data_minus_url,
                     }
                 ],
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index 3dfd527849..d57a66a697 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -19,14 +19,13 @@ import logging
 import time
 from email.mime.multipart import MIMEMultipart
 from email.mime.text import MIMEText
+from typing import Iterable, List, TypeVar
 
 from six.moves import urllib
 
 import bleach
 import jinja2
 
-from twisted.internet import defer
-
 from synapse.api.constants import EventTypes
 from synapse.api.errors import StoreError
 from synapse.logging.context import make_deferred_yieldable
@@ -41,9 +40,11 @@ from synapse.visibility import filter_events_for_client
 
 logger = logging.getLogger(__name__)
 
+T = TypeVar("T")
+
 
 MESSAGE_FROM_PERSON_IN_ROOM = (
-    "You have a message on %(app)s from %(person)s " "in the %(room)s room..."
+    "You have a message on %(app)s from %(person)s in the %(room)s room..."
 )
 MESSAGE_FROM_PERSON = "You have a message on %(app)s from %(person)s..."
 MESSAGES_FROM_PERSON = "You have messages on %(app)s from %(person)s..."
@@ -55,7 +56,7 @@ MESSAGES_FROM_PERSON_AND_OTHERS = (
     "You have messages on %(app)s from %(person)s and others..."
 )
 INVITE_FROM_PERSON_TO_ROOM = (
-    "%(person)s has invited you to join the " "%(room)s room on %(app)s..."
+    "%(person)s has invited you to join the %(room)s room on %(app)s..."
 )
 INVITE_FROM_PERSON = "%(person)s has invited you to chat on %(app)s..."
 
@@ -119,12 +120,12 @@ class Mailer(object):
         self.store = self.hs.get_datastore()
         self.macaroon_gen = self.hs.get_macaroon_generator()
         self.state_handler = self.hs.get_state_handler()
+        self.storage = hs.get_storage()
         self.app_name = app_name
 
         logger.info("Created Mailer for app_name %s" % app_name)
 
-    @defer.inlineCallbacks
-    def send_password_reset_mail(self, email_address, token, client_secret, sid):
+    async def send_password_reset_mail(self, email_address, token, client_secret, sid):
         """Send an email with a password reset link to a user
 
         Args:
@@ -136,22 +137,22 @@ class Mailer(object):
                 group together multiple email sending attempts
             sid (str): The generated session ID
         """
+        params = {"token": token, "client_secret": client_secret, "sid": sid}
         link = (
             self.hs.config.public_baseurl
-            + "_matrix/client/unstable/password_reset/email/submit_token"
-            "?token=%s&client_secret=%s&sid=%s" % (token, client_secret, sid)
+            + "_matrix/client/unstable/password_reset/email/submit_token?%s"
+            % urllib.parse.urlencode(params)
         )
 
         template_vars = {"link": link}
 
-        yield self.send_email(
+        await self.send_email(
             email_address,
             "[%s] Password Reset" % self.hs.config.server_name,
             template_vars,
         )
 
-    @defer.inlineCallbacks
-    def send_registration_mail(self, email_address, token, client_secret, sid):
+    async def send_registration_mail(self, email_address, token, client_secret, sid):
         """Send an email with a registration confirmation link to a user
 
         Args:
@@ -163,28 +164,56 @@ class Mailer(object):
                 group together multiple email sending attempts
             sid (str): The generated session ID
         """
+        params = {"token": token, "client_secret": client_secret, "sid": sid}
         link = (
             self.hs.config.public_baseurl
-            + "_matrix/client/unstable/registration/email/submit_token"
-            "?token=%s&client_secret=%s&sid=%s" % (token, client_secret, sid)
+            + "_matrix/client/unstable/registration/email/submit_token?%s"
+            % urllib.parse.urlencode(params)
         )
 
         template_vars = {"link": link}
 
-        yield self.send_email(
+        await self.send_email(
             email_address,
             "[%s] Register your Email Address" % self.hs.config.server_name,
             template_vars,
         )
 
-    @defer.inlineCallbacks
-    def send_notification_mail(
+    async def send_add_threepid_mail(self, email_address, token, client_secret, sid):
+        """Send an email with a validation link to a user for adding a 3pid to their account
+
+        Args:
+            email_address (str): Email address we're sending the validation link to
+
+            token (str): Unique token generated by the server to verify the email was received
+
+            client_secret (str): Unique token generated by the client to group together
+                multiple email sending attempts
+
+            sid (str): The generated session ID
+        """
+        params = {"token": token, "client_secret": client_secret, "sid": sid}
+        link = (
+            self.hs.config.public_baseurl
+            + "_matrix/client/unstable/add_threepid/email/submit_token?%s"
+            % urllib.parse.urlencode(params)
+        )
+
+        template_vars = {"link": link}
+
+        await self.send_email(
+            email_address,
+            "[%s] Validate Your Email" % self.hs.config.server_name,
+            template_vars,
+        )
+
+    async def send_notification_mail(
         self, app_id, user_id, email_address, push_actions, reason
     ):
         """Send email regarding a user's room notifications"""
         rooms_in_order = deduped_ordered_list([pa["room_id"] for pa in push_actions])
 
-        notif_events = yield self.store.get_events(
+        notif_events = await self.store.get_events(
             [pa["event_id"] for pa in push_actions]
         )
 
@@ -197,7 +226,7 @@ class Mailer(object):
         state_by_room = {}
 
         try:
-            user_display_name = yield self.store.get_profile_displayname(
+            user_display_name = await self.store.get_profile_displayname(
                 UserID.from_string(user_id).localpart
             )
             if user_display_name is None:
@@ -205,14 +234,13 @@ class Mailer(object):
         except StoreError:
             user_display_name = user_id
 
-        @defer.inlineCallbacks
-        def _fetch_room_state(room_id):
-            room_state = yield self.store.get_current_state_ids(room_id)
+        async def _fetch_room_state(room_id):
+            room_state = await self.store.get_current_state_ids(room_id)
             state_by_room[room_id] = room_state
 
         # Run at most 3 of these at once: sync does 10 at a time but email
         # notifs are much less realtime than sync so we can afford to wait a bit.
-        yield concurrently_execute(_fetch_room_state, rooms_in_order, 3)
+        await concurrently_execute(_fetch_room_state, rooms_in_order, 3)
 
         # actually sort our so-called rooms_in_order list, most recent room first
         rooms_in_order.sort(key=lambda r: -(notifs_by_room[r][-1]["received_ts"] or 0))
@@ -220,19 +248,19 @@ class Mailer(object):
         rooms = []
 
         for r in rooms_in_order:
-            roomvars = yield self.get_room_vars(
+            roomvars = await self.get_room_vars(
                 r, user_id, notifs_by_room[r], notif_events, state_by_room[r]
             )
             rooms.append(roomvars)
 
-        reason["room_name"] = yield calculate_room_name(
+        reason["room_name"] = await calculate_room_name(
             self.store,
             state_by_room[reason["room_id"]],
             user_id,
             fallback_to_members=True,
         )
 
-        summary_text = yield self.make_summary_text(
+        summary_text = await self.make_summary_text(
             notifs_by_room, state_by_room, notif_events, user_id, reason
         )
 
@@ -247,12 +275,11 @@ class Mailer(object):
             "reason": reason,
         }
 
-        yield self.send_email(
+        await self.send_email(
             email_address, "[%s] %s" % (self.app_name, summary_text), template_vars
         )
 
-    @defer.inlineCallbacks
-    def send_email(self, email_address, subject, template_vars):
+    async def send_email(self, email_address, subject, template_vars):
         """Send an email with the given information and template text"""
         try:
             from_string = self.hs.config.email_notif_from % {"app": self.app_name}
@@ -280,9 +307,9 @@ class Mailer(object):
         multipart_msg.attach(text_part)
         multipart_msg.attach(html_part)
 
-        logger.info("Sending email notification to %s" % email_address)
+        logger.info("Sending email to %s" % email_address)
 
-        yield make_deferred_yieldable(
+        await make_deferred_yieldable(
             self.sendmail(
                 self.hs.config.email_smtp_host,
                 raw_from,
@@ -297,13 +324,14 @@ class Mailer(object):
             )
         )
 
-    @defer.inlineCallbacks
-    def get_room_vars(self, room_id, user_id, notifs, notif_events, room_state_ids):
+    async def get_room_vars(
+        self, room_id, user_id, notifs, notif_events, room_state_ids
+    ):
         my_member_event_id = room_state_ids[("m.room.member", user_id)]
-        my_member_event = yield self.store.get_event(my_member_event_id)
+        my_member_event = await self.store.get_event(my_member_event_id)
         is_invite = my_member_event.content["membership"] == "invite"
 
-        room_name = yield calculate_room_name(self.store, room_state_ids, user_id)
+        room_name = await calculate_room_name(self.store, room_state_ids, user_id)
 
         room_vars = {
             "title": room_name,
@@ -315,7 +343,7 @@ class Mailer(object):
 
         if not is_invite:
             for n in notifs:
-                notifvars = yield self.get_notif_vars(
+                notifvars = await self.get_notif_vars(
                     n, user_id, notif_events[n["event_id"]], room_state_ids
                 )
 
@@ -342,9 +370,8 @@ class Mailer(object):
 
         return room_vars
 
-    @defer.inlineCallbacks
-    def get_notif_vars(self, notif, user_id, notif_event, room_state_ids):
-        results = yield self.store.get_events_around(
+    async def get_notif_vars(self, notif, user_id, notif_event, room_state_ids):
+        results = await self.store.get_events_around(
             notif["room_id"],
             notif["event_id"],
             before_limit=CONTEXT_BEFORE,
@@ -357,25 +384,24 @@ class Mailer(object):
             "messages": [],
         }
 
-        the_events = yield filter_events_for_client(
-            self.store, user_id, results["events_before"]
+        the_events = await filter_events_for_client(
+            self.storage, user_id, results["events_before"]
         )
         the_events.append(notif_event)
 
         for event in the_events:
-            messagevars = yield self.get_message_vars(notif, event, room_state_ids)
+            messagevars = await self.get_message_vars(notif, event, room_state_ids)
             if messagevars is not None:
                 ret["messages"].append(messagevars)
 
         return ret
 
-    @defer.inlineCallbacks
-    def get_message_vars(self, notif, event, room_state_ids):
+    async def get_message_vars(self, notif, event, room_state_ids):
         if event.type != EventTypes.Message:
             return
 
         sender_state_event_id = room_state_ids[("m.room.member", event.sender)]
-        sender_state_event = yield self.store.get_event(sender_state_event_id)
+        sender_state_event = await self.store.get_event(sender_state_event_id)
         sender_name = name_from_member_event(sender_state_event)
         sender_avatar_url = sender_state_event.content.get("avatar_url")
 
@@ -425,8 +451,7 @@ class Mailer(object):
 
         return messagevars
 
-    @defer.inlineCallbacks
-    def make_summary_text(
+    async def make_summary_text(
         self, notifs_by_room, room_state_ids, notif_events, user_id, reason
     ):
         if len(notifs_by_room) == 1:
@@ -436,17 +461,17 @@ class Mailer(object):
             # If the room has some kind of name, use it, but we don't
             # want the generated-from-names one here otherwise we'll
             # end up with, "new message from Bob in the Bob room"
-            room_name = yield calculate_room_name(
+            room_name = await calculate_room_name(
                 self.store, room_state_ids[room_id], user_id, fallback_to_members=False
             )
 
             my_member_event_id = room_state_ids[room_id][("m.room.member", user_id)]
-            my_member_event = yield self.store.get_event(my_member_event_id)
+            my_member_event = await self.store.get_event(my_member_event_id)
             if my_member_event.content["membership"] == "invite":
                 inviter_member_event_id = room_state_ids[room_id][
                     ("m.room.member", my_member_event.sender)
                 ]
-                inviter_member_event = yield self.store.get_event(
+                inviter_member_event = await self.store.get_event(
                     inviter_member_event_id
                 )
                 inviter_name = name_from_member_event(inviter_member_event)
@@ -471,7 +496,7 @@ class Mailer(object):
                     state_event_id = room_state_ids[room_id][
                         ("m.room.member", event.sender)
                     ]
-                    state_event = yield self.store.get_event(state_event_id)
+                    state_event = await self.store.get_event(state_event_id)
                     sender_name = name_from_member_event(state_event)
 
                 if sender_name is not None and room_name is not None:
@@ -494,15 +519,13 @@ class Mailer(object):
                     # If the room doesn't have a name, say who the messages
                     # are from explicitly to avoid, "messages in the Bob room"
                     sender_ids = list(
-                        set(
-                            [
-                                notif_events[n["event_id"]].sender
-                                for n in notifs_by_room[room_id]
-                            ]
-                        )
+                        {
+                            notif_events[n["event_id"]].sender
+                            for n in notifs_by_room[room_id]
+                        }
                     )
 
-                    member_events = yield self.store.get_events(
+                    member_events = await self.store.get_events(
                         [
                             room_state_ids[room_id][("m.room.member", s)]
                             for s in sender_ids
@@ -525,16 +548,16 @@ class Mailer(object):
             else:
                 # If the reason room doesn't have a name, say who the messages
                 # are from explicitly to avoid, "messages in the Bob room"
+                room_id = reason["room_id"]
+
                 sender_ids = list(
-                    set(
-                        [
-                            notif_events[n["event_id"]].sender
-                            for n in notifs_by_room[reason["room_id"]]
-                        ]
-                    )
+                    {
+                        notif_events[n["event_id"]].sender
+                        for n in notifs_by_room[room_id]
+                    }
                 )
 
-                member_events = yield self.store.get_events(
+                member_events = await self.store.get_events(
                     [room_state_ids[room_id][("m.room.member", s)] for s in sender_ids]
                 )
 
@@ -608,10 +631,10 @@ def safe_text(raw_text):
     )
 
 
-def deduped_ordered_list(l):
+def deduped_ordered_list(it: Iterable[T]) -> List[T]:
     seen = set()
     ret = []
-    for item in l:
+    for item in it:
         if item not in seen:
             seen.add(item)
             ret.append(item)
diff --git a/synapse/push/presentable_names.py b/synapse/push/presentable_names.py
index 16a7e8e31d..0644a13cfc 100644
--- a/synapse/push/presentable_names.py
+++ b/synapse/push/presentable_names.py
@@ -18,6 +18,8 @@ import re
 
 from twisted.internet import defer
 
+from synapse.api.constants import EventTypes
+
 logger = logging.getLogger(__name__)
 
 # intentionally looser than what aliases we allow to be registered since
@@ -50,17 +52,17 @@ def calculate_room_name(
         (string or None) A human readable name for the room.
     """
     # does it have a name?
-    if ("m.room.name", "") in room_state_ids:
+    if (EventTypes.Name, "") in room_state_ids:
         m_room_name = yield store.get_event(
-            room_state_ids[("m.room.name", "")], allow_none=True
+            room_state_ids[(EventTypes.Name, "")], allow_none=True
         )
         if m_room_name and m_room_name.content and m_room_name.content["name"]:
             return m_room_name.content["name"]
 
     # does it have a canonical alias?
-    if ("m.room.canonical_alias", "") in room_state_ids:
+    if (EventTypes.CanonicalAlias, "") in room_state_ids:
         canon_alias = yield store.get_event(
-            room_state_ids[("m.room.canonical_alias", "")], allow_none=True
+            room_state_ids[(EventTypes.CanonicalAlias, "")], allow_none=True
         )
         if (
             canon_alias
@@ -74,32 +76,22 @@ def calculate_room_name(
     # for an event type, so rearrange the data structure
     room_state_bytype_ids = _state_as_two_level_dict(room_state_ids)
 
-    # right then, any aliases at all?
-    if "m.room.aliases" in room_state_bytype_ids:
-        m_room_aliases = room_state_bytype_ids["m.room.aliases"]
-        for alias_id in m_room_aliases.values():
-            alias_event = yield store.get_event(alias_id, allow_none=True)
-            if alias_event and alias_event.content.get("aliases"):
-                the_aliases = alias_event.content["aliases"]
-                if len(the_aliases) > 0 and _looks_like_an_alias(the_aliases[0]):
-                    return the_aliases[0]
-
     if not fallback_to_members:
         return None
 
     my_member_event = None
-    if ("m.room.member", user_id) in room_state_ids:
+    if (EventTypes.Member, user_id) in room_state_ids:
         my_member_event = yield store.get_event(
-            room_state_ids[("m.room.member", user_id)], allow_none=True
+            room_state_ids[(EventTypes.Member, user_id)], allow_none=True
         )
 
     if (
         my_member_event is not None
         and my_member_event.content["membership"] == "invite"
     ):
-        if ("m.room.member", my_member_event.sender) in room_state_ids:
+        if (EventTypes.Member, my_member_event.sender) in room_state_ids:
             inviter_member_event = yield store.get_event(
-                room_state_ids[("m.room.member", my_member_event.sender)],
+                room_state_ids[(EventTypes.Member, my_member_event.sender)],
                 allow_none=True,
             )
             if inviter_member_event:
@@ -114,9 +106,9 @@ def calculate_room_name(
 
     # we're going to have to generate a name based on who's in the room,
     # so find out who is in the room that isn't the user.
-    if "m.room.member" in room_state_bytype_ids:
+    if EventTypes.Member in room_state_bytype_ids:
         member_events = yield store.get_events(
-            list(room_state_bytype_ids["m.room.member"].values())
+            list(room_state_bytype_ids[EventTypes.Member].values())
         )
         all_members = [
             ev
@@ -138,9 +130,9 @@ def calculate_room_name(
             # self-chat, peeked room with 1 participant,
             # or inbound invite, or outbound 3PID invite.
             if all_members[0].sender == user_id:
-                if "m.room.third_party_invite" in room_state_bytype_ids:
+                if EventTypes.ThirdPartyInvite in room_state_bytype_ids:
                     third_party_invites = room_state_bytype_ids[
-                        "m.room.third_party_invite"
+                        EventTypes.ThirdPartyInvite
                     ].values()
 
                     if len(third_party_invites) > 0:
diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py
index 5ed9147de4..11032491af 100644
--- a/synapse/push/push_rule_evaluator.py
+++ b/synapse/push/push_rule_evaluator.py
@@ -16,11 +16,13 @@
 
 import logging
 import re
+from typing import Pattern
 
 from six import string_types
 
+from synapse.events import EventBase
 from synapse.types import UserID
-from synapse.util.caches import CACHE_SIZE_FACTOR, register_cache
+from synapse.util.caches import register_cache
 from synapse.util.caches.lrucache import LruCache
 
 logger = logging.getLogger(__name__)
@@ -56,18 +58,18 @@ def _test_ineq_condition(condition, number):
     rhs = m.group(2)
     if not rhs.isdigit():
         return False
-    rhs = int(rhs)
+    rhs_int = int(rhs)
 
     if ineq == "" or ineq == "==":
-        return number == rhs
+        return number == rhs_int
     elif ineq == "<":
-        return number < rhs
+        return number < rhs_int
     elif ineq == ">":
-        return number > rhs
+        return number > rhs_int
     elif ineq == ">=":
-        return number >= rhs
+        return number >= rhs_int
     elif ineq == "<=":
-        return number <= rhs
+        return number <= rhs_int
     else:
         return False
 
@@ -83,7 +85,13 @@ def tweaks_for_actions(actions):
 
 
 class PushRuleEvaluatorForEvent(object):
-    def __init__(self, event, room_member_count, sender_power_level, power_levels):
+    def __init__(
+        self,
+        event: EventBase,
+        room_member_count: int,
+        sender_power_level: int,
+        power_levels: dict,
+    ):
         self._event = event
         self._room_member_count = room_member_count
         self._sender_power_level = sender_power_level
@@ -92,7 +100,7 @@ class PushRuleEvaluatorForEvent(object):
         # Maps strings of e.g. 'content.body' -> event["content"]["body"]
         self._value_cache = _flatten_dict(event)
 
-    def matches(self, condition, user_id, display_name):
+    def matches(self, condition: dict, user_id: str, display_name: str) -> bool:
         if condition["kind"] == "event_match":
             return self._event_match(condition, user_id)
         elif condition["kind"] == "contains_display_name":
@@ -106,7 +114,7 @@ class PushRuleEvaluatorForEvent(object):
         else:
             return True
 
-    def _event_match(self, condition, user_id):
+    def _event_match(self, condition: dict, user_id: str) -> bool:
         pattern = condition.get("pattern", None)
 
         if not pattern:
@@ -117,7 +125,7 @@ class PushRuleEvaluatorForEvent(object):
                 pattern = UserID.from_string(user_id).localpart
 
         if not pattern:
-            logger.warn("event_match condition with no pattern")
+            logger.warning("event_match condition with no pattern")
             return False
 
         # XXX: optimisation: cache our pattern regexps
@@ -134,7 +142,7 @@ class PushRuleEvaluatorForEvent(object):
 
             return _glob_matches(pattern, haystack)
 
-    def _contains_display_name(self, display_name):
+    def _contains_display_name(self, display_name: str) -> bool:
         if not display_name:
             return False
 
@@ -142,51 +150,52 @@ class PushRuleEvaluatorForEvent(object):
         if not body:
             return False
 
-        return _glob_matches(display_name, body, word_boundary=True)
+        # Similar to _glob_matches, but do not treat display_name as a glob.
+        r = regex_cache.get((display_name, False, True), None)
+        if not r:
+            r = re.escape(display_name)
+            r = _re_word_boundary(r)
+            r = re.compile(r, flags=re.IGNORECASE)
+            regex_cache[(display_name, False, True)] = r
+
+        return r.search(body)
 
-    def _get_value(self, dotted_key):
+    def _get_value(self, dotted_key: str) -> str:
         return self._value_cache.get(dotted_key, None)
 
 
-# Caches (glob, word_boundary) -> regex for push. See _glob_matches
-regex_cache = LruCache(50000 * CACHE_SIZE_FACTOR)
+# Caches (string, is_glob, word_boundary) -> regex for push. See _glob_matches
+regex_cache = LruCache(50000)
 register_cache("cache", "regex_push_cache", regex_cache)
 
 
-def _glob_matches(glob, value, word_boundary=False):
+def _glob_matches(glob: str, value: str, word_boundary: bool = False) -> bool:
     """Tests if value matches glob.
 
     Args:
-        glob (string)
-        value (string): String to test against glob.
-        word_boundary (bool): Whether to match against word boundaries or entire
+        glob
+        value: String to test against glob.
+        word_boundary: Whether to match against word boundaries or entire
             string. Defaults to False.
-
-    Returns:
-        bool
     """
 
     try:
-        r = regex_cache.get((glob, word_boundary), None)
+        r = regex_cache.get((glob, True, word_boundary), None)
         if not r:
             r = _glob_to_re(glob, word_boundary)
-            regex_cache[(glob, word_boundary)] = r
+            regex_cache[(glob, True, word_boundary)] = r
         return r.search(value)
     except re.error:
-        logger.warn("Failed to parse glob to regex: %r", glob)
+        logger.warning("Failed to parse glob to regex: %r", glob)
         return False
 
 
-def _glob_to_re(glob, word_boundary):
+def _glob_to_re(glob: str, word_boundary: bool) -> Pattern:
     """Generates regex for a given glob.
 
     Args:
-        glob (string)
-        word_boundary (bool): Whether to match against word boundaries or entire
-            string. Defaults to False.
-
-    Returns:
-        regex object
+        glob
+        word_boundary: Whether to match against word boundaries or entire string.
     """
     if IS_GLOB.search(glob):
         r = re.escape(glob)
@@ -219,7 +228,7 @@ def _glob_to_re(glob, word_boundary):
         return re.compile(r, flags=re.IGNORECASE)
 
 
-def _re_word_boundary(r):
+def _re_word_boundary(r: str) -> str:
     """
     Adds word boundary characters to the start and end of an
     expression to require that the match occur as a whole word,
diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py
index a54051a726..5dae4648c0 100644
--- a/synapse/push/push_tools.py
+++ b/synapse/push/push_tools.py
@@ -16,11 +16,12 @@
 from twisted.internet import defer
 
 from synapse.push.presentable_names import calculate_room_name, name_from_member_event
+from synapse.storage import Storage
 
 
 @defer.inlineCallbacks
 def get_badge_count(store, user_id):
-    invites = yield store.get_invited_rooms_for_user(user_id)
+    invites = yield store.get_invited_rooms_for_local_user(user_id)
     joins = yield store.get_rooms_for_user(user_id)
 
     my_receipts_by_room = yield store.get_receipts_for_user(user_id, "m.read")
@@ -43,22 +44,22 @@ def get_badge_count(store, user_id):
 
 
 @defer.inlineCallbacks
-def get_context_for_event(store, state_handler, ev, user_id):
+def get_context_for_event(storage: Storage, state_handler, ev, user_id):
     ctx = {}
 
-    room_state_ids = yield store.get_state_ids_for_event(ev.event_id)
+    room_state_ids = yield storage.state.get_state_ids_for_event(ev.event_id)
 
     # we no longer bother setting room_alias, and make room_name the
     # human-readable name instead, be that m.room.name, an alias or
     # a list of people in the room
     name = yield calculate_room_name(
-        store, room_state_ids, user_id, fallback_to_single_member=False
+        storage.main, room_state_ids, user_id, fallback_to_single_member=False
     )
     if name:
         ctx["name"] = name
 
     sender_state_event_id = room_state_ids[("m.room.member", ev.sender)]
-    sender_state_event = yield store.get_event(sender_state_event_id)
+    sender_state_event = yield storage.main.get_event(sender_state_event_id)
     ctx["sender_display_name"] = name_from_member_event(sender_state_event)
 
     return ctx
diff --git a/synapse/push/pusher.py b/synapse/push/pusher.py
index f277aeb131..8ad0bf5936 100644
--- a/synapse/push/pusher.py
+++ b/synapse/push/pusher.py
@@ -80,9 +80,11 @@ class PusherFactory(object):
         return EmailPusher(self.hs, pusherdict, mailer)
 
     def _app_name_from_pusherdict(self, pusherdict):
-        if "data" in pusherdict and "brand" in pusherdict["data"]:
-            app_name = pusherdict["data"]["brand"]
-        else:
-            app_name = self.config.email_app_name
+        data = pusherdict["data"]
 
-        return app_name
+        if isinstance(data, dict):
+            brand = data.get("brand")
+            if isinstance(brand, str):
+                return brand
+
+        return self.config.email_app_name
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 08e840fdc2..88d203aa44 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -15,11 +15,17 @@
 # limitations under the License.
 
 import logging
+from collections import defaultdict
+from threading import Lock
+from typing import Dict, Tuple, Union
 
 from twisted.internet import defer
 
+from synapse.metrics import LaterGauge
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.push import PusherConfigException
+from synapse.push.emailpusher import EmailPusher
+from synapse.push.httppusher import HttpPusher
 from synapse.push.pusher import PusherFactory
 from synapse.util.async_helpers import concurrently_execute
 
@@ -47,7 +53,29 @@ class PusherPool:
         self._should_start_pushers = _hs.config.start_pushers
         self.store = self.hs.get_datastore()
         self.clock = self.hs.get_clock()
-        self.pushers = {}
+
+        # map from user id to app_id:pushkey to pusher
+        self.pushers = {}  # type: Dict[str, Dict[str, Union[HttpPusher, EmailPusher]]]
+
+        # a lock for the pushers dict, since `count_pushers` is called from an different
+        # and we otherwise get concurrent modification errors
+        self._pushers_lock = Lock()
+
+        def count_pushers():
+            results = defaultdict(int)  # type: Dict[Tuple[str, str], int]
+            with self._pushers_lock:
+                for pushers in self.pushers.values():
+                    for pusher in pushers.values():
+                        k = (type(pusher).__name__, pusher.app_id)
+                        results[k] += 1
+            return results
+
+        LaterGauge(
+            name="synapse_pushers",
+            desc="the number of active pushers",
+            labels=["kind", "app_id"],
+            caller=count_pushers,
+        )
 
     def start(self):
         """Starts the pushers off in a background process.
@@ -103,9 +131,7 @@ class PusherPool:
         # create the pusher setting last_stream_ordering to the current maximum
         # stream ordering in event_push_actions, so it will process
         # pushes from this point onwards.
-        last_stream_ordering = (
-            yield self.store.get_latest_push_action_stream_ordering()
-        )
+        last_stream_ordering = yield self.store.get_latest_push_action_stream_ordering()
 
         yield self.store.add_pusher(
             user_id=user_id,
@@ -193,7 +219,7 @@ class PusherPool:
                 min_stream_id - 1, max_stream_id
             )
             # This returns a tuple, user_id is at index 3
-            users_affected = set([r[3] for r in updated_receipts])
+            users_affected = {r[3] for r in updated_receipts}
 
             for u in users_affected:
                 if u in self.pushers:
@@ -234,7 +260,6 @@ class PusherPool:
             Deferred
         """
         pushers = yield self.store.get_all_pushers()
-        logger.info("Starting %d pushers", len(pushers))
 
         # Stagger starting up the pushers so we don't completely drown the
         # process on start up.
@@ -247,7 +272,7 @@ class PusherPool:
         """Start the given pusher
 
         Args:
-            pusherdict (dict):
+            pusherdict (dict): dict with the values pulled from the db table
 
         Returns:
             Deferred[EmailPusher|HttpPusher]
@@ -256,7 +281,8 @@ class PusherPool:
             p = self.pusher_factory.create_pusher(pusherdict)
         except PusherConfigException as e:
             logger.warning(
-                "Pusher incorrectly configured user=%s, appid=%s, pushkey=%s: %s",
+                "Pusher incorrectly configured id=%i, user=%s, appid=%s, pushkey=%s: %s",
+                pusherdict["id"],
                 pusherdict.get("user_name"),
                 pusherdict.get("app_id"),
                 pusherdict.get("pushkey"),
@@ -264,18 +290,21 @@ class PusherPool:
             )
             return
         except Exception:
-            logger.exception("Couldn't start a pusher: caught Exception")
+            logger.exception(
+                "Couldn't start pusher id %i: caught Exception", pusherdict["id"],
+            )
             return
 
         if not p:
             return
 
         appid_pushkey = "%s:%s" % (pusherdict["app_id"], pusherdict["pushkey"])
-        byuser = self.pushers.setdefault(pusherdict["user_name"], {})
 
-        if appid_pushkey in byuser:
-            byuser[appid_pushkey].on_stop()
-        byuser[appid_pushkey] = p
+        with self._pushers_lock:
+            byuser = self.pushers.setdefault(pusherdict["user_name"], {})
+            if appid_pushkey in byuser:
+                byuser[appid_pushkey].on_stop()
+            byuser[appid_pushkey] = p
 
         # Check if there *may* be push to process. We do this as this check is a
         # lot cheaper to do than actually fetching the exact rows we need to
@@ -304,7 +333,9 @@ class PusherPool:
         if appid_pushkey in byuser:
             logger.info("Stopping pusher %s / %s", user_id, appid_pushkey)
             byuser[appid_pushkey].on_stop()
-            del byuser[appid_pushkey]
+            with self._pushers_lock:
+                del byuser[appid_pushkey]
+
         yield self.store.delete_pusher_by_app_id_pushkey_user_id(
             app_id, pushkey, user_id
         )