diff options
author | Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> | 2022-05-16 16:35:31 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-05-16 15:35:31 +0000 |
commit | 83be72d76ca171ceb0fc381aa4548c1d9fea0dc7 (patch) | |
tree | c7d4fee54f2b7ba7993c15b6d892d9579da3d91b /synapse/replication/tcp | |
parent | Avoid unnecessary copies when filtering private read receipts. (#12711) (diff) | |
download | synapse-83be72d76ca171ceb0fc381aa4548c1d9fea0dc7.tar.xz |
Add `StreamKeyType` class and replace string literals with constants (#12567)
Diffstat (limited to 'synapse/replication/tcp')
-rw-r--r-- | synapse/replication/tcp/client.py | 18 |
1 files changed, 11 insertions, 7 deletions
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 350762f494..a52e25c1af 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -43,7 +43,7 @@ from synapse.replication.tcp.streams.events import ( EventsStreamEventRow, EventsStreamRow, ) -from synapse.types import PersistedEventPosition, ReadReceipt, UserID +from synapse.types import PersistedEventPosition, ReadReceipt, StreamKeyType, UserID from synapse.util.async_helpers import Linearizer, timeout_deferred from synapse.util.metrics import Measure @@ -153,19 +153,19 @@ class ReplicationDataHandler: if stream_name == TypingStream.NAME: self._typing_handler.process_replication_rows(token, rows) self.notifier.on_new_event( - "typing_key", token, rooms=[row.room_id for row in rows] + StreamKeyType.TYPING, token, rooms=[row.room_id for row in rows] ) elif stream_name == PushRulesStream.NAME: self.notifier.on_new_event( - "push_rules_key", token, users=[row.user_id for row in rows] + StreamKeyType.PUSH_RULES, token, users=[row.user_id for row in rows] ) elif stream_name in (AccountDataStream.NAME, TagAccountDataStream.NAME): self.notifier.on_new_event( - "account_data_key", token, users=[row.user_id for row in rows] + StreamKeyType.ACCOUNT_DATA, token, users=[row.user_id for row in rows] ) elif stream_name == ReceiptsStream.NAME: self.notifier.on_new_event( - "receipt_key", token, rooms=[row.room_id for row in rows] + StreamKeyType.RECEIPT, token, rooms=[row.room_id for row in rows] ) await self._pusher_pool.on_new_receipts( token, token, {row.room_id for row in rows} @@ -173,14 +173,18 @@ class ReplicationDataHandler: elif stream_name == ToDeviceStream.NAME: entities = [row.entity for row in rows if row.entity.startswith("@")] if entities: - self.notifier.on_new_event("to_device_key", token, users=entities) + self.notifier.on_new_event( + StreamKeyType.TO_DEVICE, token, users=entities + ) elif stream_name == DeviceListsStream.NAME: all_room_ids: Set[str] = set() for row in rows: if row.entity.startswith("@"): room_ids = await self.store.get_rooms_for_user(row.entity) all_room_ids.update(room_ids) - self.notifier.on_new_event("device_list_key", token, rooms=all_room_ids) + self.notifier.on_new_event( + StreamKeyType.DEVICE_LIST, token, rooms=all_room_ids + ) elif stream_name == GroupServerStream.NAME: self.notifier.on_new_event( "groups_key", token, users=[row.user_id for row in rows] |