summary refs log tree commit diff
path: root/synapse/push
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/push')
-rw-r--r--synapse/push/__init__.py108
-rw-r--r--synapse/push/action_generator.py15
-rw-r--r--synapse/push/baserules.py23
-rw-r--r--synapse/push/bulk_push_rule_evaluator.py98
-rw-r--r--synapse/push/clientformat.py23
-rw-r--r--synapse/push/emailpusher.py111
-rw-r--r--synapse/push/httppusher.py129
-rw-r--r--synapse/push/mailer.py123
-rw-r--r--synapse/push/presentable_names.py48
-rw-r--r--synapse/push/push_rule_evaluator.py28
-rw-r--r--synapse/push/push_tools.py7
-rw-r--r--synapse/push/pusher.py34
-rw-r--r--synapse/push/pusherpool.py168
13 files changed, 569 insertions, 346 deletions
diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py
index 5a437f9810..9e7ac149a1 100644
--- a/synapse/push/__init__.py
+++ b/synapse/push/__init__.py
@@ -13,7 +13,111 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import abc
+from typing import TYPE_CHECKING, Any, Dict, Optional
+
+import attr
+
+from synapse.types import JsonDict, RoomStreamToken
+
+if TYPE_CHECKING:
+    from synapse.app.homeserver import HomeServer
+
+
+@attr.s(slots=True)
+class PusherConfig:
+    """Parameters necessary to configure a pusher."""
+
+    id = attr.ib(type=Optional[str])
+    user_name = attr.ib(type=str)
+    access_token = attr.ib(type=Optional[int])
+    profile_tag = attr.ib(type=str)
+    kind = attr.ib(type=str)
+    app_id = attr.ib(type=str)
+    app_display_name = attr.ib(type=str)
+    device_display_name = attr.ib(type=str)
+    pushkey = attr.ib(type=str)
+    ts = attr.ib(type=int)
+    lang = attr.ib(type=Optional[str])
+    data = attr.ib(type=Optional[JsonDict])
+    last_stream_ordering = attr.ib(type=Optional[int])
+    last_success = attr.ib(type=Optional[int])
+    failing_since = attr.ib(type=Optional[int])
+
+    def as_dict(self) -> Dict[str, Any]:
+        """Information that can be retrieved about a pusher after creation."""
+        return {
+            "app_display_name": self.app_display_name,
+            "app_id": self.app_id,
+            "data": self.data,
+            "device_display_name": self.device_display_name,
+            "kind": self.kind,
+            "lang": self.lang,
+            "profile_tag": self.profile_tag,
+            "pushkey": self.pushkey,
+        }
+
+
+@attr.s(slots=True)
+class ThrottleParams:
+    """Parameters for controlling the rate of sending pushes via email."""
+
+    last_sent_ts = attr.ib(type=int)
+    throttle_ms = attr.ib(type=int)
+
+
+class Pusher(metaclass=abc.ABCMeta):
+    def __init__(self, hs: "HomeServer", pusher_config: PusherConfig):
+        self.hs = hs
+        self.store = self.hs.get_datastore()
+        self.clock = self.hs.get_clock()
+
+        self.pusher_id = pusher_config.id
+        self.user_id = pusher_config.user_name
+        self.app_id = pusher_config.app_id
+        self.pushkey = pusher_config.pushkey
+
+        self.last_stream_ordering = pusher_config.last_stream_ordering
+
+        # This is the highest stream ordering we know it's safe to process.
+        # When new events arrive, we'll be given a window of new events: we
+        # should honour this rather than just looking for anything higher
+        # because of potential out-of-order event serialisation.
+        self.max_stream_ordering = self.store.get_room_max_stream_ordering()
+
+    def on_new_notifications(self, max_token: RoomStreamToken) -> None:
+        # We just use the minimum stream ordering and ignore the vector clock
+        # component. This is safe to do as long as we *always* ignore the vector
+        # clock components.
+        max_stream_ordering = max_token.stream
+
+        self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering)
+        self._start_processing()
+
+    @abc.abstractmethod
+    def _start_processing(self):
+        """Start processing push notifications."""
+        raise NotImplementedError()
+
+    @abc.abstractmethod
+    def on_new_receipts(self, min_stream_id: int, max_stream_id: int) -> None:
+        raise NotImplementedError()
+
+    @abc.abstractmethod
+    def on_started(self, have_notifs: bool) -> None:
+        """Called when this pusher has been started.
+
+        Args:
+            should_check_for_notifs: Whether we should immediately
+                check for push to send. Set to False only if it's known there
+                is nothing to send
+        """
+        raise NotImplementedError()
+
+    @abc.abstractmethod
+    def on_stop(self) -> None:
+        raise NotImplementedError()
+
 
 class PusherConfigException(Exception):
-    def __init__(self, msg):
-        super().__init__(msg)
+    """An error occurred when creating a pusher."""
diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py
index fabc9ba126..aaed28650d 100644
--- a/synapse/push/action_generator.py
+++ b/synapse/push/action_generator.py
@@ -14,19 +14,22 @@
 # limitations under the License.
 
 import logging
+from typing import TYPE_CHECKING
 
+from synapse.events import EventBase
+from synapse.events.snapshot import EventContext
+from synapse.push.bulk_push_rule_evaluator import BulkPushRuleEvaluator
 from synapse.util.metrics import Measure
 
-from .bulk_push_rule_evaluator import BulkPushRuleEvaluator
+if TYPE_CHECKING:
+    from synapse.app.homeserver import HomeServer
 
 logger = logging.getLogger(__name__)
 
 
 class ActionGenerator:
-    def __init__(self, hs):
-        self.hs = hs
+    def __init__(self, hs: "HomeServer"):
         self.clock = hs.get_clock()
-        self.store = hs.get_datastore()
         self.bulk_evaluator = BulkPushRuleEvaluator(hs)
         # really we want to get all user ids and all profile tags too,
         # since we want the actions for each profile tag for every user and
@@ -35,6 +38,8 @@ class ActionGenerator:
         # event stream, so we just run the rules for a client with no profile
         # tag (ie. we just need all the users).
 
-    async def handle_push_actions_for_event(self, event, context):
+    async def handle_push_actions_for_event(
+        self, event: EventBase, context: EventContext
+    ) -> None:
         with Measure(self.clock, "action_for_event_by_user"):
             await self.bulk_evaluator.action_for_event_by_user(event, context)
diff --git a/synapse/push/baserules.py b/synapse/push/baserules.py
index f5788c1de7..6211506990 100644
--- a/synapse/push/baserules.py
+++ b/synapse/push/baserules.py
@@ -15,16 +15,19 @@
 # limitations under the License.
 
 import copy
+from typing import Any, Dict, List
 
 from synapse.push.rulekinds import PRIORITY_CLASS_INVERSE_MAP, PRIORITY_CLASS_MAP
 
 
-def list_with_base_rules(rawrules, use_new_defaults=False):
+def list_with_base_rules(
+    rawrules: List[Dict[str, Any]], use_new_defaults: bool = False
+) -> List[Dict[str, Any]]:
     """Combine the list of rules set by the user with the default push rules
 
     Args:
-        rawrules(list): The rules the user has modified or set.
-        use_new_defaults(bool): Whether to use the new experimental default rules when
+        rawrules: The rules the user has modified or set.
+        use_new_defaults: Whether to use the new experimental default rules when
             appending or prepending default rules.
 
     Returns:
@@ -94,7 +97,11 @@ def list_with_base_rules(rawrules, use_new_defaults=False):
     return ruleslist
 
 
-def make_base_append_rules(kind, modified_base_rules, use_new_defaults=False):
+def make_base_append_rules(
+    kind: str,
+    modified_base_rules: Dict[str, Dict[str, Any]],
+    use_new_defaults: bool = False,
+) -> List[Dict[str, Any]]:
     rules = []
 
     if kind == "override":
@@ -116,6 +123,7 @@ def make_base_append_rules(kind, modified_base_rules, use_new_defaults=False):
     rules = copy.deepcopy(rules)
     for r in rules:
         # Only modify the actions, keep the conditions the same.
+        assert isinstance(r["rule_id"], str)
         modified = modified_base_rules.get(r["rule_id"])
         if modified:
             r["actions"] = modified["actions"]
@@ -123,7 +131,11 @@ def make_base_append_rules(kind, modified_base_rules, use_new_defaults=False):
     return rules
 
 
-def make_base_prepend_rules(kind, modified_base_rules, use_new_defaults=False):
+def make_base_prepend_rules(
+    kind: str,
+    modified_base_rules: Dict[str, Dict[str, Any]],
+    use_new_defaults: bool = False,
+) -> List[Dict[str, Any]]:
     rules = []
 
     if kind == "override":
@@ -133,6 +145,7 @@ def make_base_prepend_rules(kind, modified_base_rules, use_new_defaults=False):
     rules = copy.deepcopy(rules)
     for r in rules:
         # Only modify the actions, keep the conditions the same.
+        assert isinstance(r["rule_id"], str)
         modified = modified_base_rules.get(r["rule_id"])
         if modified:
             r["actions"] = modified["actions"]
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index 82a72dc34f..10f27e4378 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -15,6 +15,7 @@
 # limitations under the License.
 
 import logging
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple, Union
 
 import attr
 from prometheus_client import Counter
@@ -25,16 +26,16 @@ from synapse.events import EventBase
 from synapse.events.snapshot import EventContext
 from synapse.state import POWER_KEY
 from synapse.util.async_helpers import Linearizer
-from synapse.util.caches import register_cache
+from synapse.util.caches import CacheMetric, register_cache
 from synapse.util.caches.descriptors import lru_cache
 from synapse.util.caches.lrucache import LruCache
 
 from .push_rule_evaluator import PushRuleEvaluatorForEvent
 
-logger = logging.getLogger(__name__)
-
+if TYPE_CHECKING:
+    from synapse.app.homeserver import HomeServer
 
-rules_by_room = {}
+logger = logging.getLogger(__name__)
 
 
 push_rules_invalidation_counter = Counter(
@@ -101,7 +102,7 @@ class BulkPushRuleEvaluator:
     room at once.
     """
 
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         self.hs = hs
         self.store = hs.get_datastore()
         self.auth = hs.get_auth()
@@ -113,7 +114,9 @@ class BulkPushRuleEvaluator:
             resizable=False,
         )
 
-    async def _get_rules_for_event(self, event, context):
+    async def _get_rules_for_event(
+        self, event: EventBase, context: EventContext
+    ) -> Dict[str, List[Dict[str, Any]]]:
         """This gets the rules for all users in the room at the time of the event,
         as well as the push rules for the invitee if the event is an invite.
 
@@ -140,11 +143,8 @@ class BulkPushRuleEvaluator:
         return rules_by_user
 
     @lru_cache()
-    def _get_rules_for_room(self, room_id):
+    def _get_rules_for_room(self, room_id: str) -> "RulesForRoom":
         """Get the current RulesForRoom object for the given room id
-
-        Returns:
-            RulesForRoom
         """
         # It's important that RulesForRoom gets added to self._get_rules_for_room.cache
         # before any lookup methods get called on it as otherwise there may be
@@ -156,20 +156,21 @@ class BulkPushRuleEvaluator:
             self.room_push_rule_cache_metrics,
         )
 
-    async def _get_power_levels_and_sender_level(self, event, context):
+    async def _get_power_levels_and_sender_level(
+        self, event: EventBase, context: EventContext
+    ) -> Tuple[dict, int]:
         prev_state_ids = await context.get_prev_state_ids()
         pl_event_id = prev_state_ids.get(POWER_KEY)
         if pl_event_id:
             # fastpath: if there's a power level event, that's all we need, and
             # not having a power level event is an extreme edge case
-            pl_event = await self.store.get_event(pl_event_id)
-            auth_events = {POWER_KEY: pl_event}
+            auth_events = {POWER_KEY: await self.store.get_event(pl_event_id)}
         else:
             auth_events_ids = self.auth.compute_auth_events(
                 event, prev_state_ids, for_verification=False
             )
-            auth_events = await self.store.get_events(auth_events_ids)
-            auth_events = {(e.type, e.state_key): e for e in auth_events.values()}
+            auth_events_dict = await self.store.get_events(auth_events_ids)
+            auth_events = {(e.type, e.state_key): e for e in auth_events_dict.values()}
 
         sender_level = get_user_power_level(event.sender, auth_events)
 
@@ -177,7 +178,9 @@ class BulkPushRuleEvaluator:
 
         return pl_event.content if pl_event else {}, sender_level
 
-    async def action_for_event_by_user(self, event, context) -> None:
+    async def action_for_event_by_user(
+        self, event: EventBase, context: EventContext
+    ) -> None:
         """Given an event and context, evaluate the push rules, check if the message
         should increment the unread count, and insert the results into the
         event_push_actions_staging table.
@@ -185,7 +188,7 @@ class BulkPushRuleEvaluator:
         count_as_unread = _should_count_as_unread(event, context)
 
         rules_by_user = await self._get_rules_for_event(event, context)
-        actions_by_user = {}
+        actions_by_user = {}  # type: Dict[str, List[Union[dict, str]]]
 
         room_members = await self.store.get_joined_users_from_context(event, context)
 
@@ -198,7 +201,7 @@ class BulkPushRuleEvaluator:
             event, len(room_members), sender_power_level, power_levels
         )
 
-        condition_cache = {}
+        condition_cache = {}  # type: Dict[str, bool]
 
         for uid, rules in rules_by_user.items():
             if event.sender == uid:
@@ -249,7 +252,13 @@ class BulkPushRuleEvaluator:
         )
 
 
-def _condition_checker(evaluator, conditions, uid, display_name, cache):
+def _condition_checker(
+    evaluator: PushRuleEvaluatorForEvent,
+    conditions: List[dict],
+    uid: str,
+    display_name: str,
+    cache: Dict[str, bool],
+) -> bool:
     for cond in conditions:
         _id = cond.get("_id", None)
         if _id:
@@ -277,15 +286,19 @@ class RulesForRoom:
     """
 
     def __init__(
-        self, hs, room_id, rules_for_room_cache: LruCache, room_push_rule_cache_metrics
+        self,
+        hs: "HomeServer",
+        room_id: str,
+        rules_for_room_cache: LruCache,
+        room_push_rule_cache_metrics: CacheMetric,
     ):
         """
         Args:
-            hs (HomeServer)
-            room_id (str)
+            hs: The HomeServer object.
+            room_id: The room ID.
             rules_for_room_cache: The cache object that caches these
                 RoomsForUser objects.
-            room_push_rule_cache_metrics (CacheMetric)
+            room_push_rule_cache_metrics: The metrics object
         """
         self.room_id = room_id
         self.is_mine_id = hs.is_mine_id
@@ -294,8 +307,10 @@ class RulesForRoom:
 
         self.linearizer = Linearizer(name="rules_for_room")
 
-        self.member_map = {}  # event_id -> (user_id, state)
-        self.rules_by_user = {}  # user_id -> rules
+        # event_id -> (user_id, state)
+        self.member_map = {}  # type: Dict[str, Tuple[str, str]]
+        # user_id -> rules
+        self.rules_by_user = {}  # type: Dict[str, List[Dict[str, dict]]]
 
         # The last state group we updated the caches for. If the state_group of
         # a new event comes along, we know that we can just return the cached
@@ -315,7 +330,7 @@ class RulesForRoom:
         # calculate push for)
         # These never need to be invalidated as we will never set up push for
         # them.
-        self.uninteresting_user_set = set()
+        self.uninteresting_user_set = set()  # type: Set[str]
 
         # We need to be clever on the invalidating caches callbacks, as
         # otherwise the invalidation callback holds a reference to the object,
@@ -325,7 +340,9 @@ class RulesForRoom:
         # to self around in the callback.
         self.invalidate_all_cb = _Invalidation(rules_for_room_cache, room_id)
 
-    async def get_rules(self, event, context):
+    async def get_rules(
+        self, event: EventBase, context: EventContext
+    ) -> Dict[str, List[Dict[str, dict]]]:
         """Given an event context return the rules for all users who are
         currently in the room.
         """
@@ -356,6 +373,8 @@ class RulesForRoom:
             else:
                 current_state_ids = await context.get_current_state_ids()
                 push_rules_delta_state_cache_metric.inc_misses()
+            # Ensure the state IDs exist.
+            assert current_state_ids is not None
 
             push_rules_state_size_counter.inc(len(current_state_ids))
 
@@ -420,18 +439,23 @@ class RulesForRoom:
         return ret_rules_by_user
 
     async def _update_rules_with_member_event_ids(
-        self, ret_rules_by_user, member_event_ids, state_group, event
-    ):
+        self,
+        ret_rules_by_user: Dict[str, list],
+        member_event_ids: Dict[str, str],
+        state_group: Optional[int],
+        event: EventBase,
+    ) -> None:
         """Update the partially filled rules_by_user dict by fetching rules for
         any newly joined users in the `member_event_ids` list.
 
         Args:
-            ret_rules_by_user (dict): Partiallly filled dict of push rules. Gets
+            ret_rules_by_user: Partially filled dict of push rules. Gets
                 updated with any new rules.
-            member_event_ids (dict): Dict of user id to event id for membership events
+            member_event_ids: Dict of user id to event id for membership events
                 that have happened since the last time we filled rules_by_user
             state_group: The state group we are currently computing push rules
                 for. Used when updating the cache.
+            event: The event we are currently computing push rules for.
         """
         sequence = self.sequence
 
@@ -449,19 +473,19 @@ class RulesForRoom:
         if logger.isEnabledFor(logging.DEBUG):
             logger.debug("Found members %r: %r", self.room_id, members.values())
 
-        user_ids = {
+        joined_user_ids = {
             user_id
             for user_id, membership in members.values()
             if membership == Membership.JOIN
         }
 
-        logger.debug("Joined: %r", user_ids)
+        logger.debug("Joined: %r", joined_user_ids)
 
         # Previously we only considered users with pushers or read receipts in that
         # room. We can't do this anymore because we use push actions to calculate unread
         # counts, which don't rely on the user having pushers or sent a read receipt into
         # the room. Therefore we just need to filter for local users here.
-        user_ids = list(filter(self.is_mine_id, user_ids))
+        user_ids = list(filter(self.is_mine_id, joined_user_ids))
 
         rules_by_user = await self.store.bulk_get_push_rules(
             user_ids, on_invalidate=self.invalidate_all_cb
@@ -473,7 +497,7 @@ class RulesForRoom:
 
         self.update_cache(sequence, members, ret_rules_by_user, state_group)
 
-    def invalidate_all(self):
+    def invalidate_all(self) -> None:
         # Note: Don't hand this function directly to an invalidation callback
         # as it keeps a reference to self and will stop this instance from being
         # GC'd if it gets dropped from the rules_to_user cache. Instead use
@@ -485,7 +509,7 @@ class RulesForRoom:
         self.rules_by_user = {}
         push_rules_invalidation_counter.inc()
 
-    def update_cache(self, sequence, members, rules_by_user, state_group):
+    def update_cache(self, sequence, members, rules_by_user, state_group) -> None:
         if sequence == self.sequence:
             self.member_map.update(members)
             self.rules_by_user = rules_by_user
@@ -506,7 +530,7 @@ class _Invalidation:
     cache = attr.ib(type=LruCache)
     room_id = attr.ib(type=str)
 
-    def __call__(self):
+    def __call__(self) -> None:
         rules = self.cache.get(self.room_id, None, update_metrics=False)
         if rules:
             rules.invalidate_all()
diff --git a/synapse/push/clientformat.py b/synapse/push/clientformat.py
index a59b639f15..0cadba761a 100644
--- a/synapse/push/clientformat.py
+++ b/synapse/push/clientformat.py
@@ -14,24 +14,27 @@
 # limitations under the License.
 
 import copy
+from typing import Any, Dict, List, Optional
 
 from synapse.push.rulekinds import PRIORITY_CLASS_INVERSE_MAP, PRIORITY_CLASS_MAP
+from synapse.types import UserID
 
 
-def format_push_rules_for_user(user, ruleslist):
+def format_push_rules_for_user(user: UserID, ruleslist) -> Dict[str, Dict[str, list]]:
     """Converts a list of rawrules and a enabled map into nested dictionaries
     to match the Matrix client-server format for push rules"""
 
     # We're going to be mutating this a lot, so do a deep copy
     ruleslist = copy.deepcopy(ruleslist)
 
-    rules = {"global": {}, "device": {}}
+    rules = {
+        "global": {},
+        "device": {},
+    }  # type: Dict[str, Dict[str, List[Dict[str, Any]]]]
 
     rules["global"] = _add_empty_priority_class_arrays(rules["global"])
 
     for r in ruleslist:
-        rulearray = None
-
         template_name = _priority_class_to_template_name(r["priority_class"])
 
         # Remove internal stuff.
@@ -57,13 +60,13 @@ def format_push_rules_for_user(user, ruleslist):
     return rules
 
 
-def _add_empty_priority_class_arrays(d):
+def _add_empty_priority_class_arrays(d: Dict[str, list]) -> Dict[str, list]:
     for pc in PRIORITY_CLASS_MAP.keys():
         d[pc] = []
     return d
 
 
-def _rule_to_template(rule):
+def _rule_to_template(rule: Dict[str, Any]) -> Optional[Dict[str, Any]]:
     unscoped_rule_id = None
     if "rule_id" in rule:
         unscoped_rule_id = _rule_id_from_namespaced(rule["rule_id"])
@@ -82,6 +85,10 @@ def _rule_to_template(rule):
             return None
         templaterule = {"actions": rule["actions"]}
         templaterule["pattern"] = thecond["pattern"]
+    else:
+        # This should not be reached unless this function is not kept in sync
+        # with PRIORITY_CLASS_INVERSE_MAP.
+        raise ValueError("Unexpected template_name: %s" % (template_name,))
 
     if unscoped_rule_id:
         templaterule["rule_id"] = unscoped_rule_id
@@ -90,9 +97,9 @@ def _rule_to_template(rule):
     return templaterule
 
 
-def _rule_id_from_namespaced(in_rule_id):
+def _rule_id_from_namespaced(in_rule_id: str) -> str:
     return in_rule_id.split("/")[-1]
 
 
-def _priority_class_to_template_name(pc):
+def _priority_class_to_template_name(pc: int) -> str:
     return PRIORITY_CLASS_INVERSE_MAP[pc]
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index c6763971ee..d2eff75a58 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -14,11 +14,17 @@
 # limitations under the License.
 
 import logging
+from typing import TYPE_CHECKING, Dict, List, Optional
 
+from twisted.internet.base import DelayedCall
 from twisted.internet.error import AlreadyCalled, AlreadyCancelled
 
 from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.types import RoomStreamToken
+from synapse.push import Pusher, PusherConfig, ThrottleParams
+from synapse.push.mailer import Mailer
+
+if TYPE_CHECKING:
+    from synapse.app.homeserver import HomeServer
 
 logger = logging.getLogger(__name__)
 
@@ -46,7 +52,7 @@ THROTTLE_RESET_AFTER_MS = 12 * 60 * 60 * 1000
 INCLUDE_ALL_UNREAD_NOTIFS = False
 
 
-class EmailPusher:
+class EmailPusher(Pusher):
     """
     A pusher that sends email notifications about events (approximately)
     when they happen.
@@ -54,37 +60,30 @@ class EmailPusher:
     factor out the common parts
     """
 
-    def __init__(self, hs, pusherdict, mailer):
-        self.hs = hs
+    def __init__(self, hs: "HomeServer", pusher_config: PusherConfig, mailer: Mailer):
+        super().__init__(hs, pusher_config)
         self.mailer = mailer
 
         self.store = self.hs.get_datastore()
-        self.clock = self.hs.get_clock()
-        self.pusher_id = pusherdict["id"]
-        self.user_id = pusherdict["user_name"]
-        self.app_id = pusherdict["app_id"]
-        self.email = pusherdict["pushkey"]
-        self.last_stream_ordering = pusherdict["last_stream_ordering"]
-        self.timed_call = None
-        self.throttle_params = None
-
-        # See httppusher
-        self.max_stream_ordering = None
+        self.email = pusher_config.pushkey
+        self.timed_call = None  # type: Optional[DelayedCall]
+        self.throttle_params = {}  # type: Dict[str, ThrottleParams]
+        self._inited = False
 
         self._is_processing = False
 
-    def on_started(self, should_check_for_notifs):
+    def on_started(self, should_check_for_notifs: bool) -> None:
         """Called when this pusher has been started.
 
         Args:
-            should_check_for_notifs (bool): Whether we should immediately
+            should_check_for_notifs: Whether we should immediately
                 check for push to send. Set to False only if it's known there
                 is nothing to send
         """
         if should_check_for_notifs and self.mailer is not None:
             self._start_processing()
 
-    def on_stop(self):
+    def on_stop(self) -> None:
         if self.timed_call:
             try:
                 self.timed_call.cancel()
@@ -92,37 +91,23 @@ class EmailPusher:
                 pass
             self.timed_call = None
 
-    def on_new_notifications(self, max_token: RoomStreamToken):
-        # We just use the minimum stream ordering and ignore the vector clock
-        # component. This is safe to do as long as we *always* ignore the vector
-        # clock components.
-        max_stream_ordering = max_token.stream
-
-        if self.max_stream_ordering:
-            self.max_stream_ordering = max(
-                max_stream_ordering, self.max_stream_ordering
-            )
-        else:
-            self.max_stream_ordering = max_stream_ordering
-        self._start_processing()
-
-    def on_new_receipts(self, min_stream_id, max_stream_id):
+    def on_new_receipts(self, min_stream_id: int, max_stream_id: int) -> None:
         # We could wake up and cancel the timer but there tend to be quite a
         # lot of read receipts so it's probably less work to just let the
         # timer fire
         pass
 
-    def on_timer(self):
+    def on_timer(self) -> None:
         self.timed_call = None
         self._start_processing()
 
-    def _start_processing(self):
+    def _start_processing(self) -> None:
         if self._is_processing:
             return
 
         run_as_background_process("emailpush.process", self._process)
 
-    def _pause_processing(self):
+    def _pause_processing(self) -> None:
         """Used by tests to temporarily pause processing of events.
 
         Asserts that its not currently processing.
@@ -130,25 +115,27 @@ class EmailPusher:
         assert not self._is_processing
         self._is_processing = True
 
-    def _resume_processing(self):
+    def _resume_processing(self) -> None:
         """Used by tests to resume processing of events after pausing.
         """
         assert self._is_processing
         self._is_processing = False
         self._start_processing()
 
-    async def _process(self):
+    async def _process(self) -> None:
         # we should never get here if we are already processing
         assert not self._is_processing
 
         try:
             self._is_processing = True
 
-            if self.throttle_params is None:
+            if not self._inited:
                 # this is our first loop: load up the throttle params
+                assert self.pusher_id is not None
                 self.throttle_params = await self.store.get_throttle_params_by_room(
                     self.pusher_id
                 )
+                self._inited = True
 
             # if the max ordering changes while we're running _unsafe_process,
             # call it again, and so on until we've caught up.
@@ -163,17 +150,19 @@ class EmailPusher:
         finally:
             self._is_processing = False
 
-    async def _unsafe_process(self):
+    async def _unsafe_process(self) -> None:
         """
         Main logic of the push loop without the wrapper function that sets
         up logging, measures and guards against multiple instances of it
         being run.
         """
         start = 0 if INCLUDE_ALL_UNREAD_NOTIFS else self.last_stream_ordering
-        fn = self.store.get_unread_push_actions_for_user_in_range_for_email
-        unprocessed = await fn(self.user_id, start, self.max_stream_ordering)
+        assert start is not None
+        unprocessed = await self.store.get_unread_push_actions_for_user_in_range_for_email(
+            self.user_id, start, self.max_stream_ordering
+        )
 
-        soonest_due_at = None
+        soonest_due_at = None  # type: Optional[int]
 
         if not unprocessed:
             await self.save_last_stream_ordering_and_success(self.max_stream_ordering)
@@ -230,7 +219,9 @@ class EmailPusher:
                 self.seconds_until(soonest_due_at), self.on_timer
             )
 
-    async def save_last_stream_ordering_and_success(self, last_stream_ordering):
+    async def save_last_stream_ordering_and_success(
+        self, last_stream_ordering: Optional[int]
+    ) -> None:
         if last_stream_ordering is None:
             # This happens if we haven't yet processed anything
             return
@@ -248,28 +239,30 @@ class EmailPusher:
             # lets just stop and return.
             self.on_stop()
 
-    def seconds_until(self, ts_msec):
+    def seconds_until(self, ts_msec: int) -> float:
         secs = (ts_msec - self.clock.time_msec()) / 1000
         return max(secs, 0)
 
-    def get_room_throttle_ms(self, room_id):
+    def get_room_throttle_ms(self, room_id: str) -> int:
         if room_id in self.throttle_params:
-            return self.throttle_params[room_id]["throttle_ms"]
+            return self.throttle_params[room_id].throttle_ms
         else:
             return 0
 
-    def get_room_last_sent_ts(self, room_id):
+    def get_room_last_sent_ts(self, room_id: str) -> int:
         if room_id in self.throttle_params:
-            return self.throttle_params[room_id]["last_sent_ts"]
+            return self.throttle_params[room_id].last_sent_ts
         else:
             return 0
 
-    def room_ready_to_notify_at(self, room_id):
+    def room_ready_to_notify_at(self, room_id: str) -> int:
         """
         Determines whether throttling should prevent us from sending an email
         for the given room
-        Returns: The timestamp when we are next allowed to send an email notif
-        for this room
+
+        Returns:
+            The timestamp when we are next allowed to send an email notif
+            for this room
         """
         last_sent_ts = self.get_room_last_sent_ts(room_id)
         throttle_ms = self.get_room_throttle_ms(room_id)
@@ -277,7 +270,9 @@ class EmailPusher:
         may_send_at = last_sent_ts + throttle_ms
         return may_send_at
 
-    async def sent_notif_update_throttle(self, room_id, notified_push_action):
+    async def sent_notif_update_throttle(
+        self, room_id: str, notified_push_action: dict
+    ) -> None:
         # We have sent a notification, so update the throttle accordingly.
         # If the event that triggered the notif happened more than
         # THROTTLE_RESET_AFTER_MS after the previous one that triggered a
@@ -307,15 +302,15 @@ class EmailPusher:
                 new_throttle_ms = min(
                     current_throttle_ms * THROTTLE_MULTIPLIER, THROTTLE_MAX_MS
                 )
-        self.throttle_params[room_id] = {
-            "last_sent_ts": self.clock.time_msec(),
-            "throttle_ms": new_throttle_ms,
-        }
+        self.throttle_params[room_id] = ThrottleParams(
+            self.clock.time_msec(), new_throttle_ms,
+        )
+        assert self.pusher_id is not None
         await self.store.set_throttle_params(
             self.pusher_id, room_id, self.throttle_params[room_id]
         )
 
-    async def send_notification(self, push_actions, reason):
+    async def send_notification(self, push_actions: List[dict], reason: dict) -> None:
         logger.info("Sending notif email for user %r", self.user_id)
 
         await self.mailer.send_notification_mail(
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index eff0975b6a..417fe0f1f5 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -14,19 +14,24 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
+import urllib.parse
+from typing import TYPE_CHECKING, Any, Dict, Iterable, Union
 
 from prometheus_client import Counter
 
 from twisted.internet.error import AlreadyCalled, AlreadyCancelled
 
 from synapse.api.constants import EventTypes
+from synapse.events import EventBase
 from synapse.logging import opentracing
 from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.push import PusherConfigException
-from synapse.types import RoomStreamToken
+from synapse.push import Pusher, PusherConfig, PusherConfigException
 
 from . import push_rule_evaluator, push_tools
 
+if TYPE_CHECKING:
+    from synapse.app.homeserver import HomeServer
+
 logger = logging.getLogger(__name__)
 
 http_push_processed_counter = Counter(
@@ -50,91 +55,76 @@ http_badges_failed_counter = Counter(
 )
 
 
-class HttpPusher:
+class HttpPusher(Pusher):
     INITIAL_BACKOFF_SEC = 1  # in seconds because that's what Twisted takes
     MAX_BACKOFF_SEC = 60 * 60
 
     # This one's in ms because we compare it against the clock
     GIVE_UP_AFTER_MS = 24 * 60 * 60 * 1000
 
-    def __init__(self, hs, pusherdict):
-        self.hs = hs
-        self.store = self.hs.get_datastore()
+    def __init__(self, hs: "HomeServer", pusher_config: PusherConfig):
+        super().__init__(hs, pusher_config)
         self.storage = self.hs.get_storage()
-        self.clock = self.hs.get_clock()
-        self.state_handler = self.hs.get_state_handler()
-        self.user_id = pusherdict["user_name"]
-        self.app_id = pusherdict["app_id"]
-        self.app_display_name = pusherdict["app_display_name"]
-        self.device_display_name = pusherdict["device_display_name"]
-        self.pushkey = pusherdict["pushkey"]
-        self.pushkey_ts = pusherdict["ts"]
-        self.data = pusherdict["data"]
-        self.last_stream_ordering = pusherdict["last_stream_ordering"]
+        self.app_display_name = pusher_config.app_display_name
+        self.device_display_name = pusher_config.device_display_name
+        self.pushkey_ts = pusher_config.ts
+        self.data = pusher_config.data
         self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
-        self.failing_since = pusherdict["failing_since"]
+        self.failing_since = pusher_config.failing_since
         self.timed_call = None
         self._is_processing = False
         self._group_unread_count_by_room = hs.config.push_group_unread_count_by_room
 
-        # This is the highest stream ordering we know it's safe to process.
-        # When new events arrive, we'll be given a window of new events: we
-        # should honour this rather than just looking for anything higher
-        # because of potential out-of-order event serialisation. This starts
-        # off as None though as we don't know any better.
-        self.max_stream_ordering = None
-
-        if "data" not in pusherdict:
-            raise PusherConfigException("No 'data' key for HTTP pusher")
-        self.data = pusherdict["data"]
+        self.data = pusher_config.data
+        if self.data is None:
+            raise PusherConfigException("'data' key can not be null for HTTP pusher")
 
         self.name = "%s/%s/%s" % (
-            pusherdict["user_name"],
-            pusherdict["app_id"],
-            pusherdict["pushkey"],
+            pusher_config.user_name,
+            pusher_config.app_id,
+            pusher_config.pushkey,
         )
 
-        if self.data is None:
-            raise PusherConfigException("data can not be null for HTTP pusher")
-
+        # Validate that there's a URL and it is of the proper form.
         if "url" not in self.data:
             raise PusherConfigException("'url' required in data for HTTP pusher")
-        self.url = self.data["url"]
-        self.http_client = hs.get_proxied_http_client()
+
+        url = self.data["url"]
+        if not isinstance(url, str):
+            raise PusherConfigException("'url' must be a string")
+        url_parts = urllib.parse.urlparse(url)
+        # Note that the specification also says the scheme must be HTTPS, but
+        # it isn't up to the homeserver to verify that.
+        if url_parts.path != "/_matrix/push/v1/notify":
+            raise PusherConfigException(
+                "'url' must have a path of '/_matrix/push/v1/notify'"
+            )
+
+        self.url = url
+        self.http_client = hs.get_proxied_blacklisted_http_client()
         self.data_minus_url = {}
         self.data_minus_url.update(self.data)
         del self.data_minus_url["url"]
 
-    def on_started(self, should_check_for_notifs):
+    def on_started(self, should_check_for_notifs: bool) -> None:
         """Called when this pusher has been started.
 
         Args:
-            should_check_for_notifs (bool): Whether we should immediately
+            should_check_for_notifs: Whether we should immediately
                 check for push to send. Set to False only if it's known there
                 is nothing to send
         """
         if should_check_for_notifs:
             self._start_processing()
 
-    def on_new_notifications(self, max_token: RoomStreamToken):
-        # We just use the minimum stream ordering and ignore the vector clock
-        # component. This is safe to do as long as we *always* ignore the vector
-        # clock components.
-        max_stream_ordering = max_token.stream
-
-        self.max_stream_ordering = max(
-            max_stream_ordering, self.max_stream_ordering or 0
-        )
-        self._start_processing()
-
-    def on_new_receipts(self, min_stream_id, max_stream_id):
+    def on_new_receipts(self, min_stream_id: int, max_stream_id: int) -> None:
         # Note that the min here shouldn't be relied upon to be accurate.
 
         # We could check the receipts are actually m.read receipts here,
         # but currently that's the only type of receipt anyway...
         run_as_background_process("http_pusher.on_new_receipts", self._update_badge)
 
-    async def _update_badge(self):
+    async def _update_badge(self) -> None:
         # XXX as per https://github.com/matrix-org/matrix-doc/issues/2627, this seems
         # to be largely redundant. perhaps we can remove it.
         badge = await push_tools.get_badge_count(
@@ -144,10 +134,10 @@ class HttpPusher:
         )
         await self._send_badge(badge)
 
-    def on_timer(self):
+    def on_timer(self) -> None:
         self._start_processing()
 
-    def on_stop(self):
+    def on_stop(self) -> None:
         if self.timed_call:
             try:
                 self.timed_call.cancel()
@@ -155,13 +145,13 @@ class HttpPusher:
                 pass
             self.timed_call = None
 
-    def _start_processing(self):
+    def _start_processing(self) -> None:
         if self._is_processing:
             return
 
         run_as_background_process("httppush.process", self._process)
 
-    async def _process(self):
+    async def _process(self) -> None:
         # we should never get here if we are already processing
         assert not self._is_processing
 
@@ -180,15 +170,14 @@ class HttpPusher:
         finally:
             self._is_processing = False
 
-    async def _unsafe_process(self):
+    async def _unsafe_process(self) -> None:
         """
         Looks for unset notifications and dispatch them, in order
         Never call this directly: use _process which will only allow this to
         run once per pusher.
         """
-
-        fn = self.store.get_unread_push_actions_for_user_in_range_for_http
-        unprocessed = await fn(
+        assert self.last_stream_ordering is not None
+        unprocessed = await self.store.get_unread_push_actions_for_user_in_range_for_http(
             self.user_id, self.last_stream_ordering, self.max_stream_ordering
         )
 
@@ -216,6 +205,7 @@ class HttpPusher:
                 http_push_processed_counter.inc()
                 self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
                 self.last_stream_ordering = push_action["stream_ordering"]
+                assert self.last_stream_ordering is not None
                 pusher_still_exists = await self.store.update_pusher_last_stream_ordering_and_success(
                     self.app_id,
                     self.pushkey,
@@ -257,17 +247,12 @@ class HttpPusher:
                     )
                     self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
                     self.last_stream_ordering = push_action["stream_ordering"]
-                    pusher_still_exists = await self.store.update_pusher_last_stream_ordering(
+                    await self.store.update_pusher_last_stream_ordering(
                         self.app_id,
                         self.pushkey,
                         self.user_id,
                         self.last_stream_ordering,
                     )
-                    if not pusher_still_exists:
-                        # The pusher has been deleted while we were processing, so
-                        # lets just stop and return.
-                        self.on_stop()
-                        return
 
                     self.failing_since = None
                     await self.store.update_pusher_failing_since(
@@ -283,7 +268,7 @@ class HttpPusher:
                     )
                     break
 
-    async def _process_one(self, push_action):
+    async def _process_one(self, push_action: dict) -> bool:
         if "notify" not in push_action["actions"]:
             return True
 
@@ -314,7 +299,9 @@ class HttpPusher:
                     await self.hs.remove_pusher(self.app_id, pk, self.user_id)
         return True
 
-    async def _build_notification_dict(self, event, tweaks, badge):
+    async def _build_notification_dict(
+        self, event: EventBase, tweaks: Dict[str, bool], badge: int
+    ) -> Dict[str, Any]:
         priority = "low"
         if (
             event.type == EventTypes.Encrypted
@@ -325,6 +312,8 @@ class HttpPusher:
             #  or may do so (i.e. is encrypted so has unknown effects).
             priority = "high"
 
+        # This was checked in the __init__, but mypy doesn't seem to know that.
+        assert self.data is not None
         if self.data.get("format") == "event_id_only":
             d = {
                 "notification": {
@@ -344,9 +333,7 @@ class HttpPusher:
             }
             return d
 
-        ctx = await push_tools.get_context_for_event(
-            self.storage, self.state_handler, event, self.user_id
-        )
+        ctx = await push_tools.get_context_for_event(self.storage, event, self.user_id)
 
         d = {
             "notification": {
@@ -386,7 +373,9 @@ class HttpPusher:
 
         return d
 
-    async def dispatch_push(self, event, tweaks, badge):
+    async def dispatch_push(
+        self, event: EventBase, tweaks: Dict[str, bool], badge: int
+    ) -> Union[bool, Iterable[str]]:
         notification_dict = await self._build_notification_dict(event, tweaks, badge)
         if not notification_dict:
             return []
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index 38195c8eea..9ff092e8bb 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -19,7 +19,7 @@ import logging
 import urllib.parse
 from email.mime.multipart import MIMEMultipart
 from email.mime.text import MIMEText
-from typing import Iterable, List, TypeVar
+from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, TypeVar
 
 import bleach
 import jinja2
@@ -27,16 +27,20 @@ import jinja2
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import StoreError
 from synapse.config.emailconfig import EmailSubjectConfig
+from synapse.events import EventBase
 from synapse.logging.context import make_deferred_yieldable
 from synapse.push.presentable_names import (
     calculate_room_name,
     descriptor_from_member_events,
     name_from_member_event,
 )
-from synapse.types import UserID
+from synapse.types import StateMap, UserID
 from synapse.util.async_helpers import concurrently_execute
 from synapse.visibility import filter_events_for_client
 
+if TYPE_CHECKING:
+    from synapse.app.homeserver import HomeServer
+
 logger = logging.getLogger(__name__)
 
 T = TypeVar("T")
@@ -93,7 +97,13 @@ ALLOWED_ATTRS = {
 
 
 class Mailer:
-    def __init__(self, hs, app_name, template_html, template_text):
+    def __init__(
+        self,
+        hs: "HomeServer",
+        app_name: str,
+        template_html: jinja2.Template,
+        template_text: jinja2.Template,
+    ):
         self.hs = hs
         self.template_html = template_html
         self.template_text = template_text
@@ -108,17 +118,19 @@ class Mailer:
 
         logger.info("Created Mailer for app_name %s" % app_name)
 
-    async def send_password_reset_mail(self, email_address, token, client_secret, sid):
+    async def send_password_reset_mail(
+        self, email_address: str, token: str, client_secret: str, sid: str
+    ) -> None:
         """Send an email with a password reset link to a user
 
         Args:
-            email_address (str): Email address we're sending the password
+            email_address: Email address we're sending the password
                 reset to
-            token (str): Unique token generated by the server to verify
+            token: Unique token generated by the server to verify
                 the email was received
-            client_secret (str): Unique token generated by the client to
+            client_secret: Unique token generated by the client to
                 group together multiple email sending attempts
-            sid (str): The generated session ID
+            sid: The generated session ID
         """
         params = {"token": token, "client_secret": client_secret, "sid": sid}
         link = (
@@ -136,17 +148,19 @@ class Mailer:
             template_vars,
         )
 
-    async def send_registration_mail(self, email_address, token, client_secret, sid):
+    async def send_registration_mail(
+        self, email_address: str, token: str, client_secret: str, sid: str
+    ) -> None:
         """Send an email with a registration confirmation link to a user
 
         Args:
-            email_address (str): Email address we're sending the registration
+            email_address: Email address we're sending the registration
                 link to
-            token (str): Unique token generated by the server to verify
+            token: Unique token generated by the server to verify
                 the email was received
-            client_secret (str): Unique token generated by the client to
+            client_secret: Unique token generated by the client to
                 group together multiple email sending attempts
-            sid (str): The generated session ID
+            sid: The generated session ID
         """
         params = {"token": token, "client_secret": client_secret, "sid": sid}
         link = (
@@ -164,18 +178,20 @@ class Mailer:
             template_vars,
         )
 
-    async def send_add_threepid_mail(self, email_address, token, client_secret, sid):
+    async def send_add_threepid_mail(
+        self, email_address: str, token: str, client_secret: str, sid: str
+    ) -> None:
         """Send an email with a validation link to a user for adding a 3pid to their account
 
         Args:
-            email_address (str): Email address we're sending the validation link to
+            email_address: Email address we're sending the validation link to
 
-            token (str): Unique token generated by the server to verify the email was received
+            token: Unique token generated by the server to verify the email was received
 
-            client_secret (str): Unique token generated by the client to group together
+            client_secret: Unique token generated by the client to group together
                 multiple email sending attempts
 
-            sid (str): The generated session ID
+            sid: The generated session ID
         """
         params = {"token": token, "client_secret": client_secret, "sid": sid}
         link = (
@@ -194,8 +210,13 @@ class Mailer:
         )
 
     async def send_notification_mail(
-        self, app_id, user_id, email_address, push_actions, reason
-    ):
+        self,
+        app_id: str,
+        user_id: str,
+        email_address: str,
+        push_actions: Iterable[Dict[str, Any]],
+        reason: Dict[str, Any],
+    ) -> None:
         """Send email regarding a user's room notifications"""
         rooms_in_order = deduped_ordered_list([pa["room_id"] for pa in push_actions])
 
@@ -203,7 +224,7 @@ class Mailer:
             [pa["event_id"] for pa in push_actions]
         )
 
-        notifs_by_room = {}
+        notifs_by_room = {}  # type: Dict[str, List[Dict[str, Any]]]
         for pa in push_actions:
             notifs_by_room.setdefault(pa["room_id"], []).append(pa)
 
@@ -262,7 +283,9 @@ class Mailer:
 
         await self.send_email(email_address, summary_text, template_vars)
 
-    async def send_email(self, email_address, subject, extra_template_vars):
+    async def send_email(
+        self, email_address: str, subject: str, extra_template_vars: Dict[str, Any]
+    ) -> None:
         """Send an email with the given information and template text"""
         try:
             from_string = self.hs.config.email_notif_from % {"app": self.app_name}
@@ -315,8 +338,13 @@ class Mailer:
         )
 
     async def get_room_vars(
-        self, room_id, user_id, notifs, notif_events, room_state_ids
-    ):
+        self,
+        room_id: str,
+        user_id: str,
+        notifs: Iterable[Dict[str, Any]],
+        notif_events: Dict[str, EventBase],
+        room_state_ids: StateMap[str],
+    ) -> Dict[str, Any]:
         # Check if one of the notifs is an invite event for the user.
         is_invite = False
         for n in notifs:
@@ -334,7 +362,7 @@ class Mailer:
             "notifs": [],
             "invite": is_invite,
             "link": self.make_room_link(room_id),
-        }
+        }  # type: Dict[str, Any]
 
         if not is_invite:
             for n in notifs:
@@ -365,7 +393,13 @@ class Mailer:
 
         return room_vars
 
-    async def get_notif_vars(self, notif, user_id, notif_event, room_state_ids):
+    async def get_notif_vars(
+        self,
+        notif: Dict[str, Any],
+        user_id: str,
+        notif_event: EventBase,
+        room_state_ids: StateMap[str],
+    ) -> Dict[str, Any]:
         results = await self.store.get_events_around(
             notif["room_id"],
             notif["event_id"],
@@ -391,7 +425,9 @@ class Mailer:
 
         return ret
 
-    async def get_message_vars(self, notif, event, room_state_ids):
+    async def get_message_vars(
+        self, notif: Dict[str, Any], event: EventBase, room_state_ids: StateMap[str]
+    ) -> Optional[Dict[str, Any]]:
         if event.type != EventTypes.Message and event.type != EventTypes.Encrypted:
             return None
 
@@ -432,7 +468,9 @@ class Mailer:
 
         return ret
 
-    def add_text_message_vars(self, messagevars, event):
+    def add_text_message_vars(
+        self, messagevars: Dict[str, Any], event: EventBase
+    ) -> None:
         msgformat = event.content.get("format")
 
         messagevars["format"] = msgformat
@@ -445,15 +483,18 @@ class Mailer:
         elif body:
             messagevars["body_text_html"] = safe_text(body)
 
-        return messagevars
-
-    def add_image_message_vars(self, messagevars, event):
+    def add_image_message_vars(
+        self, messagevars: Dict[str, Any], event: EventBase
+    ) -> None:
         messagevars["image_url"] = event.content["url"]
 
-        return messagevars
-
     async def make_summary_text(
-        self, notifs_by_room, room_state_ids, notif_events, user_id, reason
+        self,
+        notifs_by_room: Dict[str, List[Dict[str, Any]]],
+        room_state_ids: Dict[str, StateMap[str]],
+        notif_events: Dict[str, EventBase],
+        user_id: str,
+        reason: Dict[str, Any],
     ):
         if len(notifs_by_room) == 1:
             # Only one room has new stuff
@@ -580,7 +621,7 @@ class Mailer:
                     "app": self.app_name,
                 }
 
-    def make_room_link(self, room_id):
+    def make_room_link(self, room_id: str) -> str:
         if self.hs.config.email_riot_base_url:
             base_url = "%s/#/room" % (self.hs.config.email_riot_base_url)
         elif self.app_name == "Vector":
@@ -590,7 +631,7 @@ class Mailer:
             base_url = "https://matrix.to/#"
         return "%s/%s" % (base_url, room_id)
 
-    def make_notif_link(self, notif):
+    def make_notif_link(self, notif: Dict[str, str]) -> str:
         if self.hs.config.email_riot_base_url:
             return "%s/#/room/%s/%s" % (
                 self.hs.config.email_riot_base_url,
@@ -606,7 +647,9 @@ class Mailer:
         else:
             return "https://matrix.to/#/%s/%s" % (notif["room_id"], notif["event_id"])
 
-    def make_unsubscribe_link(self, user_id, app_id, email_address):
+    def make_unsubscribe_link(
+        self, user_id: str, app_id: str, email_address: str
+    ) -> str:
         params = {
             "access_token": self.macaroon_gen.generate_delete_pusher_token(user_id),
             "app_id": app_id,
@@ -620,7 +663,7 @@ class Mailer:
         )
 
 
-def safe_markup(raw_html):
+def safe_markup(raw_html: str) -> jinja2.Markup:
     return jinja2.Markup(
         bleach.linkify(
             bleach.clean(
@@ -635,7 +678,7 @@ def safe_markup(raw_html):
     )
 
 
-def safe_text(raw_text):
+def safe_text(raw_text: str) -> jinja2.Markup:
     """
     Process text: treat it as HTML but escape any tags (ie. just escape the
     HTML) then linkify it.
@@ -655,7 +698,7 @@ def deduped_ordered_list(it: Iterable[T]) -> List[T]:
     return ret
 
 
-def string_ordinal_total(s):
+def string_ordinal_total(s: str) -> int:
     tot = 0
     for c in s:
         tot += ord(c)
diff --git a/synapse/push/presentable_names.py b/synapse/push/presentable_names.py
index d8f4a453cd..7e50341d74 100644
--- a/synapse/push/presentable_names.py
+++ b/synapse/push/presentable_names.py
@@ -15,8 +15,14 @@
 
 import logging
 import re
+from typing import TYPE_CHECKING, Dict, Iterable, Optional
 
 from synapse.api.constants import EventTypes
+from synapse.events import EventBase
+from synapse.types import StateMap
+
+if TYPE_CHECKING:
+    from synapse.storage.databases.main import DataStore
 
 logger = logging.getLogger(__name__)
 
@@ -28,25 +34,29 @@ ALL_ALONE = "Empty Room"
 
 
 async def calculate_room_name(
-    store,
-    room_state_ids,
-    user_id,
-    fallback_to_members=True,
-    fallback_to_single_member=True,
-):
+    store: "DataStore",
+    room_state_ids: StateMap[str],
+    user_id: str,
+    fallback_to_members: bool = True,
+    fallback_to_single_member: bool = True,
+) -> Optional[str]:
     """
     Works out a user-facing name for the given room as per Matrix
     spec recommendations.
     Does not yet support internationalisation.
     Args:
-        room_state: Dictionary of the room's state
+        store: The data store to query.
+        room_state_ids: Dictionary of the room's state IDs.
         user_id: The ID of the user to whom the room name is being presented
         fallback_to_members: If False, return None instead of generating a name
                              based on the room's members if the room has no
                              title or aliases.
+        fallback_to_single_member: If False, return None instead of generating a
+            name based on the user who invited this user to the room if the room
+            has no title or aliases.
 
     Returns:
-        (string or None) A human readable name for the room.
+        A human readable name for the room, if possible.
     """
     # does it have a name?
     if (EventTypes.Name, "") in room_state_ids:
@@ -97,7 +107,7 @@ async def calculate_room_name(
                         name_from_member_event(inviter_member_event),
                     )
                 else:
-                    return
+                    return None
         else:
             return "Room Invite"
 
@@ -150,19 +160,19 @@ async def calculate_room_name(
         else:
             return ALL_ALONE
     elif len(other_members) == 1 and not fallback_to_single_member:
-        return
-    else:
-        return descriptor_from_member_events(other_members)
+        return None
+
+    return descriptor_from_member_events(other_members)
 
 
-def descriptor_from_member_events(member_events):
+def descriptor_from_member_events(member_events: Iterable[EventBase]) -> str:
     """Get a description of the room based on the member events.
 
     Args:
-        member_events (Iterable[FrozenEvent])
+        member_events: The events of a room.
 
     Returns:
-        str
+        The room description
     """
 
     member_events = list(member_events)
@@ -183,7 +193,7 @@ def descriptor_from_member_events(member_events):
         )
 
 
-def name_from_member_event(member_event):
+def name_from_member_event(member_event: EventBase) -> str:
     if (
         member_event.content
         and "displayname" in member_event.content
@@ -193,12 +203,12 @@ def name_from_member_event(member_event):
     return member_event.state_key
 
 
-def _state_as_two_level_dict(state):
-    ret = {}
+def _state_as_two_level_dict(state: StateMap[str]) -> Dict[str, Dict[str, str]]:
+    ret = {}  # type: Dict[str, Dict[str, str]]
     for k, v in state.items():
         ret.setdefault(k[0], {})[k[1]] = v
     return ret
 
 
-def _looks_like_an_alias(string):
+def _looks_like_an_alias(string: str) -> bool:
     return ALIAS_RE.match(string) is not None
diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py
index 2ce9e444ab..ba1877adcd 100644
--- a/synapse/push/push_rule_evaluator.py
+++ b/synapse/push/push_rule_evaluator.py
@@ -30,22 +30,30 @@ IS_GLOB = re.compile(r"[\?\*\[\]]")
 INEQUALITY_EXPR = re.compile("^([=<>]*)([0-9]*)$")
 
 
-def _room_member_count(ev, condition, room_member_count):
+def _room_member_count(
+    ev: EventBase, condition: Dict[str, Any], room_member_count: int
+) -> bool:
     return _test_ineq_condition(condition, room_member_count)
 
 
-def _sender_notification_permission(ev, condition, sender_power_level, power_levels):
+def _sender_notification_permission(
+    ev: EventBase,
+    condition: Dict[str, Any],
+    sender_power_level: int,
+    power_levels: Dict[str, Union[int, Dict[str, int]]],
+) -> bool:
     notif_level_key = condition.get("key")
     if notif_level_key is None:
         return False
 
     notif_levels = power_levels.get("notifications", {})
+    assert isinstance(notif_levels, dict)
     room_notif_level = notif_levels.get(notif_level_key, 50)
 
     return sender_power_level >= room_notif_level
 
 
-def _test_ineq_condition(condition, number):
+def _test_ineq_condition(condition: Dict[str, Any], number: int) -> bool:
     if "is" not in condition:
         return False
     m = INEQUALITY_EXPR.match(condition["is"])
@@ -110,7 +118,7 @@ class PushRuleEvaluatorForEvent:
         event: EventBase,
         room_member_count: int,
         sender_power_level: int,
-        power_levels: dict,
+        power_levels: Dict[str, Union[int, Dict[str, int]]],
     ):
         self._event = event
         self._room_member_count = room_member_count
@@ -120,7 +128,9 @@ class PushRuleEvaluatorForEvent:
         # Maps strings of e.g. 'content.body' -> event["content"]["body"]
         self._value_cache = _flatten_dict(event)
 
-    def matches(self, condition: dict, user_id: str, display_name: str) -> bool:
+    def matches(
+        self, condition: Dict[str, Any], user_id: str, display_name: str
+    ) -> bool:
         if condition["kind"] == "event_match":
             return self._event_match(condition, user_id)
         elif condition["kind"] == "contains_display_name":
@@ -261,7 +271,13 @@ def _re_word_boundary(r: str) -> str:
     return r"(^|\W)%s(\W|$)" % (r,)
 
 
-def _flatten_dict(d, prefix=[], result=None):
+def _flatten_dict(
+    d: Union[EventBase, dict],
+    prefix: Optional[List[str]] = None,
+    result: Optional[Dict[str, str]] = None,
+) -> Dict[str, str]:
+    if prefix is None:
+        prefix = []
     if result is None:
         result = {}
     for key, value in d.items():
diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py
index 6e7c880dc0..df34103224 100644
--- a/synapse/push/push_tools.py
+++ b/synapse/push/push_tools.py
@@ -12,6 +12,9 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+from typing import Dict
+
+from synapse.events import EventBase
 from synapse.push.presentable_names import calculate_room_name, name_from_member_event
 from synapse.storage import Storage
 from synapse.storage.databases.main import DataStore
@@ -46,7 +49,9 @@ async def get_badge_count(store: DataStore, user_id: str, group_by_room: bool) -
     return badge
 
 
-async def get_context_for_event(storage: Storage, state_handler, ev, user_id):
+async def get_context_for_event(
+    storage: Storage, ev: EventBase, user_id: str
+) -> Dict[str, str]:
     ctx = {}
 
     room_state_ids = await storage.state.get_state_ids_for_event(ev.event_id)
diff --git a/synapse/push/pusher.py b/synapse/push/pusher.py
index 2a52e226e3..2aa7918fb4 100644
--- a/synapse/push/pusher.py
+++ b/synapse/push/pusher.py
@@ -14,25 +14,31 @@
 # limitations under the License.
 
 import logging
+from typing import TYPE_CHECKING, Callable, Dict, Optional
 
+from synapse.push import Pusher, PusherConfig
 from synapse.push.emailpusher import EmailPusher
+from synapse.push.httppusher import HttpPusher
 from synapse.push.mailer import Mailer
 
-from .httppusher import HttpPusher
+if TYPE_CHECKING:
+    from synapse.app.homeserver import HomeServer
 
 logger = logging.getLogger(__name__)
 
 
 class PusherFactory:
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         self.hs = hs
         self.config = hs.config
 
-        self.pusher_types = {"http": HttpPusher}
+        self.pusher_types = {
+            "http": HttpPusher
+        }  # type: Dict[str, Callable[[HomeServer, PusherConfig], Pusher]]
 
         logger.info("email enable notifs: %r", hs.config.email_enable_notifs)
         if hs.config.email_enable_notifs:
-            self.mailers = {}  # app_name -> Mailer
+            self.mailers = {}  # type: Dict[str, Mailer]
 
             self._notif_template_html = hs.config.email_notif_template_html
             self._notif_template_text = hs.config.email_notif_template_text
@@ -41,16 +47,18 @@ class PusherFactory:
 
             logger.info("defined email pusher type")
 
-    def create_pusher(self, pusherdict):
-        kind = pusherdict["kind"]
+    def create_pusher(self, pusher_config: PusherConfig) -> Optional[Pusher]:
+        kind = pusher_config.kind
         f = self.pusher_types.get(kind, None)
         if not f:
             return None
-        logger.debug("creating %s pusher for %r", kind, pusherdict)
-        return f(self.hs, pusherdict)
+        logger.debug("creating %s pusher for %r", kind, pusher_config)
+        return f(self.hs, pusher_config)
 
-    def _create_email_pusher(self, _hs, pusherdict):
-        app_name = self._app_name_from_pusherdict(pusherdict)
+    def _create_email_pusher(
+        self, _hs: "HomeServer", pusher_config: PusherConfig
+    ) -> EmailPusher:
+        app_name = self._app_name_from_pusherdict(pusher_config)
         mailer = self.mailers.get(app_name)
         if not mailer:
             mailer = Mailer(
@@ -60,10 +68,10 @@ class PusherFactory:
                 template_text=self._notif_template_text,
             )
             self.mailers[app_name] = mailer
-        return EmailPusher(self.hs, pusherdict, mailer)
+        return EmailPusher(self.hs, pusher_config, mailer)
 
-    def _app_name_from_pusherdict(self, pusherdict):
-        data = pusherdict["data"]
+    def _app_name_from_pusherdict(self, pusher_config: PusherConfig) -> str:
+        data = pusher_config.data
 
         if isinstance(data, dict):
             brand = data.get("brand")
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index f325964983..8158356d40 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -15,7 +15,7 @@
 # limitations under the License.
 
 import logging
-from typing import TYPE_CHECKING, Dict, Union
+from typing import TYPE_CHECKING, Dict, Iterable, Optional
 
 from prometheus_client import Gauge
 
@@ -23,11 +23,9 @@ from synapse.metrics.background_process_metrics import (
     run_as_background_process,
     wrap_as_background_process,
 )
-from synapse.push import PusherConfigException
-from synapse.push.emailpusher import EmailPusher
-from synapse.push.httppusher import HttpPusher
+from synapse.push import Pusher, PusherConfig, PusherConfigException
 from synapse.push.pusher import PusherFactory
-from synapse.types import RoomStreamToken
+from synapse.types import JsonDict, RoomStreamToken
 from synapse.util.async_helpers import concurrently_execute
 
 if TYPE_CHECKING:
@@ -77,9 +75,9 @@ class PusherPool:
         self._last_room_stream_id_seen = self.store.get_room_max_stream_ordering()
 
         # map from user id to app_id:pushkey to pusher
-        self.pushers = {}  # type: Dict[str, Dict[str, Union[HttpPusher, EmailPusher]]]
+        self.pushers = {}  # type: Dict[str, Dict[str, Pusher]]
 
-    def start(self):
+    def start(self) -> None:
         """Starts the pushers off in a background process.
         """
         if not self._should_start_pushers:
@@ -89,21 +87,21 @@ class PusherPool:
 
     async def add_pusher(
         self,
-        user_id,
-        access_token,
-        kind,
-        app_id,
-        app_display_name,
-        device_display_name,
-        pushkey,
-        lang,
-        data,
-        profile_tag="",
-    ):
+        user_id: str,
+        access_token: Optional[int],
+        kind: str,
+        app_id: str,
+        app_display_name: str,
+        device_display_name: str,
+        pushkey: str,
+        lang: Optional[str],
+        data: JsonDict,
+        profile_tag: str = "",
+    ) -> Optional[Pusher]:
         """Creates a new pusher and adds it to the pool
 
         Returns:
-            EmailPusher|HttpPusher
+            The newly created pusher.
         """
 
         time_now_msec = self.clock.time_msec()
@@ -113,27 +111,28 @@ class PusherPool:
         # recreated, added and started: this means we have only one
         # code path adding pushers.
         self.pusher_factory.create_pusher(
-            {
-                "id": None,
-                "user_name": user_id,
-                "kind": kind,
-                "app_id": app_id,
-                "app_display_name": app_display_name,
-                "device_display_name": device_display_name,
-                "pushkey": pushkey,
-                "ts": time_now_msec,
-                "lang": lang,
-                "data": data,
-                "last_stream_ordering": None,
-                "last_success": None,
-                "failing_since": None,
-            }
+            PusherConfig(
+                id=None,
+                user_name=user_id,
+                access_token=access_token,
+                profile_tag=profile_tag,
+                kind=kind,
+                app_id=app_id,
+                app_display_name=app_display_name,
+                device_display_name=device_display_name,
+                pushkey=pushkey,
+                ts=time_now_msec,
+                lang=lang,
+                data=data,
+                last_stream_ordering=None,
+                last_success=None,
+                failing_since=None,
+            )
         )
 
         # create the pusher setting last_stream_ordering to the current maximum
-        # stream ordering in event_push_actions, so it will process
-        # pushes from this point onwards.
-        last_stream_ordering = await self.store.get_latest_push_action_stream_ordering()
+        # stream ordering, so it will process pushes from this point onwards.
+        last_stream_ordering = self.store.get_room_max_stream_ordering()
 
         await self.store.add_pusher(
             user_id=user_id,
@@ -154,43 +153,44 @@ class PusherPool:
         return pusher
 
     async def remove_pushers_by_app_id_and_pushkey_not_user(
-        self, app_id, pushkey, not_user_id
-    ):
+        self, app_id: str, pushkey: str, not_user_id: str
+    ) -> None:
         to_remove = await self.store.get_pushers_by_app_id_and_pushkey(app_id, pushkey)
         for p in to_remove:
-            if p["user_name"] != not_user_id:
+            if p.user_name != not_user_id:
                 logger.info(
                     "Removing pusher for app id %s, pushkey %s, user %s",
                     app_id,
                     pushkey,
-                    p["user_name"],
+                    p.user_name,
                 )
-                await self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"])
+                await self.remove_pusher(p.app_id, p.pushkey, p.user_name)
 
-    async def remove_pushers_by_access_token(self, user_id, access_tokens):
+    async def remove_pushers_by_access_token(
+        self, user_id: str, access_tokens: Iterable[int]
+    ) -> None:
         """Remove the pushers for a given user corresponding to a set of
         access_tokens.
 
         Args:
-            user_id (str): user to remove pushers for
-            access_tokens (Iterable[int]): access token *ids* to remove pushers
-                for
+            user_id: user to remove pushers for
+            access_tokens: access token *ids* to remove pushers for
         """
         if not self._pusher_shard_config.should_handle(self._instance_name, user_id):
             return
 
         tokens = set(access_tokens)
         for p in await self.store.get_pushers_by_user_id(user_id):
-            if p["access_token"] in tokens:
+            if p.access_token in tokens:
                 logger.info(
                     "Removing pusher for app id %s, pushkey %s, user %s",
-                    p["app_id"],
-                    p["pushkey"],
-                    p["user_name"],
+                    p.app_id,
+                    p.pushkey,
+                    p.user_name,
                 )
-                await self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"])
+                await self.remove_pusher(p.app_id, p.pushkey, p.user_name)
 
-    def on_new_notifications(self, max_token: RoomStreamToken):
+    def on_new_notifications(self, max_token: RoomStreamToken) -> None:
         if not self.pushers:
             # nothing to do here.
             return
@@ -209,7 +209,7 @@ class PusherPool:
         self._on_new_notifications(max_token)
 
     @wrap_as_background_process("on_new_notifications")
-    async def _on_new_notifications(self, max_token: RoomStreamToken):
+    async def _on_new_notifications(self, max_token: RoomStreamToken) -> None:
         # We just use the minimum stream ordering and ignore the vector clock
         # component. This is safe to do as long as we *always* ignore the vector
         # clock components.
@@ -239,7 +239,9 @@ class PusherPool:
         except Exception:
             logger.exception("Exception in pusher on_new_notifications")
 
-    async def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
+    async def on_new_receipts(
+        self, min_stream_id: int, max_stream_id: int, affected_room_ids: Iterable[str]
+    ) -> None:
         if not self.pushers:
             # nothing to do here.
             return
@@ -267,28 +269,30 @@ class PusherPool:
         except Exception:
             logger.exception("Exception in pusher on_new_receipts")
 
-    async def start_pusher_by_id(self, app_id, pushkey, user_id):
+    async def start_pusher_by_id(
+        self, app_id: str, pushkey: str, user_id: str
+    ) -> Optional[Pusher]:
         """Look up the details for the given pusher, and start it
 
         Returns:
-            EmailPusher|HttpPusher|None: The pusher started, if any
+            The pusher started, if any
         """
         if not self._should_start_pushers:
-            return
+            return None
 
         if not self._pusher_shard_config.should_handle(self._instance_name, user_id):
-            return
+            return None
 
         resultlist = await self.store.get_pushers_by_app_id_and_pushkey(app_id, pushkey)
 
-        pusher_dict = None
+        pusher_config = None
         for r in resultlist:
-            if r["user_name"] == user_id:
-                pusher_dict = r
+            if r.user_name == user_id:
+                pusher_config = r
 
         pusher = None
-        if pusher_dict:
-            pusher = await self._start_pusher(pusher_dict)
+        if pusher_config:
+            pusher = await self._start_pusher(pusher_config)
 
         return pusher
 
@@ -303,44 +307,44 @@ class PusherPool:
 
         logger.info("Started pushers")
 
-    async def _start_pusher(self, pusherdict):
+    async def _start_pusher(self, pusher_config: PusherConfig) -> Optional[Pusher]:
         """Start the given pusher
 
         Args:
-            pusherdict (dict): dict with the values pulled from the db table
+            pusher_config: The pusher configuration with the values pulled from the db table
 
         Returns:
-            EmailPusher|HttpPusher
+            The newly created pusher or None.
         """
         if not self._pusher_shard_config.should_handle(
-            self._instance_name, pusherdict["user_name"]
+            self._instance_name, pusher_config.user_name
         ):
-            return
+            return None
 
         try:
-            p = self.pusher_factory.create_pusher(pusherdict)
+            p = self.pusher_factory.create_pusher(pusher_config)
         except PusherConfigException as e:
             logger.warning(
                 "Pusher incorrectly configured id=%i, user=%s, appid=%s, pushkey=%s: %s",
-                pusherdict["id"],
-                pusherdict.get("user_name"),
-                pusherdict.get("app_id"),
-                pusherdict.get("pushkey"),
+                pusher_config.id,
+                pusher_config.user_name,
+                pusher_config.app_id,
+                pusher_config.pushkey,
                 e,
             )
-            return
+            return None
         except Exception:
             logger.exception(
-                "Couldn't start pusher id %i: caught Exception", pusherdict["id"],
+                "Couldn't start pusher id %i: caught Exception", pusher_config.id,
             )
-            return
+            return None
 
         if not p:
-            return
+            return None
 
-        appid_pushkey = "%s:%s" % (pusherdict["app_id"], pusherdict["pushkey"])
+        appid_pushkey = "%s:%s" % (pusher_config.app_id, pusher_config.pushkey)
 
-        byuser = self.pushers.setdefault(pusherdict["user_name"], {})
+        byuser = self.pushers.setdefault(pusher_config.user_name, {})
         if appid_pushkey in byuser:
             byuser[appid_pushkey].on_stop()
         byuser[appid_pushkey] = p
@@ -350,8 +354,8 @@ class PusherPool:
         # Check if there *may* be push to process. We do this as this check is a
         # lot cheaper to do than actually fetching the exact rows we need to
         # push.
-        user_id = pusherdict["user_name"]
-        last_stream_ordering = pusherdict["last_stream_ordering"]
+        user_id = pusher_config.user_name
+        last_stream_ordering = pusher_config.last_stream_ordering
         if last_stream_ordering:
             have_notifs = await self.store.get_if_maybe_push_in_range_for_user(
                 user_id, last_stream_ordering
@@ -365,7 +369,7 @@ class PusherPool:
 
         return p
 
-    async def remove_pusher(self, app_id, pushkey, user_id):
+    async def remove_pusher(self, app_id: str, pushkey: str, user_id: str) -> None:
         appid_pushkey = "%s:%s" % (app_id, pushkey)
 
         byuser = self.pushers.get(user_id, {})