summary refs log tree commit diff
path: root/synapse/replication
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication')
-rw-r--r--synapse/replication/tcp/client.py18
1 files changed, 11 insertions, 7 deletions
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 350762f494..a52e25c1af 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -43,7 +43,7 @@ from synapse.replication.tcp.streams.events import (
     EventsStreamEventRow,
     EventsStreamRow,
 )
-from synapse.types import PersistedEventPosition, ReadReceipt, UserID
+from synapse.types import PersistedEventPosition, ReadReceipt, StreamKeyType, UserID
 from synapse.util.async_helpers import Linearizer, timeout_deferred
 from synapse.util.metrics import Measure
 
@@ -153,19 +153,19 @@ class ReplicationDataHandler:
         if stream_name == TypingStream.NAME:
             self._typing_handler.process_replication_rows(token, rows)
             self.notifier.on_new_event(
-                "typing_key", token, rooms=[row.room_id for row in rows]
+                StreamKeyType.TYPING, token, rooms=[row.room_id for row in rows]
             )
         elif stream_name == PushRulesStream.NAME:
             self.notifier.on_new_event(
-                "push_rules_key", token, users=[row.user_id for row in rows]
+                StreamKeyType.PUSH_RULES, token, users=[row.user_id for row in rows]
             )
         elif stream_name in (AccountDataStream.NAME, TagAccountDataStream.NAME):
             self.notifier.on_new_event(
-                "account_data_key", token, users=[row.user_id for row in rows]
+                StreamKeyType.ACCOUNT_DATA, token, users=[row.user_id for row in rows]
             )
         elif stream_name == ReceiptsStream.NAME:
             self.notifier.on_new_event(
-                "receipt_key", token, rooms=[row.room_id for row in rows]
+                StreamKeyType.RECEIPT, token, rooms=[row.room_id for row in rows]
             )
             await self._pusher_pool.on_new_receipts(
                 token, token, {row.room_id for row in rows}
@@ -173,14 +173,18 @@ class ReplicationDataHandler:
         elif stream_name == ToDeviceStream.NAME:
             entities = [row.entity for row in rows if row.entity.startswith("@")]
             if entities:
-                self.notifier.on_new_event("to_device_key", token, users=entities)
+                self.notifier.on_new_event(
+                    StreamKeyType.TO_DEVICE, token, users=entities
+                )
         elif stream_name == DeviceListsStream.NAME:
             all_room_ids: Set[str] = set()
             for row in rows:
                 if row.entity.startswith("@"):
                     room_ids = await self.store.get_rooms_for_user(row.entity)
                     all_room_ids.update(room_ids)
-            self.notifier.on_new_event("device_list_key", token, rooms=all_room_ids)
+            self.notifier.on_new_event(
+                StreamKeyType.DEVICE_LIST, token, rooms=all_room_ids
+            )
         elif stream_name == GroupServerStream.NAME:
             self.notifier.on_new_event(
                 "groups_key", token, users=[row.user_id for row in rows]