diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py
index b9c39b1718..a3b6c8ae8e 100644
--- a/synapse/storage/databases/main/end_to_end_keys.py
+++ b/synapse/storage/databases/main/end_to_end_keys.py
@@ -244,9 +244,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
set_tag("include_all_devices", include_all_devices)
set_tag("include_deleted_devices", include_deleted_devices)
- result = await self.db_pool.runInteraction(
- "get_e2e_device_keys",
- self._get_e2e_device_keys_txn,
+ result = await self._get_e2e_device_keys(
query_list,
include_all_devices,
include_deleted_devices,
@@ -285,9 +283,8 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
log_kv(result)
return result
- def _get_e2e_device_keys_txn(
+ async def _get_e2e_device_keys(
self,
- txn: LoggingTransaction,
query_list: Collection[Tuple[str, Optional[str]]],
include_all_devices: bool = False,
include_deleted_devices: bool = False,
@@ -319,7 +316,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
if user_list:
user_id_in_list_clause, user_args = make_in_list_sql_clause(
- txn.database_engine, "user_id", user_list
+ self.database_engine, "user_id", user_list
)
query_clauses.append(user_id_in_list_clause)
query_params_list.append(user_args)
@@ -332,13 +329,16 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
user_device_id_in_list_clause,
user_device_args,
) = make_tuple_in_list_sql_clause(
- txn.database_engine, ("user_id", "device_id"), user_device_batch
+ self.database_engine, ("user_id", "device_id"), user_device_batch
)
query_clauses.append(user_device_id_in_list_clause)
query_params_list.append(user_device_args)
result: Dict[str, Dict[str, Optional[DeviceKeyLookupResult]]] = {}
- for query_clause, query_params in zip(query_clauses, query_params_list):
+
+ def get_e2e_device_keys_txn(
+ txn: LoggingTransaction, query_clause: str, query_params: list
+ ) -> None:
sql = (
"SELECT user_id, device_id, "
" d.display_name, "
@@ -361,6 +361,14 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
display_name, db_to_json(key_json) if key_json else None
)
+ for query_clause, query_params in zip(query_clauses, query_params_list):
+ await self.db_pool.runInteraction(
+ "_get_e2e_device_keys",
+ get_e2e_device_keys_txn,
+ query_clause,
+ query_params,
+ )
+
if include_deleted_devices:
for user_id, device_id in deleted_devices:
if device_id is None:
diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index 9c41d01e13..7a7c0d9c75 100644
--- a/synapse/storage/databases/main/purge_events.py
+++ b/synapse/storage/databases/main/purge_events.py
@@ -325,6 +325,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
# We then run the same purge a second time without this isolation level to
# purge any of those rows which were added during the first.
+ logger.info("[purge] Starting initial main purge of [1/2]")
state_groups_to_delete = await self.db_pool.runInteraction(
"purge_room",
self._purge_room_txn,
@@ -332,6 +333,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
isolation_level=IsolationLevel.READ_COMMITTED,
)
+ logger.info("[purge] Starting secondary main purge of [2/2]")
state_groups_to_delete.extend(
await self.db_pool.runInteraction(
"purge_room",
@@ -339,6 +341,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
room_id=room_id,
),
)
+ logger.info("[purge] Done with main purge")
return state_groups_to_delete
@@ -376,7 +379,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
)
referenced_chain_id_tuples = list(txn)
- logger.info("[purge] removing events from event_auth_chain_links")
+ logger.info("[purge] removing from event_auth_chain_links")
txn.executemany(
"""
DELETE FROM event_auth_chain_links WHERE
@@ -399,7 +402,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
"rejections",
"state_events",
):
- logger.info("[purge] removing %s from %s", room_id, table)
+ logger.info("[purge] removing from %s", table)
txn.execute(
"""
@@ -454,7 +457,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
# happy
"rooms",
):
- logger.info("[purge] removing %s from %s", room_id, table)
+ logger.info("[purge] removing from %s", table)
txn.execute("DELETE FROM %s WHERE room_id=?" % (table,), (room_id,))
# Other tables we do NOT need to clear out:
@@ -486,6 +489,4 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
# that already exist.
self._invalidate_cache_and_stream(txn, self.have_seen_event, (room_id,))
- logger.info("[purge] done")
-
return state_groups
diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index 6d72bd9f67..c3bd36efc9 100644
--- a/synapse/storage/databases/main/transactions.py
+++ b/synapse/storage/databases/main/transactions.py
@@ -224,7 +224,7 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
await self.db_pool.runInteraction(
"set_destination_retry_timings",
- self._set_destination_retry_timings_native,
+ self._set_destination_retry_timings_txn,
destination,
failure_ts,
retry_last_ts,
@@ -232,7 +232,7 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
db_autocommit=True, # Safe as it's a single upsert
)
- def _set_destination_retry_timings_native(
+ def _set_destination_retry_timings_txn(
self,
txn: LoggingTransaction,
destination: str,
@@ -266,58 +266,6 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
txn, self.get_destination_retry_timings, (destination,)
)
- def _set_destination_retry_timings_emulated(
- self,
- txn: LoggingTransaction,
- destination: str,
- failure_ts: Optional[int],
- retry_last_ts: int,
- retry_interval: int,
- ) -> None:
- self.database_engine.lock_table(txn, "destinations")
-
- # We need to be careful here as the data may have changed from under us
- # due to a worker setting the timings.
-
- prev_row = self.db_pool.simple_select_one_txn(
- txn,
- table="destinations",
- keyvalues={"destination": destination},
- retcols=("failure_ts", "retry_last_ts", "retry_interval"),
- allow_none=True,
- )
-
- if not prev_row:
- self.db_pool.simple_insert_txn(
- txn,
- table="destinations",
- values={
- "destination": destination,
- "failure_ts": failure_ts,
- "retry_last_ts": retry_last_ts,
- "retry_interval": retry_interval,
- },
- )
- elif (
- retry_interval == 0
- or prev_row["retry_interval"] is None
- or prev_row["retry_interval"] < retry_interval
- ):
- self.db_pool.simple_update_one_txn(
- txn,
- "destinations",
- keyvalues={"destination": destination},
- updatevalues={
- "failure_ts": failure_ts,
- "retry_last_ts": retry_last_ts,
- "retry_interval": retry_interval,
- },
- )
-
- self._invalidate_cache_and_stream(
- txn, self.get_destination_retry_timings, (destination,)
- )
-
async def store_destination_rooms_entries(
self,
destinations: Iterable[str],
diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py
index f16a509ac4..97f09b73dd 100644
--- a/synapse/storage/databases/main/user_directory.py
+++ b/synapse/storage/databases/main/user_directory.py
@@ -54,6 +54,7 @@ from synapse.storage.databases.main.state_deltas import StateDeltasStore
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.types import (
JsonDict,
+ UserID,
UserProfile,
get_domain_from_id,
get_localpart_from_id,
@@ -473,11 +474,116 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
return False
+ async def set_remote_user_profile_in_user_dir_stale(
+ self, user_id: str, next_try_at_ms: int, retry_counter: int
+ ) -> None:
+ """
+ Marks a remote user as having a possibly-stale user directory profile.
+
+ Args:
+ user_id: the remote user who may have a stale profile on this server.
+ next_try_at_ms: timestamp in ms after which the user directory profile can be
+ refreshed.
+ retry_counter: number of failures in refreshing the profile so far. Used for
+ exponential backoff calculations.
+ """
+ assert not self.hs.is_mine_id(
+ user_id
+ ), "Can't mark a local user as a stale remote user."
+
+ server_name = UserID.from_string(user_id).domain
+
+ await self.db_pool.simple_upsert(
+ table="user_directory_stale_remote_users",
+ keyvalues={"user_id": user_id},
+ values={
+ "next_try_at_ts": next_try_at_ms,
+ "retry_counter": retry_counter,
+ "user_server_name": server_name,
+ },
+ desc="set_remote_user_profile_in_user_dir_stale",
+ )
+
+ async def clear_remote_user_profile_in_user_dir_stale(self, user_id: str) -> None:
+ """
+ Marks a remote user as no longer having a possibly-stale user directory profile.
+
+ Args:
+ user_id: the remote user who no longer has a stale profile on this server.
+ """
+ await self.db_pool.simple_delete(
+ table="user_directory_stale_remote_users",
+ keyvalues={"user_id": user_id},
+ desc="clear_remote_user_profile_in_user_dir_stale",
+ )
+
+ async def get_remote_servers_with_profiles_to_refresh(
+ self, now_ts: int, limit: int
+ ) -> List[str]:
+ """
+ Get a list of up to `limit` server names which have users whose
+ locally-cached profiles we believe to be stale
+ and are refreshable given the current time `now_ts` in milliseconds.
+ """
+
+ def _get_remote_servers_with_refreshable_profiles_txn(
+ txn: LoggingTransaction,
+ ) -> List[str]:
+ sql = """
+ SELECT user_server_name
+ FROM user_directory_stale_remote_users
+ WHERE next_try_at_ts < ?
+ GROUP BY user_server_name
+ ORDER BY MIN(next_try_at_ts), user_server_name
+ LIMIT ?
+ """
+ txn.execute(sql, (now_ts, limit))
+ return [row[0] for row in txn]
+
+ return await self.db_pool.runInteraction(
+ "get_remote_servers_with_profiles_to_refresh",
+ _get_remote_servers_with_refreshable_profiles_txn,
+ )
+
+ async def get_remote_users_to_refresh_on_server(
+ self, server_name: str, now_ts: int, limit: int
+ ) -> List[Tuple[str, int, int]]:
+ """
+ Get a list of up to `limit` user IDs from the server `server_name`
+ whose locally-cached profiles we believe to be stale
+ and are refreshable given the current time `now_ts` in milliseconds.
+
+ Returns:
+ tuple of:
+ - User ID
+ - Retry counter (number of failures so far)
+ - Time the retry is scheduled for, in milliseconds
+ """
+
+ def _get_remote_users_to_refresh_on_server_txn(
+ txn: LoggingTransaction,
+ ) -> List[Tuple[str, int, int]]:
+ sql = """
+ SELECT user_id, retry_counter, next_try_at_ts
+ FROM user_directory_stale_remote_users
+ WHERE user_server_name = ? AND next_try_at_ts < ?
+ ORDER BY next_try_at_ts
+ LIMIT ?
+ """
+ txn.execute(sql, (server_name, now_ts, limit))
+ return cast(List[Tuple[str, int, int]], txn.fetchall())
+
+ return await self.db_pool.runInteraction(
+ "get_remote_users_to_refresh_on_server",
+ _get_remote_users_to_refresh_on_server_txn,
+ )
+
async def update_profile_in_user_dir(
self, user_id: str, display_name: Optional[str], avatar_url: Optional[str]
) -> None:
"""
Update or add a user's profile in the user directory.
+ If the user is remote, the profile will be marked as not stale.
"""
# If the display name or avatar URL are unexpected types, replace with None.
display_name = non_null_str_or_none(display_name)
@@ -491,6 +597,14 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
values={"display_name": display_name, "avatar_url": avatar_url},
)
+ if not self.hs.is_mine_id(user_id):
+ # Remote users: Make sure the profile is not marked as stale anymore.
+ self.db_pool.simple_delete_txn(
+ txn,
+ table="user_directory_stale_remote_users",
+ keyvalues={"user_id": user_id},
+ )
+
# The display name that goes into the database index.
index_display_name = display_name
if index_display_name is not None:
|