summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/admin.py4
-rw-r--r--synapse/handlers/appservice.py4
-rw-r--r--synapse/handlers/device.py1
-rw-r--r--synapse/handlers/federation.py15
-rw-r--r--synapse/handlers/initial_sync.py3
-rw-r--r--synapse/handlers/message.py10
-rw-r--r--synapse/handlers/push_rules.py6
-rw-r--r--synapse/handlers/receipts.py25
-rw-r--r--synapse/handlers/room.py3
-rw-r--r--synapse/handlers/room_member.py45
-rw-r--r--synapse/handlers/sync.py2
11 files changed, 48 insertions, 70 deletions
diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py

index ba9704a065..97fd1fd427 100644 --- a/synapse/handlers/admin.py +++ b/synapse/handlers/admin.py
@@ -171,8 +171,8 @@ class AdminHandler: else: stream_ordering = room.stream_ordering - from_key = RoomStreamToken(0, 0) - to_key = RoomStreamToken(None, stream_ordering) + from_key = RoomStreamToken(topological=0, stream=0) + to_key = RoomStreamToken(stream=stream_ordering) # Events that we've processed in this room written_events: Set[str] = set() diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 7de7bd3289..c200a45f3a 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py
@@ -216,7 +216,7 @@ class ApplicationServicesHandler: def notify_interested_services_ephemeral( self, - stream_key: str, + stream_key: StreamKeyType, new_token: Union[int, RoomStreamToken], users: Collection[Union[str, UserID]], ) -> None: @@ -326,7 +326,7 @@ class ApplicationServicesHandler: async def _notify_interested_services_ephemeral( self, services: List[ApplicationService], - stream_key: str, + stream_key: StreamKeyType, new_token: int, users: Collection[Union[str, UserID]], ) -> None: diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 86ad96d030..50df4f2b06 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py
@@ -845,7 +845,6 @@ class DeviceHandler(DeviceWorkerHandler): else: assert max_stream_id == stream_id # Avoid moving `room_id` backwards. - pass if self._handle_new_device_update_new_data: continue diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 29cd45550a..9d72794e8b 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py
@@ -868,19 +868,10 @@ class FederationHandler: # This is a bit of a hack and is cribbing off of invites. Basically we # store the room state here and retrieve it again when this event appears # in the invitee's sync stream. It is stripped out for all other local users. - stripped_room_state = ( - knock_response.get("knock_room_state") - # Since v1.37, Synapse incorrectly used "knock_state_events" for this field. - # Thus, we also check for a 'knock_state_events' to support old instances. - # See https://github.com/matrix-org/synapse/issues/14088. - or knock_response.get("knock_state_events") - ) + stripped_room_state = knock_response.get("knock_room_state") if stripped_room_state is None: - raise KeyError( - "Missing 'knock_room_state' (or legacy 'knock_state_events') field in " - "send_knock response" - ) + raise KeyError("Missing 'knock_room_state' field in send_knock response") event.unsigned["knock_room_state"] = stripped_room_state @@ -1506,7 +1497,6 @@ class FederationHandler: # in the meantime and context needs to be recomputed, so let's do so. if i == max_retries - 1: raise e - pass else: destinations = {x.split(":", 1)[-1] for x in (sender_user_id, room_id)} @@ -1582,7 +1572,6 @@ class FederationHandler: # in the meantime and context needs to be recomputed, so let's do so. if i == max_retries - 1: raise e - pass async def add_display_name_to_third_party_invite( self, diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index 5737f8014d..c34bd7db95 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py
@@ -192,8 +192,7 @@ class InitialSyncHandler: ) elif event.membership == Membership.LEAVE: room_end_token = RoomStreamToken( - None, - event.stream_ordering, + stream=event.stream_ordering, ) deferred_room_state = run_in_background( self._state_storage_controller.get_state_for_events, diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 44dbbf81dd..8de4b8e816 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py
@@ -693,13 +693,9 @@ class EventCreationHandler: if require_consent and not is_exempt: await self.assert_accepted_privacy_policy(requester) - # Save the access token ID, the device ID and the transaction ID in the event - # internal metadata. This is useful to determine if we should echo the - # transaction_id in events. + # Save the the device ID and the transaction ID in the event internal metadata. + # This is useful to determine if we should echo the transaction_id in events. # See `synapse.events.utils.EventClientSerializer.serialize_event` - if requester.access_token_id is not None: - builder.internal_metadata.token_id = requester.access_token_id - if requester.device_id is not None: builder.internal_metadata.device_id = requester.device_id @@ -1133,7 +1129,6 @@ class EventCreationHandler: # in the meantime and context needs to be recomputed, so let's do so. if i == max_retries - 1: raise e - pass # we know it was persisted, so must have a stream ordering assert ev.internal_metadata.stream_ordering @@ -2038,7 +2033,6 @@ class EventCreationHandler: # in the meantime and context needs to be recomputed, so let's do so. if i == max_retries - 1: raise e - pass return True except AuthError: logger.info( diff --git a/synapse/handlers/push_rules.py b/synapse/handlers/push_rules.py
index 7ed88a3611..87b428ab1c 100644 --- a/synapse/handlers/push_rules.py +++ b/synapse/handlers/push_rules.py
@@ -19,7 +19,7 @@ from synapse.api.errors import SynapseError, UnrecognizedRequestError from synapse.push.clientformat import format_push_rules_for_user from synapse.storage.push_rule import RuleNotFoundException from synapse.synapse_rust.push import get_base_rule_ids -from synapse.types import JsonDict, UserID +from synapse.types import JsonDict, StreamKeyType, UserID if TYPE_CHECKING: from synapse.server import HomeServer @@ -114,7 +114,9 @@ class PushRulesHandler: user_id: the user ID the change is for. """ stream_id = self._main_store.get_max_push_rules_stream_id() - self._notifier.on_new_event("push_rules_key", stream_id, users=[user_id]) + self._notifier.on_new_event( + StreamKeyType.PUSH_RULES, stream_id, users=[user_id] + ) async def push_rules_for_user( self, user: UserID diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index a7a29b758b..69ac468f75 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py
@@ -130,11 +130,10 @@ class ReceiptsHandler: async def _handle_new_receipts(self, receipts: List[ReadReceipt]) -> bool: """Takes a list of receipts, stores them and informs the notifier.""" - min_batch_id: Optional[int] = None - max_batch_id: Optional[int] = None + receipts_persisted: List[ReadReceipt] = [] for receipt in receipts: - res = await self.store.insert_receipt( + stream_id = await self.store.insert_receipt( receipt.room_id, receipt.receipt_type, receipt.user_id, @@ -143,30 +142,26 @@ class ReceiptsHandler: receipt.data, ) - if not res: - # res will be None if this receipt is 'old' + if stream_id is None: + # stream_id will be None if this receipt is 'old' continue - stream_id, max_persisted_id = res + receipts_persisted.append(receipt) - if min_batch_id is None or stream_id < min_batch_id: - min_batch_id = stream_id - if max_batch_id is None or max_persisted_id > max_batch_id: - max_batch_id = max_persisted_id - - # Either both of these should be None or neither. - if min_batch_id is None or max_batch_id is None: + if not receipts_persisted: # no new receipts return False - affected_room_ids = list({r.room_id for r in receipts}) + max_batch_id = self.store.get_max_receipt_stream_id() + + affected_room_ids = list({r.room_id for r in receipts_persisted}) self.notifier.on_new_event( StreamKeyType.RECEIPT, max_batch_id, rooms=affected_room_ids ) # Note that the min here shouldn't be relied upon to be accurate. await self.hs.get_pusherpool().on_new_receipts( - min_batch_id, max_batch_id, affected_room_ids + {r.user_id for r in receipts_persisted} ) return True diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index a0c3b16819..97c9f01245 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py
@@ -261,7 +261,6 @@ class RoomCreationHandler: # in the meantime and context needs to be recomputed, so let's do so. if i == max_retries - 1: raise e - pass # This is to satisfy mypy and should never happen raise PartialStateConflictError() @@ -1708,7 +1707,7 @@ class RoomEventSource(EventSource[RoomStreamToken, EventBase]): if from_key.topological: logger.warning("Stream has topological part!!!! %r", from_key) - from_key = RoomStreamToken(None, from_key.stream) + from_key = RoomStreamToken(stream=from_key.stream) app_service = self.store.get_app_service_by_user_id(user.to_string()) if app_service: diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 90343c2306..130eee7e1d 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py
@@ -382,8 +382,10 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): and persist a new event for the new membership change. Args: - requester: - target: + requester: User requesting the membership change, i.e. the sender of the + desired membership event. + target: Use whose membership should change, i.e. the state_key of the + desired membership event. room_id: membership: @@ -415,7 +417,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): Returns: Tuple of event ID and stream ordering position """ - user_id = target.to_string() if content is None: @@ -475,21 +476,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): (EventTypes.Member, user_id), None ) - if event.membership == Membership.JOIN: - newly_joined = True - if prev_member_event_id: - prev_member_event = await self.store.get_event( - prev_member_event_id - ) - newly_joined = prev_member_event.membership != Membership.JOIN - - # Only rate-limit if the user actually joined the room, otherwise we'll end - # up blocking profile updates. - if newly_joined and ratelimit: - await self._join_rate_limiter_local.ratelimit(requester) - await self._join_rate_per_room_limiter.ratelimit( - requester, key=room_id, update=False - ) with opentracing.start_active_span("handle_new_client_event"): result_event = ( await self.event_creation_handler.handle_new_client_event( @@ -514,7 +500,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): # in the meantime and context needs to be recomputed, so let's do so. if i == max_retries - 1: raise e - pass # we know it was persisted, so should have a stream ordering assert result_event.internal_metadata.stream_ordering @@ -618,6 +603,25 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): Raises: ShadowBanError if a shadow-banned requester attempts to send an invite. """ + if ratelimit: + if action == Membership.JOIN: + # Only rate-limit if the user isn't already joined to the room, otherwise + # we'll end up blocking profile updates. + ( + current_membership, + _, + ) = await self.store.get_local_current_membership_for_user_in_room( + requester.user.to_string(), + room_id, + ) + if current_membership != Membership.JOIN: + await self._join_rate_limiter_local.ratelimit(requester) + await self._join_rate_per_room_limiter.ratelimit( + requester, key=room_id, update=False + ) + elif action == Membership.INVITE: + await self.ratelimit_invite(requester, room_id, target.to_string()) + if action == Membership.INVITE and requester.shadow_banned: # We randomly sleep a bit just to annoy the requester. await self.clock.sleep(random.randint(1, 10)) @@ -794,8 +798,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): if effective_membership_state == Membership.INVITE: target_id = target.to_string() - if ratelimit: - await self.ratelimit_invite(requester, room_id, target_id) # block any attempts to invite the server notices mxid if target_id == self._server_notices_mxid: @@ -2002,7 +2004,6 @@ class RoomMemberMasterHandler(RoomMemberHandler): # in the meantime and context needs to be recomputed, so let's do so. if i == max_retries - 1: raise e - pass # we know it was persisted, so must have a stream ordering assert result_event.internal_metadata.stream_ordering diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 7bd42f635f..744e080309 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py
@@ -2333,7 +2333,7 @@ class SyncHandler: continue leave_token = now_token.copy_and_replace( - StreamKeyType.ROOM, RoomStreamToken(None, event.stream_ordering) + StreamKeyType.ROOM, RoomStreamToken(stream=event.stream_ordering) ) room_entries.append( RoomSyncResultBuilder(