diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index eadbf4901c..c43f31353b 100644
--- a/synapse/storage/databases/main/registration.py
+++ b/synapse/storage/databases/main/registration.py
@@ -763,16 +763,33 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
txn, self.get_user_by_external_id, (auth_provider, external_id)
)
- self.db_pool.simple_insert_txn(
+ # This INSERT ... ON CONFLICT DO NOTHING statement will cause a
+ # 'could not serialize access due to concurrent update'
+ # if the row is added concurrently by another transaction.
+ # This is exactly what we want, as it makes the transaction get retried
+ # in a new snapshot where we can check for a genuine conflict.
+ was_inserted = self.db_pool.simple_upsert_txn(
txn,
table="user_external_ids",
- values={
- "auth_provider": auth_provider,
- "external_id": external_id,
- "user_id": user_id,
- },
+ keyvalues={"auth_provider": auth_provider, "external_id": external_id},
+ values={},
+ insertion_values={"user_id": user_id},
)
+ if not was_inserted:
+ existing_id = self.db_pool.simple_select_one_onecol_txn(
+ txn,
+ table="user_external_ids",
+ keyvalues={"auth_provider": auth_provider, "user_id": user_id},
+ retcol="external_id",
+ allow_none=True,
+ )
+
+ if existing_id != external_id:
+ raise ExternalIDReuseException(
+ f"{user_id!r} has external id {existing_id!r} for {auth_provider} but trying to add {external_id!r}"
+ )
+
async def remove_user_external_id(
self, auth_provider: str, external_id: str, user_id: str
) -> None:
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index a0a6dcd04e..dfa7dd48d9 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -1622,14 +1622,11 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
sql = """
UPDATE room_memberships
SET participant = true
- WHERE (user_id, room_id) IN (
- SELECT user_id, room_id
- FROM room_memberships
- WHERE user_id = ?
- AND room_id = ?
- ORDER BY event_stream_ordering DESC
- LIMIT 1
+ WHERE event_id IN (
+ SELECT event_id FROM local_current_membership
+ WHERE user_id = ? AND room_id = ?
)
+ AND NOT participant
"""
txn.execute(sql, (user_id, room_id))
@@ -1651,11 +1648,10 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
) -> bool:
sql = """
SELECT participant
- FROM room_memberships
- WHERE user_id = ?
- AND room_id = ?
- ORDER BY event_stream_ordering DESC
- LIMIT 1
+ FROM local_current_membership AS l
+ INNER JOIN room_memberships AS r USING (event_id)
+ WHERE l.user_id = ?
+ AND l.room_id = ?
"""
txn.execute(sql, (user_id, room_id))
res = txn.fetchone()
|