diff options
author | Erik Johnston <erik@matrix.org> | 2020-08-28 17:12:45 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-08-28 17:12:45 +0100 |
commit | 3b4556cf87666bb6f40d89c8c7fff42d336237b6 (patch) | |
tree | 7c4e854c4a04289f514dd9eb9b9bbd66a56af5ea /synapse/replication/tcp | |
parent | Convert `event_push_actions`, `registration`, and `roommember` datastores to ... (diff) | |
download | synapse-3b4556cf87666bb6f40d89c8c7fff42d336237b6.tar.xz |
Fix `wait_for_stream_position` for multiple waiters. (#8196)
This fixes a bug where having multiple callers waiting on the same stream and position will cause it to try and compare two deferreds, which fails (due to the sorted list having an entry of `Tuple[int, Deferred]`).
Diffstat (limited to '')
-rw-r--r-- | synapse/replication/tcp/client.py | 6 |
1 files changed, 2 insertions, 4 deletions
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index fcf8ebf1e7..d6ecf5b327 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -14,7 +14,6 @@ # limitations under the License. """A replication client for use by synapse workers. """ -import heapq import logging from typing import TYPE_CHECKING, Dict, List, Tuple @@ -219,9 +218,8 @@ class ReplicationDataHandler: waiting_list = self._streams_to_waiters.setdefault(stream_name, []) - # We insert into the list using heapq as it is more efficient than - # pushing then resorting each time. - heapq.heappush(waiting_list, (position, deferred)) + waiting_list.append((position, deferred)) + waiting_list.sort(key=lambda t: t[0]) # We measure here to get in flight counts and average waiting time. with Measure(self._clock, "repl.wait_for_stream_position"): |