diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index e8834b2162..384c39f4d7 100644
--- a/synapse/storage/databases/main/event_push_actions.py
+++ b/synapse/storage/databases/main/event_push_actions.py
@@ -15,7 +15,7 @@
# limitations under the License.
import logging
-from typing import List
+from typing import Dict, List, Union
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import LoggingTransaction, SQLBaseStore, db_to_json
@@ -383,19 +383,20 @@ class EventPushActionsWorkerStore(SQLBaseStore):
# Now return the first `limit`
return notifs[:limit]
- def get_if_maybe_push_in_range_for_user(self, user_id, min_stream_ordering):
+ async def get_if_maybe_push_in_range_for_user(
+ self, user_id: str, min_stream_ordering: int
+ ) -> bool:
"""A fast check to see if there might be something to push for the
user since the given stream ordering. May return false positives.
Useful to know whether to bother starting a pusher on start up or not.
Args:
- user_id (str)
- min_stream_ordering (int)
+ user_id
+ min_stream_ordering
Returns:
- Deferred[bool]: True if there may be push to process, False if
- there definitely isn't.
+ True if there may be push to process, False if there definitely isn't.
"""
def _get_if_maybe_push_in_range_for_user_txn(txn):
@@ -408,22 +409,20 @@ class EventPushActionsWorkerStore(SQLBaseStore):
txn.execute(sql, (user_id, min_stream_ordering))
return bool(txn.fetchone())
- return self.db_pool.runInteraction(
+ return await self.db_pool.runInteraction(
"get_if_maybe_push_in_range_for_user",
_get_if_maybe_push_in_range_for_user_txn,
)
- async def add_push_actions_to_staging(self, event_id, user_id_actions):
+ async def add_push_actions_to_staging(
+ self, event_id: str, user_id_actions: Dict[str, List[Union[dict, str]]]
+ ) -> None:
"""Add the push actions for the event to the push action staging area.
Args:
- event_id (str)
- user_id_actions (dict[str, list[dict|str])]): A dictionary mapping
- user_id to list of push actions, where an action can either be
- a string or dict.
-
- Returns:
- Deferred
+ event_id
+ user_id_actions: A mapping of user_id to list of push actions, where
+ an action can either be a string or dict.
"""
if not user_id_actions:
@@ -507,7 +506,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
"Found stream ordering 1 day ago: it's %d", self.stream_ordering_day_ago
)
- def find_first_stream_ordering_after_ts(self, ts):
+ async def find_first_stream_ordering_after_ts(self, ts: int) -> int:
"""Gets the stream ordering corresponding to a given timestamp.
Specifically, finds the stream_ordering of the first event that was
@@ -516,13 +515,12 @@ class EventPushActionsWorkerStore(SQLBaseStore):
relatively slow.
Args:
- ts (int): timestamp in millis
+ ts: timestamp in millis
Returns:
- Deferred[int]: stream ordering of the first event received on/after
- the timestamp
+ stream ordering of the first event received on/after the timestamp
"""
- return self.db_pool.runInteraction(
+ return await self.db_pool.runInteraction(
"_find_first_stream_ordering_after_ts_txn",
self._find_first_stream_ordering_after_ts_txn,
ts,
|