summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/account_data.py10
-rw-r--r--synapse/handlers/appservice.py39
-rw-r--r--synapse/handlers/device.py7
-rw-r--r--synapse/handlers/devicemessage.py6
-rw-r--r--synapse/handlers/initial_sync.py13
-rw-r--r--synapse/handlers/pagination.py6
-rw-r--r--synapse/handlers/presence.py6
-rw-r--r--synapse/handlers/receipts.py12
-rw-r--r--synapse/handlers/room.py9
-rw-r--r--synapse/handlers/search.py10
-rw-r--r--synapse/handlers/sync.py23
-rw-r--r--synapse/handlers/typing.py4
12 files changed, 83 insertions, 62 deletions
diff --git a/synapse/handlers/account_data.py b/synapse/handlers/account_data.py
index 4af9fbc5d1..0478448b47 100644
--- a/synapse/handlers/account_data.py
+++ b/synapse/handlers/account_data.py
@@ -23,7 +23,7 @@ from synapse.replication.http.account_data import (
     ReplicationUserAccountDataRestServlet,
 )
 from synapse.streams import EventSource
-from synapse.types import JsonDict, UserID
+from synapse.types import JsonDict, StreamKeyType, UserID
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -105,7 +105,7 @@ class AccountDataHandler:
             )
 
             self._notifier.on_new_event(
-                "account_data_key", max_stream_id, users=[user_id]
+                StreamKeyType.ACCOUNT_DATA, max_stream_id, users=[user_id]
             )
 
             await self._notify_modules(user_id, room_id, account_data_type, content)
@@ -141,7 +141,7 @@ class AccountDataHandler:
             )
 
             self._notifier.on_new_event(
-                "account_data_key", max_stream_id, users=[user_id]
+                StreamKeyType.ACCOUNT_DATA, max_stream_id, users=[user_id]
             )
 
             await self._notify_modules(user_id, None, account_data_type, content)
@@ -176,7 +176,7 @@ class AccountDataHandler:
             )
 
             self._notifier.on_new_event(
-                "account_data_key", max_stream_id, users=[user_id]
+                StreamKeyType.ACCOUNT_DATA, max_stream_id, users=[user_id]
             )
             return max_stream_id
         else:
@@ -201,7 +201,7 @@ class AccountDataHandler:
             )
 
             self._notifier.on_new_event(
-                "account_data_key", max_stream_id, users=[user_id]
+                StreamKeyType.ACCOUNT_DATA, max_stream_id, users=[user_id]
             )
             return max_stream_id
         else:
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 85bd5e4768..1da7bcc85b 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -38,6 +38,7 @@ from synapse.types import (
     JsonDict,
     RoomAlias,
     RoomStreamToken,
+    StreamKeyType,
     UserID,
 )
 from synapse.util.async_helpers import Linearizer
@@ -213,8 +214,8 @@ class ApplicationServicesHandler:
         Args:
             stream_key: The stream the event came from.
 
-                `stream_key` can be "typing_key", "receipt_key", "presence_key",
-                "to_device_key" or "device_list_key". Any other value for `stream_key`
+                `stream_key` can be StreamKeyType.TYPING, StreamKeyType.RECEIPT, StreamKeyType.PRESENCE,
+                StreamKeyType.TO_DEVICE or StreamKeyType.DEVICE_LIST. Any other value for `stream_key`
                 will cause this function to return early.
 
                 Ephemeral events will only be pushed to appservices that have opted into
@@ -235,11 +236,11 @@ class ApplicationServicesHandler:
         # Only the following streams are currently supported.
         # FIXME: We should use constants for these values.
         if stream_key not in (
-            "typing_key",
-            "receipt_key",
-            "presence_key",
-            "to_device_key",
-            "device_list_key",
+            StreamKeyType.TYPING,
+            StreamKeyType.RECEIPT,
+            StreamKeyType.PRESENCE,
+            StreamKeyType.TO_DEVICE,
+            StreamKeyType.DEVICE_LIST,
         ):
             return
 
@@ -258,14 +259,14 @@ class ApplicationServicesHandler:
 
         # Ignore to-device messages if the feature flag is not enabled
         if (
-            stream_key == "to_device_key"
+            stream_key == StreamKeyType.TO_DEVICE
             and not self._msc2409_to_device_messages_enabled
         ):
             return
 
         # Ignore device lists if the feature flag is not enabled
         if (
-            stream_key == "device_list_key"
+            stream_key == StreamKeyType.DEVICE_LIST
             and not self._msc3202_transaction_extensions_enabled
         ):
             return
@@ -283,15 +284,15 @@ class ApplicationServicesHandler:
             if (
                 stream_key
                 in (
-                    "typing_key",
-                    "receipt_key",
-                    "presence_key",
-                    "to_device_key",
+                    StreamKeyType.TYPING,
+                    StreamKeyType.RECEIPT,
+                    StreamKeyType.PRESENCE,
+                    StreamKeyType.TO_DEVICE,
                 )
                 and service.supports_ephemeral
             )
             or (
-                stream_key == "device_list_key"
+                stream_key == StreamKeyType.DEVICE_LIST
                 and service.msc3202_transaction_extensions
             )
         ]
@@ -317,7 +318,7 @@ class ApplicationServicesHandler:
         logger.debug("Checking interested services for %s", stream_key)
         with Measure(self.clock, "notify_interested_services_ephemeral"):
             for service in services:
-                if stream_key == "typing_key":
+                if stream_key == StreamKeyType.TYPING:
                     # Note that we don't persist the token (via set_appservice_stream_type_pos)
                     # for typing_key due to performance reasons and due to their highly
                     # ephemeral nature.
@@ -333,7 +334,7 @@ class ApplicationServicesHandler:
                 async with self._ephemeral_events_linearizer.queue(
                     (service.id, stream_key)
                 ):
-                    if stream_key == "receipt_key":
+                    if stream_key == StreamKeyType.RECEIPT:
                         events = await self._handle_receipts(service, new_token)
                         self.scheduler.enqueue_for_appservice(service, ephemeral=events)
 
@@ -342,7 +343,7 @@ class ApplicationServicesHandler:
                             service, "read_receipt", new_token
                         )
 
-                    elif stream_key == "presence_key":
+                    elif stream_key == StreamKeyType.PRESENCE:
                         events = await self._handle_presence(service, users, new_token)
                         self.scheduler.enqueue_for_appservice(service, ephemeral=events)
 
@@ -351,7 +352,7 @@ class ApplicationServicesHandler:
                             service, "presence", new_token
                         )
 
-                    elif stream_key == "to_device_key":
+                    elif stream_key == StreamKeyType.TO_DEVICE:
                         # Retrieve a list of to-device message events, as well as the
                         # maximum stream token of the messages we were able to retrieve.
                         to_device_messages = await self._get_to_device_messages(
@@ -366,7 +367,7 @@ class ApplicationServicesHandler:
                             service, "to_device", new_token
                         )
 
-                    elif stream_key == "device_list_key":
+                    elif stream_key == StreamKeyType.DEVICE_LIST:
                         device_list_summary = await self._get_device_list_summary(
                             service, new_token
                         )
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index a91b1ee4d5..1d6d1f8a92 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -43,6 +43,7 @@ from synapse.metrics.background_process_metrics import (
 )
 from synapse.types import (
     JsonDict,
+    StreamKeyType,
     StreamToken,
     UserID,
     get_domain_from_id,
@@ -502,7 +503,7 @@ class DeviceHandler(DeviceWorkerHandler):
         # specify the user ID too since the user should always get their own device list
         # updates, even if they aren't in any rooms.
         self.notifier.on_new_event(
-            "device_list_key", position, users={user_id}, rooms=room_ids
+            StreamKeyType.DEVICE_LIST, position, users={user_id}, rooms=room_ids
         )
 
         # We may need to do some processing asynchronously for local user IDs.
@@ -523,7 +524,9 @@ class DeviceHandler(DeviceWorkerHandler):
             from_user_id, user_ids
         )
 
-        self.notifier.on_new_event("device_list_key", position, users=[from_user_id])
+        self.notifier.on_new_event(
+            StreamKeyType.DEVICE_LIST, position, users=[from_user_id]
+        )
 
     async def user_left_room(self, user: UserID, room_id: str) -> None:
         user_id = user.to_string()
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index 4cb725d027..53668cce3b 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -26,7 +26,7 @@ from synapse.logging.opentracing import (
     set_tag,
 )
 from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
-from synapse.types import JsonDict, Requester, UserID, get_domain_from_id
+from synapse.types import JsonDict, Requester, StreamKeyType, UserID, get_domain_from_id
 from synapse.util import json_encoder
 from synapse.util.stringutils import random_string
 
@@ -151,7 +151,7 @@ class DeviceMessageHandler:
         # Notify listeners that there are new to-device messages to process,
         # handing them the latest stream id.
         self.notifier.on_new_event(
-            "to_device_key", last_stream_id, users=local_messages.keys()
+            StreamKeyType.TO_DEVICE, last_stream_id, users=local_messages.keys()
         )
 
     async def _check_for_unknown_devices(
@@ -285,7 +285,7 @@ class DeviceMessageHandler:
         # Notify listeners that there are new to-device messages to process,
         # handing them the latest stream id.
         self.notifier.on_new_event(
-            "to_device_key", last_stream_id, users=local_messages.keys()
+            StreamKeyType.TO_DEVICE, last_stream_id, users=local_messages.keys()
         )
 
         if self.federation_sender:
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index de09aed3a3..d79248ad90 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -30,6 +30,7 @@ from synapse.types import (
     Requester,
     RoomStreamToken,
     StateMap,
+    StreamKeyType,
     StreamToken,
     UserID,
 )
@@ -220,8 +221,10 @@ class InitialSyncHandler:
                     self.storage, user_id, messages
                 )
 
-                start_token = now_token.copy_and_replace("room_key", token)
-                end_token = now_token.copy_and_replace("room_key", room_end_token)
+                start_token = now_token.copy_and_replace(StreamKeyType.ROOM, token)
+                end_token = now_token.copy_and_replace(
+                    StreamKeyType.ROOM, room_end_token
+                )
                 time_now = self.clock.time_msec()
 
                 d["messages"] = {
@@ -369,8 +372,8 @@ class InitialSyncHandler:
             self.storage, user_id, messages, is_peeking=is_peeking
         )
 
-        start_token = StreamToken.START.copy_and_replace("room_key", token)
-        end_token = StreamToken.START.copy_and_replace("room_key", stream_token)
+        start_token = StreamToken.START.copy_and_replace(StreamKeyType.ROOM, token)
+        end_token = StreamToken.START.copy_and_replace(StreamKeyType.ROOM, stream_token)
 
         time_now = self.clock.time_msec()
 
@@ -474,7 +477,7 @@ class InitialSyncHandler:
             self.storage, user_id, messages, is_peeking=is_peeking
         )
 
-        start_token = now_token.copy_and_replace("room_key", token)
+        start_token = now_token.copy_and_replace(StreamKeyType.ROOM, token)
         end_token = now_token
 
         time_now = self.clock.time_msec()
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 2e30180094..6ae88add95 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -27,7 +27,7 @@ from synapse.handlers.room import ShutdownRoomResponse
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage.state import StateFilter
 from synapse.streams.config import PaginationConfig
-from synapse.types import JsonDict, Requester
+from synapse.types import JsonDict, Requester, StreamKeyType
 from synapse.util.async_helpers import ReadWriteLock
 from synapse.util.stringutils import random_string
 from synapse.visibility import filter_events_for_client
@@ -491,7 +491,7 @@ class PaginationHandler:
 
                     if leave_token.topological < curr_topo:
                         from_token = from_token.copy_and_replace(
-                            "room_key", leave_token
+                            StreamKeyType.ROOM, leave_token
                         )
 
                 await self.hs.get_federation_handler().maybe_backfill(
@@ -513,7 +513,7 @@ class PaginationHandler:
                 event_filter=event_filter,
             )
 
-            next_token = from_token.copy_and_replace("room_key", next_key)
+            next_token = from_token.copy_and_replace(StreamKeyType.ROOM, next_key)
 
         if events:
             if event_filter:
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 268481ec19..dd84e6c88b 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -66,7 +66,7 @@ from synapse.replication.tcp.commands import ClearUserSyncsCommand
 from synapse.replication.tcp.streams import PresenceFederationStream, PresenceStream
 from synapse.storage.databases.main import DataStore
 from synapse.streams import EventSource
-from synapse.types import JsonDict, UserID, get_domain_from_id
+from synapse.types import JsonDict, StreamKeyType, UserID, get_domain_from_id
 from synapse.util.async_helpers import Linearizer
 from synapse.util.caches.descriptors import _CacheContext, cached
 from synapse.util.metrics import Measure
@@ -522,7 +522,7 @@ class WorkerPresenceHandler(BasePresenceHandler):
         room_ids_to_states, users_to_states = parties
 
         self.notifier.on_new_event(
-            "presence_key",
+            StreamKeyType.PRESENCE,
             stream_id,
             rooms=room_ids_to_states.keys(),
             users=users_to_states.keys(),
@@ -1145,7 +1145,7 @@ class PresenceHandler(BasePresenceHandler):
         room_ids_to_states, users_to_states = parties
 
         self.notifier.on_new_event(
-            "presence_key",
+            StreamKeyType.PRESENCE,
             stream_id,
             rooms=room_ids_to_states.keys(),
             users=[UserID.from_string(u) for u in users_to_states],
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 550d58b0e1..e6a35f1d09 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -17,7 +17,13 @@ from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple
 from synapse.api.constants import ReceiptTypes
 from synapse.appservice import ApplicationService
 from synapse.streams import EventSource
-from synapse.types import JsonDict, ReadReceipt, UserID, get_domain_from_id
+from synapse.types import (
+    JsonDict,
+    ReadReceipt,
+    StreamKeyType,
+    UserID,
+    get_domain_from_id,
+)
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -129,7 +135,9 @@ class ReceiptsHandler:
 
         affected_room_ids = list({r.room_id for r in receipts})
 
-        self.notifier.on_new_event("receipt_key", max_batch_id, rooms=affected_room_ids)
+        self.notifier.on_new_event(
+            StreamKeyType.RECEIPT, max_batch_id, rooms=affected_room_ids
+        )
         # Note that the min here shouldn't be relied upon to be accurate.
         await self.hs.get_pusherpool().on_new_receipts(
             min_batch_id, max_batch_id, affected_room_ids
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 23baa50d03..a2973109ad 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -73,6 +73,7 @@ from synapse.types import (
     RoomID,
     RoomStreamToken,
     StateMap,
+    StreamKeyType,
     StreamToken,
     UserID,
     create_requester,
@@ -1292,10 +1293,10 @@ class RoomContextHandler:
             events_after=events_after,
             state=await filter_evts(state_events),
             aggregations=aggregations,
-            start=await token.copy_and_replace("room_key", results.start).to_string(
-                self.store
-            ),
-            end=await token.copy_and_replace("room_key", results.end).to_string(
+            start=await token.copy_and_replace(
+                StreamKeyType.ROOM, results.start
+            ).to_string(self.store),
+            end=await token.copy_and_replace(StreamKeyType.ROOM, results.end).to_string(
                 self.store
             ),
         )
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index 5619f8f50e..cd1c47dae8 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -24,7 +24,7 @@ from synapse.api.errors import NotFoundError, SynapseError
 from synapse.api.filtering import Filter
 from synapse.events import EventBase
 from synapse.storage.state import StateFilter
-from synapse.types import JsonDict, UserID
+from synapse.types import JsonDict, StreamKeyType, UserID
 from synapse.visibility import filter_events_for_client
 
 if TYPE_CHECKING:
@@ -655,11 +655,11 @@ class SearchHandler:
                 "events_before": events_before,
                 "events_after": events_after,
                 "start": await now_token.copy_and_replace(
-                    "room_key", res.start
+                    StreamKeyType.ROOM, res.start
+                ).to_string(self.store),
+                "end": await now_token.copy_and_replace(
+                    StreamKeyType.ROOM, res.end
                 ).to_string(self.store),
-                "end": await now_token.copy_and_replace("room_key", res.end).to_string(
-                    self.store
-                ),
             }
 
             if include_profile:
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 2c555a66d0..4be08fe7cb 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -37,6 +37,7 @@ from synapse.types import (
     Requester,
     RoomStreamToken,
     StateMap,
+    StreamKeyType,
     StreamToken,
     UserID,
 )
@@ -449,7 +450,7 @@ class SyncHandler:
                 room_ids=room_ids,
                 is_guest=sync_config.is_guest,
             )
-            now_token = now_token.copy_and_replace("typing_key", typing_key)
+            now_token = now_token.copy_and_replace(StreamKeyType.TYPING, typing_key)
 
             ephemeral_by_room: JsonDict = {}
 
@@ -471,7 +472,7 @@ class SyncHandler:
                 room_ids=room_ids,
                 is_guest=sync_config.is_guest,
             )
-            now_token = now_token.copy_and_replace("receipt_key", receipt_key)
+            now_token = now_token.copy_and_replace(StreamKeyType.RECEIPT, receipt_key)
 
             for event in receipts:
                 room_id = event["room_id"]
@@ -537,7 +538,9 @@ class SyncHandler:
                 prev_batch_token = now_token
                 if recents:
                     room_key = recents[0].internal_metadata.before
-                    prev_batch_token = now_token.copy_and_replace("room_key", room_key)
+                    prev_batch_token = now_token.copy_and_replace(
+                        StreamKeyType.ROOM, room_key
+                    )
 
                 return TimelineBatch(
                     events=recents, prev_batch=prev_batch_token, limited=False
@@ -611,7 +614,7 @@ class SyncHandler:
                 recents = recents[-timeline_limit:]
                 room_key = recents[0].internal_metadata.before
 
-            prev_batch_token = now_token.copy_and_replace("room_key", room_key)
+            prev_batch_token = now_token.copy_and_replace(StreamKeyType.ROOM, room_key)
 
         # Don't bother to bundle aggregations if the timeline is unlimited,
         # as clients will have all the necessary information.
@@ -1398,7 +1401,7 @@ class SyncHandler:
                 now_token.to_device_key,
             )
             sync_result_builder.now_token = now_token.copy_and_replace(
-                "to_device_key", stream_id
+                StreamKeyType.TO_DEVICE, stream_id
             )
             sync_result_builder.to_device = messages
         else:
@@ -1503,7 +1506,7 @@ class SyncHandler:
         )
         assert presence_key
         sync_result_builder.now_token = now_token.copy_and_replace(
-            "presence_key", presence_key
+            StreamKeyType.PRESENCE, presence_key
         )
 
         extra_users_ids = set(newly_joined_or_invited_users)
@@ -1826,7 +1829,7 @@ class SyncHandler:
                 # stream token as it'll only be used in the context of this
                 # room. (c.f. the docstring of `to_room_stream_token`).
                 leave_token = since_token.copy_and_replace(
-                    "room_key", leave_position.to_room_stream_token()
+                    StreamKeyType.ROOM, leave_position.to_room_stream_token()
                 )
 
                 # If this is an out of band message, like a remote invite
@@ -1875,7 +1878,9 @@ class SyncHandler:
             if room_entry:
                 events, start_key = room_entry
 
-                prev_batch_token = now_token.copy_and_replace("room_key", start_key)
+                prev_batch_token = now_token.copy_and_replace(
+                    StreamKeyType.ROOM, start_key
+                )
 
                 entry = RoomSyncResultBuilder(
                     room_id=room_id,
@@ -1972,7 +1977,7 @@ class SyncHandler:
                             continue
 
                 leave_token = now_token.copy_and_replace(
-                    "room_key", RoomStreamToken(None, event.stream_ordering)
+                    StreamKeyType.ROOM, RoomStreamToken(None, event.stream_ordering)
                 )
                 room_entries.append(
                     RoomSyncResultBuilder(
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 6854428b7c..bb00750bfd 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -25,7 +25,7 @@ from synapse.metrics.background_process_metrics import (
 )
 from synapse.replication.tcp.streams import TypingStream
 from synapse.streams import EventSource
-from synapse.types import JsonDict, Requester, UserID, get_domain_from_id
+from synapse.types import JsonDict, Requester, StreamKeyType, UserID, get_domain_from_id
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.metrics import Measure
 from synapse.util.wheel_timer import WheelTimer
@@ -382,7 +382,7 @@ class TypingWriterHandler(FollowerTypingHandler):
         )
 
         self.notifier.on_new_event(
-            "typing_key", self._latest_room_serial, rooms=[member.room_id]
+            StreamKeyType.TYPING, self._latest_room_serial, rooms=[member.room_id]
         )
 
     async def get_all_typing_updates(