summary refs log tree commit diff
diff options
context:
space:
mode:
authorEric Eastwood <eric.eastwood@beta.gouv.fr>2024-06-04 12:58:03 -0500
committerGitHub <noreply@github.com>2024-06-04 12:58:03 -0500
commit7d8f0ef351e99adf602b3acb67b2516a02ff6918 (patch)
treef8fdbcd3578d0866a236e589062b80360f549a3b
parentMerge branch 'release-v1.109' into develop (diff)
downloadsynapse-7d8f0ef351e99adf602b3acb67b2516a02ff6918.tar.xz
Use fully-qualified `PersistedEventPosition` when returning `RoomsForUser` (#17265)
Use fully-qualified `PersistedEventPosition` (`instance_name` and `stream_ordering`) when returning `RoomsForUser` to facilitate proper comparisons and `RoomStreamToken` generation.

Spawning from https://github.com/element-hq/synapse/pull/17187 where we want to utilize this change
-rw-r--r--changelog.d/17265.misc1
-rw-r--r--synapse/federation/federation_server.py4
-rw-r--r--synapse/handlers/admin.py10
-rw-r--r--synapse/handlers/initial_sync.py2
-rw-r--r--synapse/handlers/pagination.py3
-rw-r--r--synapse/handlers/room.py60
-rw-r--r--synapse/handlers/sync.py2
-rw-r--r--synapse/storage/databases/main/roommember.py14
-rw-r--r--synapse/storage/roommember.py2
-rw-r--r--synapse/types/__init__.py57
-rw-r--r--tests/replication/storage/test_events.py5
11 files changed, 85 insertions, 75 deletions
diff --git a/changelog.d/17265.misc b/changelog.d/17265.misc
new file mode 100644
index 0000000000..e6d4d8b4ee
--- /dev/null
+++ b/changelog.d/17265.misc
@@ -0,0 +1 @@
+Use fully-qualified `PersistedEventPosition` when returning `RoomsForUser` to facilitate proper comparisons and `RoomStreamToken` generation.
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 7ffc650aa1..1932fa82a4 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -674,7 +674,7 @@ class FederationServer(FederationBase):
         # This is in addition to the HS-level rate limiting applied by
         # BaseFederationServlet.
         # type-ignore: mypy doesn't seem able to deduce the type of the limiter(!?)
-        await self._room_member_handler._join_rate_per_room_limiter.ratelimit(  # type: ignore[has-type]
+        await self._room_member_handler._join_rate_per_room_limiter.ratelimit(
             requester=None,
             key=room_id,
             update=False,
@@ -717,7 +717,7 @@ class FederationServer(FederationBase):
             SynapseTags.SEND_JOIN_RESPONSE_IS_PARTIAL_STATE,
             caller_supports_partial_state,
         )
-        await self._room_member_handler._join_rate_per_room_limiter.ratelimit(  # type: ignore[has-type]
+        await self._room_member_handler._join_rate_per_room_limiter.ratelimit(
             requester=None,
             key=room_id,
             update=False,
diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py
index 702d40332c..21d3bb37f3 100644
--- a/synapse/handlers/admin.py
+++ b/synapse/handlers/admin.py
@@ -126,13 +126,7 @@ class AdminHandler:
         # Get all rooms the user is in or has been in
         rooms = await self._store.get_rooms_for_local_user_where_membership_is(
             user_id,
-            membership_list=(
-                Membership.JOIN,
-                Membership.LEAVE,
-                Membership.BAN,
-                Membership.INVITE,
-                Membership.KNOCK,
-            ),
+            membership_list=Membership.LIST,
         )
 
         # We only try and fetch events for rooms the user has been in. If
@@ -179,7 +173,7 @@ class AdminHandler:
             if room.membership == Membership.JOIN:
                 stream_ordering = self._store.get_room_max_stream_ordering()
             else:
-                stream_ordering = room.stream_ordering
+                stream_ordering = room.event_pos.stream
 
             from_key = RoomStreamToken(topological=0, stream=0)
             to_key = RoomStreamToken(stream=stream_ordering)
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index d99fc4bec0..84d6fecf31 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -199,7 +199,7 @@ class InitialSyncHandler:
                     )
                 elif event.membership == Membership.LEAVE:
                     room_end_token = RoomStreamToken(
-                        stream=event.stream_ordering,
+                        stream=event.event_pos.stream,
                     )
                     deferred_room_state = run_in_background(
                         self._state_storage_controller.get_state_for_events,
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 6617105cdb..f7447b8ba5 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -27,7 +27,6 @@ from synapse.api.constants import Direction, EventTypes, Membership
 from synapse.api.errors import SynapseError
 from synapse.api.filtering import Filter
 from synapse.events.utils import SerializeEventConfig
-from synapse.handlers.room import ShutdownRoomParams, ShutdownRoomResponse
 from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME
 from synapse.logging.opentracing import trace
 from synapse.metrics.background_process_metrics import run_as_background_process
@@ -38,6 +37,8 @@ from synapse.types import (
     JsonMapping,
     Requester,
     ScheduledTask,
+    ShutdownRoomParams,
+    ShutdownRoomResponse,
     StreamKeyType,
     TaskStatus,
 )
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 51739a2653..7f1b674d10 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -40,7 +40,6 @@ from typing import (
 )
 
 import attr
-from typing_extensions import TypedDict
 
 import synapse.events.snapshot
 from synapse.api.constants import (
@@ -81,6 +80,8 @@ from synapse.types import (
     RoomAlias,
     RoomID,
     RoomStreamToken,
+    ShutdownRoomParams,
+    ShutdownRoomResponse,
     StateMap,
     StrCollection,
     StreamKeyType,
@@ -1780,63 +1781,6 @@ class RoomEventSource(EventSource[RoomStreamToken, EventBase]):
         return self.store.get_current_room_stream_token_for_room_id(room_id)
 
 
-class ShutdownRoomParams(TypedDict):
-    """
-    Attributes:
-        requester_user_id:
-            User who requested the action. Will be recorded as putting the room on the
-            blocking list.
-        new_room_user_id:
-            If set, a new room will be created with this user ID
-            as the creator and admin, and all users in the old room will be
-            moved into that room. If not set, no new room will be created
-            and the users will just be removed from the old room.
-        new_room_name:
-            A string representing the name of the room that new users will
-            be invited to. Defaults to `Content Violation Notification`
-        message:
-            A string containing the first message that will be sent as
-            `new_room_user_id` in the new room. Ideally this will clearly
-            convey why the original room was shut down.
-            Defaults to `Sharing illegal content on this server is not
-            permitted and rooms in violation will be blocked.`
-        block:
-            If set to `true`, this room will be added to a blocking list,
-            preventing future attempts to join the room. Defaults to `false`.
-        purge:
-            If set to `true`, purge the given room from the database.
-        force_purge:
-            If set to `true`, the room will be purged from database
-            even if there are still users joined to the room.
-    """
-
-    requester_user_id: Optional[str]
-    new_room_user_id: Optional[str]
-    new_room_name: Optional[str]
-    message: Optional[str]
-    block: bool
-    purge: bool
-    force_purge: bool
-
-
-class ShutdownRoomResponse(TypedDict):
-    """
-    Attributes:
-        kicked_users: An array of users (`user_id`) that were kicked.
-        failed_to_kick_users:
-            An array of users (`user_id`) that that were not kicked.
-        local_aliases:
-            An array of strings representing the local aliases that were
-            migrated from the old room to the new.
-        new_room_id: A string representing the room ID of the new room.
-    """
-
-    kicked_users: List[str]
-    failed_to_kick_users: List[str]
-    local_aliases: List[str]
-    new_room_id: Optional[str]
-
-
 class RoomShutdownHandler:
     DEFAULT_MESSAGE = (
         "Sharing illegal content on this server is not permitted and rooms in"
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 1d7d9dfdd0..e815e0ea7f 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -2805,7 +2805,7 @@ class SyncHandler:
                             continue
 
                 leave_token = now_token.copy_and_replace(
-                    StreamKeyType.ROOM, RoomStreamToken(stream=event.stream_ordering)
+                    StreamKeyType.ROOM, RoomStreamToken(stream=event.event_pos.stream)
                 )
                 room_entries.append(
                     RoomSyncResultBuilder(
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index 9fddbb2caf..d8b54dc4e3 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -476,7 +476,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
         )
 
         sql = """
-            SELECT room_id, e.sender, c.membership, event_id, e.stream_ordering, r.room_version
+            SELECT room_id, e.sender, c.membership, event_id, e.instance_name, e.stream_ordering, r.room_version
             FROM local_current_membership AS c
             INNER JOIN events AS e USING (room_id, event_id)
             INNER JOIN rooms AS r USING (room_id)
@@ -488,7 +488,17 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
         )
 
         txn.execute(sql, (user_id, *args))
-        results = [RoomsForUser(*r) for r in txn]
+        results = [
+            RoomsForUser(
+                room_id=room_id,
+                sender=sender,
+                membership=membership,
+                event_id=event_id,
+                event_pos=PersistedEventPosition(instance_name, stream_ordering),
+                room_version_id=room_version,
+            )
+            for room_id, sender, membership, event_id, instance_name, stream_ordering, room_version in txn
+        ]
 
         return results
 
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 7471f81a19..80c9630867 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -35,7 +35,7 @@ class RoomsForUser:
     sender: str
     membership: str
     event_id: str
-    stream_ordering: int
+    event_pos: PersistedEventPosition
     room_version_id: str
 
 
diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py
index 151658df53..3a89787cab 100644
--- a/synapse/types/__init__.py
+++ b/synapse/types/__init__.py
@@ -1279,3 +1279,60 @@ class ScheduledTask:
     result: Optional[JsonMapping]
     # Optional error that should be assigned a value when the status is FAILED
     error: Optional[str]
+
+
+class ShutdownRoomParams(TypedDict):
+    """
+    Attributes:
+        requester_user_id:
+            User who requested the action. Will be recorded as putting the room on the
+            blocking list.
+        new_room_user_id:
+            If set, a new room will be created with this user ID
+            as the creator and admin, and all users in the old room will be
+            moved into that room. If not set, no new room will be created
+            and the users will just be removed from the old room.
+        new_room_name:
+            A string representing the name of the room that new users will
+            be invited to. Defaults to `Content Violation Notification`
+        message:
+            A string containing the first message that will be sent as
+            `new_room_user_id` in the new room. Ideally this will clearly
+            convey why the original room was shut down.
+            Defaults to `Sharing illegal content on this server is not
+            permitted and rooms in violation will be blocked.`
+        block:
+            If set to `true`, this room will be added to a blocking list,
+            preventing future attempts to join the room. Defaults to `false`.
+        purge:
+            If set to `true`, purge the given room from the database.
+        force_purge:
+            If set to `true`, the room will be purged from database
+            even if there are still users joined to the room.
+    """
+
+    requester_user_id: Optional[str]
+    new_room_user_id: Optional[str]
+    new_room_name: Optional[str]
+    message: Optional[str]
+    block: bool
+    purge: bool
+    force_purge: bool
+
+
+class ShutdownRoomResponse(TypedDict):
+    """
+    Attributes:
+        kicked_users: An array of users (`user_id`) that were kicked.
+        failed_to_kick_users:
+            An array of users (`user_id`) that that were not kicked.
+        local_aliases:
+            An array of strings representing the local aliases that were
+            migrated from the old room to the new.
+        new_room_id: A string representing the room ID of the new room.
+    """
+
+    kicked_users: List[str]
+    failed_to_kick_users: List[str]
+    local_aliases: List[str]
+    new_room_id: Optional[str]
diff --git a/tests/replication/storage/test_events.py b/tests/replication/storage/test_events.py
index 86c8f14d1b..4e41a1c912 100644
--- a/tests/replication/storage/test_events.py
+++ b/tests/replication/storage/test_events.py
@@ -154,7 +154,10 @@ class EventsWorkerStoreTestCase(BaseWorkerStoreTestCase):
                     USER_ID,
                     "invite",
                     event.event_id,
-                    event.internal_metadata.stream_ordering,
+                    PersistedEventPosition(
+                        self.hs.get_instance_name(),
+                        event.internal_metadata.stream_ordering,
+                    ),
                     RoomVersions.V1.identifier,
                 )
             ],