diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index cdc078cf11..136babe6ce 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -65,12 +65,23 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
-from synapse.replication.tcp.streams._base import (
+from synapse.replication.tcp.streams import (
+ AccountDataStream,
DeviceListsStream,
+ GroupServerStream,
+ PresenceStream,
+ PushersStream,
+ PushRulesStream,
ReceiptsStream,
+ TagAccountDataStream,
ToDeviceStream,
+ TypingStream,
+)
+from synapse.replication.tcp.streams.events import (
+ EventsStream,
+ EventsStreamEventRow,
+ EventsStreamRow,
)
-from synapse.replication.tcp.streams.events import EventsStreamEventRow, EventsStreamRow
from synapse.rest.admin import register_servlets_for_media_repo
from synapse.rest.client.v1 import events
from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
@@ -626,7 +637,7 @@ class GenericWorkerReplicationHandler(ReplicationClientHandler):
if self.send_handler:
self.send_handler.process_replication_rows(stream_name, token, rows)
- if stream_name == "events":
+ if stream_name == EventsStream.NAME:
# We shouldn't get multiple rows per token for events stream, so
# we don't need to optimise this for multiple rows.
for row in rows:
@@ -649,44 +660,44 @@ class GenericWorkerReplicationHandler(ReplicationClientHandler):
)
await self.pusher_pool.on_new_notifications(token, token)
- elif stream_name == "push_rules":
+ elif stream_name == PushRulesStream.NAME:
self.notifier.on_new_event(
"push_rules_key", token, users=[row.user_id for row in rows]
)
- elif stream_name in ("account_data", "tag_account_data"):
+ elif stream_name in (AccountDataStream.NAME, TagAccountDataStream.NAME):
self.notifier.on_new_event(
"account_data_key", token, users=[row.user_id for row in rows]
)
- elif stream_name == "receipts":
+ elif stream_name == ReceiptsStream.NAME:
self.notifier.on_new_event(
"receipt_key", token, rooms=[row.room_id for row in rows]
)
await self.pusher_pool.on_new_receipts(
token, token, {row.room_id for row in rows}
)
- elif stream_name == "typing":
+ elif stream_name == TypingStream.NAME:
self.typing_handler.process_replication_rows(token, rows)
self.notifier.on_new_event(
"typing_key", token, rooms=[row.room_id for row in rows]
)
- elif stream_name == "to_device":
+ elif stream_name == ToDeviceStream.NAME:
entities = [row.entity for row in rows if row.entity.startswith("@")]
if entities:
self.notifier.on_new_event("to_device_key", token, users=entities)
- elif stream_name == "device_lists":
+ elif stream_name == DeviceListsStream.NAME:
all_room_ids = set()
for row in rows:
if row.entity.startswith("@"):
room_ids = await self.store.get_rooms_for_user(row.entity)
all_room_ids.update(room_ids)
self.notifier.on_new_event("device_list_key", token, rooms=all_room_ids)
- elif stream_name == "presence":
+ elif stream_name == PresenceStream.NAME:
await self.presence_handler.process_replication_rows(token, rows)
- elif stream_name == "receipts":
+ elif stream_name == GroupServerStream.NAME:
self.notifier.on_new_event(
"groups_key", token, users=[row.user_id for row in rows]
)
- elif stream_name == "pushers":
+ elif stream_name == PushersStream.NAME:
for row in rows:
if row.deleted:
self.stop_pusher(row.user_id, row.app_id, row.pushkey)
|