diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index 6317f22d3c..c016a83909 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -144,8 +144,7 @@ class BulkPushRuleEvaluator:
@lru_cache()
def _get_rules_for_room(self, room_id: str) -> "RulesForRoom":
- """Get the current RulesForRoom object for the given room id
- """
+ """Get the current RulesForRoom object for the given room id"""
# It's important that RulesForRoom gets added to self._get_rules_for_room.cache
# before any lookup methods get called on it as otherwise there may be
# a race if invalidate_all gets called (which assumes its in the cache)
@@ -252,7 +251,9 @@ class BulkPushRuleEvaluator:
# notified for this event. (This will then get handled when we persist
# the event)
await self.store.add_push_actions_to_staging(
- event.event_id, actions_by_user, count_as_unread,
+ event.event_id,
+ actions_by_user,
+ count_as_unread,
)
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index 4ac1b31748..5fec2aaf5d 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -116,8 +116,7 @@ class EmailPusher(Pusher):
self._is_processing = True
def _resume_processing(self) -> None:
- """Used by tests to resume processing of events after pausing.
- """
+ """Used by tests to resume processing of events after pausing."""
assert self._is_processing
self._is_processing = False
self._start_processing()
@@ -157,8 +156,10 @@ class EmailPusher(Pusher):
being run.
"""
start = 0 if INCLUDE_ALL_UNREAD_NOTIFS else self.last_stream_ordering
- unprocessed = await self.store.get_unread_push_actions_for_user_in_range_for_email(
- self.user_id, start, self.max_stream_ordering
+ unprocessed = (
+ await self.store.get_unread_push_actions_for_user_in_range_for_email(
+ self.user_id, start, self.max_stream_ordering
+ )
)
soonest_due_at = None # type: Optional[int]
@@ -222,12 +223,14 @@ class EmailPusher(Pusher):
self, last_stream_ordering: int
) -> None:
self.last_stream_ordering = last_stream_ordering
- pusher_still_exists = await self.store.update_pusher_last_stream_ordering_and_success(
- self.app_id,
- self.email,
- self.user_id,
- last_stream_ordering,
- self.clock.time_msec(),
+ pusher_still_exists = (
+ await self.store.update_pusher_last_stream_ordering_and_success(
+ self.app_id,
+ self.email,
+ self.user_id,
+ last_stream_ordering,
+ self.clock.time_msec(),
+ )
)
if not pusher_still_exists:
# The pusher has been deleted while we were processing, so
@@ -298,7 +301,8 @@ class EmailPusher(Pusher):
current_throttle_ms * THROTTLE_MULTIPLIER, THROTTLE_MAX_MS
)
self.throttle_params[room_id] = ThrottleParams(
- self.clock.time_msec(), new_throttle_ms,
+ self.clock.time_msec(),
+ new_throttle_ms,
)
assert self.pusher_id is not None
await self.store.set_throttle_params(
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index e048b0d59e..b9d3da2e0a 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -176,8 +176,10 @@ class HttpPusher(Pusher):
Never call this directly: use _process which will only allow this to
run once per pusher.
"""
- unprocessed = await self.store.get_unread_push_actions_for_user_in_range_for_http(
- self.user_id, self.last_stream_ordering, self.max_stream_ordering
+ unprocessed = (
+ await self.store.get_unread_push_actions_for_user_in_range_for_http(
+ self.user_id, self.last_stream_ordering, self.max_stream_ordering
+ )
)
logger.info(
@@ -204,12 +206,14 @@ class HttpPusher(Pusher):
http_push_processed_counter.inc()
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
self.last_stream_ordering = push_action["stream_ordering"]
- pusher_still_exists = await self.store.update_pusher_last_stream_ordering_and_success(
- self.app_id,
- self.pushkey,
- self.user_id,
- self.last_stream_ordering,
- self.clock.time_msec(),
+ pusher_still_exists = (
+ await self.store.update_pusher_last_stream_ordering_and_success(
+ self.app_id,
+ self.pushkey,
+ self.user_id,
+ self.last_stream_ordering,
+ self.clock.time_msec(),
+ )
)
if not pusher_still_exists:
# The pusher has been deleted while we were processing, so
@@ -290,7 +294,8 @@ class HttpPusher(Pusher):
# for sanity, we only remove the pushkey if it
# was the one we actually sent...
logger.warning(
- ("Ignoring rejected pushkey %s because we didn't send it"), pk,
+ ("Ignoring rejected pushkey %s because we didn't send it"),
+ pk,
)
else:
logger.info("Pushkey %s was rejected: removing", pk)
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index eed16dbfb5..ae1145be0e 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -78,8 +78,7 @@ class PusherPool:
self.pushers = {} # type: Dict[str, Dict[str, Pusher]]
def start(self) -> None:
- """Starts the pushers off in a background process.
- """
+ """Starts the pushers off in a background process."""
if not self._should_start_pushers:
logger.info("Not starting pushers because they are disabled in the config")
return
@@ -297,8 +296,7 @@ class PusherPool:
return pusher
async def _start_pushers(self) -> None:
- """Start all the pushers
- """
+ """Start all the pushers"""
pushers = await self.store.get_all_pushers()
# Stagger starting up the pushers so we don't completely drown the
@@ -335,7 +333,8 @@ class PusherPool:
return None
except Exception:
logger.exception(
- "Couldn't start pusher id %i: caught Exception", pusher_config.id,
+ "Couldn't start pusher id %i: caught Exception",
+ pusher_config.id,
)
return None
|