summary refs log tree commit diff
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2017-03-15 12:16:55 +0000
committerRichard van der Hoff <richard@matrix.org>2017-03-15 12:16:55 +0000
commit29ed09e80a8c7ddeebe3f257a336e4c387a06c88 (patch)
tree9fb244c8c00d9d01cd0856f0efc028caf53f4688
parentMerge pull request #2008 from matrix-org/erikj/notifier_stats (diff)
downloadsynapse-29ed09e80a8c7ddeebe3f257a336e4c387a06c88.tar.xz
Fix assertion to stop transaction queue getting wedged
... and update some docstrings to correctly reflect the types being used.

get_new_device_msgs_for_remote can return a long under some circumstances,
which was being stored in last_device_list_stream_id_by_dest, and was then
upsetting things on the next loop.
-rw-r--r--synapse/federation/transaction_queue.py5
-rw-r--r--synapse/replication/slave/storage/_slaved_id_tracker.py5
-rw-r--r--synapse/storage/deviceinbox.py6
-rw-r--r--synapse/storage/devices.py2
-rw-r--r--synapse/storage/util/id_generators.py14
-rw-r--r--synapse/util/caches/stream_change_cache.py2
6 files changed, 29 insertions, 5 deletions
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 90235ff098..c802dd67a3 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -99,7 +99,12 @@ class TransactionQueue(object):
         # destination -> list of tuple(failure, deferred)
         self.pending_failures_by_dest = {}
 
+        # destination -> stream_id of last successfully sent to-device message.
+        # NB: may be a long or an int.
         self.last_device_stream_id_by_dest = {}
+
+        # destination -> stream_id of last successfully sent device list
+        # update.
         self.last_device_list_stream_id_by_dest = {}
 
         # HACK to get unique tx id
diff --git a/synapse/replication/slave/storage/_slaved_id_tracker.py b/synapse/replication/slave/storage/_slaved_id_tracker.py
index 24b5c79d4a..9d1d173b2f 100644
--- a/synapse/replication/slave/storage/_slaved_id_tracker.py
+++ b/synapse/replication/slave/storage/_slaved_id_tracker.py
@@ -27,4 +27,9 @@ class SlavedIdTracker(object):
         self._current = (max if self.step > 0 else min)(self._current, new_id)
 
     def get_current_token(self):
+        """
+
+        Returns:
+            int
+        """
         return self._current
diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py
index 5c7db5e5f6..7925cb5f1b 100644
--- a/synapse/storage/deviceinbox.py
+++ b/synapse/storage/deviceinbox.py
@@ -357,12 +357,12 @@ class DeviceInboxStore(BackgroundUpdateStore):
         """
         Args:
             destination(str): The name of the remote server.
-            last_stream_id(int): The last position of the device message stream
+            last_stream_id(int|long): The last position of the device message stream
                 that the server sent up to.
-            current_stream_id(int): The current position of the device
+            current_stream_id(int|long): The current position of the device
                 message stream.
         Returns:
-            Deferred ([dict], int): List of messages for the device and where
+            Deferred ([dict], int|long): List of messages for the device and where
                 in the stream the messages got to.
         """
 
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index 563071b7a9..e545b62e39 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -308,7 +308,7 @@ class DeviceStore(SQLBaseStore):
         """Get stream of updates to send to remote servers
 
         Returns:
-            (now_stream_id, [ { updates }, .. ])
+            (int, list[dict]): current stream id and list of updates
         """
         now_stream_id = self._device_list_id_gen.get_current_token()
 
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index 46cf93ff87..95031dc9ec 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -30,6 +30,17 @@ class IdGenerator(object):
 
 
 def _load_current_id(db_conn, table, column, step=1):
+    """
+
+    Args:
+        db_conn (object):
+        table (str):
+        column (str):
+        step (int):
+
+    Returns:
+        int
+    """
     cur = db_conn.cursor()
     if step == 1:
         cur.execute("SELECT MAX(%s) FROM %s" % (column, table,))
@@ -131,6 +142,9 @@ class StreamIdGenerator(object):
     def get_current_token(self):
         """Returns the maximum stream id such that all stream ids less than or
         equal to it have been successfully persisted.
+
+        Returns:
+            int
         """
         with self._lock:
             if self._unfinished_ids:
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index b72bb0ff02..70fe00ce0b 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -50,7 +50,7 @@ class StreamChangeCache(object):
     def has_entity_changed(self, entity, stream_pos):
         """Returns True if the entity may have been updated since stream_pos
         """
-        assert type(stream_pos) is int
+        assert type(stream_pos) is int or type(stream_pos) is long
 
         if stream_pos < self._earliest_known_stream_pos:
             self.metrics.inc_misses()