diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index b5e40da533..322d695bc7 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -33,7 +33,6 @@ from synapse.replication.tcp.streams import (
PushersStream,
PushRulesStream,
ReceiptsStream,
- TagAccountDataStream,
ToDeviceStream,
TypingStream,
UnPartialStatedEventStream,
@@ -168,7 +167,7 @@ class ReplicationDataHandler:
self.notifier.on_new_event(
StreamKeyType.PUSH_RULES, token, users=[row.user_id for row in rows]
)
- elif stream_name in (AccountDataStream.NAME, TagAccountDataStream.NAME):
+ elif stream_name in AccountDataStream.NAME:
self.notifier.on_new_event(
StreamKeyType.ACCOUNT_DATA, token, users=[row.user_id for row in rows]
)
@@ -188,7 +187,7 @@ class ReplicationDataHandler:
elif stream_name == DeviceListsStream.NAME:
all_room_ids: Set[str] = set()
for row in rows:
- if row.entity.startswith("@"):
+ if row.entity.startswith("@") and not row.is_signature:
room_ids = await self.store.get_rooms_for_user(row.entity)
all_room_ids.update(room_ids)
self.notifier.on_new_event(
@@ -326,7 +325,7 @@ class ReplicationDataHandler:
# anyway in that case we don't need to wait.
return
- current_position = self._streams[stream_name].current_token(self._instance_name)
+ current_position = self._streams[stream_name].current_token(instance_name)
if position <= current_position:
# We're already past the position
return
@@ -423,7 +422,11 @@ class FederationSenderHandler:
# The entities are either user IDs (starting with '@') whose devices
# have changed, or remote servers that we need to tell about
# changes.
- hosts = {row.entity for row in rows if not row.entity.startswith("@")}
+ hosts = {
+ row.entity
+ for row in rows
+ if not row.entity.startswith("@") and not row.is_signature
+ }
for host in hosts:
self.federation_sender.send_device_messages(host, immediate=False)
|