diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 0e4c0d4d06..576d77e0e7 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -28,6 +28,18 @@ import random
logger = logging.getLogger(__name__)
+def started_user_eventstream(distributor, user):
+ return distributor.fire("started_user_eventstream", user)
+
+
+def stopped_user_eventstream(distributor, user):
+ return distributor.fire("stopped_user_eventstream", user)
+
+
+def user_joined_room(distributor, user, room_id):
+ return distributor.fire("user_joined_room", user, room_id)
+
+
class EventStreamHandler(BaseHandler):
def __init__(self, hs):
@@ -57,7 +69,12 @@ class EventStreamHandler(BaseHandler):
A deferred that completes once their presence has been updated.
"""
if user not in self._streams_per_user:
- self._streams_per_user[user] = 0
+ # Make sure we set the streams per user to 1 here rather than
+ # setting it to zero and incrementing the value below.
+ # Otherwise this may race with stopped_stream causing the
+ # user to be erased from the map before we have a chance
+ # to increment it.
+ self._streams_per_user[user] = 1
if user in self._stop_timer_per_user:
try:
self.clock.cancel_call_later(
@@ -66,9 +83,9 @@ class EventStreamHandler(BaseHandler):
except:
logger.exception("Failed to cancel event timer")
else:
- yield self.distributor.fire("started_user_eventstream", user)
-
- self._streams_per_user[user] += 1
+ yield started_user_eventstream(self.distributor, user)
+ else:
+ self._streams_per_user[user] += 1
def stopped_stream(self, user):
"""If there are no streams for a user this starts a timer that will
@@ -89,7 +106,7 @@ class EventStreamHandler(BaseHandler):
self._stop_timer_per_user.pop(user, None)
- return self.distributor.fire("stopped_user_eventstream", user)
+ return stopped_user_eventstream(self.distributor, user)
logger.debug("Scheduling _later: for %s", user)
self._stop_timer_per_user[user] = (
@@ -120,9 +137,7 @@ class EventStreamHandler(BaseHandler):
timeout = random.randint(int(timeout*0.9), int(timeout*1.1))
if is_guest:
- yield self.distributor.fire(
- "user_joined_room", user=auth_user, room_id=room_id
- )
+ yield user_joined_room(self.distributor, auth_user, room_id)
events, tokens = yield self.notifier.get_events_for(
auth_user, pagin_config, timeout,
|