diff --git a/changelog.d/17265.misc b/changelog.d/17265.misc
new file mode 100644
index 0000000000..e6d4d8b4ee
--- /dev/null
+++ b/changelog.d/17265.misc
@@ -0,0 +1 @@
+Use fully-qualified `PersistedEventPosition` when returning `RoomsForUser` to facilitate proper comparisons and `RoomStreamToken` generation.
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 7ffc650aa1..1932fa82a4 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -674,7 +674,7 @@ class FederationServer(FederationBase):
# This is in addition to the HS-level rate limiting applied by
# BaseFederationServlet.
# type-ignore: mypy doesn't seem able to deduce the type of the limiter(!?)
- await self._room_member_handler._join_rate_per_room_limiter.ratelimit( # type: ignore[has-type]
+ await self._room_member_handler._join_rate_per_room_limiter.ratelimit(
requester=None,
key=room_id,
update=False,
@@ -717,7 +717,7 @@ class FederationServer(FederationBase):
SynapseTags.SEND_JOIN_RESPONSE_IS_PARTIAL_STATE,
caller_supports_partial_state,
)
- await self._room_member_handler._join_rate_per_room_limiter.ratelimit( # type: ignore[has-type]
+ await self._room_member_handler._join_rate_per_room_limiter.ratelimit(
requester=None,
key=room_id,
update=False,
diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py
index 702d40332c..21d3bb37f3 100644
--- a/synapse/handlers/admin.py
+++ b/synapse/handlers/admin.py
@@ -126,13 +126,7 @@ class AdminHandler:
# Get all rooms the user is in or has been in
rooms = await self._store.get_rooms_for_local_user_where_membership_is(
user_id,
- membership_list=(
- Membership.JOIN,
- Membership.LEAVE,
- Membership.BAN,
- Membership.INVITE,
- Membership.KNOCK,
- ),
+ membership_list=Membership.LIST,
)
# We only try and fetch events for rooms the user has been in. If
@@ -179,7 +173,7 @@ class AdminHandler:
if room.membership == Membership.JOIN:
stream_ordering = self._store.get_room_max_stream_ordering()
else:
- stream_ordering = room.stream_ordering
+ stream_ordering = room.event_pos.stream
from_key = RoomStreamToken(topological=0, stream=0)
to_key = RoomStreamToken(stream=stream_ordering)
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index d99fc4bec0..84d6fecf31 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -199,7 +199,7 @@ class InitialSyncHandler:
)
elif event.membership == Membership.LEAVE:
room_end_token = RoomStreamToken(
- stream=event.stream_ordering,
+ stream=event.event_pos.stream,
)
deferred_room_state = run_in_background(
self._state_storage_controller.get_state_for_events,
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 6617105cdb..f7447b8ba5 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -27,7 +27,6 @@ from synapse.api.constants import Direction, EventTypes, Membership
from synapse.api.errors import SynapseError
from synapse.api.filtering import Filter
from synapse.events.utils import SerializeEventConfig
-from synapse.handlers.room import ShutdownRoomParams, ShutdownRoomResponse
from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME
from synapse.logging.opentracing import trace
from synapse.metrics.background_process_metrics import run_as_background_process
@@ -38,6 +37,8 @@ from synapse.types import (
JsonMapping,
Requester,
ScheduledTask,
+ ShutdownRoomParams,
+ ShutdownRoomResponse,
StreamKeyType,
TaskStatus,
)
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 51739a2653..7f1b674d10 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -40,7 +40,6 @@ from typing import (
)
import attr
-from typing_extensions import TypedDict
import synapse.events.snapshot
from synapse.api.constants import (
@@ -81,6 +80,8 @@ from synapse.types import (
RoomAlias,
RoomID,
RoomStreamToken,
+ ShutdownRoomParams,
+ ShutdownRoomResponse,
StateMap,
StrCollection,
StreamKeyType,
@@ -1780,63 +1781,6 @@ class RoomEventSource(EventSource[RoomStreamToken, EventBase]):
return self.store.get_current_room_stream_token_for_room_id(room_id)
-class ShutdownRoomParams(TypedDict):
- """
- Attributes:
- requester_user_id:
- User who requested the action. Will be recorded as putting the room on the
- blocking list.
- new_room_user_id:
- If set, a new room will be created with this user ID
- as the creator and admin, and all users in the old room will be
- moved into that room. If not set, no new room will be created
- and the users will just be removed from the old room.
- new_room_name:
- A string representing the name of the room that new users will
- be invited to. Defaults to `Content Violation Notification`
- message:
- A string containing the first message that will be sent as
- `new_room_user_id` in the new room. Ideally this will clearly
- convey why the original room was shut down.
- Defaults to `Sharing illegal content on this server is not
- permitted and rooms in violation will be blocked.`
- block:
- If set to `true`, this room will be added to a blocking list,
- preventing future attempts to join the room. Defaults to `false`.
- purge:
- If set to `true`, purge the given room from the database.
- force_purge:
- If set to `true`, the room will be purged from database
- even if there are still users joined to the room.
- """
-
- requester_user_id: Optional[str]
- new_room_user_id: Optional[str]
- new_room_name: Optional[str]
- message: Optional[str]
- block: bool
- purge: bool
- force_purge: bool
-
-
-class ShutdownRoomResponse(TypedDict):
- """
- Attributes:
- kicked_users: An array of users (`user_id`) that were kicked.
- failed_to_kick_users:
- An array of users (`user_id`) that that were not kicked.
- local_aliases:
- An array of strings representing the local aliases that were
- migrated from the old room to the new.
- new_room_id: A string representing the room ID of the new room.
- """
-
- kicked_users: List[str]
- failed_to_kick_users: List[str]
- local_aliases: List[str]
- new_room_id: Optional[str]
-
-
class RoomShutdownHandler:
DEFAULT_MESSAGE = (
"Sharing illegal content on this server is not permitted and rooms in"
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 1d7d9dfdd0..e815e0ea7f 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -2805,7 +2805,7 @@ class SyncHandler:
continue
leave_token = now_token.copy_and_replace(
- StreamKeyType.ROOM, RoomStreamToken(stream=event.stream_ordering)
+ StreamKeyType.ROOM, RoomStreamToken(stream=event.event_pos.stream)
)
room_entries.append(
RoomSyncResultBuilder(
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index 9fddbb2caf..d8b54dc4e3 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -476,7 +476,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
)
sql = """
- SELECT room_id, e.sender, c.membership, event_id, e.stream_ordering, r.room_version
+ SELECT room_id, e.sender, c.membership, event_id, e.instance_name, e.stream_ordering, r.room_version
FROM local_current_membership AS c
INNER JOIN events AS e USING (room_id, event_id)
INNER JOIN rooms AS r USING (room_id)
@@ -488,7 +488,17 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
)
txn.execute(sql, (user_id, *args))
- results = [RoomsForUser(*r) for r in txn]
+ results = [
+ RoomsForUser(
+ room_id=room_id,
+ sender=sender,
+ membership=membership,
+ event_id=event_id,
+ event_pos=PersistedEventPosition(instance_name, stream_ordering),
+ room_version_id=room_version,
+ )
+ for room_id, sender, membership, event_id, instance_name, stream_ordering, room_version in txn
+ ]
return results
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 7471f81a19..80c9630867 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -35,7 +35,7 @@ class RoomsForUser:
sender: str
membership: str
event_id: str
- stream_ordering: int
+ event_pos: PersistedEventPosition
room_version_id: str
diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py
index 151658df53..3a89787cab 100644
--- a/synapse/types/__init__.py
+++ b/synapse/types/__init__.py
@@ -1279,3 +1279,60 @@ class ScheduledTask:
result: Optional[JsonMapping]
# Optional error that should be assigned a value when the status is FAILED
error: Optional[str]
+
+
+class ShutdownRoomParams(TypedDict):
+ """
+ Attributes:
+ requester_user_id:
+ User who requested the action. Will be recorded as putting the room on the
+ blocking list.
+ new_room_user_id:
+ If set, a new room will be created with this user ID
+ as the creator and admin, and all users in the old room will be
+ moved into that room. If not set, no new room will be created
+ and the users will just be removed from the old room.
+ new_room_name:
+ A string representing the name of the room that new users will
+ be invited to. Defaults to `Content Violation Notification`
+ message:
+ A string containing the first message that will be sent as
+ `new_room_user_id` in the new room. Ideally this will clearly
+ convey why the original room was shut down.
+ Defaults to `Sharing illegal content on this server is not
+ permitted and rooms in violation will be blocked.`
+ block:
+ If set to `true`, this room will be added to a blocking list,
+ preventing future attempts to join the room. Defaults to `false`.
+ purge:
+ If set to `true`, purge the given room from the database.
+ force_purge:
+ If set to `true`, the room will be purged from database
+ even if there are still users joined to the room.
+ """
+
+ requester_user_id: Optional[str]
+ new_room_user_id: Optional[str]
+ new_room_name: Optional[str]
+ message: Optional[str]
+ block: bool
+ purge: bool
+ force_purge: bool
+
+
+class ShutdownRoomResponse(TypedDict):
+ """
+ Attributes:
+ kicked_users: An array of users (`user_id`) that were kicked.
+ failed_to_kick_users:
+ An array of users (`user_id`) that that were not kicked.
+ local_aliases:
+ An array of strings representing the local aliases that were
+ migrated from the old room to the new.
+ new_room_id: A string representing the room ID of the new room.
+ """
+
+ kicked_users: List[str]
+ failed_to_kick_users: List[str]
+ local_aliases: List[str]
+ new_room_id: Optional[str]
diff --git a/tests/replication/storage/test_events.py b/tests/replication/storage/test_events.py
index 86c8f14d1b..4e41a1c912 100644
--- a/tests/replication/storage/test_events.py
+++ b/tests/replication/storage/test_events.py
@@ -154,7 +154,10 @@ class EventsWorkerStoreTestCase(BaseWorkerStoreTestCase):
USER_ID,
"invite",
event.event_id,
- event.internal_metadata.stream_ordering,
+ PersistedEventPosition(
+ self.hs.get_instance_name(),
+ event.internal_metadata.stream_ordering,
+ ),
RoomVersions.V1.identifier,
)
],
|