diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index ca7917c989..1e7949a323 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -278,7 +278,8 @@ class ReceiptsWorkerStore(SQLBaseStore, metaclass=abc.ABCMeta):
async def get_linearized_receipts_for_all_rooms(
self, to_key: int, from_key: Optional[int] = None
) -> Dict[str, JsonDict]:
- """Get receipts for all rooms between two stream_ids.
+ """Get receipts for all rooms between two stream_ids, up
+ to a limit of the latest 100 read receipts.
Args:
to_key: Max stream id to fetch receipts upto.
@@ -294,12 +295,16 @@ class ReceiptsWorkerStore(SQLBaseStore, metaclass=abc.ABCMeta):
sql = """
SELECT * FROM receipts_linearized WHERE
stream_id > ? AND stream_id <= ?
+ ORDER BY stream_id DESC
+ LIMIT 100
"""
txn.execute(sql, [from_key, to_key])
else:
sql = """
SELECT * FROM receipts_linearized WHERE
stream_id <= ?
+ ORDER BY stream_id DESC
+ LIMIT 100
"""
txn.execute(sql, [to_key])
diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index e5d07ce72a..fedb8a6c26 100644
--- a/synapse/storage/databases/main/registration.py
+++ b/synapse/storage/databases/main/registration.py
@@ -1110,6 +1110,7 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore):
token: str,
device_id: Optional[str],
valid_until_ms: Optional[int],
+ puppets_user_id: Optional[str] = None,
) -> int:
"""Adds an access token for the given user.
@@ -1133,6 +1134,7 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore):
"token": token,
"device_id": device_id,
"valid_until_ms": valid_until_ms,
+ "puppets_user_id": puppets_user_id,
},
desc="add_access_token_to_user",
)
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index dc0c4b5499..6b89db15c9 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -1240,13 +1240,15 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
logger.error("store_room with room_id=%s failed: %s", room_id, e)
raise StoreError(500, "Problem creating room.")
- async def maybe_store_room_on_invite(self, room_id: str, room_version: RoomVersion):
+ async def maybe_store_room_on_outlier_membership(
+ self, room_id: str, room_version: RoomVersion
+ ):
"""
- When we receive an invite over federation, store the version of the room if we
- don't already know the room version.
+ When we receive an invite or any other event over federation that may relate to a room
+ we are not in, store the version of the room if we don't already know the room version.
"""
await self.db_pool.simple_upsert(
- desc="maybe_store_room_on_invite",
+ desc="maybe_store_room_on_outlier_membership",
table="rooms",
keyvalues={"room_id": room_id},
values={},
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index 01d9dbb36f..dcdaf09682 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -14,7 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-from typing import TYPE_CHECKING, Dict, FrozenSet, Iterable, List, Optional, Set
+from typing import TYPE_CHECKING, Dict, FrozenSet, Iterable, List, Optional, Set, Tuple
from synapse.api.constants import EventTypes, Membership
from synapse.events import EventBase
@@ -350,6 +350,38 @@ class RoomMemberWorkerStore(EventsWorkerStore):
return results
+ async def get_local_current_membership_for_user_in_room(
+ self, user_id: str, room_id: str
+ ) -> Tuple[Optional[str], Optional[str]]:
+ """Retrieve the current local membership state and event ID for a user in a room.
+
+ Args:
+ user_id: The ID of the user.
+ room_id: The ID of the room.
+
+ Returns:
+ A tuple of (membership_type, event_id). Both will be None if a
+ room_id/user_id pair is not found.
+ """
+ # Paranoia check.
+ if not self.hs.is_mine_id(user_id):
+ raise Exception(
+ "Cannot call 'get_local_current_membership_for_user_in_room' on "
+ "non-local user %s" % (user_id,),
+ )
+
+ results_dict = await self.db_pool.simple_select_one(
+ "local_current_membership",
+ {"room_id": room_id, "user_id": user_id},
+ ("membership", "event_id"),
+ allow_none=True,
+ desc="get_local_current_membership_for_user_in_room",
+ )
+ if not results_dict:
+ return None, None
+
+ return results_dict.get("membership"), results_dict.get("event_id")
+
@cached(max_entries=500000, iterable=True)
async def get_rooms_for_user_with_stream_ordering(
self, user_id: str
|