summary refs log tree commit diff
path: root/synapse/replication/tcp/client.py
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2020-10-01 11:22:36 +0100
committerRichard van der Hoff <richard@matrix.org>2020-10-01 11:22:36 +0100
commitdb13a8607e94b2a31734fd964fd92292c5015057 (patch)
treea3e2e229550295a75958e12519bd4af002cf19ee /synapse/replication/tcp/client.py
parentRevert federation-transaction-transmission backoff hacks (diff)
parentAdd prometheus metrics to track federation delays (#8430) (diff)
downloadsynapse-db13a8607e94b2a31734fd964fd92292c5015057.tar.xz
Merge remote-tracking branch 'origin/develop' into matrix-org-hotfixes
Diffstat (limited to 'synapse/replication/tcp/client.py')
-rw-r--r--synapse/replication/tcp/client.py14
1 files changed, 8 insertions, 6 deletions
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py

index d6ecf5b327..e165429cad 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py
@@ -29,6 +29,7 @@ from synapse.replication.tcp.streams.events import ( EventsStreamEventRow, EventsStreamRow, ) +from synapse.types import PersistedEventPosition, UserID from synapse.util.async_helpers import timeout_deferred from synapse.util.metrics import Measure @@ -98,7 +99,6 @@ class ReplicationDataHandler: def __init__(self, hs: "HomeServer"): self.store = hs.get_datastore() - self.pusher_pool = hs.get_pusherpool() self.notifier = hs.get_notifier() self._reactor = hs.get_reactor() self._clock = hs.get_clock() @@ -148,13 +148,15 @@ class ReplicationDataHandler: if event.rejected_reason: continue - extra_users = () # type: Tuple[str, ...] + extra_users = () # type: Tuple[UserID, ...] if event.type == EventTypes.Member: - extra_users = (event.state_key,) - max_token = self.store.get_room_max_stream_ordering() - self.notifier.on_new_room_event(event, token, max_token, extra_users) + extra_users = (UserID.from_string(event.state_key),) - await self.pusher_pool.on_new_notifications(token, token) + max_token = self.store.get_room_max_token() + event_pos = PersistedEventPosition(instance_name, token) + self.notifier.on_new_room_event( + event, event_pos, max_token, extra_users + ) # Notify any waiting deferreds. The list is ordered by position so we # just iterate through the list until we reach a position that is