summary refs log tree commit diff
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--changelog.d/8196.misc1
-rw-r--r--synapse/python_dependencies.py4
-rw-r--r--synapse/replication/tcp/client.py6
3 files changed, 6 insertions, 5 deletions
diff --git a/changelog.d/8196.misc b/changelog.d/8196.misc
new file mode 100644
index 0000000000..c42baf0e56
--- /dev/null
+++ b/changelog.d/8196.misc
@@ -0,0 +1 @@
+Fix `wait_for_stream_position` to allow multiple waiters on same stream ID.
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index dd77a44b8d..2d995ec456 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -66,7 +66,9 @@ REQUIREMENTS = [
     "msgpack>=0.5.2",
     "phonenumbers>=8.2.0",
     "prometheus_client>=0.0.18,<0.9.0",
-    # we use attr.validators.deep_iterable, which arrived in 19.1.0
+    # we use attr.validators.deep_iterable, which arrived in 19.1.0 (Note:
+    # Fedora 31 only has 19.1, so if we want to upgrade we should wait until 33
+    # is out in November.)
     "attrs>=19.1.0",
     "netaddr>=0.7.18",
     "Jinja2>=2.9",
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index fcf8ebf1e7..d6ecf5b327 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -14,7 +14,6 @@
 # limitations under the License.
 """A replication client for use by synapse workers.
 """
-import heapq
 import logging
 from typing import TYPE_CHECKING, Dict, List, Tuple
 
@@ -219,9 +218,8 @@ class ReplicationDataHandler:
 
         waiting_list = self._streams_to_waiters.setdefault(stream_name, [])
 
-        # We insert into the list using heapq as it is more efficient than
-        # pushing then resorting each time.
-        heapq.heappush(waiting_list, (position, deferred))
+        waiting_list.append((position, deferred))
+        waiting_list.sort(key=lambda t: t[0])
 
         # We measure here to get in flight counts and average waiting time.
         with Measure(self._clock, "repl.wait_for_stream_position"):