diff --git a/synapse/handlers/account_data.py b/synapse/handlers/account_data.py
index 4af9fbc5d1..0478448b47 100644
--- a/synapse/handlers/account_data.py
+++ b/synapse/handlers/account_data.py
@@ -23,7 +23,7 @@ from synapse.replication.http.account_data import (
ReplicationUserAccountDataRestServlet,
)
from synapse.streams import EventSource
-from synapse.types import JsonDict, UserID
+from synapse.types import JsonDict, StreamKeyType, UserID
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -105,7 +105,7 @@ class AccountDataHandler:
)
self._notifier.on_new_event(
- "account_data_key", max_stream_id, users=[user_id]
+ StreamKeyType.ACCOUNT_DATA, max_stream_id, users=[user_id]
)
await self._notify_modules(user_id, room_id, account_data_type, content)
@@ -141,7 +141,7 @@ class AccountDataHandler:
)
self._notifier.on_new_event(
- "account_data_key", max_stream_id, users=[user_id]
+ StreamKeyType.ACCOUNT_DATA, max_stream_id, users=[user_id]
)
await self._notify_modules(user_id, None, account_data_type, content)
@@ -176,7 +176,7 @@ class AccountDataHandler:
)
self._notifier.on_new_event(
- "account_data_key", max_stream_id, users=[user_id]
+ StreamKeyType.ACCOUNT_DATA, max_stream_id, users=[user_id]
)
return max_stream_id
else:
@@ -201,7 +201,7 @@ class AccountDataHandler:
)
self._notifier.on_new_event(
- "account_data_key", max_stream_id, users=[user_id]
+ StreamKeyType.ACCOUNT_DATA, max_stream_id, users=[user_id]
)
return max_stream_id
else:
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 85bd5e4768..1da7bcc85b 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -38,6 +38,7 @@ from synapse.types import (
JsonDict,
RoomAlias,
RoomStreamToken,
+ StreamKeyType,
UserID,
)
from synapse.util.async_helpers import Linearizer
@@ -213,8 +214,8 @@ class ApplicationServicesHandler:
Args:
stream_key: The stream the event came from.
- `stream_key` can be "typing_key", "receipt_key", "presence_key",
- "to_device_key" or "device_list_key". Any other value for `stream_key`
+ `stream_key` can be StreamKeyType.TYPING, StreamKeyType.RECEIPT, StreamKeyType.PRESENCE,
+ StreamKeyType.TO_DEVICE or StreamKeyType.DEVICE_LIST. Any other value for `stream_key`
will cause this function to return early.
Ephemeral events will only be pushed to appservices that have opted into
@@ -235,11 +236,11 @@ class ApplicationServicesHandler:
# Only the following streams are currently supported.
# FIXME: We should use constants for these values.
if stream_key not in (
- "typing_key",
- "receipt_key",
- "presence_key",
- "to_device_key",
- "device_list_key",
+ StreamKeyType.TYPING,
+ StreamKeyType.RECEIPT,
+ StreamKeyType.PRESENCE,
+ StreamKeyType.TO_DEVICE,
+ StreamKeyType.DEVICE_LIST,
):
return
@@ -258,14 +259,14 @@ class ApplicationServicesHandler:
# Ignore to-device messages if the feature flag is not enabled
if (
- stream_key == "to_device_key"
+ stream_key == StreamKeyType.TO_DEVICE
and not self._msc2409_to_device_messages_enabled
):
return
# Ignore device lists if the feature flag is not enabled
if (
- stream_key == "device_list_key"
+ stream_key == StreamKeyType.DEVICE_LIST
and not self._msc3202_transaction_extensions_enabled
):
return
@@ -283,15 +284,15 @@ class ApplicationServicesHandler:
if (
stream_key
in (
- "typing_key",
- "receipt_key",
- "presence_key",
- "to_device_key",
+ StreamKeyType.TYPING,
+ StreamKeyType.RECEIPT,
+ StreamKeyType.PRESENCE,
+ StreamKeyType.TO_DEVICE,
)
and service.supports_ephemeral
)
or (
- stream_key == "device_list_key"
+ stream_key == StreamKeyType.DEVICE_LIST
and service.msc3202_transaction_extensions
)
]
@@ -317,7 +318,7 @@ class ApplicationServicesHandler:
logger.debug("Checking interested services for %s", stream_key)
with Measure(self.clock, "notify_interested_services_ephemeral"):
for service in services:
- if stream_key == "typing_key":
+ if stream_key == StreamKeyType.TYPING:
# Note that we don't persist the token (via set_appservice_stream_type_pos)
# for typing_key due to performance reasons and due to their highly
# ephemeral nature.
@@ -333,7 +334,7 @@ class ApplicationServicesHandler:
async with self._ephemeral_events_linearizer.queue(
(service.id, stream_key)
):
- if stream_key == "receipt_key":
+ if stream_key == StreamKeyType.RECEIPT:
events = await self._handle_receipts(service, new_token)
self.scheduler.enqueue_for_appservice(service, ephemeral=events)
@@ -342,7 +343,7 @@ class ApplicationServicesHandler:
service, "read_receipt", new_token
)
- elif stream_key == "presence_key":
+ elif stream_key == StreamKeyType.PRESENCE:
events = await self._handle_presence(service, users, new_token)
self.scheduler.enqueue_for_appservice(service, ephemeral=events)
@@ -351,7 +352,7 @@ class ApplicationServicesHandler:
service, "presence", new_token
)
- elif stream_key == "to_device_key":
+ elif stream_key == StreamKeyType.TO_DEVICE:
# Retrieve a list of to-device message events, as well as the
# maximum stream token of the messages we were able to retrieve.
to_device_messages = await self._get_to_device_messages(
@@ -366,7 +367,7 @@ class ApplicationServicesHandler:
service, "to_device", new_token
)
- elif stream_key == "device_list_key":
+ elif stream_key == StreamKeyType.DEVICE_LIST:
device_list_summary = await self._get_device_list_summary(
service, new_token
)
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index a91b1ee4d5..1d6d1f8a92 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -43,6 +43,7 @@ from synapse.metrics.background_process_metrics import (
)
from synapse.types import (
JsonDict,
+ StreamKeyType,
StreamToken,
UserID,
get_domain_from_id,
@@ -502,7 +503,7 @@ class DeviceHandler(DeviceWorkerHandler):
# specify the user ID too since the user should always get their own device list
# updates, even if they aren't in any rooms.
self.notifier.on_new_event(
- "device_list_key", position, users={user_id}, rooms=room_ids
+ StreamKeyType.DEVICE_LIST, position, users={user_id}, rooms=room_ids
)
# We may need to do some processing asynchronously for local user IDs.
@@ -523,7 +524,9 @@ class DeviceHandler(DeviceWorkerHandler):
from_user_id, user_ids
)
- self.notifier.on_new_event("device_list_key", position, users=[from_user_id])
+ self.notifier.on_new_event(
+ StreamKeyType.DEVICE_LIST, position, users=[from_user_id]
+ )
async def user_left_room(self, user: UserID, room_id: str) -> None:
user_id = user.to_string()
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index 4cb725d027..53668cce3b 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -26,7 +26,7 @@ from synapse.logging.opentracing import (
set_tag,
)
from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
-from synapse.types import JsonDict, Requester, UserID, get_domain_from_id
+from synapse.types import JsonDict, Requester, StreamKeyType, UserID, get_domain_from_id
from synapse.util import json_encoder
from synapse.util.stringutils import random_string
@@ -151,7 +151,7 @@ class DeviceMessageHandler:
# Notify listeners that there are new to-device messages to process,
# handing them the latest stream id.
self.notifier.on_new_event(
- "to_device_key", last_stream_id, users=local_messages.keys()
+ StreamKeyType.TO_DEVICE, last_stream_id, users=local_messages.keys()
)
async def _check_for_unknown_devices(
@@ -285,7 +285,7 @@ class DeviceMessageHandler:
# Notify listeners that there are new to-device messages to process,
# handing them the latest stream id.
self.notifier.on_new_event(
- "to_device_key", last_stream_id, users=local_messages.keys()
+ StreamKeyType.TO_DEVICE, last_stream_id, users=local_messages.keys()
)
if self.federation_sender:
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index de09aed3a3..d79248ad90 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -30,6 +30,7 @@ from synapse.types import (
Requester,
RoomStreamToken,
StateMap,
+ StreamKeyType,
StreamToken,
UserID,
)
@@ -220,8 +221,10 @@ class InitialSyncHandler:
self.storage, user_id, messages
)
- start_token = now_token.copy_and_replace("room_key", token)
- end_token = now_token.copy_and_replace("room_key", room_end_token)
+ start_token = now_token.copy_and_replace(StreamKeyType.ROOM, token)
+ end_token = now_token.copy_and_replace(
+ StreamKeyType.ROOM, room_end_token
+ )
time_now = self.clock.time_msec()
d["messages"] = {
@@ -369,8 +372,8 @@ class InitialSyncHandler:
self.storage, user_id, messages, is_peeking=is_peeking
)
- start_token = StreamToken.START.copy_and_replace("room_key", token)
- end_token = StreamToken.START.copy_and_replace("room_key", stream_token)
+ start_token = StreamToken.START.copy_and_replace(StreamKeyType.ROOM, token)
+ end_token = StreamToken.START.copy_and_replace(StreamKeyType.ROOM, stream_token)
time_now = self.clock.time_msec()
@@ -474,7 +477,7 @@ class InitialSyncHandler:
self.storage, user_id, messages, is_peeking=is_peeking
)
- start_token = now_token.copy_and_replace("room_key", token)
+ start_token = now_token.copy_and_replace(StreamKeyType.ROOM, token)
end_token = now_token
time_now = self.clock.time_msec()
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 2e30180094..6ae88add95 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -27,7 +27,7 @@ from synapse.handlers.room import ShutdownRoomResponse
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.state import StateFilter
from synapse.streams.config import PaginationConfig
-from synapse.types import JsonDict, Requester
+from synapse.types import JsonDict, Requester, StreamKeyType
from synapse.util.async_helpers import ReadWriteLock
from synapse.util.stringutils import random_string
from synapse.visibility import filter_events_for_client
@@ -491,7 +491,7 @@ class PaginationHandler:
if leave_token.topological < curr_topo:
from_token = from_token.copy_and_replace(
- "room_key", leave_token
+ StreamKeyType.ROOM, leave_token
)
await self.hs.get_federation_handler().maybe_backfill(
@@ -513,7 +513,7 @@ class PaginationHandler:
event_filter=event_filter,
)
- next_token = from_token.copy_and_replace("room_key", next_key)
+ next_token = from_token.copy_and_replace(StreamKeyType.ROOM, next_key)
if events:
if event_filter:
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 268481ec19..dd84e6c88b 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -66,7 +66,7 @@ from synapse.replication.tcp.commands import ClearUserSyncsCommand
from synapse.replication.tcp.streams import PresenceFederationStream, PresenceStream
from synapse.storage.databases.main import DataStore
from synapse.streams import EventSource
-from synapse.types import JsonDict, UserID, get_domain_from_id
+from synapse.types import JsonDict, StreamKeyType, UserID, get_domain_from_id
from synapse.util.async_helpers import Linearizer
from synapse.util.caches.descriptors import _CacheContext, cached
from synapse.util.metrics import Measure
@@ -522,7 +522,7 @@ class WorkerPresenceHandler(BasePresenceHandler):
room_ids_to_states, users_to_states = parties
self.notifier.on_new_event(
- "presence_key",
+ StreamKeyType.PRESENCE,
stream_id,
rooms=room_ids_to_states.keys(),
users=users_to_states.keys(),
@@ -1145,7 +1145,7 @@ class PresenceHandler(BasePresenceHandler):
room_ids_to_states, users_to_states = parties
self.notifier.on_new_event(
- "presence_key",
+ StreamKeyType.PRESENCE,
stream_id,
rooms=room_ids_to_states.keys(),
users=[UserID.from_string(u) for u in users_to_states],
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 550d58b0e1..e6a35f1d09 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -17,7 +17,13 @@ from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple
from synapse.api.constants import ReceiptTypes
from synapse.appservice import ApplicationService
from synapse.streams import EventSource
-from synapse.types import JsonDict, ReadReceipt, UserID, get_domain_from_id
+from synapse.types import (
+ JsonDict,
+ ReadReceipt,
+ StreamKeyType,
+ UserID,
+ get_domain_from_id,
+)
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -129,7 +135,9 @@ class ReceiptsHandler:
affected_room_ids = list({r.room_id for r in receipts})
- self.notifier.on_new_event("receipt_key", max_batch_id, rooms=affected_room_ids)
+ self.notifier.on_new_event(
+ StreamKeyType.RECEIPT, max_batch_id, rooms=affected_room_ids
+ )
# Note that the min here shouldn't be relied upon to be accurate.
await self.hs.get_pusherpool().on_new_receipts(
min_batch_id, max_batch_id, affected_room_ids
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 23baa50d03..a2973109ad 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -73,6 +73,7 @@ from synapse.types import (
RoomID,
RoomStreamToken,
StateMap,
+ StreamKeyType,
StreamToken,
UserID,
create_requester,
@@ -1292,10 +1293,10 @@ class RoomContextHandler:
events_after=events_after,
state=await filter_evts(state_events),
aggregations=aggregations,
- start=await token.copy_and_replace("room_key", results.start).to_string(
- self.store
- ),
- end=await token.copy_and_replace("room_key", results.end).to_string(
+ start=await token.copy_and_replace(
+ StreamKeyType.ROOM, results.start
+ ).to_string(self.store),
+ end=await token.copy_and_replace(StreamKeyType.ROOM, results.end).to_string(
self.store
),
)
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index 5619f8f50e..cd1c47dae8 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -24,7 +24,7 @@ from synapse.api.errors import NotFoundError, SynapseError
from synapse.api.filtering import Filter
from synapse.events import EventBase
from synapse.storage.state import StateFilter
-from synapse.types import JsonDict, UserID
+from synapse.types import JsonDict, StreamKeyType, UserID
from synapse.visibility import filter_events_for_client
if TYPE_CHECKING:
@@ -655,11 +655,11 @@ class SearchHandler:
"events_before": events_before,
"events_after": events_after,
"start": await now_token.copy_and_replace(
- "room_key", res.start
+ StreamKeyType.ROOM, res.start
+ ).to_string(self.store),
+ "end": await now_token.copy_and_replace(
+ StreamKeyType.ROOM, res.end
).to_string(self.store),
- "end": await now_token.copy_and_replace("room_key", res.end).to_string(
- self.store
- ),
}
if include_profile:
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 2c555a66d0..4be08fe7cb 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -37,6 +37,7 @@ from synapse.types import (
Requester,
RoomStreamToken,
StateMap,
+ StreamKeyType,
StreamToken,
UserID,
)
@@ -449,7 +450,7 @@ class SyncHandler:
room_ids=room_ids,
is_guest=sync_config.is_guest,
)
- now_token = now_token.copy_and_replace("typing_key", typing_key)
+ now_token = now_token.copy_and_replace(StreamKeyType.TYPING, typing_key)
ephemeral_by_room: JsonDict = {}
@@ -471,7 +472,7 @@ class SyncHandler:
room_ids=room_ids,
is_guest=sync_config.is_guest,
)
- now_token = now_token.copy_and_replace("receipt_key", receipt_key)
+ now_token = now_token.copy_and_replace(StreamKeyType.RECEIPT, receipt_key)
for event in receipts:
room_id = event["room_id"]
@@ -537,7 +538,9 @@ class SyncHandler:
prev_batch_token = now_token
if recents:
room_key = recents[0].internal_metadata.before
- prev_batch_token = now_token.copy_and_replace("room_key", room_key)
+ prev_batch_token = now_token.copy_and_replace(
+ StreamKeyType.ROOM, room_key
+ )
return TimelineBatch(
events=recents, prev_batch=prev_batch_token, limited=False
@@ -611,7 +614,7 @@ class SyncHandler:
recents = recents[-timeline_limit:]
room_key = recents[0].internal_metadata.before
- prev_batch_token = now_token.copy_and_replace("room_key", room_key)
+ prev_batch_token = now_token.copy_and_replace(StreamKeyType.ROOM, room_key)
# Don't bother to bundle aggregations if the timeline is unlimited,
# as clients will have all the necessary information.
@@ -1398,7 +1401,7 @@ class SyncHandler:
now_token.to_device_key,
)
sync_result_builder.now_token = now_token.copy_and_replace(
- "to_device_key", stream_id
+ StreamKeyType.TO_DEVICE, stream_id
)
sync_result_builder.to_device = messages
else:
@@ -1503,7 +1506,7 @@ class SyncHandler:
)
assert presence_key
sync_result_builder.now_token = now_token.copy_and_replace(
- "presence_key", presence_key
+ StreamKeyType.PRESENCE, presence_key
)
extra_users_ids = set(newly_joined_or_invited_users)
@@ -1826,7 +1829,7 @@ class SyncHandler:
# stream token as it'll only be used in the context of this
# room. (c.f. the docstring of `to_room_stream_token`).
leave_token = since_token.copy_and_replace(
- "room_key", leave_position.to_room_stream_token()
+ StreamKeyType.ROOM, leave_position.to_room_stream_token()
)
# If this is an out of band message, like a remote invite
@@ -1875,7 +1878,9 @@ class SyncHandler:
if room_entry:
events, start_key = room_entry
- prev_batch_token = now_token.copy_and_replace("room_key", start_key)
+ prev_batch_token = now_token.copy_and_replace(
+ StreamKeyType.ROOM, start_key
+ )
entry = RoomSyncResultBuilder(
room_id=room_id,
@@ -1972,7 +1977,7 @@ class SyncHandler:
continue
leave_token = now_token.copy_and_replace(
- "room_key", RoomStreamToken(None, event.stream_ordering)
+ StreamKeyType.ROOM, RoomStreamToken(None, event.stream_ordering)
)
room_entries.append(
RoomSyncResultBuilder(
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 6854428b7c..bb00750bfd 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -25,7 +25,7 @@ from synapse.metrics.background_process_metrics import (
)
from synapse.replication.tcp.streams import TypingStream
from synapse.streams import EventSource
-from synapse.types import JsonDict, Requester, UserID, get_domain_from_id
+from synapse.types import JsonDict, Requester, StreamKeyType, UserID, get_domain_from_id
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.metrics import Measure
from synapse.util.wheel_timer import WheelTimer
@@ -382,7 +382,7 @@ class TypingWriterHandler(FollowerTypingHandler):
)
self.notifier.on_new_event(
- "typing_key", self._latest_room_serial, rooms=[member.room_id]
+ StreamKeyType.TYPING, self._latest_room_serial, rooms=[member.room_id]
)
async def get_all_typing_updates(
|