diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/appservice.py | 7 | ||||
-rw-r--r-- | synapse/handlers/device.py | 2 | ||||
-rw-r--r-- | synapse/handlers/devicemessage.py | 36 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 4 | ||||
-rw-r--r-- | synapse/handlers/presence.py | 16 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 32 | ||||
-rw-r--r-- | synapse/handlers/typing.py | 8 |
7 files changed, 65 insertions, 40 deletions
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 66f5b8d108..5d1d21cdc8 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -578,9 +578,6 @@ class ApplicationServicesHandler: device_id, ), messages in recipient_device_to_messages.items(): for message_json in messages: - # Remove 'message_id' from the to-device message, as it's an internal ID - message_json.pop("message_id", None) - message_payload.append( { "to_user_id": user_id, @@ -615,8 +612,8 @@ class ApplicationServicesHandler: ) # Fetch the users who have modified their device list since then. - users_with_changed_device_lists = ( - await self.store.get_users_whose_devices_changed(from_key, to_key=new_key) + users_with_changed_device_lists = await self.store.get_all_devices_changed( + from_key, to_key=new_key ) # Filter out any users the application service is not interested in diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index b1e55e1b9e..d4750a32e6 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -996,7 +996,7 @@ class DeviceListUpdater(DeviceListWorkerUpdater): # Check if we are partially joining any rooms. If so we need to store # all device list updates so that we can handle them correctly once we # know who is in the room. - # TODO(faster joins): this fetches and processes a bunch of data that we don't + # TODO(faster_joins): this fetches and processes a bunch of data that we don't # use. Could be replaced by a tighter query e.g. # SELECT EXISTS(SELECT 1 FROM partial_state_rooms) partial_rooms = await self.store.get_partial_state_room_resync_info() diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index 444c08bc2e..75e89850f5 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -15,7 +15,7 @@ import logging from typing import TYPE_CHECKING, Any, Dict -from synapse.api.constants import EduTypes, ToDeviceEventTypes +from synapse.api.constants import EduTypes, EventContentFields, ToDeviceEventTypes from synapse.api.errors import SynapseError from synapse.api.ratelimiting import Ratelimiter from synapse.logging.context import run_in_background @@ -216,14 +216,24 @@ class DeviceMessageHandler: """ sender_user_id = requester.user.to_string() - message_id = random_string(16) - set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id) - - log_kv({"number_of_to_device_messages": len(messages)}) - set_tag("sender", sender_user_id) + set_tag(SynapseTags.TO_DEVICE_TYPE, message_type) + set_tag(SynapseTags.TO_DEVICE_SENDER, sender_user_id) local_messages = {} remote_messages: Dict[str, Dict[str, Dict[str, JsonDict]]] = {} for user_id, by_device in messages.items(): + # add an opentracing log entry for each message + for device_id, message_content in by_device.items(): + log_kv( + { + "event": "send_to_device_message", + "user_id": user_id, + "device_id": device_id, + EventContentFields.TO_DEVICE_MSGID: message_content.get( + EventContentFields.TO_DEVICE_MSGID + ), + } + ) + # Ratelimit local cross-user key requests by the sending device. if ( message_type == ToDeviceEventTypes.RoomKeyRequest @@ -233,6 +243,7 @@ class DeviceMessageHandler: requester, (sender_user_id, requester.device_id) ) if not allowed: + log_kv({"message": f"dropping key requests to {user_id}"}) logger.info( "Dropping room_key_request from %s to %s due to rate limit", sender_user_id, @@ -247,18 +258,11 @@ class DeviceMessageHandler: "content": message_content, "type": message_type, "sender": sender_user_id, - "message_id": message_id, } for device_id, message_content in by_device.items() } if messages_by_device: local_messages[user_id] = messages_by_device - log_kv( - { - "user_id": user_id, - "device_id": list(messages_by_device), - } - ) else: destination = get_domain_from_id(user_id) remote_messages.setdefault(destination, {})[user_id] = by_device @@ -267,7 +271,11 @@ class DeviceMessageHandler: remote_edu_contents = {} for destination, messages in remote_messages.items(): - log_kv({"destination": destination}) + # The EDU contains a "message_id" property which is used for + # idempotence. Make up a random one. + message_id = random_string(16) + log_kv({"destination": destination, "message_id": message_id}) + remote_edu_contents[destination] = { "messages": messages, "sender": sender_user_id, diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index d92582fd5c..3398fcaf7d 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -152,6 +152,7 @@ class FederationHandler: self._federation_event_handler = hs.get_federation_event_handler() self._device_handler = hs.get_device_handler() self._bulk_push_rule_evaluator = hs.get_bulk_push_rule_evaluator() + self._notifier = hs.get_notifier() self._clean_room_for_join_client = ReplicationCleanRoomRestServlet.make_client( hs @@ -1692,6 +1693,9 @@ class FederationHandler: self._storage_controllers.state.notify_room_un_partial_stated( room_id ) + # Poke the notifier so that other workers see the write to + # the un-partial-stated rooms stream. + self._notifier.notify_replication() # TODO(faster_joins) update room stats and user directory? # https://github.com/matrix-org/synapse/issues/12814 diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index cf08737d11..2af90b25a3 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -1692,10 +1692,12 @@ class PresenceEventSource(EventSource[int, UserPresenceState]): if from_key is not None: # First get all users that have had a presence update - updated_users = stream_change_cache.get_all_entities_changed(from_key) + result = stream_change_cache.get_all_entities_changed(from_key) # Cross-reference users we're interested in with those that have had updates. - if updated_users is not None: + if result.hit: + updated_users = result.entities + # If we have the full list of changes for presence we can # simply check which ones share a room with the user. get_updates_counter.labels("stream").inc() @@ -1764,14 +1766,14 @@ class PresenceEventSource(EventSource[int, UserPresenceState]): Returns: A list of presence states for the given user to receive. """ + updated_users = None if from_key: # Only return updates since the last sync - updated_users = self.store.presence_stream_cache.get_all_entities_changed( - from_key - ) - if not updated_users: - updated_users = [] + result = self.store.presence_stream_cache.get_all_entities_changed(from_key) + if result.hit: + updated_users = result.entities + if updated_users is not None: # Get the actual presence update for each change users_to_state = await self.get_presence_handler().current_state_for_users( updated_users diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index c8858b22dd..dace9b606f 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -31,14 +31,20 @@ from typing import ( import attr from prometheus_client import Counter -from synapse.api.constants import EventTypes, Membership +from synapse.api.constants import EventContentFields, EventTypes, Membership from synapse.api.filtering import FilterCollection from synapse.api.presence import UserPresenceState from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.events import EventBase from synapse.handlers.relations import BundledAggregations from synapse.logging.context import current_context -from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, start_active_span +from synapse.logging.opentracing import ( + SynapseTags, + log_kv, + set_tag, + start_active_span, + trace, +) from synapse.push.clientformat import format_push_rules_for_user from synapse.storage.databases.main.event_push_actions import RoomNotifCounts from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary @@ -1528,10 +1534,12 @@ class SyncHandler: # # If we don't have that info cached then we get all the users that # share a room with our user and check if those users have changed. - changed_users = self.store.get_cached_device_list_changes( + cache_result = self.store.get_cached_device_list_changes( since_token.device_list_key ) - if changed_users is not None: + if cache_result.hit: + changed_users = cache_result.entities + result = await self.store.get_rooms_for_users(changed_users) for changed_user_id, entries in result.items(): @@ -1584,6 +1592,7 @@ class SyncHandler: else: return DeviceListUpdates() + @trace async def _generate_sync_entry_for_to_device( self, sync_result_builder: "SyncResultBuilder" ) -> None: @@ -1603,11 +1612,16 @@ class SyncHandler: ) for message in messages: - # We pop here as we shouldn't be sending the message ID down - # `/sync` - message_id = message.pop("message_id", None) - if message_id: - set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id) + log_kv( + { + "event": "to_device_message", + "sender": message["sender"], + "type": message["type"], + EventContentFields.TO_DEVICE_MSGID: message["content"].get( + EventContentFields.TO_DEVICE_MSGID + ), + } + ) logger.debug( "Returning %d to-device messages between %d and %d (current token: %d)", diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index a0ea719430..3f656ea4f5 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -420,11 +420,11 @@ class TypingWriterHandler(FollowerTypingHandler): if last_id == current_id: return [], current_id, False - changed_rooms: Optional[ - Iterable[str] - ] = self._typing_stream_change_cache.get_all_entities_changed(last_id) + result = self._typing_stream_change_cache.get_all_entities_changed(last_id) - if changed_rooms is None: + if result.hit: + changed_rooms: Iterable[str] = result.entities + else: changed_rooms = self._room_serials rows = [] |