diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/account_data.py | 10 | ||||
-rw-r--r-- | synapse/handlers/appservice.py | 39 | ||||
-rw-r--r-- | synapse/handlers/device.py | 7 | ||||
-rw-r--r-- | synapse/handlers/devicemessage.py | 6 | ||||
-rw-r--r-- | synapse/handlers/initial_sync.py | 13 | ||||
-rw-r--r-- | synapse/handlers/pagination.py | 6 | ||||
-rw-r--r-- | synapse/handlers/presence.py | 6 | ||||
-rw-r--r-- | synapse/handlers/receipts.py | 12 | ||||
-rw-r--r-- | synapse/handlers/room.py | 9 | ||||
-rw-r--r-- | synapse/handlers/search.py | 10 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 23 | ||||
-rw-r--r-- | synapse/handlers/typing.py | 4 |
12 files changed, 83 insertions, 62 deletions
diff --git a/synapse/handlers/account_data.py b/synapse/handlers/account_data.py index 4af9fbc5d1..0478448b47 100644 --- a/synapse/handlers/account_data.py +++ b/synapse/handlers/account_data.py @@ -23,7 +23,7 @@ from synapse.replication.http.account_data import ( ReplicationUserAccountDataRestServlet, ) from synapse.streams import EventSource -from synapse.types import JsonDict, UserID +from synapse.types import JsonDict, StreamKeyType, UserID if TYPE_CHECKING: from synapse.server import HomeServer @@ -105,7 +105,7 @@ class AccountDataHandler: ) self._notifier.on_new_event( - "account_data_key", max_stream_id, users=[user_id] + StreamKeyType.ACCOUNT_DATA, max_stream_id, users=[user_id] ) await self._notify_modules(user_id, room_id, account_data_type, content) @@ -141,7 +141,7 @@ class AccountDataHandler: ) self._notifier.on_new_event( - "account_data_key", max_stream_id, users=[user_id] + StreamKeyType.ACCOUNT_DATA, max_stream_id, users=[user_id] ) await self._notify_modules(user_id, None, account_data_type, content) @@ -176,7 +176,7 @@ class AccountDataHandler: ) self._notifier.on_new_event( - "account_data_key", max_stream_id, users=[user_id] + StreamKeyType.ACCOUNT_DATA, max_stream_id, users=[user_id] ) return max_stream_id else: @@ -201,7 +201,7 @@ class AccountDataHandler: ) self._notifier.on_new_event( - "account_data_key", max_stream_id, users=[user_id] + StreamKeyType.ACCOUNT_DATA, max_stream_id, users=[user_id] ) return max_stream_id else: diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 85bd5e4768..1da7bcc85b 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -38,6 +38,7 @@ from synapse.types import ( JsonDict, RoomAlias, RoomStreamToken, + StreamKeyType, UserID, ) from synapse.util.async_helpers import Linearizer @@ -213,8 +214,8 @@ class ApplicationServicesHandler: Args: stream_key: The stream the event came from. - `stream_key` can be "typing_key", "receipt_key", "presence_key", - "to_device_key" or "device_list_key". Any other value for `stream_key` + `stream_key` can be StreamKeyType.TYPING, StreamKeyType.RECEIPT, StreamKeyType.PRESENCE, + StreamKeyType.TO_DEVICE or StreamKeyType.DEVICE_LIST. Any other value for `stream_key` will cause this function to return early. Ephemeral events will only be pushed to appservices that have opted into @@ -235,11 +236,11 @@ class ApplicationServicesHandler: # Only the following streams are currently supported. # FIXME: We should use constants for these values. if stream_key not in ( - "typing_key", - "receipt_key", - "presence_key", - "to_device_key", - "device_list_key", + StreamKeyType.TYPING, + StreamKeyType.RECEIPT, + StreamKeyType.PRESENCE, + StreamKeyType.TO_DEVICE, + StreamKeyType.DEVICE_LIST, ): return @@ -258,14 +259,14 @@ class ApplicationServicesHandler: # Ignore to-device messages if the feature flag is not enabled if ( - stream_key == "to_device_key" + stream_key == StreamKeyType.TO_DEVICE and not self._msc2409_to_device_messages_enabled ): return # Ignore device lists if the feature flag is not enabled if ( - stream_key == "device_list_key" + stream_key == StreamKeyType.DEVICE_LIST and not self._msc3202_transaction_extensions_enabled ): return @@ -283,15 +284,15 @@ class ApplicationServicesHandler: if ( stream_key in ( - "typing_key", - "receipt_key", - "presence_key", - "to_device_key", + StreamKeyType.TYPING, + StreamKeyType.RECEIPT, + StreamKeyType.PRESENCE, + StreamKeyType.TO_DEVICE, ) and service.supports_ephemeral ) or ( - stream_key == "device_list_key" + stream_key == StreamKeyType.DEVICE_LIST and service.msc3202_transaction_extensions ) ] @@ -317,7 +318,7 @@ class ApplicationServicesHandler: logger.debug("Checking interested services for %s", stream_key) with Measure(self.clock, "notify_interested_services_ephemeral"): for service in services: - if stream_key == "typing_key": + if stream_key == StreamKeyType.TYPING: # Note that we don't persist the token (via set_appservice_stream_type_pos) # for typing_key due to performance reasons and due to their highly # ephemeral nature. @@ -333,7 +334,7 @@ class ApplicationServicesHandler: async with self._ephemeral_events_linearizer.queue( (service.id, stream_key) ): - if stream_key == "receipt_key": + if stream_key == StreamKeyType.RECEIPT: events = await self._handle_receipts(service, new_token) self.scheduler.enqueue_for_appservice(service, ephemeral=events) @@ -342,7 +343,7 @@ class ApplicationServicesHandler: service, "read_receipt", new_token ) - elif stream_key == "presence_key": + elif stream_key == StreamKeyType.PRESENCE: events = await self._handle_presence(service, users, new_token) self.scheduler.enqueue_for_appservice(service, ephemeral=events) @@ -351,7 +352,7 @@ class ApplicationServicesHandler: service, "presence", new_token ) - elif stream_key == "to_device_key": + elif stream_key == StreamKeyType.TO_DEVICE: # Retrieve a list of to-device message events, as well as the # maximum stream token of the messages we were able to retrieve. to_device_messages = await self._get_to_device_messages( @@ -366,7 +367,7 @@ class ApplicationServicesHandler: service, "to_device", new_token ) - elif stream_key == "device_list_key": + elif stream_key == StreamKeyType.DEVICE_LIST: device_list_summary = await self._get_device_list_summary( service, new_token ) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index a91b1ee4d5..1d6d1f8a92 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -43,6 +43,7 @@ from synapse.metrics.background_process_metrics import ( ) from synapse.types import ( JsonDict, + StreamKeyType, StreamToken, UserID, get_domain_from_id, @@ -502,7 +503,7 @@ class DeviceHandler(DeviceWorkerHandler): # specify the user ID too since the user should always get their own device list # updates, even if they aren't in any rooms. self.notifier.on_new_event( - "device_list_key", position, users={user_id}, rooms=room_ids + StreamKeyType.DEVICE_LIST, position, users={user_id}, rooms=room_ids ) # We may need to do some processing asynchronously for local user IDs. @@ -523,7 +524,9 @@ class DeviceHandler(DeviceWorkerHandler): from_user_id, user_ids ) - self.notifier.on_new_event("device_list_key", position, users=[from_user_id]) + self.notifier.on_new_event( + StreamKeyType.DEVICE_LIST, position, users=[from_user_id] + ) async def user_left_room(self, user: UserID, room_id: str) -> None: user_id = user.to_string() diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index 4cb725d027..53668cce3b 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -26,7 +26,7 @@ from synapse.logging.opentracing import ( set_tag, ) from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet -from synapse.types import JsonDict, Requester, UserID, get_domain_from_id +from synapse.types import JsonDict, Requester, StreamKeyType, UserID, get_domain_from_id from synapse.util import json_encoder from synapse.util.stringutils import random_string @@ -151,7 +151,7 @@ class DeviceMessageHandler: # Notify listeners that there are new to-device messages to process, # handing them the latest stream id. self.notifier.on_new_event( - "to_device_key", last_stream_id, users=local_messages.keys() + StreamKeyType.TO_DEVICE, last_stream_id, users=local_messages.keys() ) async def _check_for_unknown_devices( @@ -285,7 +285,7 @@ class DeviceMessageHandler: # Notify listeners that there are new to-device messages to process, # handing them the latest stream id. self.notifier.on_new_event( - "to_device_key", last_stream_id, users=local_messages.keys() + StreamKeyType.TO_DEVICE, last_stream_id, users=local_messages.keys() ) if self.federation_sender: diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index de09aed3a3..d79248ad90 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -30,6 +30,7 @@ from synapse.types import ( Requester, RoomStreamToken, StateMap, + StreamKeyType, StreamToken, UserID, ) @@ -220,8 +221,10 @@ class InitialSyncHandler: self.storage, user_id, messages ) - start_token = now_token.copy_and_replace("room_key", token) - end_token = now_token.copy_and_replace("room_key", room_end_token) + start_token = now_token.copy_and_replace(StreamKeyType.ROOM, token) + end_token = now_token.copy_and_replace( + StreamKeyType.ROOM, room_end_token + ) time_now = self.clock.time_msec() d["messages"] = { @@ -369,8 +372,8 @@ class InitialSyncHandler: self.storage, user_id, messages, is_peeking=is_peeking ) - start_token = StreamToken.START.copy_and_replace("room_key", token) - end_token = StreamToken.START.copy_and_replace("room_key", stream_token) + start_token = StreamToken.START.copy_and_replace(StreamKeyType.ROOM, token) + end_token = StreamToken.START.copy_and_replace(StreamKeyType.ROOM, stream_token) time_now = self.clock.time_msec() @@ -474,7 +477,7 @@ class InitialSyncHandler: self.storage, user_id, messages, is_peeking=is_peeking ) - start_token = now_token.copy_and_replace("room_key", token) + start_token = now_token.copy_and_replace(StreamKeyType.ROOM, token) end_token = now_token time_now = self.clock.time_msec() diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 2e30180094..6ae88add95 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -27,7 +27,7 @@ from synapse.handlers.room import ShutdownRoomResponse from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.state import StateFilter from synapse.streams.config import PaginationConfig -from synapse.types import JsonDict, Requester +from synapse.types import JsonDict, Requester, StreamKeyType from synapse.util.async_helpers import ReadWriteLock from synapse.util.stringutils import random_string from synapse.visibility import filter_events_for_client @@ -491,7 +491,7 @@ class PaginationHandler: if leave_token.topological < curr_topo: from_token = from_token.copy_and_replace( - "room_key", leave_token + StreamKeyType.ROOM, leave_token ) await self.hs.get_federation_handler().maybe_backfill( @@ -513,7 +513,7 @@ class PaginationHandler: event_filter=event_filter, ) - next_token = from_token.copy_and_replace("room_key", next_key) + next_token = from_token.copy_and_replace(StreamKeyType.ROOM, next_key) if events: if event_filter: diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 268481ec19..dd84e6c88b 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -66,7 +66,7 @@ from synapse.replication.tcp.commands import ClearUserSyncsCommand from synapse.replication.tcp.streams import PresenceFederationStream, PresenceStream from synapse.storage.databases.main import DataStore from synapse.streams import EventSource -from synapse.types import JsonDict, UserID, get_domain_from_id +from synapse.types import JsonDict, StreamKeyType, UserID, get_domain_from_id from synapse.util.async_helpers import Linearizer from synapse.util.caches.descriptors import _CacheContext, cached from synapse.util.metrics import Measure @@ -522,7 +522,7 @@ class WorkerPresenceHandler(BasePresenceHandler): room_ids_to_states, users_to_states = parties self.notifier.on_new_event( - "presence_key", + StreamKeyType.PRESENCE, stream_id, rooms=room_ids_to_states.keys(), users=users_to_states.keys(), @@ -1145,7 +1145,7 @@ class PresenceHandler(BasePresenceHandler): room_ids_to_states, users_to_states = parties self.notifier.on_new_event( - "presence_key", + StreamKeyType.PRESENCE, stream_id, rooms=room_ids_to_states.keys(), users=[UserID.from_string(u) for u in users_to_states], diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 550d58b0e1..e6a35f1d09 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -17,7 +17,13 @@ from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple from synapse.api.constants import ReceiptTypes from synapse.appservice import ApplicationService from synapse.streams import EventSource -from synapse.types import JsonDict, ReadReceipt, UserID, get_domain_from_id +from synapse.types import ( + JsonDict, + ReadReceipt, + StreamKeyType, + UserID, + get_domain_from_id, +) if TYPE_CHECKING: from synapse.server import HomeServer @@ -129,7 +135,9 @@ class ReceiptsHandler: affected_room_ids = list({r.room_id for r in receipts}) - self.notifier.on_new_event("receipt_key", max_batch_id, rooms=affected_room_ids) + self.notifier.on_new_event( + StreamKeyType.RECEIPT, max_batch_id, rooms=affected_room_ids + ) # Note that the min here shouldn't be relied upon to be accurate. await self.hs.get_pusherpool().on_new_receipts( min_batch_id, max_batch_id, affected_room_ids diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 23baa50d03..a2973109ad 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -73,6 +73,7 @@ from synapse.types import ( RoomID, RoomStreamToken, StateMap, + StreamKeyType, StreamToken, UserID, create_requester, @@ -1292,10 +1293,10 @@ class RoomContextHandler: events_after=events_after, state=await filter_evts(state_events), aggregations=aggregations, - start=await token.copy_and_replace("room_key", results.start).to_string( - self.store - ), - end=await token.copy_and_replace("room_key", results.end).to_string( + start=await token.copy_and_replace( + StreamKeyType.ROOM, results.start + ).to_string(self.store), + end=await token.copy_and_replace(StreamKeyType.ROOM, results.end).to_string( self.store ), ) diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index 5619f8f50e..cd1c47dae8 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -24,7 +24,7 @@ from synapse.api.errors import NotFoundError, SynapseError from synapse.api.filtering import Filter from synapse.events import EventBase from synapse.storage.state import StateFilter -from synapse.types import JsonDict, UserID +from synapse.types import JsonDict, StreamKeyType, UserID from synapse.visibility import filter_events_for_client if TYPE_CHECKING: @@ -655,11 +655,11 @@ class SearchHandler: "events_before": events_before, "events_after": events_after, "start": await now_token.copy_and_replace( - "room_key", res.start + StreamKeyType.ROOM, res.start + ).to_string(self.store), + "end": await now_token.copy_and_replace( + StreamKeyType.ROOM, res.end ).to_string(self.store), - "end": await now_token.copy_and_replace("room_key", res.end).to_string( - self.store - ), } if include_profile: diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 2c555a66d0..4be08fe7cb 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -37,6 +37,7 @@ from synapse.types import ( Requester, RoomStreamToken, StateMap, + StreamKeyType, StreamToken, UserID, ) @@ -449,7 +450,7 @@ class SyncHandler: room_ids=room_ids, is_guest=sync_config.is_guest, ) - now_token = now_token.copy_and_replace("typing_key", typing_key) + now_token = now_token.copy_and_replace(StreamKeyType.TYPING, typing_key) ephemeral_by_room: JsonDict = {} @@ -471,7 +472,7 @@ class SyncHandler: room_ids=room_ids, is_guest=sync_config.is_guest, ) - now_token = now_token.copy_and_replace("receipt_key", receipt_key) + now_token = now_token.copy_and_replace(StreamKeyType.RECEIPT, receipt_key) for event in receipts: room_id = event["room_id"] @@ -537,7 +538,9 @@ class SyncHandler: prev_batch_token = now_token if recents: room_key = recents[0].internal_metadata.before - prev_batch_token = now_token.copy_and_replace("room_key", room_key) + prev_batch_token = now_token.copy_and_replace( + StreamKeyType.ROOM, room_key + ) return TimelineBatch( events=recents, prev_batch=prev_batch_token, limited=False @@ -611,7 +614,7 @@ class SyncHandler: recents = recents[-timeline_limit:] room_key = recents[0].internal_metadata.before - prev_batch_token = now_token.copy_and_replace("room_key", room_key) + prev_batch_token = now_token.copy_and_replace(StreamKeyType.ROOM, room_key) # Don't bother to bundle aggregations if the timeline is unlimited, # as clients will have all the necessary information. @@ -1398,7 +1401,7 @@ class SyncHandler: now_token.to_device_key, ) sync_result_builder.now_token = now_token.copy_and_replace( - "to_device_key", stream_id + StreamKeyType.TO_DEVICE, stream_id ) sync_result_builder.to_device = messages else: @@ -1503,7 +1506,7 @@ class SyncHandler: ) assert presence_key sync_result_builder.now_token = now_token.copy_and_replace( - "presence_key", presence_key + StreamKeyType.PRESENCE, presence_key ) extra_users_ids = set(newly_joined_or_invited_users) @@ -1826,7 +1829,7 @@ class SyncHandler: # stream token as it'll only be used in the context of this # room. (c.f. the docstring of `to_room_stream_token`). leave_token = since_token.copy_and_replace( - "room_key", leave_position.to_room_stream_token() + StreamKeyType.ROOM, leave_position.to_room_stream_token() ) # If this is an out of band message, like a remote invite @@ -1875,7 +1878,9 @@ class SyncHandler: if room_entry: events, start_key = room_entry - prev_batch_token = now_token.copy_and_replace("room_key", start_key) + prev_batch_token = now_token.copy_and_replace( + StreamKeyType.ROOM, start_key + ) entry = RoomSyncResultBuilder( room_id=room_id, @@ -1972,7 +1977,7 @@ class SyncHandler: continue leave_token = now_token.copy_and_replace( - "room_key", RoomStreamToken(None, event.stream_ordering) + StreamKeyType.ROOM, RoomStreamToken(None, event.stream_ordering) ) room_entries.append( RoomSyncResultBuilder( diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 6854428b7c..bb00750bfd 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -25,7 +25,7 @@ from synapse.metrics.background_process_metrics import ( ) from synapse.replication.tcp.streams import TypingStream from synapse.streams import EventSource -from synapse.types import JsonDict, Requester, UserID, get_domain_from_id +from synapse.types import JsonDict, Requester, StreamKeyType, UserID, get_domain_from_id from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.metrics import Measure from synapse.util.wheel_timer import WheelTimer @@ -382,7 +382,7 @@ class TypingWriterHandler(FollowerTypingHandler): ) self.notifier.on_new_event( - "typing_key", self._latest_room_serial, rooms=[member.room_id] + StreamKeyType.TYPING, self._latest_room_serial, rooms=[member.room_id] ) async def get_all_typing_updates( |