1 files changed, 16 insertions, 2 deletions
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 0080c68ce2..f325964983 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -19,7 +19,10 @@ from typing import TYPE_CHECKING, Dict, Union
from prometheus_client import Gauge
-from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.metrics.background_process_metrics import (
+ run_as_background_process,
+ wrap_as_background_process,
+)
from synapse.push import PusherConfigException
from synapse.push.emailpusher import EmailPusher
from synapse.push.httppusher import HttpPusher
@@ -187,7 +190,7 @@ class PusherPool:
)
await self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"])
- async def on_new_notifications(self, max_token: RoomStreamToken):
+ def on_new_notifications(self, max_token: RoomStreamToken):
if not self.pushers:
# nothing to do here.
return
@@ -201,6 +204,17 @@ class PusherPool:
# Nothing to do
return
+ # We only start a new background process if necessary rather than
+ # optimistically (to cut down on overhead).
+ self._on_new_notifications(max_token)
+
+ @wrap_as_background_process("on_new_notifications")
+ async def _on_new_notifications(self, max_token: RoomStreamToken):
+ # We just use the minimum stream ordering and ignore the vector clock
+ # component. This is safe to do as long as we *always* ignore the vector
+ # clock components.
+ max_stream_id = max_token.stream
+
prev_stream_id = self._last_room_stream_id_seen
self._last_room_stream_id_seen = max_stream_id
|