diff --git a/synapse/notifier.py b/synapse/notifier.py
index 6fcb7767a0..344dd03172 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -71,14 +71,17 @@ class _NotifierUserStream(object):
so that it can remove itself from the indexes in the Notifier class.
"""
- def __init__(self, user, rooms, current_token, appservice=None):
+ def __init__(self, user, rooms, current_token, time_now_ms,
+ appservice=None):
self.user = str(user)
self.appservice = appservice
self.listeners = set()
self.rooms = set(rooms)
self.current_token = current_token
+ self.last_notified_ms = time_now_ms
- def notify(self, stream_key, stream_id):
+ def notify(self, stream_key, stream_id, time_now_ms):
+ self.last_notified_ms = time_now_ms
self.current_token = self.current_token.copy_and_replace(
stream_key, stream_id
)
@@ -96,7 +99,7 @@ class _NotifierUserStream(object):
lst = notifier.room_to_user_streams.get(room, set())
lst.discard(self)
- notifier.user_to_user_streams.get(self.user, set()).discard(self)
+ notifier.user_to_user_stream.pop(self.user)
if self.appservice:
notifier.appservice_to_user_streams.get(
@@ -111,6 +114,8 @@ class Notifier(object):
Primarily used from the /events stream.
"""
+ UNUSED_STREAM_EXPIRY_MS = 10 * 60 * 1000
+
def __init__(self, hs):
self.hs = hs
@@ -128,6 +133,10 @@ class Notifier(object):
"user_joined_room", self._user_joined_room
)
+ self.clock.looping_call(
+ self.remove_expired_streams, self.UNUSED_STREAM_EXPIRY_MS
+ )
+
# This is not a very cheap test to perform, but it's only executed
# when rendering the metrics page, which is likely once per minute at
# most when scraping it.
@@ -221,9 +230,12 @@ class Notifier(object):
logger.debug("on_new_room_event listeners %s", user_streams)
+ time_now_ms = self.clock.time_msec()
for user_stream in user_streams:
try:
- user_stream.notify("room_key", "s%d" % (room_stream_id,))
+ user_stream.notify(
+ "room_key", "s%d" % (room_stream_id,), time_now_ms
+ )
except:
logger.exception("Failed to notify listener")
@@ -246,9 +258,10 @@ class Notifier(object):
for room in rooms:
user_streams |= self.room_to_user_streams.get(room, set())
+ time_now_ms = self.clock.time_msec()
for user_stream in user_streams:
try:
- user_stream.notify(stream_key, new_token)
+ user_stream.notify(stream_key, new_token, time_now_ms)
except:
logger.exception("Failed to notify listener")
@@ -260,6 +273,7 @@ class Notifier(object):
"""
deferred = defer.Deferred()
+ time_now_ms = self.clock.time_msec()
user = str(user)
user_stream = self.user_to_user_stream.get(user)
@@ -272,6 +286,7 @@ class Notifier(object):
rooms=rooms,
appservice=appservice,
current_token=current_token,
+ time_now_ms=time_now_ms,
)
self._register_with_keys(user_stream)
else:
@@ -366,6 +381,20 @@ class Notifier(object):
defer.returnValue(result)
@log_function
+ def remove_expired_streams(self):
+ time_now_ms = self.clock.time_msec()
+ expired_streams = []
+ expire_before_ts = time_now_ms - self.UNUSED_STREAM_EXPIRY_MS
+ for stream in self.user_to_user_stream.values():
+ if stream.listeners:
+ continue
+ if stream.last_notified_ms < expire_before_ts:
+ expired_streams.append(stream)
+
+ for expired_stream in expired_streams:
+ expired_stream.remove(self)
+
+ @log_function
def _register_with_keys(self, user_stream):
self.user_to_user_stream[user_stream.user] = user_stream
@@ -385,14 +414,3 @@ class Notifier(object):
room_streams = self.room_to_user_streams.setdefault(room_id, set())
room_streams.add(new_user_stream)
new_user_stream.rooms.add(room_id)
-
-
-def _discard_if_notified(listener_set):
- """Remove any 'stale' listeners from the given set.
- """
- to_discard = set()
- for l in listener_set:
- if l.notified():
- to_discard.add(l)
-
- listener_set -= to_discard
|