summary refs log tree commit diff
diff options
context:
space:
mode:
authorAndrew Morgan <1342360+anoadragon453@users.noreply.github.com>2022-05-16 16:35:31 +0100
committerGitHub <noreply@github.com>2022-05-16 15:35:31 +0000
commit83be72d76ca171ceb0fc381aa4548c1d9fea0dc7 (patch)
treec7d4fee54f2b7ba7993c15b6d892d9579da3d91b
parentAvoid unnecessary copies when filtering private read receipts. (#12711) (diff)
downloadsynapse-83be72d76ca171ceb0fc381aa4548c1d9fea0dc7.tar.xz
Add `StreamKeyType` class and replace string literals with constants (#12567)
-rw-r--r--changelog.d/12567.misc1
-rw-r--r--synapse/handlers/account_data.py10
-rw-r--r--synapse/handlers/appservice.py39
-rw-r--r--synapse/handlers/device.py7
-rw-r--r--synapse/handlers/devicemessage.py6
-rw-r--r--synapse/handlers/initial_sync.py13
-rw-r--r--synapse/handlers/pagination.py6
-rw-r--r--synapse/handlers/presence.py6
-rw-r--r--synapse/handlers/receipts.py12
-rw-r--r--synapse/handlers/room.py9
-rw-r--r--synapse/handlers/search.py10
-rw-r--r--synapse/handlers/sync.py23
-rw-r--r--synapse/handlers/typing.py4
-rw-r--r--synapse/notifier.py5
-rw-r--r--synapse/replication/tcp/client.py18
-rw-r--r--synapse/server_notices/server_notices_manager.py4
-rw-r--r--synapse/storage/databases/main/e2e_room_keys.py4
-rw-r--r--synapse/storage/databases/main/relations.py6
-rw-r--r--synapse/types.py22
19 files changed, 125 insertions, 80 deletions
diff --git a/changelog.d/12567.misc b/changelog.d/12567.misc
new file mode 100644
index 0000000000..35f08569ba
--- /dev/null
+++ b/changelog.d/12567.misc
@@ -0,0 +1 @@
+Replace string literal instances of stream key types with typed constants.
\ No newline at end of file
diff --git a/synapse/handlers/account_data.py b/synapse/handlers/account_data.py
index 4af9fbc5d1..0478448b47 100644
--- a/synapse/handlers/account_data.py
+++ b/synapse/handlers/account_data.py
@@ -23,7 +23,7 @@ from synapse.replication.http.account_data import (
     ReplicationUserAccountDataRestServlet,
 )
 from synapse.streams import EventSource
-from synapse.types import JsonDict, UserID
+from synapse.types import JsonDict, StreamKeyType, UserID
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -105,7 +105,7 @@ class AccountDataHandler:
             )
 
             self._notifier.on_new_event(
-                "account_data_key", max_stream_id, users=[user_id]
+                StreamKeyType.ACCOUNT_DATA, max_stream_id, users=[user_id]
             )
 
             await self._notify_modules(user_id, room_id, account_data_type, content)
@@ -141,7 +141,7 @@ class AccountDataHandler:
             )
 
             self._notifier.on_new_event(
-                "account_data_key", max_stream_id, users=[user_id]
+                StreamKeyType.ACCOUNT_DATA, max_stream_id, users=[user_id]
             )
 
             await self._notify_modules(user_id, None, account_data_type, content)
@@ -176,7 +176,7 @@ class AccountDataHandler:
             )
 
             self._notifier.on_new_event(
-                "account_data_key", max_stream_id, users=[user_id]
+                StreamKeyType.ACCOUNT_DATA, max_stream_id, users=[user_id]
             )
             return max_stream_id
         else:
@@ -201,7 +201,7 @@ class AccountDataHandler:
             )
 
             self._notifier.on_new_event(
-                "account_data_key", max_stream_id, users=[user_id]
+                StreamKeyType.ACCOUNT_DATA, max_stream_id, users=[user_id]
             )
             return max_stream_id
         else:
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 85bd5e4768..1da7bcc85b 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -38,6 +38,7 @@ from synapse.types import (
     JsonDict,
     RoomAlias,
     RoomStreamToken,
+    StreamKeyType,
     UserID,
 )
 from synapse.util.async_helpers import Linearizer
@@ -213,8 +214,8 @@ class ApplicationServicesHandler:
         Args:
             stream_key: The stream the event came from.
 
-                `stream_key` can be "typing_key", "receipt_key", "presence_key",
-                "to_device_key" or "device_list_key". Any other value for `stream_key`
+                `stream_key` can be StreamKeyType.TYPING, StreamKeyType.RECEIPT, StreamKeyType.PRESENCE,
+                StreamKeyType.TO_DEVICE or StreamKeyType.DEVICE_LIST. Any other value for `stream_key`
                 will cause this function to return early.
 
                 Ephemeral events will only be pushed to appservices that have opted into
@@ -235,11 +236,11 @@ class ApplicationServicesHandler:
         # Only the following streams are currently supported.
         # FIXME: We should use constants for these values.
         if stream_key not in (
-            "typing_key",
-            "receipt_key",
-            "presence_key",
-            "to_device_key",
-            "device_list_key",
+            StreamKeyType.TYPING,
+            StreamKeyType.RECEIPT,
+            StreamKeyType.PRESENCE,
+            StreamKeyType.TO_DEVICE,
+            StreamKeyType.DEVICE_LIST,
         ):
             return
 
@@ -258,14 +259,14 @@ class ApplicationServicesHandler:
 
         # Ignore to-device messages if the feature flag is not enabled
         if (
-            stream_key == "to_device_key"
+            stream_key == StreamKeyType.TO_DEVICE
             and not self._msc2409_to_device_messages_enabled
         ):
             return
 
         # Ignore device lists if the feature flag is not enabled
         if (
-            stream_key == "device_list_key"
+            stream_key == StreamKeyType.DEVICE_LIST
             and not self._msc3202_transaction_extensions_enabled
         ):
             return
@@ -283,15 +284,15 @@ class ApplicationServicesHandler:
             if (
                 stream_key
                 in (
-                    "typing_key",
-                    "receipt_key",
-                    "presence_key",
-                    "to_device_key",
+                    StreamKeyType.TYPING,
+                    StreamKeyType.RECEIPT,
+                    StreamKeyType.PRESENCE,
+                    StreamKeyType.TO_DEVICE,
                 )
                 and service.supports_ephemeral
             )
             or (
-                stream_key == "device_list_key"
+                stream_key == StreamKeyType.DEVICE_LIST
                 and service.msc3202_transaction_extensions
             )
         ]
@@ -317,7 +318,7 @@ class ApplicationServicesHandler:
         logger.debug("Checking interested services for %s", stream_key)
         with Measure(self.clock, "notify_interested_services_ephemeral"):
             for service in services:
-                if stream_key == "typing_key":
+                if stream_key == StreamKeyType.TYPING:
                     # Note that we don't persist the token (via set_appservice_stream_type_pos)
                     # for typing_key due to performance reasons and due to their highly
                     # ephemeral nature.
@@ -333,7 +334,7 @@ class ApplicationServicesHandler:
                 async with self._ephemeral_events_linearizer.queue(
                     (service.id, stream_key)
                 ):
-                    if stream_key == "receipt_key":
+                    if stream_key == StreamKeyType.RECEIPT:
                         events = await self._handle_receipts(service, new_token)
                         self.scheduler.enqueue_for_appservice(service, ephemeral=events)
 
@@ -342,7 +343,7 @@ class ApplicationServicesHandler:
                             service, "read_receipt", new_token
                         )
 
-                    elif stream_key == "presence_key":
+                    elif stream_key == StreamKeyType.PRESENCE:
                         events = await self._handle_presence(service, users, new_token)
                         self.scheduler.enqueue_for_appservice(service, ephemeral=events)
 
@@ -351,7 +352,7 @@ class ApplicationServicesHandler:
                             service, "presence", new_token
                         )
 
-                    elif stream_key == "to_device_key":
+                    elif stream_key == StreamKeyType.TO_DEVICE:
                         # Retrieve a list of to-device message events, as well as the
                         # maximum stream token of the messages we were able to retrieve.
                         to_device_messages = await self._get_to_device_messages(
@@ -366,7 +367,7 @@ class ApplicationServicesHandler:
                             service, "to_device", new_token
                         )
 
-                    elif stream_key == "device_list_key":
+                    elif stream_key == StreamKeyType.DEVICE_LIST:
                         device_list_summary = await self._get_device_list_summary(
                             service, new_token
                         )
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index a91b1ee4d5..1d6d1f8a92 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -43,6 +43,7 @@ from synapse.metrics.background_process_metrics import (
 )
 from synapse.types import (
     JsonDict,
+    StreamKeyType,
     StreamToken,
     UserID,
     get_domain_from_id,
@@ -502,7 +503,7 @@ class DeviceHandler(DeviceWorkerHandler):
         # specify the user ID too since the user should always get their own device list
         # updates, even if they aren't in any rooms.
         self.notifier.on_new_event(
-            "device_list_key", position, users={user_id}, rooms=room_ids
+            StreamKeyType.DEVICE_LIST, position, users={user_id}, rooms=room_ids
         )
 
         # We may need to do some processing asynchronously for local user IDs.
@@ -523,7 +524,9 @@ class DeviceHandler(DeviceWorkerHandler):
             from_user_id, user_ids
         )
 
-        self.notifier.on_new_event("device_list_key", position, users=[from_user_id])
+        self.notifier.on_new_event(
+            StreamKeyType.DEVICE_LIST, position, users=[from_user_id]
+        )
 
     async def user_left_room(self, user: UserID, room_id: str) -> None:
         user_id = user.to_string()
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index 4cb725d027..53668cce3b 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -26,7 +26,7 @@ from synapse.logging.opentracing import (
     set_tag,
 )
 from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
-from synapse.types import JsonDict, Requester, UserID, get_domain_from_id
+from synapse.types import JsonDict, Requester, StreamKeyType, UserID, get_domain_from_id
 from synapse.util import json_encoder
 from synapse.util.stringutils import random_string
 
@@ -151,7 +151,7 @@ class DeviceMessageHandler:
         # Notify listeners that there are new to-device messages to process,
         # handing them the latest stream id.
         self.notifier.on_new_event(
-            "to_device_key", last_stream_id, users=local_messages.keys()
+            StreamKeyType.TO_DEVICE, last_stream_id, users=local_messages.keys()
         )
 
     async def _check_for_unknown_devices(
@@ -285,7 +285,7 @@ class DeviceMessageHandler:
         # Notify listeners that there are new to-device messages to process,
         # handing them the latest stream id.
         self.notifier.on_new_event(
-            "to_device_key", last_stream_id, users=local_messages.keys()
+            StreamKeyType.TO_DEVICE, last_stream_id, users=local_messages.keys()
         )
 
         if self.federation_sender:
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index de09aed3a3..d79248ad90 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -30,6 +30,7 @@ from synapse.types import (
     Requester,
     RoomStreamToken,
     StateMap,
+    StreamKeyType,
     StreamToken,
     UserID,
 )
@@ -220,8 +221,10 @@ class InitialSyncHandler:
                     self.storage, user_id, messages
                 )
 
-                start_token = now_token.copy_and_replace("room_key", token)
-                end_token = now_token.copy_and_replace("room_key", room_end_token)
+                start_token = now_token.copy_and_replace(StreamKeyType.ROOM, token)
+                end_token = now_token.copy_and_replace(
+                    StreamKeyType.ROOM, room_end_token
+                )
                 time_now = self.clock.time_msec()
 
                 d["messages"] = {
@@ -369,8 +372,8 @@ class InitialSyncHandler:
             self.storage, user_id, messages, is_peeking=is_peeking
         )
 
-        start_token = StreamToken.START.copy_and_replace("room_key", token)
-        end_token = StreamToken.START.copy_and_replace("room_key", stream_token)
+        start_token = StreamToken.START.copy_and_replace(StreamKeyType.ROOM, token)
+        end_token = StreamToken.START.copy_and_replace(StreamKeyType.ROOM, stream_token)
 
         time_now = self.clock.time_msec()
 
@@ -474,7 +477,7 @@ class InitialSyncHandler:
             self.storage, user_id, messages, is_peeking=is_peeking
         )
 
-        start_token = now_token.copy_and_replace("room_key", token)
+        start_token = now_token.copy_and_replace(StreamKeyType.ROOM, token)
         end_token = now_token
 
         time_now = self.clock.time_msec()
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 2e30180094..6ae88add95 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -27,7 +27,7 @@ from synapse.handlers.room import ShutdownRoomResponse
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage.state import StateFilter
 from synapse.streams.config import PaginationConfig
-from synapse.types import JsonDict, Requester
+from synapse.types import JsonDict, Requester, StreamKeyType
 from synapse.util.async_helpers import ReadWriteLock
 from synapse.util.stringutils import random_string
 from synapse.visibility import filter_events_for_client
@@ -491,7 +491,7 @@ class PaginationHandler:
 
                     if leave_token.topological < curr_topo:
                         from_token = from_token.copy_and_replace(
-                            "room_key", leave_token
+                            StreamKeyType.ROOM, leave_token
                         )
 
                 await self.hs.get_federation_handler().maybe_backfill(
@@ -513,7 +513,7 @@ class PaginationHandler:
                 event_filter=event_filter,
             )
 
-            next_token = from_token.copy_and_replace("room_key", next_key)
+            next_token = from_token.copy_and_replace(StreamKeyType.ROOM, next_key)
 
         if events:
             if event_filter:
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 268481ec19..dd84e6c88b 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -66,7 +66,7 @@ from synapse.replication.tcp.commands import ClearUserSyncsCommand
 from synapse.replication.tcp.streams import PresenceFederationStream, PresenceStream
 from synapse.storage.databases.main import DataStore
 from synapse.streams import EventSource
-from synapse.types import JsonDict, UserID, get_domain_from_id
+from synapse.types import JsonDict, StreamKeyType, UserID, get_domain_from_id
 from synapse.util.async_helpers import Linearizer
 from synapse.util.caches.descriptors import _CacheContext, cached
 from synapse.util.metrics import Measure
@@ -522,7 +522,7 @@ class WorkerPresenceHandler(BasePresenceHandler):
         room_ids_to_states, users_to_states = parties
 
         self.notifier.on_new_event(
-            "presence_key",
+            StreamKeyType.PRESENCE,
             stream_id,
             rooms=room_ids_to_states.keys(),
             users=users_to_states.keys(),
@@ -1145,7 +1145,7 @@ class PresenceHandler(BasePresenceHandler):
         room_ids_to_states, users_to_states = parties
 
         self.notifier.on_new_event(
-            "presence_key",
+            StreamKeyType.PRESENCE,
             stream_id,
             rooms=room_ids_to_states.keys(),
             users=[UserID.from_string(u) for u in users_to_states],
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 550d58b0e1..e6a35f1d09 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -17,7 +17,13 @@ from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple
 from synapse.api.constants import ReceiptTypes
 from synapse.appservice import ApplicationService
 from synapse.streams import EventSource
-from synapse.types import JsonDict, ReadReceipt, UserID, get_domain_from_id
+from synapse.types import (
+    JsonDict,
+    ReadReceipt,
+    StreamKeyType,
+    UserID,
+    get_domain_from_id,
+)
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -129,7 +135,9 @@ class ReceiptsHandler:
 
         affected_room_ids = list({r.room_id for r in receipts})
 
-        self.notifier.on_new_event("receipt_key", max_batch_id, rooms=affected_room_ids)
+        self.notifier.on_new_event(
+            StreamKeyType.RECEIPT, max_batch_id, rooms=affected_room_ids
+        )
         # Note that the min here shouldn't be relied upon to be accurate.
         await self.hs.get_pusherpool().on_new_receipts(
             min_batch_id, max_batch_id, affected_room_ids
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 23baa50d03..a2973109ad 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -73,6 +73,7 @@ from synapse.types import (
     RoomID,
     RoomStreamToken,
     StateMap,
+    StreamKeyType,
     StreamToken,
     UserID,
     create_requester,
@@ -1292,10 +1293,10 @@ class RoomContextHandler:
             events_after=events_after,
             state=await filter_evts(state_events),
             aggregations=aggregations,
-            start=await token.copy_and_replace("room_key", results.start).to_string(
-                self.store
-            ),
-            end=await token.copy_and_replace("room_key", results.end).to_string(
+            start=await token.copy_and_replace(
+                StreamKeyType.ROOM, results.start
+            ).to_string(self.store),
+            end=await token.copy_and_replace(StreamKeyType.ROOM, results.end).to_string(
                 self.store
             ),
         )
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index 5619f8f50e..cd1c47dae8 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -24,7 +24,7 @@ from synapse.api.errors import NotFoundError, SynapseError
 from synapse.api.filtering import Filter
 from synapse.events import EventBase
 from synapse.storage.state import StateFilter
-from synapse.types import JsonDict, UserID
+from synapse.types import JsonDict, StreamKeyType, UserID
 from synapse.visibility import filter_events_for_client
 
 if TYPE_CHECKING:
@@ -655,11 +655,11 @@ class SearchHandler:
                 "events_before": events_before,
                 "events_after": events_after,
                 "start": await now_token.copy_and_replace(
-                    "room_key", res.start
+                    StreamKeyType.ROOM, res.start
+                ).to_string(self.store),
+                "end": await now_token.copy_and_replace(
+                    StreamKeyType.ROOM, res.end
                 ).to_string(self.store),
-                "end": await now_token.copy_and_replace("room_key", res.end).to_string(
-                    self.store
-                ),
             }
 
             if include_profile:
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 2c555a66d0..4be08fe7cb 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -37,6 +37,7 @@ from synapse.types import (
     Requester,
     RoomStreamToken,
     StateMap,
+    StreamKeyType,
     StreamToken,
     UserID,
 )
@@ -449,7 +450,7 @@ class SyncHandler:
                 room_ids=room_ids,
                 is_guest=sync_config.is_guest,
             )
-            now_token = now_token.copy_and_replace("typing_key", typing_key)
+            now_token = now_token.copy_and_replace(StreamKeyType.TYPING, typing_key)
 
             ephemeral_by_room: JsonDict = {}
 
@@ -471,7 +472,7 @@ class SyncHandler:
                 room_ids=room_ids,
                 is_guest=sync_config.is_guest,
             )
-            now_token = now_token.copy_and_replace("receipt_key", receipt_key)
+            now_token = now_token.copy_and_replace(StreamKeyType.RECEIPT, receipt_key)
 
             for event in receipts:
                 room_id = event["room_id"]
@@ -537,7 +538,9 @@ class SyncHandler:
                 prev_batch_token = now_token
                 if recents:
                     room_key = recents[0].internal_metadata.before
-                    prev_batch_token = now_token.copy_and_replace("room_key", room_key)
+                    prev_batch_token = now_token.copy_and_replace(
+                        StreamKeyType.ROOM, room_key
+                    )
 
                 return TimelineBatch(
                     events=recents, prev_batch=prev_batch_token, limited=False
@@ -611,7 +614,7 @@ class SyncHandler:
                 recents = recents[-timeline_limit:]
                 room_key = recents[0].internal_metadata.before
 
-            prev_batch_token = now_token.copy_and_replace("room_key", room_key)
+            prev_batch_token = now_token.copy_and_replace(StreamKeyType.ROOM, room_key)
 
         # Don't bother to bundle aggregations if the timeline is unlimited,
         # as clients will have all the necessary information.
@@ -1398,7 +1401,7 @@ class SyncHandler:
                 now_token.to_device_key,
             )
             sync_result_builder.now_token = now_token.copy_and_replace(
-                "to_device_key", stream_id
+                StreamKeyType.TO_DEVICE, stream_id
             )
             sync_result_builder.to_device = messages
         else:
@@ -1503,7 +1506,7 @@ class SyncHandler:
         )
         assert presence_key
         sync_result_builder.now_token = now_token.copy_and_replace(
-            "presence_key", presence_key
+            StreamKeyType.PRESENCE, presence_key
         )
 
         extra_users_ids = set(newly_joined_or_invited_users)
@@ -1826,7 +1829,7 @@ class SyncHandler:
                 # stream token as it'll only be used in the context of this
                 # room. (c.f. the docstring of `to_room_stream_token`).
                 leave_token = since_token.copy_and_replace(
-                    "room_key", leave_position.to_room_stream_token()
+                    StreamKeyType.ROOM, leave_position.to_room_stream_token()
                 )
 
                 # If this is an out of band message, like a remote invite
@@ -1875,7 +1878,9 @@ class SyncHandler:
             if room_entry:
                 events, start_key = room_entry
 
-                prev_batch_token = now_token.copy_and_replace("room_key", start_key)
+                prev_batch_token = now_token.copy_and_replace(
+                    StreamKeyType.ROOM, start_key
+                )
 
                 entry = RoomSyncResultBuilder(
                     room_id=room_id,
@@ -1972,7 +1977,7 @@ class SyncHandler:
                             continue
 
                 leave_token = now_token.copy_and_replace(
-                    "room_key", RoomStreamToken(None, event.stream_ordering)
+                    StreamKeyType.ROOM, RoomStreamToken(None, event.stream_ordering)
                 )
                 room_entries.append(
                     RoomSyncResultBuilder(
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 6854428b7c..bb00750bfd 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -25,7 +25,7 @@ from synapse.metrics.background_process_metrics import (
 )
 from synapse.replication.tcp.streams import TypingStream
 from synapse.streams import EventSource
-from synapse.types import JsonDict, Requester, UserID, get_domain_from_id
+from synapse.types import JsonDict, Requester, StreamKeyType, UserID, get_domain_from_id
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.metrics import Measure
 from synapse.util.wheel_timer import WheelTimer
@@ -382,7 +382,7 @@ class TypingWriterHandler(FollowerTypingHandler):
         )
 
         self.notifier.on_new_event(
-            "typing_key", self._latest_room_serial, rooms=[member.room_id]
+            StreamKeyType.TYPING, self._latest_room_serial, rooms=[member.room_id]
         )
 
     async def get_all_typing_updates(
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 01a50b9d62..ba23257f54 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -46,6 +46,7 @@ from synapse.types import (
     JsonDict,
     PersistedEventPosition,
     RoomStreamToken,
+    StreamKeyType,
     StreamToken,
     UserID,
 )
@@ -370,7 +371,7 @@ class Notifier:
 
         if users or rooms:
             self.on_new_event(
-                "room_key",
+                StreamKeyType.ROOM,
                 max_room_stream_token,
                 users=users,
                 rooms=rooms,
@@ -440,7 +441,7 @@ class Notifier:
             for room in rooms:
                 user_streams |= self.room_to_user_streams.get(room, set())
 
-            if stream_key == "to_device_key":
+            if stream_key == StreamKeyType.TO_DEVICE:
                 issue9533_logger.debug(
                     "to-device messages stream id %s, awaking streams for %s",
                     new_token,
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 350762f494..a52e25c1af 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -43,7 +43,7 @@ from synapse.replication.tcp.streams.events import (
     EventsStreamEventRow,
     EventsStreamRow,
 )
-from synapse.types import PersistedEventPosition, ReadReceipt, UserID
+from synapse.types import PersistedEventPosition, ReadReceipt, StreamKeyType, UserID
 from synapse.util.async_helpers import Linearizer, timeout_deferred
 from synapse.util.metrics import Measure
 
@@ -153,19 +153,19 @@ class ReplicationDataHandler:
         if stream_name == TypingStream.NAME:
             self._typing_handler.process_replication_rows(token, rows)
             self.notifier.on_new_event(
-                "typing_key", token, rooms=[row.room_id for row in rows]
+                StreamKeyType.TYPING, token, rooms=[row.room_id for row in rows]
             )
         elif stream_name == PushRulesStream.NAME:
             self.notifier.on_new_event(
-                "push_rules_key", token, users=[row.user_id for row in rows]
+                StreamKeyType.PUSH_RULES, token, users=[row.user_id for row in rows]
             )
         elif stream_name in (AccountDataStream.NAME, TagAccountDataStream.NAME):
             self.notifier.on_new_event(
-                "account_data_key", token, users=[row.user_id for row in rows]
+                StreamKeyType.ACCOUNT_DATA, token, users=[row.user_id for row in rows]
             )
         elif stream_name == ReceiptsStream.NAME:
             self.notifier.on_new_event(
-                "receipt_key", token, rooms=[row.room_id for row in rows]
+                StreamKeyType.RECEIPT, token, rooms=[row.room_id for row in rows]
             )
             await self._pusher_pool.on_new_receipts(
                 token, token, {row.room_id for row in rows}
@@ -173,14 +173,18 @@ class ReplicationDataHandler:
         elif stream_name == ToDeviceStream.NAME:
             entities = [row.entity for row in rows if row.entity.startswith("@")]
             if entities:
-                self.notifier.on_new_event("to_device_key", token, users=entities)
+                self.notifier.on_new_event(
+                    StreamKeyType.TO_DEVICE, token, users=entities
+                )
         elif stream_name == DeviceListsStream.NAME:
             all_room_ids: Set[str] = set()
             for row in rows:
                 if row.entity.startswith("@"):
                     room_ids = await self.store.get_rooms_for_user(row.entity)
                     all_room_ids.update(room_ids)
-            self.notifier.on_new_event("device_list_key", token, rooms=all_room_ids)
+            self.notifier.on_new_event(
+                StreamKeyType.DEVICE_LIST, token, rooms=all_room_ids
+            )
         elif stream_name == GroupServerStream.NAME:
             self.notifier.on_new_event(
                 "groups_key", token, users=[row.user_id for row in rows]
diff --git a/synapse/server_notices/server_notices_manager.py b/synapse/server_notices/server_notices_manager.py
index c2c37e1015..8ecab86ec7 100644
--- a/synapse/server_notices/server_notices_manager.py
+++ b/synapse/server_notices/server_notices_manager.py
@@ -16,7 +16,7 @@ from typing import TYPE_CHECKING, Optional
 
 from synapse.api.constants import EventTypes, Membership, RoomCreationPreset
 from synapse.events import EventBase
-from synapse.types import Requester, UserID, create_requester
+from synapse.types import Requester, StreamKeyType, UserID, create_requester
 from synapse.util.caches.descriptors import cached
 
 if TYPE_CHECKING:
@@ -189,7 +189,7 @@ class ServerNoticesManager:
         max_id = await self._account_data_handler.add_tag_to_room(
             user_id, room_id, SERVER_NOTICE_ROOM_TAG, {}
         )
-        self._notifier.on_new_event("account_data_key", max_id, users=[user_id])
+        self._notifier.on_new_event(StreamKeyType.ACCOUNT_DATA, max_id, users=[user_id])
 
         logger.info("Created server notices room %s for %s", room_id, user_id)
         return room_id
diff --git a/synapse/storage/databases/main/e2e_room_keys.py b/synapse/storage/databases/main/e2e_room_keys.py
index b789a588a5..af59be6b48 100644
--- a/synapse/storage/databases/main/e2e_room_keys.py
+++ b/synapse/storage/databases/main/e2e_room_keys.py
@@ -21,7 +21,7 @@ from synapse.api.errors import StoreError
 from synapse.logging.opentracing import log_kv, trace
 from synapse.storage._base import SQLBaseStore, db_to_json
 from synapse.storage.database import LoggingTransaction
-from synapse.types import JsonDict, JsonSerializable
+from synapse.types import JsonDict, JsonSerializable, StreamKeyType
 from synapse.util import json_encoder
 
 
@@ -126,7 +126,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
                     "message": "Set room key",
                     "room_id": room_id,
                     "session_id": session_id,
-                    "room_key": room_key,
+                    StreamKeyType.ROOM: room_key,
                 }
             )
 
diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py
index 484976ca6b..fe8fded88b 100644
--- a/synapse/storage/databases/main/relations.py
+++ b/synapse/storage/databases/main/relations.py
@@ -34,7 +34,7 @@ from synapse.storage._base import SQLBaseStore
 from synapse.storage.database import LoggingTransaction, make_in_list_sql_clause
 from synapse.storage.databases.main.stream import generate_pagination_where_clause
 from synapse.storage.engines import PostgresEngine
-from synapse.types import JsonDict, RoomStreamToken, StreamToken
+from synapse.types import JsonDict, RoomStreamToken, StreamKeyType, StreamToken
 from synapse.util.caches.descriptors import cached, cachedList
 
 logger = logging.getLogger(__name__)
@@ -161,7 +161,9 @@ class RelationsWorkerStore(SQLBaseStore):
             if len(events) > limit and last_topo_id and last_stream_id:
                 next_key = RoomStreamToken(last_topo_id, last_stream_id)
                 if from_token:
-                    next_token = from_token.copy_and_replace("room_key", next_key)
+                    next_token = from_token.copy_and_replace(
+                        StreamKeyType.ROOM, next_key
+                    )
                 else:
                     next_token = StreamToken(
                         room_key=next_key,
diff --git a/synapse/types.py b/synapse/types.py
index 325332a6e0..bd8071d51d 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -37,7 +37,7 @@ import attr
 from frozendict import frozendict
 from signedjson.key import decode_verify_key_bytes
 from signedjson.types import VerifyKey
-from typing_extensions import TypedDict
+from typing_extensions import Final, TypedDict
 from unpaddedbase64 import decode_base64
 from zope.interface import Interface
 
@@ -630,6 +630,22 @@ class RoomStreamToken:
             return "s%d" % (self.stream,)
 
 
+class StreamKeyType:
+    """Known stream types.
+
+    A stream is a list of entities ordered by an incrementing "stream token".
+    """
+
+    ROOM: Final = "room_key"
+    PRESENCE: Final = "presence_key"
+    TYPING: Final = "typing_key"
+    RECEIPT: Final = "receipt_key"
+    ACCOUNT_DATA: Final = "account_data_key"
+    PUSH_RULES: Final = "push_rules_key"
+    TO_DEVICE: Final = "to_device_key"
+    DEVICE_LIST: Final = "device_list_key"
+
+
 @attr.s(slots=True, frozen=True, auto_attribs=True)
 class StreamToken:
     """A collection of keys joined together by underscores in the following
@@ -743,9 +759,9 @@ class StreamToken:
 
         :raises TypeError: if `key` is not the one of the keys tracked by a StreamToken.
         """
-        if key == "room_key":
+        if key == StreamKeyType.ROOM:
             new_token = self.copy_and_replace(
-                "room_key", self.room_key.copy_and_advance(new_value)
+                StreamKeyType.ROOM, self.room_key.copy_and_advance(new_value)
             )
             return new_token