summary refs log tree commit diff
path: root/synapse/replication/tcp/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp/client.py')
-rw-r--r--synapse/replication/tcp/client.py20
1 files changed, 10 insertions, 10 deletions
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py

index fcf8ebf1e7..e165429cad 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py
@@ -14,7 +14,6 @@ # limitations under the License. """A replication client for use by synapse workers. """ -import heapq import logging from typing import TYPE_CHECKING, Dict, List, Tuple @@ -30,6 +29,7 @@ from synapse.replication.tcp.streams.events import ( EventsStreamEventRow, EventsStreamRow, ) +from synapse.types import PersistedEventPosition, UserID from synapse.util.async_helpers import timeout_deferred from synapse.util.metrics import Measure @@ -99,7 +99,6 @@ class ReplicationDataHandler: def __init__(self, hs: "HomeServer"): self.store = hs.get_datastore() - self.pusher_pool = hs.get_pusherpool() self.notifier = hs.get_notifier() self._reactor = hs.get_reactor() self._clock = hs.get_clock() @@ -149,13 +148,15 @@ class ReplicationDataHandler: if event.rejected_reason: continue - extra_users = () # type: Tuple[str, ...] + extra_users = () # type: Tuple[UserID, ...] if event.type == EventTypes.Member: - extra_users = (event.state_key,) - max_token = self.store.get_room_max_stream_ordering() - self.notifier.on_new_room_event(event, token, max_token, extra_users) + extra_users = (UserID.from_string(event.state_key),) - await self.pusher_pool.on_new_notifications(token, token) + max_token = self.store.get_room_max_token() + event_pos = PersistedEventPosition(instance_name, token) + self.notifier.on_new_room_event( + event, event_pos, max_token, extra_users + ) # Notify any waiting deferreds. The list is ordered by position so we # just iterate through the list until we reach a position that is @@ -219,9 +220,8 @@ class ReplicationDataHandler: waiting_list = self._streams_to_waiters.setdefault(stream_name, []) - # We insert into the list using heapq as it is more efficient than - # pushing then resorting each time. - heapq.heappush(waiting_list, (position, deferred)) + waiting_list.append((position, deferred)) + waiting_list.sort(key=lambda t: t[0]) # We measure here to get in flight counts and average waiting time. with Measure(self._clock, "repl.wait_for_stream_position"):