1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
|
From 4eaab31757f096a04f4278d722cdef1eb92a1743 Mon Sep 17 00:00:00 2001
From: Erik Johnston <erikj@element.io>
Date: Tue, 29 Apr 2025 14:08:32 +0100
Subject: [PATCH 15/74] Minor performance improvements to notifier/replication
(#18367)
These are some improvements to `on_new_event` which is a hot path. Not
sure how much this will save, but maybe like ~5%?
Possibly easier to review commit-by-commit
---
changelog.d/18367.misc | 1 +
synapse/notifier.py | 61 +++++++++++++++++++++---------------------
2 files changed, 32 insertions(+), 30 deletions(-)
create mode 100644 changelog.d/18367.misc
diff --git a/changelog.d/18367.misc b/changelog.d/18367.misc
new file mode 100644
index 0000000000..2e8b897fa6
--- /dev/null
+++ b/changelog.d/18367.misc
@@ -0,0 +1 @@
+Minor performance improvements to the notifier.
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 88f531182a..1914d0c914 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -66,7 +66,6 @@ from synapse.types import (
from synapse.util.async_helpers import (
timeout_deferred,
)
-from synapse.util.metrics import Measure
from synapse.util.stringutils import shortstr
from synapse.visibility import filter_events_for_client
@@ -520,20 +519,22 @@ class Notifier:
users = users or []
rooms = rooms or []
- with Measure(self.clock, "on_new_event"):
- user_streams: Set[_NotifierUserStream] = set()
-
- log_kv(
- {
- "waking_up_explicit_users": len(users),
- "waking_up_explicit_rooms": len(rooms),
- "users": shortstr(users),
- "rooms": shortstr(rooms),
- "stream": stream_key,
- "stream_id": new_token,
- }
- )
+ user_streams: Set[_NotifierUserStream] = set()
+
+ log_kv(
+ {
+ "waking_up_explicit_users": len(users),
+ "waking_up_explicit_rooms": len(rooms),
+ "users": shortstr(users),
+ "rooms": shortstr(rooms),
+ "stream": stream_key,
+ "stream_id": new_token,
+ }
+ )
+ # Only calculate which user streams to wake up if there are, in fact,
+ # any user streams registered.
+ if self.user_to_user_stream or self.room_to_user_streams:
for user in users:
user_stream = self.user_to_user_stream.get(str(user))
if user_stream is not None:
@@ -565,25 +566,25 @@ class Notifier:
# We resolve all these deferreds in one go so that we only need to
# call `PreserveLoggingContext` once, as it has a bunch of overhead
# (to calculate performance stats)
- with PreserveLoggingContext():
- for listener in listeners:
- listener.callback(current_token)
+ if listeners:
+ with PreserveLoggingContext():
+ for listener in listeners:
+ listener.callback(current_token)
- users_woken_by_stream_counter.labels(stream_key).inc(len(user_streams))
+ if user_streams:
+ users_woken_by_stream_counter.labels(stream_key).inc(len(user_streams))
- self.notify_replication()
+ self.notify_replication()
- # Notify appservices.
- try:
- self.appservice_handler.notify_interested_services_ephemeral(
- stream_key,
- new_token,
- users,
- )
- except Exception:
- logger.exception(
- "Error notifying application services of ephemeral events"
- )
+ # Notify appservices.
+ try:
+ self.appservice_handler.notify_interested_services_ephemeral(
+ stream_key,
+ new_token,
+ users,
+ )
+ except Exception:
+ logger.exception("Error notifying application services of ephemeral events")
def on_new_replication_data(self) -> None:
"""Used to inform replication listeners that something has happened
--
2.49.0
|