diff --git a/changelog.d/8196.misc b/changelog.d/8196.misc
new file mode 100644
index 0000000000..c42baf0e56
--- /dev/null
+++ b/changelog.d/8196.misc
@@ -0,0 +1 @@
+Fix `wait_for_stream_position` to allow multiple waiters on same stream ID.
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index dd77a44b8d..2d995ec456 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -66,7 +66,9 @@ REQUIREMENTS = [
"msgpack>=0.5.2",
"phonenumbers>=8.2.0",
"prometheus_client>=0.0.18,<0.9.0",
- # we use attr.validators.deep_iterable, which arrived in 19.1.0
+ # we use attr.validators.deep_iterable, which arrived in 19.1.0 (Note:
+ # Fedora 31 only has 19.1, so if we want to upgrade we should wait until 33
+ # is out in November.)
"attrs>=19.1.0",
"netaddr>=0.7.18",
"Jinja2>=2.9",
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index fcf8ebf1e7..d6ecf5b327 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -14,7 +14,6 @@
# limitations under the License.
"""A replication client for use by synapse workers.
"""
-import heapq
import logging
from typing import TYPE_CHECKING, Dict, List, Tuple
@@ -219,9 +218,8 @@ class ReplicationDataHandler:
waiting_list = self._streams_to_waiters.setdefault(stream_name, [])
- # We insert into the list using heapq as it is more efficient than
- # pushing then resorting each time.
- heapq.heappush(waiting_list, (position, deferred))
+ waiting_list.append((position, deferred))
+ waiting_list.sort(key=lambda t: t[0])
# We measure here to get in flight counts and average waiting time.
with Measure(self._clock, "repl.wait_for_stream_position"):
|