summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-08-28 17:12:45 +0100
committerGitHub <noreply@github.com>2020-08-28 17:12:45 +0100
commit3b4556cf87666bb6f40d89c8c7fff42d336237b6 (patch)
tree7c4e854c4a04289f514dd9eb9b9bbd66a56af5ea
parentConvert `event_push_actions`, `registration`, and `roommember` datastores to ... (diff)
downloadsynapse-3b4556cf87666bb6f40d89c8c7fff42d336237b6.tar.xz
Fix `wait_for_stream_position` for multiple waiters. (#8196)
This fixes a bug where having multiple callers waiting on the same
stream and position will cause it to try and compare two deferreds,
which fails (due to the sorted list having an entry of `Tuple[int,
Deferred]`).
-rw-r--r--changelog.d/8196.misc1
-rw-r--r--synapse/python_dependencies.py4
-rw-r--r--synapse/replication/tcp/client.py6
3 files changed, 6 insertions, 5 deletions
diff --git a/changelog.d/8196.misc b/changelog.d/8196.misc
new file mode 100644
index 0000000000..c42baf0e56
--- /dev/null
+++ b/changelog.d/8196.misc
@@ -0,0 +1 @@
+Fix `wait_for_stream_position` to allow multiple waiters on same stream ID.
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index dd77a44b8d..2d995ec456 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -66,7 +66,9 @@ REQUIREMENTS = [
     "msgpack>=0.5.2",
     "phonenumbers>=8.2.0",
     "prometheus_client>=0.0.18,<0.9.0",
-    # we use attr.validators.deep_iterable, which arrived in 19.1.0
+    # we use attr.validators.deep_iterable, which arrived in 19.1.0 (Note:
+    # Fedora 31 only has 19.1, so if we want to upgrade we should wait until 33
+    # is out in November.)
     "attrs>=19.1.0",
     "netaddr>=0.7.18",
     "Jinja2>=2.9",
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index fcf8ebf1e7..d6ecf5b327 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -14,7 +14,6 @@
 # limitations under the License.
 """A replication client for use by synapse workers.
 """
-import heapq
 import logging
 from typing import TYPE_CHECKING, Dict, List, Tuple
 
@@ -219,9 +218,8 @@ class ReplicationDataHandler:
 
         waiting_list = self._streams_to_waiters.setdefault(stream_name, [])
 
-        # We insert into the list using heapq as it is more efficient than
-        # pushing then resorting each time.
-        heapq.heappush(waiting_list, (position, deferred))
+        waiting_list.append((position, deferred))
+        waiting_list.sort(key=lambda t: t[0])
 
         # We measure here to get in flight counts and average waiting time.
         with Measure(self._clock, "repl.wait_for_stream_position"):