diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 9c12d81cfb..8158356d40 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -15,7 +15,7 @@
# limitations under the License.
import logging
-from typing import TYPE_CHECKING, Any, Dict, Optional
+from typing import TYPE_CHECKING, Dict, Iterable, Optional
from prometheus_client import Gauge
@@ -23,9 +23,9 @@ from synapse.metrics.background_process_metrics import (
run_as_background_process,
wrap_as_background_process,
)
-from synapse.push import Pusher, PusherConfigException
+from synapse.push import Pusher, PusherConfig, PusherConfigException
from synapse.push.pusher import PusherFactory
-from synapse.types import RoomStreamToken
+from synapse.types import JsonDict, RoomStreamToken
from synapse.util.async_helpers import concurrently_execute
if TYPE_CHECKING:
@@ -77,7 +77,7 @@ class PusherPool:
# map from user id to app_id:pushkey to pusher
self.pushers = {} # type: Dict[str, Dict[str, Pusher]]
- def start(self):
+ def start(self) -> None:
"""Starts the pushers off in a background process.
"""
if not self._should_start_pushers:
@@ -87,16 +87,16 @@ class PusherPool:
async def add_pusher(
self,
- user_id,
- access_token,
- kind,
- app_id,
- app_display_name,
- device_display_name,
- pushkey,
- lang,
- data,
- profile_tag="",
+ user_id: str,
+ access_token: Optional[int],
+ kind: str,
+ app_id: str,
+ app_display_name: str,
+ device_display_name: str,
+ pushkey: str,
+ lang: Optional[str],
+ data: JsonDict,
+ profile_tag: str = "",
) -> Optional[Pusher]:
"""Creates a new pusher and adds it to the pool
@@ -111,21 +111,23 @@ class PusherPool:
# recreated, added and started: this means we have only one
# code path adding pushers.
self.pusher_factory.create_pusher(
- {
- "id": None,
- "user_name": user_id,
- "kind": kind,
- "app_id": app_id,
- "app_display_name": app_display_name,
- "device_display_name": device_display_name,
- "pushkey": pushkey,
- "ts": time_now_msec,
- "lang": lang,
- "data": data,
- "last_stream_ordering": None,
- "last_success": None,
- "failing_since": None,
- }
+ PusherConfig(
+ id=None,
+ user_name=user_id,
+ access_token=access_token,
+ profile_tag=profile_tag,
+ kind=kind,
+ app_id=app_id,
+ app_display_name=app_display_name,
+ device_display_name=device_display_name,
+ pushkey=pushkey,
+ ts=time_now_msec,
+ lang=lang,
+ data=data,
+ last_stream_ordering=None,
+ last_success=None,
+ failing_since=None,
+ )
)
# create the pusher setting last_stream_ordering to the current maximum
@@ -151,43 +153,44 @@ class PusherPool:
return pusher
async def remove_pushers_by_app_id_and_pushkey_not_user(
- self, app_id, pushkey, not_user_id
- ):
+ self, app_id: str, pushkey: str, not_user_id: str
+ ) -> None:
to_remove = await self.store.get_pushers_by_app_id_and_pushkey(app_id, pushkey)
for p in to_remove:
- if p["user_name"] != not_user_id:
+ if p.user_name != not_user_id:
logger.info(
"Removing pusher for app id %s, pushkey %s, user %s",
app_id,
pushkey,
- p["user_name"],
+ p.user_name,
)
- await self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"])
+ await self.remove_pusher(p.app_id, p.pushkey, p.user_name)
- async def remove_pushers_by_access_token(self, user_id, access_tokens):
+ async def remove_pushers_by_access_token(
+ self, user_id: str, access_tokens: Iterable[int]
+ ) -> None:
"""Remove the pushers for a given user corresponding to a set of
access_tokens.
Args:
- user_id (str): user to remove pushers for
- access_tokens (Iterable[int]): access token *ids* to remove pushers
- for
+ user_id: user to remove pushers for
+ access_tokens: access token *ids* to remove pushers for
"""
if not self._pusher_shard_config.should_handle(self._instance_name, user_id):
return
tokens = set(access_tokens)
for p in await self.store.get_pushers_by_user_id(user_id):
- if p["access_token"] in tokens:
+ if p.access_token in tokens:
logger.info(
"Removing pusher for app id %s, pushkey %s, user %s",
- p["app_id"],
- p["pushkey"],
- p["user_name"],
+ p.app_id,
+ p.pushkey,
+ p.user_name,
)
- await self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"])
+ await self.remove_pusher(p.app_id, p.pushkey, p.user_name)
- def on_new_notifications(self, max_token: RoomStreamToken):
+ def on_new_notifications(self, max_token: RoomStreamToken) -> None:
if not self.pushers:
# nothing to do here.
return
@@ -206,7 +209,7 @@ class PusherPool:
self._on_new_notifications(max_token)
@wrap_as_background_process("on_new_notifications")
- async def _on_new_notifications(self, max_token: RoomStreamToken):
+ async def _on_new_notifications(self, max_token: RoomStreamToken) -> None:
# We just use the minimum stream ordering and ignore the vector clock
# component. This is safe to do as long as we *always* ignore the vector
# clock components.
@@ -236,7 +239,9 @@ class PusherPool:
except Exception:
logger.exception("Exception in pusher on_new_notifications")
- async def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
+ async def on_new_receipts(
+ self, min_stream_id: int, max_stream_id: int, affected_room_ids: Iterable[str]
+ ) -> None:
if not self.pushers:
# nothing to do here.
return
@@ -280,14 +285,14 @@ class PusherPool:
resultlist = await self.store.get_pushers_by_app_id_and_pushkey(app_id, pushkey)
- pusher_dict = None
+ pusher_config = None
for r in resultlist:
- if r["user_name"] == user_id:
- pusher_dict = r
+ if r.user_name == user_id:
+ pusher_config = r
pusher = None
- if pusher_dict:
- pusher = await self._start_pusher(pusher_dict)
+ if pusher_config:
+ pusher = await self._start_pusher(pusher_config)
return pusher
@@ -302,44 +307,44 @@ class PusherPool:
logger.info("Started pushers")
- async def _start_pusher(self, pusherdict: Dict[str, Any]) -> Optional[Pusher]:
+ async def _start_pusher(self, pusher_config: PusherConfig) -> Optional[Pusher]:
"""Start the given pusher
Args:
- pusherdict: dict with the values pulled from the db table
+ pusher_config: The pusher configuration with the values pulled from the db table
Returns:
The newly created pusher or None.
"""
if not self._pusher_shard_config.should_handle(
- self._instance_name, pusherdict["user_name"]
+ self._instance_name, pusher_config.user_name
):
return None
try:
- p = self.pusher_factory.create_pusher(pusherdict)
+ p = self.pusher_factory.create_pusher(pusher_config)
except PusherConfigException as e:
logger.warning(
"Pusher incorrectly configured id=%i, user=%s, appid=%s, pushkey=%s: %s",
- pusherdict["id"],
- pusherdict.get("user_name"),
- pusherdict.get("app_id"),
- pusherdict.get("pushkey"),
+ pusher_config.id,
+ pusher_config.user_name,
+ pusher_config.app_id,
+ pusher_config.pushkey,
e,
)
return None
except Exception:
logger.exception(
- "Couldn't start pusher id %i: caught Exception", pusherdict["id"],
+ "Couldn't start pusher id %i: caught Exception", pusher_config.id,
)
return None
if not p:
return None
- appid_pushkey = "%s:%s" % (pusherdict["app_id"], pusherdict["pushkey"])
+ appid_pushkey = "%s:%s" % (pusher_config.app_id, pusher_config.pushkey)
- byuser = self.pushers.setdefault(pusherdict["user_name"], {})
+ byuser = self.pushers.setdefault(pusher_config.user_name, {})
if appid_pushkey in byuser:
byuser[appid_pushkey].on_stop()
byuser[appid_pushkey] = p
@@ -349,8 +354,8 @@ class PusherPool:
# Check if there *may* be push to process. We do this as this check is a
# lot cheaper to do than actually fetching the exact rows we need to
# push.
- user_id = pusherdict["user_name"]
- last_stream_ordering = pusherdict["last_stream_ordering"]
+ user_id = pusher_config.user_name
+ last_stream_ordering = pusher_config.last_stream_ordering
if last_stream_ordering:
have_notifs = await self.store.get_if_maybe_push_in_range_for_user(
user_id, last_stream_ordering
@@ -364,7 +369,7 @@ class PusherPool:
return p
- async def remove_pusher(self, app_id, pushkey, user_id):
+ async def remove_pusher(self, app_id: str, pushkey: str, user_id: str) -> None:
appid_pushkey = "%s:%s" % (app_id, pushkey)
byuser = self.pushers.get(user_id, {})
|