diff --git a/synapse/notifier.py b/synapse/notifier.py
index 862b42cfc8..0b5d97521e 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -88,7 +88,7 @@ class _NotifierUserStream(object):
stream_id(str): The new id for the stream the event came from.
time_now_ms(int): The current time in milliseconds.
"""
- self.current_token = self.current_token.copy_and_replace(
+ self.current_token = self.current_token.copy_and_advance(
stream_key, stream_id
)
if self.listeners:
@@ -192,7 +192,7 @@ class Notifier(object):
yield run_on_reactor()
self.pending_new_room_events.append((
- event, room_stream_id, extra_users
+ room_stream_id, event, extra_users
))
self._notify_pending_new_room_events(max_room_stream_id)
@@ -205,10 +205,10 @@ class Notifier(object):
"""
pending = sorted(self.pending_new_room_events)
self.pending_new_room_events = []
- for event, room_stream_id, extra_users in pending:
+ for room_stream_id, event, extra_users in pending:
if room_stream_id > max_room_stream_id:
self.pending_new_room_events.append((
- event, room_stream_id, extra_users
+ room_stream_id, event, extra_users
))
else:
self._on_new_room_event(event, room_stream_id, extra_users)
diff --git a/synapse/types.py b/synapse/types.py
index d89a04f7c3..1b21160c57 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -119,6 +119,7 @@ class StreamToken(
@property
def room_stream_id(self):
# TODO(markjh): Awful hack to work around hacks in the presence tests
+ # which assume that the keys are integers.
if type(self.room_key) is int:
return self.room_key
else:
@@ -132,6 +133,22 @@ class StreamToken(
or (int(other_token.typing_key) < int(self.typing_key))
)
+ def copy_and_advance(self, key, new_value):
+ """Advance the given key in the token to a new value if and only if the
+ new value is after the old value.
+ """
+ new_token = self.copy_and_replace(key, new_value)
+ if key == "room_key":
+ new_id = new_token.room_stream_id
+ old_id = self.room_stream_id
+ else:
+ new_id = int(getattr(new_token, key))
+ old_id = int(getattr(self, key))
+ if old_id < new_id:
+ return new_token
+ else:
+ return self
+
def copy_and_replace(self, key, new_value):
d = self._asdict()
d[key] = new_value
|