diff options
author | Mark Haines <mjark@negativecurvature.net> | 2016-01-04 14:02:50 +0000 |
---|---|---|
committer | Mark Haines <mjark@negativecurvature.net> | 2016-01-04 14:02:50 +0000 |
commit | f35f8d06ea58e2d0cdccd82924c7a44fd93f4c38 (patch) | |
tree | dc5312558565f8ac01264be21d388e563a5c8c58 /synapse/handlers/events.py | |
parent | Added info abou Martin Giess' auto-deployment process with vagrant/ansible (diff) | |
parent | Bump changelog and version for v0.12.0 (diff) | |
download | synapse-f35f8d06ea58e2d0cdccd82924c7a44fd93f4c38.tar.xz |
Merge remote-tracking branch 'origin/release-v0.12.0' v0.12.0
Diffstat (limited to 'synapse/handlers/events.py')
-rw-r--r-- | synapse/handlers/events.py | 31 |
1 files changed, 23 insertions, 8 deletions
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 0e4c0d4d06..576d77e0e7 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -28,6 +28,18 @@ import random logger = logging.getLogger(__name__) +def started_user_eventstream(distributor, user): + return distributor.fire("started_user_eventstream", user) + + +def stopped_user_eventstream(distributor, user): + return distributor.fire("stopped_user_eventstream", user) + + +def user_joined_room(distributor, user, room_id): + return distributor.fire("user_joined_room", user, room_id) + + class EventStreamHandler(BaseHandler): def __init__(self, hs): @@ -57,7 +69,12 @@ class EventStreamHandler(BaseHandler): A deferred that completes once their presence has been updated. """ if user not in self._streams_per_user: - self._streams_per_user[user] = 0 + # Make sure we set the streams per user to 1 here rather than + # setting it to zero and incrementing the value below. + # Otherwise this may race with stopped_stream causing the + # user to be erased from the map before we have a chance + # to increment it. + self._streams_per_user[user] = 1 if user in self._stop_timer_per_user: try: self.clock.cancel_call_later( @@ -66,9 +83,9 @@ class EventStreamHandler(BaseHandler): except: logger.exception("Failed to cancel event timer") else: - yield self.distributor.fire("started_user_eventstream", user) - - self._streams_per_user[user] += 1 + yield started_user_eventstream(self.distributor, user) + else: + self._streams_per_user[user] += 1 def stopped_stream(self, user): """If there are no streams for a user this starts a timer that will @@ -89,7 +106,7 @@ class EventStreamHandler(BaseHandler): self._stop_timer_per_user.pop(user, None) - return self.distributor.fire("stopped_user_eventstream", user) + return stopped_user_eventstream(self.distributor, user) logger.debug("Scheduling _later: for %s", user) self._stop_timer_per_user[user] = ( @@ -120,9 +137,7 @@ class EventStreamHandler(BaseHandler): timeout = random.randint(int(timeout*0.9), int(timeout*1.1)) if is_guest: - yield self.distributor.fire( - "user_joined_room", user=auth_user, room_id=room_id - ) + yield user_joined_room(self.distributor, auth_user, room_id) events, tokens = yield self.notifier.get_events_for( auth_user, pagin_config, timeout, |