diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py
index ba9704a065..97fd1fd427 100644
--- a/synapse/handlers/admin.py
+++ b/synapse/handlers/admin.py
@@ -171,8 +171,8 @@ class AdminHandler:
else:
stream_ordering = room.stream_ordering
- from_key = RoomStreamToken(0, 0)
- to_key = RoomStreamToken(None, stream_ordering)
+ from_key = RoomStreamToken(topological=0, stream=0)
+ to_key = RoomStreamToken(stream=stream_ordering)
# Events that we've processed in this room
written_events: Set[str] = set()
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 7de7bd3289..c200a45f3a 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -216,7 +216,7 @@ class ApplicationServicesHandler:
def notify_interested_services_ephemeral(
self,
- stream_key: str,
+ stream_key: StreamKeyType,
new_token: Union[int, RoomStreamToken],
users: Collection[Union[str, UserID]],
) -> None:
@@ -326,7 +326,7 @@ class ApplicationServicesHandler:
async def _notify_interested_services_ephemeral(
self,
services: List[ApplicationService],
- stream_key: str,
+ stream_key: StreamKeyType,
new_token: int,
users: Collection[Union[str, UserID]],
) -> None:
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 86ad96d030..50df4f2b06 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -845,7 +845,6 @@ class DeviceHandler(DeviceWorkerHandler):
else:
assert max_stream_id == stream_id
# Avoid moving `room_id` backwards.
- pass
if self._handle_new_device_update_new_data:
continue
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 29cd45550a..9d72794e8b 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -868,19 +868,10 @@ class FederationHandler:
# This is a bit of a hack and is cribbing off of invites. Basically we
# store the room state here and retrieve it again when this event appears
# in the invitee's sync stream. It is stripped out for all other local users.
- stripped_room_state = (
- knock_response.get("knock_room_state")
- # Since v1.37, Synapse incorrectly used "knock_state_events" for this field.
- # Thus, we also check for a 'knock_state_events' to support old instances.
- # See https://github.com/matrix-org/synapse/issues/14088.
- or knock_response.get("knock_state_events")
- )
+ stripped_room_state = knock_response.get("knock_room_state")
if stripped_room_state is None:
- raise KeyError(
- "Missing 'knock_room_state' (or legacy 'knock_state_events') field in "
- "send_knock response"
- )
+ raise KeyError("Missing 'knock_room_state' field in send_knock response")
event.unsigned["knock_room_state"] = stripped_room_state
@@ -1506,7 +1497,6 @@ class FederationHandler:
# in the meantime and context needs to be recomputed, so let's do so.
if i == max_retries - 1:
raise e
- pass
else:
destinations = {x.split(":", 1)[-1] for x in (sender_user_id, room_id)}
@@ -1582,7 +1572,6 @@ class FederationHandler:
# in the meantime and context needs to be recomputed, so let's do so.
if i == max_retries - 1:
raise e
- pass
async def add_display_name_to_third_party_invite(
self,
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index 5737f8014d..c34bd7db95 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -192,8 +192,7 @@ class InitialSyncHandler:
)
elif event.membership == Membership.LEAVE:
room_end_token = RoomStreamToken(
- None,
- event.stream_ordering,
+ stream=event.stream_ordering,
)
deferred_room_state = run_in_background(
self._state_storage_controller.get_state_for_events,
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 44dbbf81dd..8de4b8e816 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -693,13 +693,9 @@ class EventCreationHandler:
if require_consent and not is_exempt:
await self.assert_accepted_privacy_policy(requester)
- # Save the access token ID, the device ID and the transaction ID in the event
- # internal metadata. This is useful to determine if we should echo the
- # transaction_id in events.
+ # Save the the device ID and the transaction ID in the event internal metadata.
+ # This is useful to determine if we should echo the transaction_id in events.
# See `synapse.events.utils.EventClientSerializer.serialize_event`
- if requester.access_token_id is not None:
- builder.internal_metadata.token_id = requester.access_token_id
-
if requester.device_id is not None:
builder.internal_metadata.device_id = requester.device_id
@@ -1133,7 +1129,6 @@ class EventCreationHandler:
# in the meantime and context needs to be recomputed, so let's do so.
if i == max_retries - 1:
raise e
- pass
# we know it was persisted, so must have a stream ordering
assert ev.internal_metadata.stream_ordering
@@ -2038,7 +2033,6 @@ class EventCreationHandler:
# in the meantime and context needs to be recomputed, so let's do so.
if i == max_retries - 1:
raise e
- pass
return True
except AuthError:
logger.info(
diff --git a/synapse/handlers/push_rules.py b/synapse/handlers/push_rules.py
index 7ed88a3611..87b428ab1c 100644
--- a/synapse/handlers/push_rules.py
+++ b/synapse/handlers/push_rules.py
@@ -19,7 +19,7 @@ from synapse.api.errors import SynapseError, UnrecognizedRequestError
from synapse.push.clientformat import format_push_rules_for_user
from synapse.storage.push_rule import RuleNotFoundException
from synapse.synapse_rust.push import get_base_rule_ids
-from synapse.types import JsonDict, UserID
+from synapse.types import JsonDict, StreamKeyType, UserID
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -114,7 +114,9 @@ class PushRulesHandler:
user_id: the user ID the change is for.
"""
stream_id = self._main_store.get_max_push_rules_stream_id()
- self._notifier.on_new_event("push_rules_key", stream_id, users=[user_id])
+ self._notifier.on_new_event(
+ StreamKeyType.PUSH_RULES, stream_id, users=[user_id]
+ )
async def push_rules_for_user(
self, user: UserID
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index a7a29b758b..69ac468f75 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -130,11 +130,10 @@ class ReceiptsHandler:
async def _handle_new_receipts(self, receipts: List[ReadReceipt]) -> bool:
"""Takes a list of receipts, stores them and informs the notifier."""
- min_batch_id: Optional[int] = None
- max_batch_id: Optional[int] = None
+ receipts_persisted: List[ReadReceipt] = []
for receipt in receipts:
- res = await self.store.insert_receipt(
+ stream_id = await self.store.insert_receipt(
receipt.room_id,
receipt.receipt_type,
receipt.user_id,
@@ -143,30 +142,26 @@ class ReceiptsHandler:
receipt.data,
)
- if not res:
- # res will be None if this receipt is 'old'
+ if stream_id is None:
+ # stream_id will be None if this receipt is 'old'
continue
- stream_id, max_persisted_id = res
+ receipts_persisted.append(receipt)
- if min_batch_id is None or stream_id < min_batch_id:
- min_batch_id = stream_id
- if max_batch_id is None or max_persisted_id > max_batch_id:
- max_batch_id = max_persisted_id
-
- # Either both of these should be None or neither.
- if min_batch_id is None or max_batch_id is None:
+ if not receipts_persisted:
# no new receipts
return False
- affected_room_ids = list({r.room_id for r in receipts})
+ max_batch_id = self.store.get_max_receipt_stream_id()
+
+ affected_room_ids = list({r.room_id for r in receipts_persisted})
self.notifier.on_new_event(
StreamKeyType.RECEIPT, max_batch_id, rooms=affected_room_ids
)
# Note that the min here shouldn't be relied upon to be accurate.
await self.hs.get_pusherpool().on_new_receipts(
- min_batch_id, max_batch_id, affected_room_ids
+ {r.user_id for r in receipts_persisted}
)
return True
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index a0c3b16819..97c9f01245 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -261,7 +261,6 @@ class RoomCreationHandler:
# in the meantime and context needs to be recomputed, so let's do so.
if i == max_retries - 1:
raise e
- pass
# This is to satisfy mypy and should never happen
raise PartialStateConflictError()
@@ -1708,7 +1707,7 @@ class RoomEventSource(EventSource[RoomStreamToken, EventBase]):
if from_key.topological:
logger.warning("Stream has topological part!!!! %r", from_key)
- from_key = RoomStreamToken(None, from_key.stream)
+ from_key = RoomStreamToken(stream=from_key.stream)
app_service = self.store.get_app_service_by_user_id(user.to_string())
if app_service:
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 90343c2306..130eee7e1d 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -382,8 +382,10 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
and persist a new event for the new membership change.
Args:
- requester:
- target:
+ requester: User requesting the membership change, i.e. the sender of the
+ desired membership event.
+ target: Use whose membership should change, i.e. the state_key of the
+ desired membership event.
room_id:
membership:
@@ -415,7 +417,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
Returns:
Tuple of event ID and stream ordering position
"""
-
user_id = target.to_string()
if content is None:
@@ -475,21 +476,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
(EventTypes.Member, user_id), None
)
- if event.membership == Membership.JOIN:
- newly_joined = True
- if prev_member_event_id:
- prev_member_event = await self.store.get_event(
- prev_member_event_id
- )
- newly_joined = prev_member_event.membership != Membership.JOIN
-
- # Only rate-limit if the user actually joined the room, otherwise we'll end
- # up blocking profile updates.
- if newly_joined and ratelimit:
- await self._join_rate_limiter_local.ratelimit(requester)
- await self._join_rate_per_room_limiter.ratelimit(
- requester, key=room_id, update=False
- )
with opentracing.start_active_span("handle_new_client_event"):
result_event = (
await self.event_creation_handler.handle_new_client_event(
@@ -514,7 +500,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
# in the meantime and context needs to be recomputed, so let's do so.
if i == max_retries - 1:
raise e
- pass
# we know it was persisted, so should have a stream ordering
assert result_event.internal_metadata.stream_ordering
@@ -618,6 +603,25 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
Raises:
ShadowBanError if a shadow-banned requester attempts to send an invite.
"""
+ if ratelimit:
+ if action == Membership.JOIN:
+ # Only rate-limit if the user isn't already joined to the room, otherwise
+ # we'll end up blocking profile updates.
+ (
+ current_membership,
+ _,
+ ) = await self.store.get_local_current_membership_for_user_in_room(
+ requester.user.to_string(),
+ room_id,
+ )
+ if current_membership != Membership.JOIN:
+ await self._join_rate_limiter_local.ratelimit(requester)
+ await self._join_rate_per_room_limiter.ratelimit(
+ requester, key=room_id, update=False
+ )
+ elif action == Membership.INVITE:
+ await self.ratelimit_invite(requester, room_id, target.to_string())
+
if action == Membership.INVITE and requester.shadow_banned:
# We randomly sleep a bit just to annoy the requester.
await self.clock.sleep(random.randint(1, 10))
@@ -794,8 +798,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
if effective_membership_state == Membership.INVITE:
target_id = target.to_string()
- if ratelimit:
- await self.ratelimit_invite(requester, room_id, target_id)
# block any attempts to invite the server notices mxid
if target_id == self._server_notices_mxid:
@@ -2002,7 +2004,6 @@ class RoomMemberMasterHandler(RoomMemberHandler):
# in the meantime and context needs to be recomputed, so let's do so.
if i == max_retries - 1:
raise e
- pass
# we know it was persisted, so must have a stream ordering
assert result_event.internal_metadata.stream_ordering
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 7bd42f635f..744e080309 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -2333,7 +2333,7 @@ class SyncHandler:
continue
leave_token = now_token.copy_and_replace(
- StreamKeyType.ROOM, RoomStreamToken(None, event.stream_ordering)
+ StreamKeyType.ROOM, RoomStreamToken(stream=event.stream_ordering)
)
room_entries.append(
RoomSyncResultBuilder(
|