summary refs log tree commit diff
diff options
context:
space:
mode:
authorMark Haines <mark.haines@matrix.org>2015-05-18 13:17:36 +0100
committerMark Haines <mark.haines@matrix.org>2015-05-18 13:17:36 +0100
commit1e90715a3d5f8a910c187dec888283e110a3c04a (patch)
treeefbb5d06143cd39692842f07fb9a542a9f85e9fa
parentAdd some doc-strings to notifier (diff)
downloadsynapse-1e90715a3d5f8a910c187dec888283e110a3c04a.tar.xz
Make sure the notifier stream token goes forward when it is updated. Sort the pending events by the correct room_stream_id
-rw-r--r--synapse/notifier.py8
-rw-r--r--synapse/types.py17
2 files changed, 21 insertions, 4 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 862b42cfc8..0b5d97521e 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -88,7 +88,7 @@ class _NotifierUserStream(object):
             stream_id(str): The new id for the stream the event came from.
             time_now_ms(int): The current time in milliseconds.
         """
-        self.current_token = self.current_token.copy_and_replace(
+        self.current_token = self.current_token.copy_and_advance(
             stream_key, stream_id
         )
         if self.listeners:
@@ -192,7 +192,7 @@ class Notifier(object):
         yield run_on_reactor()
 
         self.pending_new_room_events.append((
-            event, room_stream_id, extra_users
+            room_stream_id, event, extra_users
         ))
         self._notify_pending_new_room_events(max_room_stream_id)
 
@@ -205,10 +205,10 @@ class Notifier(object):
         """
         pending = sorted(self.pending_new_room_events)
         self.pending_new_room_events = []
-        for event, room_stream_id, extra_users in pending:
+        for room_stream_id, event, extra_users in pending:
             if room_stream_id > max_room_stream_id:
                 self.pending_new_room_events.append((
-                    event, room_stream_id, extra_users
+                    room_stream_id, event, extra_users
                 ))
             else:
                 self._on_new_room_event(event, room_stream_id, extra_users)
diff --git a/synapse/types.py b/synapse/types.py
index d89a04f7c3..1b21160c57 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -119,6 +119,7 @@ class StreamToken(
     @property
     def room_stream_id(self):
         # TODO(markjh): Awful hack to work around hacks in the presence tests
+        # which assume that the keys are integers.
         if type(self.room_key) is int:
             return self.room_key
         else:
@@ -132,6 +133,22 @@ class StreamToken(
             or (int(other_token.typing_key) < int(self.typing_key))
         )
 
+    def copy_and_advance(self, key, new_value):
+        """Advance the given key in the token to a new value if and only if the
+        new value is after the old value.
+        """
+        new_token = self.copy_and_replace(key, new_value)
+        if key == "room_key":
+            new_id = new_token.room_stream_id
+            old_id = self.room_stream_id
+        else:
+            new_id = int(getattr(new_token, key))
+            old_id = int(getattr(self, key))
+        if old_id < new_id:
+            return new_token
+        else:
+            return self
+
     def copy_and_replace(self, key, new_value):
         d = self._asdict()
         d[key] = new_value