diff --git a/synapse/storage/controllers/state.py b/synapse/storage/controllers/state.py
index 5964835ea3..19d9d315f6 100644
--- a/synapse/storage/controllers/state.py
+++ b/synapse/storage/controllers/state.py
@@ -23,7 +23,6 @@ from typing import (
List,
Mapping,
Optional,
- Set,
Tuple,
)
@@ -37,6 +36,7 @@ from synapse.storage.util.partial_state_events_tracker import (
PartialStateEventsTracker,
)
from synapse.types import MutableStateMap, StateMap
+from synapse.util.cancellation import cancellable
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -230,6 +230,7 @@ class StateStorageController:
@trace
@tag_args
+ @cancellable
async def get_state_ids_for_events(
self,
event_ids: Collection[str],
@@ -351,6 +352,7 @@ class StateStorageController:
@trace
@tag_args
+ @cancellable
async def get_state_group_for_events(
self,
event_ids: Collection[str],
@@ -399,6 +401,7 @@ class StateStorageController:
event_id, room_id, prev_group, delta_ids, current_state_ids
)
+ @cancellable
async def get_current_state_ids(
self,
room_id: str,
@@ -520,7 +523,7 @@ class StateStorageController:
)
return state_map.get(key)
- async def get_current_hosts_in_room(self, room_id: str) -> Set[str]:
+ async def get_current_hosts_in_room(self, room_id: str) -> List[str]:
"""Get current hosts in room based on current state."""
await self._partial_state_room_tracker.await_full_state(room_id)
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index ca0f606797..15d5da3fa5 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -533,15 +533,14 @@ class DatabasePool:
if isinstance(self.engine, Sqlite3Engine):
self._unsafe_to_upsert_tables.add("user_directory_search")
- if self.engine.can_native_upsert:
- # Check ASAP (and then later, every 1s) to see if we have finished
- # background updates of tables that aren't safe to update.
- self._clock.call_later(
- 0.0,
- run_as_background_process,
- "upsert_safety_check",
- self._check_safe_to_upsert,
- )
+ # Check ASAP (and then later, every 1s) to see if we have finished
+ # background updates of tables that aren't safe to update.
+ self._clock.call_later(
+ 0.0,
+ run_as_background_process,
+ "upsert_safety_check",
+ self._check_safe_to_upsert,
+ )
def name(self) -> str:
"Return the name of this database"
@@ -1156,11 +1155,8 @@ class DatabasePool:
attempts = 0
while True:
try:
- # We can autocommit if we are going to use native upserts
- autocommit = (
- self.engine.can_native_upsert
- and table not in self._unsafe_to_upsert_tables
- )
+ # We can autocommit if it is safe to upsert
+ autocommit = table not in self._unsafe_to_upsert_tables
return await self.runInteraction(
desc,
@@ -1195,7 +1191,7 @@ class DatabasePool:
) -> bool:
"""
Pick the UPSERT method which works best on the platform. Either the
- native one (Pg9.5+, recent SQLites), or fall back to an emulated method.
+ native one (Pg9.5+, SQLite >= 3.24), or fall back to an emulated method.
Args:
txn: The transaction to use.
@@ -1203,14 +1199,15 @@ class DatabasePool:
keyvalues: The unique key tables and their new values
values: The nonunique columns and their new values
insertion_values: additional key/values to use only when inserting
- lock: True to lock the table when doing the upsert.
+ lock: True to lock the table when doing the upsert. Unused when performing
+ a native upsert.
Returns:
Returns True if a row was inserted or updated (i.e. if `values` is
not empty then this always returns True)
"""
insertion_values = insertion_values or {}
- if self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables:
+ if table not in self._unsafe_to_upsert_tables:
return self.simple_upsert_txn_native_upsert(
txn, table, keyvalues, values, insertion_values=insertion_values
)
@@ -1361,14 +1358,12 @@ class DatabasePool:
value_names: The value column names
value_values: A list of each row's value column values.
Ignored if value_names is empty.
- lock: True to lock the table when doing the upsert. Unused if the database engine
- supports native upserts.
+ lock: True to lock the table when doing the upsert. Unused when performing
+ a native upsert.
"""
- # We can autocommit if we are going to use native upserts
- autocommit = (
- self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables
- )
+ # We can autocommit if it safe to upsert
+ autocommit = table not in self._unsafe_to_upsert_tables
await self.runInteraction(
desc,
@@ -1402,10 +1397,10 @@ class DatabasePool:
value_names: The value column names
value_values: A list of each row's value column values.
Ignored if value_names is empty.
- lock: True to lock the table when doing the upsert. Unused if the database engine
- supports native upserts.
+ lock: True to lock the table when doing the upsert. Unused when performing
+ a native upsert.
"""
- if self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables:
+ if table not in self._unsafe_to_upsert_tables:
return self.simple_upsert_many_txn_native_upsert(
txn, table, key_names, key_values, value_names, value_values
)
diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py
index 9af9f4f18e..c38b8a9e5a 100644
--- a/synapse/storage/databases/main/account_data.py
+++ b/synapse/storage/databases/main/account_data.py
@@ -650,9 +650,6 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
txn, self.get_account_data_for_room, (user_id,)
)
self._invalidate_cache_and_stream(txn, self.get_push_rules_for_user, (user_id,))
- self._invalidate_cache_and_stream(
- txn, self.get_push_rules_enabled_for_user, (user_id,)
- )
# This user might be contained in the ignored_by cache for other users,
# so we have to invalidate it all.
self._invalidate_all_cache_and_stream(txn, self.ignored_by)
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index 7ceb7a202b..dfca34550d 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -53,6 +53,7 @@ from synapse.util import json_decoder, json_encoder
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.lrucache import LruCache
from synapse.util.caches.stream_change_cache import StreamChangeCache
+from synapse.util.cancellation import cancellable
from synapse.util.iterutils import batch_iter
from synapse.util.stringutils import shortstr
@@ -668,6 +669,7 @@ class DeviceWorkerStore(EndToEndKeyWorkerStore):
...
@trace
+ @cancellable
async def get_user_devices_from_cache(
self, query_list: List[Tuple[str, Optional[str]]]
) -> Tuple[Set[str], Dict[str, Dict[str, JsonDict]]]:
@@ -743,6 +745,7 @@ class DeviceWorkerStore(EndToEndKeyWorkerStore):
return self._device_list_stream_cache.get_all_entities_changed(from_key)
+ @cancellable
async def get_users_whose_devices_changed(
self,
from_key: int,
@@ -1221,6 +1224,7 @@ class DeviceWorkerStore(EndToEndKeyWorkerStore):
desc="get_min_device_lists_changes_in_room",
)
+ @cancellable
async def get_device_list_changes_in_rooms(
self, room_ids: Collection[str], from_id: int
) -> Optional[Set[str]]:
diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py
index 2df8101390..210cfab073 100644
--- a/synapse/storage/databases/main/end_to_end_keys.py
+++ b/synapse/storage/databases/main/end_to_end_keys.py
@@ -50,6 +50,7 @@ from synapse.storage.util.id_generators import StreamIdGenerator
from synapse.types import JsonDict
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached, cachedList
+from synapse.util.cancellation import cancellable
from synapse.util.iterutils import batch_iter
if TYPE_CHECKING:
@@ -135,6 +136,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
return now_stream_id, []
@trace
+ @cancellable
async def get_e2e_device_keys_for_cs_api(
self, query_list: List[Tuple[str, Optional[str]]]
) -> Dict[str, Dict[str, JsonDict]]:
@@ -197,6 +199,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
...
@trace
+ @cancellable
async def get_e2e_device_keys_and_signatures(
self,
query_list: Collection[Tuple[str, Optional[str]]],
@@ -887,6 +890,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
return keys
+ @cancellable
async def get_e2e_cross_signing_keys_bulk(
self, user_ids: List[str], from_user_id: Optional[str] = None
) -> Dict[str, Optional[Dict[str, JsonDict]]]:
@@ -902,7 +906,6 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
keys were not found, either their user ID will not be in the dict,
or their user ID will map to None.
"""
-
result = await self._get_bare_e2e_cross_signing_keys_bulk(user_ids)
if from_user_id:
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index 41b015dba1..271e7a71c1 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -48,6 +48,7 @@ from synapse.types import JsonDict
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached
from synapse.util.caches.lrucache import LruCache
+from synapse.util.cancellation import cancellable
from synapse.util.iterutils import batch_iter
if TYPE_CHECKING:
@@ -976,6 +977,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
return int(min_depth) if min_depth is not None else None
+ @cancellable
async def get_forward_extremities_for_room_at_stream_ordering(
self, room_id: str, stream_ordering: int
) -> List[str]:
@@ -1606,7 +1608,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
logger.info("Invalid prev_events for %s", event_id)
continue
- if room_version.event_format == EventFormatVersions.V1:
+ if room_version.event_format == EventFormatVersions.ROOM_V1_V2:
for prev_event_tuple in prev_events:
if (
not isinstance(prev_event_tuple, list)
diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index eabf9c9739..f4a07de2a3 100644
--- a/synapse/storage/databases/main/event_push_actions.py
+++ b/synapse/storage/databases/main/event_push_actions.py
@@ -274,7 +274,6 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
receipt_types=(
ReceiptTypes.READ,
ReceiptTypes.READ_PRIVATE,
- ReceiptTypes.UNSTABLE_READ_PRIVATE,
),
)
@@ -459,6 +458,31 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
return await self.db_pool.runInteraction("get_push_action_users_in_range", f)
+ def _get_receipts_by_room_txn(
+ self, txn: LoggingTransaction, user_id: str
+ ) -> List[Tuple[str, int]]:
+ receipt_types_clause, args = make_in_list_sql_clause(
+ self.database_engine,
+ "receipt_type",
+ (
+ ReceiptTypes.READ,
+ ReceiptTypes.READ_PRIVATE,
+ ),
+ )
+
+ sql = f"""
+ SELECT room_id, MAX(stream_ordering)
+ FROM receipts_linearized
+ INNER JOIN events USING (room_id, event_id)
+ WHERE {receipt_types_clause}
+ AND user_id = ?
+ GROUP BY room_id
+ """
+
+ args.extend((user_id,))
+ txn.execute(sql, args)
+ return cast(List[Tuple[str, int]], txn.fetchall())
+
async def get_unread_push_actions_for_user_in_range_for_http(
self,
user_id: str,
@@ -482,106 +506,45 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
The list will have between 0~limit entries.
"""
- # find rooms that have a read receipt in them and return the next
- # push actions
- def get_after_receipt(
- txn: LoggingTransaction,
- ) -> List[Tuple[str, str, int, str, bool]]:
- # find rooms that have a read receipt in them and return the next
- # push actions
-
- receipt_types_clause, args = make_in_list_sql_clause(
- self.database_engine,
- "receipt_type",
- (
- ReceiptTypes.READ,
- ReceiptTypes.READ_PRIVATE,
- ReceiptTypes.UNSTABLE_READ_PRIVATE,
- ),
- )
-
- sql = f"""
- SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,
- ep.highlight
- FROM (
- SELECT room_id,
- MAX(stream_ordering) as stream_ordering
- FROM events
- INNER JOIN receipts_linearized USING (room_id, event_id)
- WHERE {receipt_types_clause} AND user_id = ?
- GROUP BY room_id
- ) AS rl,
- event_push_actions AS ep
- WHERE
- ep.room_id = rl.room_id
- AND ep.stream_ordering > rl.stream_ordering
- AND ep.user_id = ?
- AND ep.stream_ordering > ?
- AND ep.stream_ordering <= ?
- AND ep.notif = 1
- ORDER BY ep.stream_ordering ASC LIMIT ?
- """
- args.extend(
- (user_id, user_id, min_stream_ordering, max_stream_ordering, limit)
- )
- txn.execute(sql, args)
- return cast(List[Tuple[str, str, int, str, bool]], txn.fetchall())
-
- after_read_receipt = await self.db_pool.runInteraction(
- "get_unread_push_actions_for_user_in_range_http_arr", get_after_receipt
+ receipts_by_room = dict(
+ await self.db_pool.runInteraction(
+ "get_unread_push_actions_for_user_in_range_http_receipts",
+ self._get_receipts_by_room_txn,
+ user_id=user_id,
+ ),
)
- # There are rooms with push actions in them but you don't have a read receipt in
- # them e.g. rooms you've been invited to, so get push actions for rooms which do
- # not have read receipts in them too.
- def get_no_receipt(
+ def get_push_actions_txn(
txn: LoggingTransaction,
) -> List[Tuple[str, str, int, str, bool]]:
- receipt_types_clause, args = make_in_list_sql_clause(
- self.database_engine,
- "receipt_type",
- (
- ReceiptTypes.READ,
- ReceiptTypes.READ_PRIVATE,
- ReceiptTypes.UNSTABLE_READ_PRIVATE,
- ),
- )
-
- sql = f"""
- SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,
- ep.highlight
+ sql = """
+ SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, ep.highlight
FROM event_push_actions AS ep
- INNER JOIN events AS e USING (room_id, event_id)
WHERE
- ep.room_id NOT IN (
- SELECT room_id FROM receipts_linearized
- WHERE {receipt_types_clause} AND user_id = ?
- GROUP BY room_id
- )
- AND ep.user_id = ?
+ ep.user_id = ?
AND ep.stream_ordering > ?
AND ep.stream_ordering <= ?
AND ep.notif = 1
ORDER BY ep.stream_ordering ASC LIMIT ?
"""
- args.extend(
- (user_id, user_id, min_stream_ordering, max_stream_ordering, limit)
- )
- txn.execute(sql, args)
+ txn.execute(sql, (user_id, min_stream_ordering, max_stream_ordering, limit))
return cast(List[Tuple[str, str, int, str, bool]], txn.fetchall())
- no_read_receipt = await self.db_pool.runInteraction(
- "get_unread_push_actions_for_user_in_range_http_nrr", get_no_receipt
+ push_actions = await self.db_pool.runInteraction(
+ "get_unread_push_actions_for_user_in_range_http", get_push_actions_txn
)
notifs = [
HttpPushAction(
- event_id=row[0],
- room_id=row[1],
- stream_ordering=row[2],
- actions=_deserialize_action(row[3], row[4]),
+ event_id=event_id,
+ room_id=room_id,
+ stream_ordering=stream_ordering,
+ actions=_deserialize_action(actions, highlight),
)
- for row in after_read_receipt + no_read_receipt
+ for event_id, room_id, stream_ordering, actions, highlight in push_actions
+ # Only include push actions with a stream ordering after any receipt, or without any
+ # receipt present (invited to but never read rooms).
+ if stream_ordering > receipts_by_room.get(room_id, 0)
]
# Now sort it so it's ordered correctly, since currently it will
@@ -617,106 +580,49 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
The list will have between 0~limit entries.
"""
- # find rooms that have a read receipt in them and return the most recent
- # push actions
- def get_after_receipt(
- txn: LoggingTransaction,
- ) -> List[Tuple[str, str, int, str, bool, int]]:
- receipt_types_clause, args = make_in_list_sql_clause(
- self.database_engine,
- "receipt_type",
- (
- ReceiptTypes.READ,
- ReceiptTypes.READ_PRIVATE,
- ReceiptTypes.UNSTABLE_READ_PRIVATE,
- ),
- )
-
- sql = f"""
- SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,
- ep.highlight, e.received_ts
- FROM (
- SELECT room_id,
- MAX(stream_ordering) as stream_ordering
- FROM events
- INNER JOIN receipts_linearized USING (room_id, event_id)
- WHERE {receipt_types_clause} AND user_id = ?
- GROUP BY room_id
- ) AS rl,
- event_push_actions AS ep
- INNER JOIN events AS e USING (room_id, event_id)
- WHERE
- ep.room_id = rl.room_id
- AND ep.stream_ordering > rl.stream_ordering
- AND ep.user_id = ?
- AND ep.stream_ordering > ?
- AND ep.stream_ordering <= ?
- AND ep.notif = 1
- ORDER BY ep.stream_ordering DESC LIMIT ?
- """
- args.extend(
- (user_id, user_id, min_stream_ordering, max_stream_ordering, limit)
- )
- txn.execute(sql, args)
- return cast(List[Tuple[str, str, int, str, bool, int]], txn.fetchall())
-
- after_read_receipt = await self.db_pool.runInteraction(
- "get_unread_push_actions_for_user_in_range_email_arr", get_after_receipt
+ receipts_by_room = dict(
+ await self.db_pool.runInteraction(
+ "get_unread_push_actions_for_user_in_range_email_receipts",
+ self._get_receipts_by_room_txn,
+ user_id=user_id,
+ ),
)
- # There are rooms with push actions in them but you don't have a read receipt in
- # them e.g. rooms you've been invited to, so get push actions for rooms which do
- # not have read receipts in them too.
- def get_no_receipt(
+ def get_push_actions_txn(
txn: LoggingTransaction,
) -> List[Tuple[str, str, int, str, bool, int]]:
- receipt_types_clause, args = make_in_list_sql_clause(
- self.database_engine,
- "receipt_type",
- (
- ReceiptTypes.READ,
- ReceiptTypes.READ_PRIVATE,
- ReceiptTypes.UNSTABLE_READ_PRIVATE,
- ),
- )
-
- sql = f"""
+ sql = """
SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,
ep.highlight, e.received_ts
FROM event_push_actions AS ep
INNER JOIN events AS e USING (room_id, event_id)
WHERE
- ep.room_id NOT IN (
- SELECT room_id FROM receipts_linearized
- WHERE {receipt_types_clause} AND user_id = ?
- GROUP BY room_id
- )
- AND ep.user_id = ?
+ ep.user_id = ?
AND ep.stream_ordering > ?
AND ep.stream_ordering <= ?
AND ep.notif = 1
ORDER BY ep.stream_ordering DESC LIMIT ?
"""
- args.extend(
- (user_id, user_id, min_stream_ordering, max_stream_ordering, limit)
- )
- txn.execute(sql, args)
+ txn.execute(sql, (user_id, min_stream_ordering, max_stream_ordering, limit))
return cast(List[Tuple[str, str, int, str, bool, int]], txn.fetchall())
- no_read_receipt = await self.db_pool.runInteraction(
- "get_unread_push_actions_for_user_in_range_email_nrr", get_no_receipt
+ push_actions = await self.db_pool.runInteraction(
+ "get_unread_push_actions_for_user_in_range_email", get_push_actions_txn
)
# Make a list of dicts from the two sets of results.
notifs = [
EmailPushAction(
- event_id=row[0],
- room_id=row[1],
- stream_ordering=row[2],
- actions=_deserialize_action(row[3], row[4]),
- received_ts=row[5],
+ event_id=event_id,
+ room_id=room_id,
+ stream_ordering=stream_ordering,
+ actions=_deserialize_action(actions, highlight),
+ received_ts=received_ts,
)
- for row in after_read_receipt + no_read_receipt
+ for event_id, room_id, stream_ordering, actions, highlight, received_ts in push_actions
+ # Only include push actions with a stream ordering after any receipt, or without any
+ # receipt present (invited to but never read rooms).
+ if stream_ordering > receipts_by_room.get(room_id, 0)
]
# Now sort it so it's ordered correctly, since currently it will
@@ -792,26 +698,14 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
int(count_as_unread), # unread column
)
- def _add_push_actions_to_staging_txn(txn: LoggingTransaction) -> None:
- # We don't use simple_insert_many here to avoid the overhead
- # of generating lists of dicts.
-
- sql = """
- INSERT INTO event_push_actions_staging
- (event_id, user_id, actions, notif, highlight, unread)
- VALUES (?, ?, ?, ?, ?, ?)
- """
-
- txn.execute_batch(
- sql,
- (
- _gen_entry(user_id, actions)
- for user_id, actions in user_id_actions.items()
- ),
- )
-
- return await self.db_pool.runInteraction(
- "add_push_actions_to_staging", _add_push_actions_to_staging_txn
+ await self.db_pool.simple_insert_many(
+ "event_push_actions_staging",
+ keys=("event_id", "user_id", "actions", "notif", "highlight", "unread"),
+ values=[
+ _gen_entry(user_id, actions)
+ for user_id, actions in user_id_actions.items()
+ ],
+ desc="add_push_actions_to_staging",
)
async def remove_push_actions_from_staging(self, event_id: str) -> None:
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 90e6d82058..9f6b1fcef1 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -81,6 +81,7 @@ from synapse.util import unwrapFirstError
from synapse.util.async_helpers import ObservableDeferred, delay_cancellation
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.lrucache import AsyncLruCache
+from synapse.util.cancellation import cancellable
from synapse.util.iterutils import batch_iter
from synapse.util.metrics import Measure
@@ -339,6 +340,7 @@ class EventsWorkerStore(SQLBaseStore):
) -> Optional[EventBase]:
...
+ @cancellable
async def get_event(
self,
event_id: str,
@@ -433,6 +435,7 @@ class EventsWorkerStore(SQLBaseStore):
@trace
@tag_args
+ @cancellable
async def get_events_as_list(
self,
event_ids: Collection[str],
@@ -584,6 +587,7 @@ class EventsWorkerStore(SQLBaseStore):
return events
+ @cancellable
async def _get_events_from_cache_or_db(
self, event_ids: Iterable[str], allow_rejected: bool = False
) -> Dict[str, EventCacheEntry]:
@@ -1156,7 +1160,7 @@ class EventsWorkerStore(SQLBaseStore):
if format_version is None:
# This means that we stored the event before we had the concept
# of a event format version, so it must be a V1 event.
- format_version = EventFormatVersions.V1
+ format_version = EventFormatVersions.ROOM_V1_V2
room_version_id = row.room_version_id
@@ -1186,10 +1190,10 @@ class EventsWorkerStore(SQLBaseStore):
#
# So, the following approximations should be adequate.
- if format_version == EventFormatVersions.V1:
+ if format_version == EventFormatVersions.ROOM_V1_V2:
# if it's event format v1 then it must be room v1 or v2
room_version = RoomVersions.V1
- elif format_version == EventFormatVersions.V2:
+ elif format_version == EventFormatVersions.ROOM_V3:
# if it's event format v2 then it must be room v3
room_version = RoomVersions.V3
else:
@@ -2111,7 +2115,14 @@ class EventsWorkerStore(SQLBaseStore):
AND room_id = ?
/* Make sure event is not rejected */
AND rejections.event_id IS NULL
- ORDER BY origin_server_ts %s
+ /**
+ * First sort by the message timestamp. If the message timestamps are the
+ * same, we want the message that logically comes "next" (before/after
+ * the given timestamp) based on the DAG and its topological order (`depth`).
+ * Finally, we can tie-break based on when it was received on the server
+ * (`stream_ordering`).
+ */
+ ORDER BY origin_server_ts %s, depth %s, stream_ordering %s
LIMIT 1;
"""
@@ -2130,7 +2141,8 @@ class EventsWorkerStore(SQLBaseStore):
order = "ASC"
txn.execute(
- sql_template % (comparison_operator, order), (timestamp, room_id)
+ sql_template % (comparison_operator, order, order, order),
+ (timestamp, room_id),
)
row = txn.fetchone()
if row:
diff --git a/synapse/storage/databases/main/lock.py b/synapse/storage/databases/main/lock.py
index 2d7633fbd5..7270ef09da 100644
--- a/synapse/storage/databases/main/lock.py
+++ b/synapse/storage/databases/main/lock.py
@@ -129,91 +129,48 @@ class LockStore(SQLBaseStore):
now = self._clock.time_msec()
token = random_string(6)
- if self.db_pool.engine.can_native_upsert:
-
- def _try_acquire_lock_txn(txn: LoggingTransaction) -> bool:
- # We take out the lock if either a) there is no row for the lock
- # already, b) the existing row has timed out, or c) the row is
- # for this instance (which means the process got killed and
- # restarted)
- sql = """
- INSERT INTO worker_locks (lock_name, lock_key, instance_name, token, last_renewed_ts)
- VALUES (?, ?, ?, ?, ?)
- ON CONFLICT (lock_name, lock_key)
- DO UPDATE
- SET
- token = EXCLUDED.token,
- instance_name = EXCLUDED.instance_name,
- last_renewed_ts = EXCLUDED.last_renewed_ts
- WHERE
- worker_locks.last_renewed_ts < ?
- OR worker_locks.instance_name = EXCLUDED.instance_name
- """
- txn.execute(
- sql,
- (
- lock_name,
- lock_key,
- self._instance_name,
- token,
- now,
- now - _LOCK_TIMEOUT_MS,
- ),
- )
-
- # We only acquired the lock if we inserted or updated the table.
- return bool(txn.rowcount)
-
- did_lock = await self.db_pool.runInteraction(
- "try_acquire_lock",
- _try_acquire_lock_txn,
- # We can autocommit here as we're executing a single query, this
- # will avoid serialization errors.
- db_autocommit=True,
+ def _try_acquire_lock_txn(txn: LoggingTransaction) -> bool:
+ # We take out the lock if either a) there is no row for the lock
+ # already, b) the existing row has timed out, or c) the row is
+ # for this instance (which means the process got killed and
+ # restarted)
+ sql = """
+ INSERT INTO worker_locks (lock_name, lock_key, instance_name, token, last_renewed_ts)
+ VALUES (?, ?, ?, ?, ?)
+ ON CONFLICT (lock_name, lock_key)
+ DO UPDATE
+ SET
+ token = EXCLUDED.token,
+ instance_name = EXCLUDED.instance_name,
+ last_renewed_ts = EXCLUDED.last_renewed_ts
+ WHERE
+ worker_locks.last_renewed_ts < ?
+ OR worker_locks.instance_name = EXCLUDED.instance_name
+ """
+ txn.execute(
+ sql,
+ (
+ lock_name,
+ lock_key,
+ self._instance_name,
+ token,
+ now,
+ now - _LOCK_TIMEOUT_MS,
+ ),
)
- if not did_lock:
- return None
-
- else:
- # If we're on an old SQLite we emulate the above logic by first
- # clearing out any existing stale locks and then upserting.
-
- def _try_acquire_lock_emulated_txn(txn: LoggingTransaction) -> bool:
- sql = """
- DELETE FROM worker_locks
- WHERE
- lock_name = ?
- AND lock_key = ?
- AND (last_renewed_ts < ? OR instance_name = ?)
- """
- txn.execute(
- sql,
- (lock_name, lock_key, now - _LOCK_TIMEOUT_MS, self._instance_name),
- )
-
- inserted = self.db_pool.simple_upsert_txn_emulated(
- txn,
- table="worker_locks",
- keyvalues={
- "lock_name": lock_name,
- "lock_key": lock_key,
- },
- values={},
- insertion_values={
- "token": token,
- "last_renewed_ts": self._clock.time_msec(),
- "instance_name": self._instance_name,
- },
- )
-
- return inserted
- did_lock = await self.db_pool.runInteraction(
- "try_acquire_lock_emulated", _try_acquire_lock_emulated_txn
- )
+ # We only acquired the lock if we inserted or updated the table.
+ return bool(txn.rowcount)
- if not did_lock:
- return None
+ did_lock = await self.db_pool.runInteraction(
+ "try_acquire_lock",
+ _try_acquire_lock_txn,
+ # We can autocommit here as we're executing a single query, this
+ # will avoid serialization errors.
+ db_autocommit=True,
+ )
+ if not did_lock:
+ return None
lock = Lock(
self._reactor,
diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py
index 255620f996..5079edd1e0 100644
--- a/synapse/storage/databases/main/push_rule.py
+++ b/synapse/storage/databases/main/push_rule.py
@@ -165,7 +165,6 @@ class PushRulesWorkerStore(
return _load_rules(rows, enabled_map, self.hs.config.experimental)
- @cached(max_entries=5000)
async def get_push_rules_enabled_for_user(self, user_id: str) -> Dict[str, bool]:
results = await self.db_pool.simple_select_list(
table="push_rules_enable",
@@ -229,9 +228,6 @@ class PushRulesWorkerStore(
return results
- @cachedList(
- cached_method_name="get_push_rules_enabled_for_user", list_name="user_ids"
- )
async def bulk_get_push_rules_enabled(
self, user_ids: Collection[str]
) -> Dict[str, Dict[str, bool]]:
@@ -246,6 +242,7 @@ class PushRulesWorkerStore(
iterable=user_ids,
retcols=("user_name", "rule_id", "enabled"),
desc="bulk_get_push_rules_enabled",
+ batch_size=1000,
)
for row in rows:
enabled = bool(row["enabled"])
@@ -792,7 +789,6 @@ class PushRuleStore(PushRulesWorkerStore):
self.db_pool.simple_insert_txn(txn, "push_rules_stream", values=values)
txn.call_after(self.get_push_rules_for_user.invalidate, (user_id,))
- txn.call_after(self.get_push_rules_enabled_for_user.invalidate, (user_id,))
txn.call_after(
self.push_rules_stream_cache.entity_has_changed, user_id, stream_id
)
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index 124c70ad37..3838409519 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -812,7 +812,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
# FIXME: This shouldn't invalidate the whole cache
txn.call_after(self._get_linearized_receipts_for_room.invalidate, (room_id,))
- self.db_pool.simple_delete_txn(
+ self.db_pool.simple_upsert_txn(
txn,
table="receipts_graph",
keyvalues={
@@ -820,17 +820,13 @@ class ReceiptsWorkerStore(SQLBaseStore):
"receipt_type": receipt_type,
"user_id": user_id,
},
- )
- self.db_pool.simple_insert_txn(
- txn,
- table="receipts_graph",
values={
- "room_id": room_id,
- "receipt_type": receipt_type,
- "user_id": user_id,
"event_ids": json_encoder.encode(event_ids),
"data": json_encoder.encode(data),
},
+ # receipts_graph has a unique constraint on
+ # (user_id, room_id, receipt_type), so no need to lock
+ lock=False,
)
diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index cb63cd9b7d..ac821878b0 100644
--- a/synapse/storage/databases/main/registration.py
+++ b/synapse/storage/databases/main/registration.py
@@ -69,9 +69,9 @@ class TokenLookupResult:
"""
user_id: str
+ token_id: int
is_guest: bool = False
shadow_banned: bool = False
- token_id: Optional[int] = None
device_id: Optional[str] = None
valid_until_ms: Optional[int] = None
token_owner: str = attr.ib()
@@ -175,6 +175,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
"is_guest",
"admin",
"consent_version",
+ "consent_ts",
"consent_server_notice_sent",
"appservice_id",
"creation_ts",
@@ -2227,7 +2228,10 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore):
txn,
table="users",
keyvalues={"name": user_id},
- updatevalues={"consent_version": consent_version},
+ updatevalues={
+ "consent_version": consent_version,
+ "consent_ts": self._clock.time_msec(),
+ },
)
self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index b7d4baa6bb..bef66f1992 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -641,8 +641,10 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
"version": room[5],
"creator": room[6],
"encryption": room[7],
- "federatable": room[8],
- "public": room[9],
+ # room_stats_state.federatable is an integer on sqlite.
+ "federatable": bool(room[8]),
+ # rooms.is_public is an integer on sqlite.
+ "public": bool(room[9]),
"join_rules": room[10],
"guest_access": room[11],
"history_visibility": room[12],
@@ -1183,8 +1185,9 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
)
return False
- @staticmethod
- def _clear_partial_state_room_txn(txn: LoggingTransaction, room_id: str) -> None:
+ def _clear_partial_state_room_txn(
+ self, txn: LoggingTransaction, room_id: str
+ ) -> None:
DatabasePool.simple_delete_txn(
txn,
table="partial_state_rooms_servers",
@@ -1195,7 +1198,9 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
table="partial_state_rooms",
keyvalues={"room_id": room_id},
)
+ self._invalidate_cache_and_stream(txn, self.is_partial_state_room, (room_id,))
+ @cached()
async def is_partial_state_room(self, room_id: str) -> bool:
"""Checks if this room has partial state.
@@ -1769,9 +1774,8 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
servers,
)
- @staticmethod
def _store_partial_state_room_txn(
- txn: LoggingTransaction, room_id: str, servers: Collection[str]
+ self, txn: LoggingTransaction, room_id: str, servers: Collection[str]
) -> None:
DatabasePool.simple_insert_txn(
txn,
@@ -1786,6 +1790,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
keys=("room_id", "server_name"),
values=((room_id, s) for s in servers),
)
+ self._invalidate_cache_and_stream(txn, self.is_partial_state_room, (room_id,))
async def maybe_store_room_on_outlier_membership(
self, room_id: str, room_version: RoomVersion
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index 827c1f1efd..6e1ff5626b 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -31,7 +31,6 @@ from typing import (
import attr
from synapse.api.constants import EventTypes, Membership
-from synapse.events import EventBase
from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import (
run_as_background_process,
@@ -56,6 +55,7 @@ from synapse.types import JsonDict, PersistedEventPosition, StateMap, get_domain
from synapse.util.async_helpers import Linearizer
from synapse.util.caches import intern_string
from synapse.util.caches.descriptors import _CacheContext, cached, cachedList
+from synapse.util.cancellation import cancellable
from synapse.util.iterutils import batch_iter
from synapse.util.metrics import Measure
@@ -187,27 +187,55 @@ class RoomMemberWorkerStore(EventsWorkerStore):
@cached(max_entries=100000, iterable=True)
async def get_users_in_room(self, room_id: str) -> List[str]:
+ """
+ Returns a list of users in the room sorted by longest in the room first
+ (aka. with the lowest depth). This is done to match the sort in
+ `get_current_hosts_in_room()` and so we can re-use the cache but it's
+ not horrible to have here either.
+
+ Uses `m.room.member`s in the room state at the current forward extremities to
+ determine which users are in the room.
+
+ Will return inaccurate results for rooms with partial state, since the state for
+ the forward extremities of those rooms will exclude most members. We may also
+ calculate room state incorrectly for such rooms and believe that a member is or
+ is not in the room when the opposite is true.
+ """
return await self.db_pool.runInteraction(
"get_users_in_room", self.get_users_in_room_txn, room_id
)
def get_users_in_room_txn(self, txn: LoggingTransaction, room_id: str) -> List[str]:
+ """
+ Returns a list of users in the room sorted by longest in the room first
+ (aka. with the lowest depth). This is done to match the sort in
+ `get_current_hosts_in_room()` and so we can re-use the cache but it's
+ not horrible to have here either.
+ """
# If we can assume current_state_events.membership is up to date
# then we can avoid a join, which is a Very Good Thing given how
# frequently this function gets called.
if self._current_state_events_membership_up_to_date:
sql = """
- SELECT state_key FROM current_state_events
- WHERE type = 'm.room.member' AND room_id = ? AND membership = ?
+ SELECT c.state_key FROM current_state_events as c
+ /* Get the depth of the event from the events table */
+ INNER JOIN events AS e USING (event_id)
+ WHERE c.type = 'm.room.member' AND c.room_id = ? AND membership = ?
+ /* Sorted by lowest depth first */
+ ORDER BY e.depth ASC;
"""
else:
sql = """
- SELECT state_key FROM room_memberships as m
+ SELECT c.state_key FROM room_memberships as m
+ /* Get the depth of the event from the events table */
+ INNER JOIN events AS e USING (event_id)
INNER JOIN current_state_events as c
ON m.event_id = c.event_id
AND m.room_id = c.room_id
AND m.user_id = c.state_key
WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?
+ /* Sorted by lowest depth first */
+ ORDER BY e.depth ASC;
"""
txn.execute(sql, (room_id, Membership.JOIN))
@@ -534,6 +562,32 @@ class RoomMemberWorkerStore(EventsWorkerStore):
desc="get_local_users_in_room",
)
+ async def check_local_user_in_room(self, user_id: str, room_id: str) -> bool:
+ """
+ Check whether a given local user is currently joined to the given room.
+
+ Returns:
+ A boolean indicating whether the user is currently joined to the room
+
+ Raises:
+ Exeption when called with a non-local user to this homeserver
+ """
+ if not self.hs.is_mine_id(user_id):
+ raise Exception(
+ "Cannot call 'check_local_user_in_room' on "
+ "non-local user %s" % (user_id,),
+ )
+
+ (
+ membership,
+ member_event_id,
+ ) = await self.get_local_current_membership_for_user_in_room(
+ user_id=user_id,
+ room_id=room_id,
+ )
+
+ return membership == Membership.JOIN
+
async def get_local_current_membership_for_user_in_room(
self, user_id: str, room_id: str
) -> Tuple[Optional[str], Optional[str]]:
@@ -724,6 +778,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
_get_users_server_still_shares_room_with_txn,
)
+ @cancellable
async def get_rooms_for_user(
self, user_id: str, on_invalidate: Optional[Callable[[], None]] = None
) -> FrozenSet[str]:
@@ -835,144 +890,92 @@ class RoomMemberWorkerStore(EventsWorkerStore):
return shared_room_ids or frozenset()
- async def get_joined_users_from_state(
- self, room_id: str, state: StateMap[str], state_entry: "_StateCacheEntry"
- ) -> Dict[str, ProfileInfo]:
- state_group: Union[object, int] = state_entry.state_group
- if not state_group:
- # If state_group is None it means it has yet to be assigned a
- # state group, i.e. we need to make sure that calls with a state_group
- # of None don't hit previous cached calls with a None state_group.
- # To do this we set the state_group to a new object as object() != object()
- state_group = object()
-
- assert state_group is not None
- with Measure(self._clock, "get_joined_users_from_state"):
- return await self._get_joined_users_from_context(
- room_id, state_group, state, context=state_entry
- )
+ async def get_joined_user_ids_from_state(
+ self, room_id: str, state: StateMap[str]
+ ) -> Set[str]:
+ """
+ For a given set of state IDs, get a set of user IDs in the room.
- @cached(num_args=2, iterable=True, max_entries=100000)
- async def _get_joined_users_from_context(
- self,
- room_id: str,
- state_group: Union[object, int],
- current_state_ids: StateMap[str],
- event: Optional[EventBase] = None,
- context: Optional["_StateCacheEntry"] = None,
- ) -> Dict[str, ProfileInfo]:
- # We don't use `state_group`, it's there so that we can cache based
- # on it. However, it's important that it's never None, since two current_states
- # with a state_group of None are likely to be different.
- assert state_group is not None
+ This method checks the local event cache, before calling
+ `_get_user_ids_from_membership_event_ids` for any uncached events.
+ """
- users_in_room = {}
- member_event_ids = [
- e_id
- for key, e_id in current_state_ids.items()
- if key[0] == EventTypes.Member
- ]
-
- if context is not None:
- # If we have a context with a delta from a previous state group,
- # check if we also have the result from the previous group in cache.
- # If we do then we can reuse that result and simply update it with
- # any membership changes in `delta_ids`
- if context.prev_group and context.delta_ids:
- prev_res = self._get_joined_users_from_context.cache.get_immediate(
- (room_id, context.prev_group), None
- )
- if prev_res and isinstance(prev_res, dict):
- users_in_room = dict(prev_res)
- member_event_ids = [
- e_id
- for key, e_id in context.delta_ids.items()
- if key[0] == EventTypes.Member
- ]
- for etype, state_key in context.delta_ids:
- if etype == EventTypes.Member:
- users_in_room.pop(state_key, None)
-
- # We check if we have any of the member event ids in the event cache
- # before we ask the DB
-
- # We don't update the event cache hit ratio as it completely throws off
- # the hit ratio counts. After all, we don't populate the cache if we
- # miss it here
- event_map = self._get_events_from_local_cache(
- member_event_ids, update_metrics=False
- )
+ with Measure(self._clock, "get_joined_user_ids_from_state"):
+ users_in_room = set()
+ member_event_ids = [
+ e_id for key, e_id in state.items() if key[0] == EventTypes.Member
+ ]
- missing_member_event_ids = []
- for event_id in member_event_ids:
- ev_entry = event_map.get(event_id)
- if ev_entry and not ev_entry.event.rejected_reason:
- if ev_entry.event.membership == Membership.JOIN:
- users_in_room[ev_entry.event.state_key] = ProfileInfo(
- display_name=ev_entry.event.content.get("displayname", None),
- avatar_url=ev_entry.event.content.get("avatar_url", None),
- )
- else:
- missing_member_event_ids.append(event_id)
+ # We check if we have any of the member event ids in the event cache
+ # before we ask the DB
- if missing_member_event_ids:
- event_to_memberships = await self._get_joined_profiles_from_event_ids(
- missing_member_event_ids
+ # We don't update the event cache hit ratio as it completely throws off
+ # the hit ratio counts. After all, we don't populate the cache if we
+ # miss it here
+ event_map = self._get_events_from_local_cache(
+ member_event_ids, update_metrics=False
)
- users_in_room.update(row for row in event_to_memberships.values() if row)
-
- if event is not None and event.type == EventTypes.Member:
- if event.membership == Membership.JOIN:
- if event.event_id in member_event_ids:
- users_in_room[event.state_key] = ProfileInfo(
- display_name=event.content.get("displayname", None),
- avatar_url=event.content.get("avatar_url", None),
+
+ missing_member_event_ids = []
+ for event_id in member_event_ids:
+ ev_entry = event_map.get(event_id)
+ if ev_entry and not ev_entry.event.rejected_reason:
+ if ev_entry.event.membership == Membership.JOIN:
+ users_in_room.add(ev_entry.event.state_key)
+ else:
+ missing_member_event_ids.append(event_id)
+
+ if missing_member_event_ids:
+ event_to_memberships = (
+ await self._get_user_ids_from_membership_event_ids(
+ missing_member_event_ids
)
+ )
+ users_in_room.update(
+ user_id for user_id in event_to_memberships.values() if user_id
+ )
- return users_in_room
+ return users_in_room
- @cached(max_entries=10000)
- def _get_joined_profile_from_event_id(
+ @cached(
+ max_entries=10000,
+ # This name matches the old function that has been replaced - the cache name
+ # is kept here to maintain backwards compatibility.
+ name="_get_joined_profile_from_event_id",
+ )
+ def _get_user_id_from_membership_event_id(
self, event_id: str
) -> Optional[Tuple[str, ProfileInfo]]:
raise NotImplementedError()
@cachedList(
- cached_method_name="_get_joined_profile_from_event_id",
+ cached_method_name="_get_user_id_from_membership_event_id",
list_name="event_ids",
)
- async def _get_joined_profiles_from_event_ids(
+ async def _get_user_ids_from_membership_event_ids(
self, event_ids: Iterable[str]
- ) -> Dict[str, Optional[Tuple[str, ProfileInfo]]]:
+ ) -> Dict[str, Optional[str]]:
"""For given set of member event_ids check if they point to a join
- event and if so return the associated user and profile info.
+ event.
Args:
event_ids: The member event IDs to lookup
Returns:
- Map from event ID to `user_id` and ProfileInfo (or None if not join event).
+ Map from event ID to `user_id`, or None if event is not a join.
"""
rows = await self.db_pool.simple_select_many_batch(
table="room_memberships",
column="event_id",
iterable=event_ids,
- retcols=("user_id", "display_name", "avatar_url", "event_id"),
+ retcols=("user_id", "event_id"),
keyvalues={"membership": Membership.JOIN},
batch_size=1000,
- desc="_get_joined_profiles_from_event_ids",
+ desc="_get_user_ids_from_membership_event_ids",
)
- return {
- row["event_id"]: (
- row["user_id"],
- ProfileInfo(
- avatar_url=row["avatar_url"], display_name=row["display_name"]
- ),
- )
- for row in rows
- }
+ return {row["event_id"]: row["user_id"] for row in rows}
@cached(max_entries=10000)
async def is_host_joined(self, room_id: str, host: str) -> bool:
@@ -1018,37 +1021,81 @@ class RoomMemberWorkerStore(EventsWorkerStore):
return True
@cached(iterable=True, max_entries=10000)
- async def get_current_hosts_in_room(self, room_id: str) -> Set[str]:
- """Get current hosts in room based on current state."""
+ async def get_current_hosts_in_room(self, room_id: str) -> List[str]:
+ """
+ Get current hosts in room based on current state.
+
+ The heuristic of sorting by servers who have been in the room the
+ longest is good because they're most likely to have anything we ask
+ about.
+
+ Uses `m.room.member`s in the room state at the current forward extremities to
+ determine which hosts are in the room.
+
+ Will return inaccurate results for rooms with partial state, since the state for
+ the forward extremities of those rooms will exclude most members. We may also
+ calculate room state incorrectly for such rooms and believe that a host is or
+ is not in the room when the opposite is true.
+
+ Returns:
+ Returns a list of servers sorted by longest in the room first. (aka.
+ sorted by join with the lowest depth first).
+ """
# First we check if we already have `get_users_in_room` in the cache, as
# we can just calculate result from that
users = self.get_users_in_room.cache.get_immediate(
(room_id,), None, update_metrics=False
)
- if users is not None:
- return {get_domain_from_id(u) for u in users}
-
- if isinstance(self.database_engine, Sqlite3Engine):
+ if users is None and isinstance(self.database_engine, Sqlite3Engine):
# If we're using SQLite then let's just always use
# `get_users_in_room` rather than funky SQL.
users = await self.get_users_in_room(room_id)
- return {get_domain_from_id(u) for u in users}
+
+ if users is not None:
+ # Because `users` is sorted from lowest -> highest depth, the list
+ # of domains will also be sorted that way.
+ domains: List[str] = []
+ # We use a `Set` just for fast lookups
+ domain_set: Set[str] = set()
+ for u in users:
+ if ":" not in u:
+ continue
+ domain = get_domain_from_id(u)
+ if domain not in domain_set:
+ domain_set.add(domain)
+ domains.append(domain)
+ return domains
# For PostgreSQL we can use a regex to pull out the domains from the
# joined users in `current_state_events` via regex.
- def get_current_hosts_in_room_txn(txn: LoggingTransaction) -> Set[str]:
+ def get_current_hosts_in_room_txn(txn: LoggingTransaction) -> List[str]:
+ # Returns a list of servers currently joined in the room sorted by
+ # longest in the room first (aka. with the lowest depth). The
+ # heuristic of sorting by servers who have been in the room the
+ # longest is good because they're most likely to have anything we
+ # ask about.
sql = """
- SELECT DISTINCT substring(state_key FROM '@[^:]*:(.*)$')
- FROM current_state_events
+ SELECT
+ /* Match the domain part of the MXID */
+ substring(c.state_key FROM '@[^:]*:(.*)$') as server_domain
+ FROM current_state_events c
+ /* Get the depth of the event from the events table */
+ INNER JOIN events AS e USING (event_id)
WHERE
- type = 'm.room.member'
- AND membership = 'join'
- AND room_id = ?
+ /* Find any join state events in the room */
+ c.type = 'm.room.member'
+ AND c.membership = 'join'
+ AND c.room_id = ?
+ /* Group all state events from the same domain into their own buckets (groups) */
+ GROUP BY server_domain
+ /* Sorted by lowest depth first */
+ ORDER BY min(e.depth) ASC;
"""
txn.execute(sql, (room_id,))
- return {d for d, in txn}
+ # `server_domain` will be `NULL` for malformed MXIDs with no colons.
+ return [d for d, in txn if d is not None]
return await self.db_pool.runInteraction(
"get_current_hosts_in_room", get_current_hosts_in_room_txn
@@ -1131,12 +1178,12 @@ class RoomMemberWorkerStore(EventsWorkerStore):
else:
# The cache doesn't match the state group or prev state group,
# so we calculate the result from first principles.
- joined_users = await self.get_joined_users_from_state(
- room_id, state, state_entry
+ joined_user_ids = await self.get_joined_user_ids_from_state(
+ room_id, state
)
cache.hosts_to_joined_users = {}
- for user_id in joined_users:
+ for user_id in joined_user_ids:
host = intern_string(get_domain_from_id(user_id))
cache.hosts_to_joined_users.setdefault(host, set()).add(user_id)
diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py
index 0b10af0e58..32095d7969 100644
--- a/synapse/storage/databases/main/state.py
+++ b/synapse/storage/databases/main/state.py
@@ -23,6 +23,7 @@ from synapse.api.errors import NotFoundError, UnsupportedRoomVersionError
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
+from synapse.logging.tracing import trace
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import (
DatabasePool,
@@ -36,6 +37,7 @@ from synapse.storage.state import StateFilter
from synapse.types import JsonDict, JsonMapping, StateMap
from synapse.util.caches import intern_string
from synapse.util.caches.descriptors import cached, cachedList
+from synapse.util.cancellation import cancellable
from synapse.util.iterutils import batch_iter
if TYPE_CHECKING:
@@ -142,6 +144,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
return room_version
+ @trace
async def get_metadata_for_events(
self, event_ids: Collection[str]
) -> Dict[str, EventMetadata]:
@@ -281,6 +284,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
)
# FIXME: how should this be cached?
+ @cancellable
async def get_partial_filtered_current_state_ids(
self, room_id: str, state_filter: Optional[StateFilter] = None
) -> StateMap[str]:
diff --git a/synapse/storage/databases/main/stats.py b/synapse/storage/databases/main/stats.py
index b4c652acf3..356d4ca788 100644
--- a/synapse/storage/databases/main/stats.py
+++ b/synapse/storage/databases/main/stats.py
@@ -446,59 +446,41 @@ class StatsStore(StateDeltasStore):
absolutes: Absolute (set) fields
additive_relatives: Fields that will be added onto if existing row present.
"""
- if self.database_engine.can_native_upsert:
- absolute_updates = [
- "%(field)s = EXCLUDED.%(field)s" % {"field": field}
- for field in absolutes.keys()
- ]
-
- relative_updates = [
- "%(field)s = EXCLUDED.%(field)s + COALESCE(%(table)s.%(field)s, 0)"
- % {"table": table, "field": field}
- for field in additive_relatives.keys()
- ]
-
- insert_cols = []
- qargs = []
-
- for (key, val) in chain(
- keyvalues.items(), absolutes.items(), additive_relatives.items()
- ):
- insert_cols.append(key)
- qargs.append(val)
+ absolute_updates = [
+ "%(field)s = EXCLUDED.%(field)s" % {"field": field}
+ for field in absolutes.keys()
+ ]
+
+ relative_updates = [
+ "%(field)s = EXCLUDED.%(field)s + COALESCE(%(table)s.%(field)s, 0)"
+ % {"table": table, "field": field}
+ for field in additive_relatives.keys()
+ ]
+
+ insert_cols = []
+ qargs = []
+
+ for (key, val) in chain(
+ keyvalues.items(), absolutes.items(), additive_relatives.items()
+ ):
+ insert_cols.append(key)
+ qargs.append(val)
+
+ sql = """
+ INSERT INTO %(table)s (%(insert_cols_cs)s)
+ VALUES (%(insert_vals_qs)s)
+ ON CONFLICT (%(key_columns)s) DO UPDATE SET %(updates)s
+ """ % {
+ "table": table,
+ "insert_cols_cs": ", ".join(insert_cols),
+ "insert_vals_qs": ", ".join(
+ ["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives))
+ ),
+ "key_columns": ", ".join(keyvalues),
+ "updates": ", ".join(chain(absolute_updates, relative_updates)),
+ }
- sql = """
- INSERT INTO %(table)s (%(insert_cols_cs)s)
- VALUES (%(insert_vals_qs)s)
- ON CONFLICT (%(key_columns)s) DO UPDATE SET %(updates)s
- """ % {
- "table": table,
- "insert_cols_cs": ", ".join(insert_cols),
- "insert_vals_qs": ", ".join(
- ["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives))
- ),
- "key_columns": ", ".join(keyvalues),
- "updates": ", ".join(chain(absolute_updates, relative_updates)),
- }
-
- txn.execute(sql, qargs)
- else:
- self.database_engine.lock_table(txn, table)
- retcols = list(chain(absolutes.keys(), additive_relatives.keys()))
- current_row = self.db_pool.simple_select_one_txn(
- txn, table, keyvalues, retcols, allow_none=True
- )
- if current_row is None:
- merged_dict = {**keyvalues, **absolutes, **additive_relatives}
- self.db_pool.simple_insert_txn(txn, table, merged_dict)
- else:
- for (key, val) in additive_relatives.items():
- if current_row[key] is None:
- current_row[key] = val
- else:
- current_row[key] += val
- current_row.update(absolutes)
- self.db_pool.simple_update_one_txn(txn, table, keyvalues, current_row)
+ txn.execute(sql, qargs)
async def _calculate_and_set_initial_state_for_room(self, room_id: str) -> None:
"""Calculate and insert an entry into room_stats_current.
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index f61f290547..f0b179eea5 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -72,6 +72,7 @@ from synapse.storage.util.id_generators import MultiWriterIdGenerator
from synapse.types import PersistedEventPosition, RoomStreamToken
from synapse.util.caches.descriptors import cached
from synapse.util.caches.stream_change_cache import StreamChangeCache
+from synapse.util.cancellation import cancellable
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -597,6 +598,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
return ret, key
+ @cancellable
async def get_membership_changes_for_user(
self,
user_id: str,
diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index ba79e19f7f..f8c6877ee8 100644
--- a/synapse/storage/databases/main/transactions.py
+++ b/synapse/storage/databases/main/transactions.py
@@ -221,25 +221,15 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
retry_interval: how long until next retry in ms
"""
- if self.database_engine.can_native_upsert:
- await self.db_pool.runInteraction(
- "set_destination_retry_timings",
- self._set_destination_retry_timings_native,
- destination,
- failure_ts,
- retry_last_ts,
- retry_interval,
- db_autocommit=True, # Safe as its a single upsert
- )
- else:
- await self.db_pool.runInteraction(
- "set_destination_retry_timings",
- self._set_destination_retry_timings_emulated,
- destination,
- failure_ts,
- retry_last_ts,
- retry_interval,
- )
+ await self.db_pool.runInteraction(
+ "set_destination_retry_timings",
+ self._set_destination_retry_timings_native,
+ destination,
+ failure_ts,
+ retry_last_ts,
+ retry_interval,
+ db_autocommit=True, # Safe as it's a single upsert
+ )
def _set_destination_retry_timings_native(
self,
@@ -249,8 +239,6 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
retry_last_ts: int,
retry_interval: int,
) -> None:
- assert self.database_engine.can_native_upsert
-
# Upsert retry time interval if retry_interval is zero (i.e. we're
# resetting it) or greater than the existing retry interval.
#
diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py
index bb64543c1f..f8cfcaca83 100644
--- a/synapse/storage/databases/state/store.py
+++ b/synapse/storage/databases/state/store.py
@@ -31,6 +31,7 @@ from synapse.storage.util.sequence import build_sequence_generator
from synapse.types import MutableStateMap, StateKey, StateMap
from synapse.util.caches.descriptors import cached
from synapse.util.caches.dictionary_cache import DictionaryCache
+from synapse.util.cancellation import cancellable
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -156,6 +157,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
"get_state_group_delta", _get_state_group_delta_txn
)
+ @cancellable
async def _get_state_groups_from_groups(
self, groups: List[int], state_filter: StateFilter
) -> Dict[int, StateMap[str]]:
@@ -235,6 +237,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
return state_filter.filter_state(state_dict_ids), not missing_types
+ @cancellable
async def _get_state_for_groups(
self, groups: Iterable[int], state_filter: Optional[StateFilter] = None
) -> Dict[int, MutableStateMap[str]]:
diff --git a/synapse/storage/engines/_base.py b/synapse/storage/engines/_base.py
index 971ff82693..0d16a419a4 100644
--- a/synapse/storage/engines/_base.py
+++ b/synapse/storage/engines/_base.py
@@ -45,14 +45,6 @@ class BaseDatabaseEngine(Generic[ConnectionType], metaclass=abc.ABCMeta):
@property
@abc.abstractmethod
- def can_native_upsert(self) -> bool:
- """
- Do we support native UPSERTs?
- """
- ...
-
- @property
- @abc.abstractmethod
def supports_using_any_list(self) -> bool:
"""
Do we support using `a = ANY(?)` and passing a list
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index 517f9d5f98..7f7d006ac2 100644
--- a/synapse/storage/engines/postgres.py
+++ b/synapse/storage/engines/postgres.py
@@ -159,13 +159,6 @@ class PostgresEngine(BaseDatabaseEngine[psycopg2.extensions.connection]):
db_conn.commit()
@property
- def can_native_upsert(self) -> bool:
- """
- Can we use native UPSERTs?
- """
- return True
-
- @property
def supports_using_any_list(self) -> bool:
"""Do we support using `a = ANY(?)` and passing a list"""
return True
diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py
index 621f2c5efe..095ae0a096 100644
--- a/synapse/storage/engines/sqlite.py
+++ b/synapse/storage/engines/sqlite.py
@@ -49,14 +49,6 @@ class Sqlite3Engine(BaseDatabaseEngine[sqlite3.Connection]):
return True
@property
- def can_native_upsert(self) -> bool:
- """
- Do we support native UPSERTs? This requires SQLite3 3.24+, plus some
- more work we haven't done yet to tell what was inserted vs updated.
- """
- return sqlite3.sqlite_version_info >= (3, 24, 0)
-
- @property
def supports_using_any_list(self) -> bool:
"""Do we support using `a = ANY(?)` and passing a list"""
return False
@@ -70,12 +62,11 @@ class Sqlite3Engine(BaseDatabaseEngine[sqlite3.Connection]):
self, db_conn: sqlite3.Connection, allow_outdated_version: bool = False
) -> None:
if not allow_outdated_version:
- version = sqlite3.sqlite_version_info
# Synapse is untested against older SQLite versions, and we don't want
# to let users upgrade to a version of Synapse with broken support for their
# sqlite version, because it risks leaving them with a half-upgraded db.
- if version < (3, 22, 0):
- raise RuntimeError("Synapse requires sqlite 3.22 or above.")
+ if sqlite3.sqlite_version_info < (3, 27, 0):
+ raise RuntimeError("Synapse requires sqlite 3.27 or above.")
def check_new_database(self, txn: Cursor) -> None:
"""Gets called when setting up a brand new database. This allows us to
diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py
index dd187f7422..b0458643e6 100644
--- a/synapse/storage/schema/__init__.py
+++ b/synapse/storage/schema/__init__.py
@@ -75,6 +75,7 @@ Changes in SCHEMA_VERSION = 71:
Changes in SCHEMA_VERSION = 72:
- event_edges.(room_id, is_state) are no longer written to.
- Tables related to groups are dropped.
+ - Unused column application_services_state.last_txn is dropped
- Rename column in `device_lists_outbound_pokes` and `device_lists_changes_in_room`
from `opentracing_context` to generalized `tracing_context`.
"""
diff --git a/synapse/storage/schema/main/delta/72/04drop_column_application_services_state_last_txn.sql.postgres b/synapse/storage/schema/main/delta/72/04drop_column_application_services_state_last_txn.sql.postgres
new file mode 100644
index 0000000000..13d47de9e6
--- /dev/null
+++ b/synapse/storage/schema/main/delta/72/04drop_column_application_services_state_last_txn.sql.postgres
@@ -0,0 +1,17 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Drop unused column application_services_state.last_txn
+ALTER table application_services_state DROP COLUMN last_txn;
\ No newline at end of file
diff --git a/synapse/storage/schema/main/delta/72/04drop_column_application_services_state_last_txn.sql.sqlite b/synapse/storage/schema/main/delta/72/04drop_column_application_services_state_last_txn.sql.sqlite
new file mode 100644
index 0000000000..3be1c88d72
--- /dev/null
+++ b/synapse/storage/schema/main/delta/72/04drop_column_application_services_state_last_txn.sql.sqlite
@@ -0,0 +1,40 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Drop unused column application_services_state.last_txn
+
+CREATE TABLE application_services_state2 (
+ as_id TEXT PRIMARY KEY NOT NULL,
+ state VARCHAR(5),
+ read_receipt_stream_id BIGINT,
+ presence_stream_id BIGINT,
+ to_device_stream_id BIGINT,
+ device_list_stream_id BIGINT
+);
+
+
+INSERT INTO application_services_state2 (
+ as_id,
+ state,
+ read_receipt_stream_id,
+ presence_stream_id,
+ to_device_stream_id,
+ device_list_stream_id
+)
+SELECT as_id, state, read_receipt_stream_id, presence_stream_id, to_device_stream_id, device_list_stream_id
+FROM application_services_state;
+
+DROP TABLE application_services_state;
+ALTER TABLE application_services_state2 RENAME TO application_services_state;
diff --git a/synapse/storage/schema/main/delta/72/05remove_unstable_private_read_receipts.sql b/synapse/storage/schema/main/delta/72/05remove_unstable_private_read_receipts.sql
new file mode 100644
index 0000000000..36b41049cd
--- /dev/null
+++ b/synapse/storage/schema/main/delta/72/05remove_unstable_private_read_receipts.sql
@@ -0,0 +1,19 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Drop previously received private read receipts so they do not accidentally
+-- get leaked to other users.
+DELETE FROM receipts_linearized WHERE receipt_type = 'org.matrix.msc2285.read.private';
+DELETE FROM receipts_graph WHERE receipt_type = 'org.matrix.msc2285.read.private';
diff --git a/synapse/storage/schema/main/delta/72/06add_consent_ts_to_users.sql b/synapse/storage/schema/main/delta/72/06add_consent_ts_to_users.sql
new file mode 100644
index 0000000000..609eb1750f
--- /dev/null
+++ b/synapse/storage/schema/main/delta/72/06add_consent_ts_to_users.sql
@@ -0,0 +1,16 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE users ADD consent_ts bigint;
diff --git a/synapse/storage/schema/main/delta/72/04rename_opentelemtetry_tracing_context.sql b/synapse/storage/schema/main/delta/72/07rename_opentelemtetry_tracing_context.sql
index ae904863f8..ae904863f8 100644
--- a/synapse/storage/schema/main/delta/72/04rename_opentelemtetry_tracing_context.sql
+++ b/synapse/storage/schema/main/delta/72/07rename_opentelemtetry_tracing_context.sql
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index 3c13859faa..2dfe4c0b66 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -460,8 +460,17 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator):
# Cast safety: this corresponds to the types returned by the query above.
rows.extend(cast(Iterable[Tuple[str, int]], cur))
- # Sort so that we handle rows in order for each instance.
- rows.sort()
+ # Sort by stream_id (ascending, lowest -> highest) so that we handle
+ # rows in order for each instance because we don't want to overwrite
+ # the current_position of an instance to a lower stream ID than
+ # we're actually at.
+ def sort_by_stream_id_key_func(row: Tuple[str, int]) -> int:
+ (instance, stream_id) = row
+ # If `stream_id` is ever `None`, we will see a `TypeError: '<'
+ # not supported between instances of 'NoneType' and 'X'` error.
+ return stream_id
+
+ rows.sort(key=sort_by_stream_id_key_func)
with self._lock:
for (
diff --git a/synapse/storage/util/partial_state_events_tracker.py b/synapse/storage/util/partial_state_events_tracker.py
index 07af89ee31..9bdb68c0c1 100644
--- a/synapse/storage/util/partial_state_events_tracker.py
+++ b/synapse/storage/util/partial_state_events_tracker.py
@@ -24,6 +24,7 @@ from synapse.logging.tracing import trace_with_opname
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.databases.main.room import RoomWorkerStore
from synapse.util import unwrapFirstError
+from synapse.util.cancellation import cancellable
logger = logging.getLogger(__name__)
@@ -60,6 +61,7 @@ class PartialStateEventsTracker:
o.callback(None)
@trace_with_opname("PartialStateEventsTracker.await_full_state")
+ @cancellable
async def await_full_state(self, event_ids: Collection[str]) -> None:
"""Wait for all the given events to have full state.
@@ -154,6 +156,7 @@ class PartialCurrentStateTracker:
o.callback(None)
@trace_with_opname("PartialCurrentStateTracker.await_full_state")
+ @cancellable
async def await_full_state(self, room_id: str) -> None:
# We add the deferred immediately so that the DB call to check for
# partial state doesn't race when we unpartial the room.
|