From 1e90715a3d5f8a910c187dec888283e110a3c04a Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 18 May 2015 13:17:36 +0100 Subject: Make sure the notifier stream token goes forward when it is updated. Sort the pending events by the correct room_stream_id --- synapse/notifier.py | 8 ++++---- synapse/types.py | 17 +++++++++++++++++ 2 files changed, 21 insertions(+), 4 deletions(-) (limited to 'synapse') diff --git a/synapse/notifier.py b/synapse/notifier.py index 862b42cfc8..0b5d97521e 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -88,7 +88,7 @@ class _NotifierUserStream(object): stream_id(str): The new id for the stream the event came from. time_now_ms(int): The current time in milliseconds. """ - self.current_token = self.current_token.copy_and_replace( + self.current_token = self.current_token.copy_and_advance( stream_key, stream_id ) if self.listeners: @@ -192,7 +192,7 @@ class Notifier(object): yield run_on_reactor() self.pending_new_room_events.append(( - event, room_stream_id, extra_users + room_stream_id, event, extra_users )) self._notify_pending_new_room_events(max_room_stream_id) @@ -205,10 +205,10 @@ class Notifier(object): """ pending = sorted(self.pending_new_room_events) self.pending_new_room_events = [] - for event, room_stream_id, extra_users in pending: + for room_stream_id, event, extra_users in pending: if room_stream_id > max_room_stream_id: self.pending_new_room_events.append(( - event, room_stream_id, extra_users + room_stream_id, event, extra_users )) else: self._on_new_room_event(event, room_stream_id, extra_users) diff --git a/synapse/types.py b/synapse/types.py index d89a04f7c3..1b21160c57 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -119,6 +119,7 @@ class StreamToken( @property def room_stream_id(self): # TODO(markjh): Awful hack to work around hacks in the presence tests + # which assume that the keys are integers. if type(self.room_key) is int: return self.room_key else: @@ -132,6 +133,22 @@ class StreamToken( or (int(other_token.typing_key) < int(self.typing_key)) ) + def copy_and_advance(self, key, new_value): + """Advance the given key in the token to a new value if and only if the + new value is after the old value. + """ + new_token = self.copy_and_replace(key, new_value) + if key == "room_key": + new_id = new_token.room_stream_id + old_id = self.room_stream_id + else: + new_id = int(getattr(new_token, key)) + old_id = int(getattr(self, key)) + if old_id < new_id: + return new_token + else: + return self + def copy_and_replace(self, key, new_value): d = self._asdict() d[key] = new_value -- cgit 1.4.1