diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py
index a9843f6e17..8f7bdbc61a 100644
--- a/synapse/storage/databases/main/account_data.py
+++ b/synapse/storage/databases/main/account_data.py
@@ -85,13 +85,10 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
writers=hs.config.worker.writers.account_data,
)
else:
+ # Multiple writers are not supported for SQLite.
+ #
# We shouldn't be running in worker mode with SQLite, but its useful
# to support it for unit tests.
- #
- # If this process is the writer than we need to use
- # `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets
- # updated over replication. (Multiple writers are not supported for
- # SQLite).
self._account_data_id_gen = StreamIdGenerator(
db_conn,
hs.get_replication_notifier(),
diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py
index bd07d20171..46fa0a73f9 100644
--- a/synapse/storage/databases/main/cache.py
+++ b/synapse/storage/databases/main/cache.py
@@ -274,11 +274,11 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
async def invalidate_cache_and_stream(
self, cache_name: str, keys: Tuple[Any, ...]
) -> None:
- """Invalidates the cache and adds it to the cache stream so slaves
+ """Invalidates the cache and adds it to the cache stream so other workers
will know to invalidate their caches.
- This should only be used to invalidate caches where slaves won't
- otherwise know from other replication streams that the cache should
+ This should only be used to invalidate caches where other workers won't
+ otherwise have known from other replication streams that the cache should
be invalidated.
"""
cache_func = getattr(self, cache_name, None)
@@ -297,11 +297,11 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
cache_func: CachedFunction,
keys: Tuple[Any, ...],
) -> None:
- """Invalidates the cache and adds it to the cache stream so slaves
+ """Invalidates the cache and adds it to the cache stream so other workers
will know to invalidate their caches.
- This should only be used to invalidate caches where slaves won't
- otherwise know from other replication streams that the cache should
+ This should only be used to invalidate caches where other workers won't
+ otherwise have known from other replication streams that the cache should
be invalidated.
"""
txn.call_after(cache_func.invalidate, keys)
@@ -310,7 +310,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
def _invalidate_all_cache_and_stream(
self, txn: LoggingTransaction, cache_func: CachedFunction
) -> None:
- """Invalidates the entire cache and adds it to the cache stream so slaves
+ """Invalidates the entire cache and adds it to the cache stream so other workers
will know to invalidate their caches.
"""
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index 5503621ad6..a67fdb3c22 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -105,8 +105,6 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
is_writer=hs.config.worker.worker_app is None,
)
- # Type-ignore: _device_list_id_gen is mixed in from either DataStore (as a
- # StreamIdGenerator) or SlavedDataStore (as a SlavedIdTracker).
device_list_max = self._device_list_id_gen.get_current_token()
device_list_prefill, min_device_list_id = self.db_pool.get_cache_dict(
db_conn,
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index ac19de183c..2681917d0b 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -46,7 +46,7 @@ from synapse.storage.database import (
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.databases.main.signatures import SignatureWorkerStore
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
-from synapse.types import JsonDict
+from synapse.types import JsonDict, StrCollection
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached
from synapse.util.caches.lrucache import LruCache
@@ -1584,6 +1584,35 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
txn.execute(sql, (room_id, event_id, 1, self._clock.time_msec(), cause))
@trace
+ async def get_event_ids_with_failed_pull_attempts(
+ self, event_ids: StrCollection
+ ) -> Set[str]:
+ """
+ Filter the given list of `event_ids` and return events which have any failed
+ pull attempts.
+
+ Args:
+ event_ids: A list of events to filter down.
+
+ Returns:
+ A filtered down list of `event_ids` that have previous failed pull attempts.
+ """
+
+ rows = await self.db_pool.simple_select_many_batch(
+ table="event_failed_pull_attempts",
+ column="event_id",
+ iterable=event_ids,
+ keyvalues={},
+ retcols=("event_id",),
+ desc="get_event_ids_with_failed_pull_attempts",
+ )
+ event_ids_with_failed_pull_attempts: Set[str] = {
+ row["event_id"] for row in rows
+ }
+
+ return event_ids_with_failed_pull_attempts
+
+ @trace
async def get_event_ids_to_not_pull_from_backoff(
self,
room_id: str,
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 0ff3fc7369..a39bc90974 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -213,13 +213,10 @@ class EventsWorkerStore(SQLBaseStore):
writers=hs.config.worker.writers.events,
)
else:
+ # Multiple writers are not supported for SQLite.
+ #
# We shouldn't be running in worker mode with SQLite, but its useful
# to support it for unit tests.
- #
- # If this process is the writer than we need to use
- # `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets
- # updated over replication. (Multiple writers are not supported for
- # SQLite).
self._stream_id_gen = StreamIdGenerator(
db_conn,
hs.get_replication_notifier(),
@@ -1976,12 +1973,6 @@ class EventsWorkerStore(SQLBaseStore):
return rows, to_token, True
- async def is_event_after(self, event_id1: str, event_id2: str) -> bool:
- """Returns True if event_id1 is after event_id2 in the stream"""
- to_1, so_1 = await self.get_event_ordering(event_id1)
- to_2, so_2 = await self.get_event_ordering(event_id2)
- return (to_1, so_1) > (to_2, so_2)
-
@cached(max_entries=5000)
async def get_event_ordering(self, event_id: str) -> Tuple[int, int]:
res = await self.db_pool.simple_select_one(
diff --git a/synapse/storage/databases/main/filtering.py b/synapse/storage/databases/main/filtering.py
index 50516402f9..da31eb44dc 100644
--- a/synapse/storage/databases/main/filtering.py
+++ b/synapse/storage/databases/main/filtering.py
@@ -25,6 +25,7 @@ from synapse.storage.database import (
LoggingDatabaseConnection,
LoggingTransaction,
)
+from synapse.storage.engines import PostgresEngine
from synapse.types import JsonDict, UserID
from synapse.util.caches.descriptors import cached
@@ -40,6 +41,8 @@ class FilteringWorkerStore(SQLBaseStore):
hs: "HomeServer",
):
super().__init__(database, db_conn, hs)
+ self.server_name: str = hs.hostname
+ self.database_engine = database.engine
self.db_pool.updates.register_background_index_update(
"full_users_filters_unique_idx",
index_name="full_users_unique_idx",
@@ -48,6 +51,98 @@ class FilteringWorkerStore(SQLBaseStore):
unique=True,
)
+ self.db_pool.updates.register_background_update_handler(
+ "populate_full_user_id_user_filters",
+ self.populate_full_user_id_user_filters,
+ )
+
+ async def populate_full_user_id_user_filters(
+ self, progress: JsonDict, batch_size: int
+ ) -> int:
+ """
+ Background update to populate the column `full_user_id` of the table
+ user_filters from entries in the column `user_local_part` of the same table
+ """
+
+ lower_bound_id = progress.get("lower_bound_id", "")
+
+ def _get_last_id(txn: LoggingTransaction) -> Optional[str]:
+ sql = """
+ SELECT user_id FROM user_filters
+ WHERE user_id > ?
+ ORDER BY user_id
+ LIMIT 1 OFFSET 50
+ """
+ txn.execute(sql, (lower_bound_id,))
+ res = txn.fetchone()
+ if res:
+ upper_bound_id = res[0]
+ return upper_bound_id
+ else:
+ return None
+
+ def _process_batch(
+ txn: LoggingTransaction, lower_bound_id: str, upper_bound_id: str
+ ) -> None:
+ sql = """
+ UPDATE user_filters
+ SET full_user_id = '@' || user_id || ?
+ WHERE ? < user_id AND user_id <= ? AND full_user_id IS NULL
+ """
+ txn.execute(sql, (f":{self.server_name}", lower_bound_id, upper_bound_id))
+
+ def _final_batch(txn: LoggingTransaction, lower_bound_id: str) -> None:
+ sql = """
+ UPDATE user_filters
+ SET full_user_id = '@' || user_id || ?
+ WHERE ? < user_id AND full_user_id IS NULL
+ """
+ txn.execute(
+ sql,
+ (
+ f":{self.server_name}",
+ lower_bound_id,
+ ),
+ )
+
+ if isinstance(self.database_engine, PostgresEngine):
+ sql = """
+ ALTER TABLE user_filters VALIDATE CONSTRAINT full_user_id_not_null
+ """
+ txn.execute(sql)
+
+ upper_bound_id = await self.db_pool.runInteraction(
+ "populate_full_user_id_user_filters", _get_last_id
+ )
+
+ if upper_bound_id is None:
+ await self.db_pool.runInteraction(
+ "populate_full_user_id_user_filters", _final_batch, lower_bound_id
+ )
+
+ await self.db_pool.updates._end_background_update(
+ "populate_full_user_id_user_filters"
+ )
+ return 1
+
+ await self.db_pool.runInteraction(
+ "populate_full_user_id_user_filters",
+ _process_batch,
+ lower_bound_id,
+ upper_bound_id,
+ )
+
+ progress["lower_bound_id"] = upper_bound_id
+
+ await self.db_pool.runInteraction(
+ "populate_full_user_id_user_filters",
+ self.db_pool.updates._background_update_progress_txn,
+ "populate_full_user_id_user_filters",
+ progress,
+ )
+
+ return 50
+
@cached(num_args=2)
async def get_user_filter(
self, user_localpart: str, filter_id: Union[int, str]
diff --git a/synapse/storage/databases/main/metrics.py b/synapse/storage/databases/main/metrics.py
index 14294a0bb8..595e22982e 100644
--- a/synapse/storage/databases/main/metrics.py
+++ b/synapse/storage/databases/main/metrics.py
@@ -248,89 +248,6 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
(count,) = cast(Tuple[int], txn.fetchone())
return count
- async def count_r30_users(self) -> Dict[str, int]:
- """
- Counts the number of 30 day retained users, defined as:-
- * Users who have created their accounts more than 30 days ago
- * Where last seen at most 30 days ago
- * Where account creation and last_seen are > 30 days apart
-
- Returns:
- A mapping of counts globally as well as broken out by platform.
- """
-
- def _count_r30_users(txn: LoggingTransaction) -> Dict[str, int]:
- thirty_days_in_secs = 86400 * 30
- now = int(self._clock.time())
- thirty_days_ago_in_secs = now - thirty_days_in_secs
-
- sql = """
- SELECT platform, COUNT(*) FROM (
- SELECT
- users.name, platform, users.creation_ts * 1000,
- MAX(uip.last_seen)
- FROM users
- INNER JOIN (
- SELECT
- user_id,
- last_seen,
- CASE
- WHEN user_agent LIKE '%%Android%%' THEN 'android'
- WHEN user_agent LIKE '%%iOS%%' THEN 'ios'
- WHEN user_agent LIKE '%%Electron%%' THEN 'electron'
- WHEN user_agent LIKE '%%Mozilla%%' THEN 'web'
- WHEN user_agent LIKE '%%Gecko%%' THEN 'web'
- ELSE 'unknown'
- END
- AS platform
- FROM user_ips
- ) uip
- ON users.name = uip.user_id
- AND users.appservice_id is NULL
- AND users.creation_ts < ?
- AND uip.last_seen/1000 > ?
- AND (uip.last_seen/1000) - users.creation_ts > 86400 * 30
- GROUP BY users.name, platform, users.creation_ts
- ) u GROUP BY platform
- """
-
- results = {}
- txn.execute(sql, (thirty_days_ago_in_secs, thirty_days_ago_in_secs))
-
- for row in txn:
- if row[0] == "unknown":
- pass
- results[row[0]] = row[1]
-
- sql = """
- SELECT COUNT(*) FROM (
- SELECT users.name, users.creation_ts * 1000,
- MAX(uip.last_seen)
- FROM users
- INNER JOIN (
- SELECT
- user_id,
- last_seen
- FROM user_ips
- ) uip
- ON users.name = uip.user_id
- AND appservice_id is NULL
- AND users.creation_ts < ?
- AND uip.last_seen/1000 > ?
- AND (uip.last_seen/1000) - users.creation_ts > 86400 * 30
- GROUP BY users.name, users.creation_ts
- ) u
- """
-
- txn.execute(sql, (thirty_days_ago_in_secs, thirty_days_ago_in_secs))
-
- (count,) = cast(Tuple[int], txn.fetchone())
- results["all"] = count
-
- return results
-
- return await self.db_pool.runInteraction("count_r30_users", _count_r30_users)
-
async def count_r30v2_users(self) -> Dict[str, int]:
"""
Counts the number of 30 day retained users, defined as users that:
diff --git a/synapse/storage/databases/main/profile.py b/synapse/storage/databases/main/profile.py
index c4022d2427..65c92bef51 100644
--- a/synapse/storage/databases/main/profile.py
+++ b/synapse/storage/databases/main/profile.py
@@ -15,9 +15,14 @@ from typing import TYPE_CHECKING, Optional
from synapse.api.errors import StoreError
from synapse.storage._base import SQLBaseStore
-from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
+from synapse.storage.database import (
+ DatabasePool,
+ LoggingDatabaseConnection,
+ LoggingTransaction,
+)
from synapse.storage.databases.main.roommember import ProfileInfo
-from synapse.types import UserID
+from synapse.storage.engines import PostgresEngine
+from synapse.types import JsonDict, UserID
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -31,6 +36,8 @@ class ProfileWorkerStore(SQLBaseStore):
hs: "HomeServer",
):
super().__init__(database, db_conn, hs)
+ self.server_name: str = hs.hostname
+ self.database_engine = database.engine
self.db_pool.updates.register_background_index_update(
"profiles_full_user_id_key_idx",
index_name="profiles_full_user_id_key",
@@ -39,6 +46,97 @@ class ProfileWorkerStore(SQLBaseStore):
unique=True,
)
+ self.db_pool.updates.register_background_update_handler(
+ "populate_full_user_id_profiles", self.populate_full_user_id_profiles
+ )
+
+ async def populate_full_user_id_profiles(
+ self, progress: JsonDict, batch_size: int
+ ) -> int:
+ """
+ Background update to populate the column `full_user_id` of the table
+ profiles from entries in the column `user_local_part` of the same table
+ """
+
+ lower_bound_id = progress.get("lower_bound_id", "")
+
+ def _get_last_id(txn: LoggingTransaction) -> Optional[str]:
+ sql = """
+ SELECT user_id FROM profiles
+ WHERE user_id > ?
+ ORDER BY user_id
+ LIMIT 1 OFFSET 50
+ """
+ txn.execute(sql, (lower_bound_id,))
+ res = txn.fetchone()
+ if res:
+ upper_bound_id = res[0]
+ return upper_bound_id
+ else:
+ return None
+
+ def _process_batch(
+ txn: LoggingTransaction, lower_bound_id: str, upper_bound_id: str
+ ) -> None:
+ sql = """
+ UPDATE profiles
+ SET full_user_id = '@' || user_id || ?
+ WHERE ? < user_id AND user_id <= ? AND full_user_id IS NULL
+ """
+ txn.execute(sql, (f":{self.server_name}", lower_bound_id, upper_bound_id))
+
+ def _final_batch(txn: LoggingTransaction, lower_bound_id: str) -> None:
+ sql = """
+ UPDATE profiles
+ SET full_user_id = '@' || user_id || ?
+ WHERE ? < user_id AND full_user_id IS NULL
+ """
+ txn.execute(
+ sql,
+ (
+ f":{self.server_name}",
+ lower_bound_id,
+ ),
+ )
+
+ if isinstance(self.database_engine, PostgresEngine):
+ sql = """
+ ALTER TABLE profiles VALIDATE CONSTRAINT full_user_id_not_null
+ """
+ txn.execute(sql)
+
+ upper_bound_id = await self.db_pool.runInteraction(
+ "populate_full_user_id_profiles", _get_last_id
+ )
+
+ if upper_bound_id is None:
+ await self.db_pool.runInteraction(
+ "populate_full_user_id_profiles", _final_batch, lower_bound_id
+ )
+
+ await self.db_pool.updates._end_background_update(
+ "populate_full_user_id_profiles"
+ )
+ return 1
+
+ await self.db_pool.runInteraction(
+ "populate_full_user_id_profiles",
+ _process_batch,
+ lower_bound_id,
+ upper_bound_id,
+ )
+
+ progress["lower_bound_id"] = upper_bound_id
+
+ await self.db_pool.runInteraction(
+ "populate_full_user_id_profiles",
+ self.db_pool.updates._background_update_progress_txn,
+ "populate_full_user_id_profiles",
+ progress,
+ )
+
+ return 50
+
async def get_profileinfo(self, user_localpart: str) -> ProfileInfo:
try:
profile = await self.db_pool.simple_select_one(
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index 074942b167..5ee5c7ad9f 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -85,13 +85,10 @@ class ReceiptsWorkerStore(SQLBaseStore):
else:
self._can_write_to_receipts = True
+ # Multiple writers are not supported for SQLite.
+ #
# We shouldn't be running in worker mode with SQLite, but its useful
# to support it for unit tests.
- #
- # If this process is the writer than we need to use
- # `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets
- # updated over replication. (Multiple writers are not supported for
- # SQLite).
self._receipts_id_gen = StreamIdGenerator(
db_conn,
hs.get_replication_notifier(),
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index e068f27a10..ae9c201b87 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -1099,7 +1099,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
# `get_joined_hosts` is called with the "current" state group for the
# room, and so consecutive calls will be for consecutive state groups
# which point to the previous state group.
- cache = await self._get_joined_hosts_cache(room_id) # type: ignore[misc]
+ cache = await self._get_joined_hosts_cache(room_id)
# If the state group in the cache matches, we already have the data we need.
if state_entry.state_group == cache.state_group:
diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py
index b7d58978de..a0319575f0 100644
--- a/synapse/storage/databases/main/user_directory.py
+++ b/synapse/storage/databases/main/user_directory.py
@@ -17,6 +17,7 @@ import re
import unicodedata
from typing import (
TYPE_CHECKING,
+ Collection,
Iterable,
List,
Mapping,
@@ -45,7 +46,7 @@ from synapse.util.stringutils import non_null_str_or_none
if TYPE_CHECKING:
from synapse.server import HomeServer
-from synapse.api.constants import EventTypes, HistoryVisibility, JoinRules
+from synapse.api.constants import EventTypes, HistoryVisibility, JoinRules, UserTypes
from synapse.storage.database import (
DatabasePool,
LoggingDatabaseConnection,
@@ -356,13 +357,30 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
Add all local users to the user directory.
"""
- def _get_next_batch(txn: LoggingTransaction) -> Optional[List[str]]:
- sql = "SELECT user_id FROM %s LIMIT %s" % (
- TEMP_TABLE + "_users",
- str(batch_size),
- )
- txn.execute(sql)
- user_result = cast(List[Tuple[str]], txn.fetchall())
+ def _populate_user_directory_process_users_txn(
+ txn: LoggingTransaction,
+ ) -> Optional[int]:
+ if self.database_engine.supports_returning:
+ # Note: we use an ORDER BY in the SELECT to force usage of an
+ # index. Otherwise, postgres does a sequential scan that is
+ # surprisingly slow (I think due to the fact it will read/skip
+ # over lots of already deleted rows).
+ sql = f"""
+ DELETE FROM {TEMP_TABLE + "_users"}
+ WHERE user_id IN (
+ SELECT user_id FROM {TEMP_TABLE + "_users"} ORDER BY user_id LIMIT ?
+ )
+ RETURNING user_id
+ """
+ txn.execute(sql, (batch_size,))
+ user_result = cast(List[Tuple[str]], txn.fetchall())
+ else:
+ sql = "SELECT user_id FROM %s ORDER BY user_id LIMIT %s" % (
+ TEMP_TABLE + "_users",
+ str(batch_size),
+ )
+ txn.execute(sql)
+ user_result = cast(List[Tuple[str]], txn.fetchall())
if not user_result:
return None
@@ -378,85 +396,81 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
assert count_result is not None
progress["remaining"] = count_result[0]
- return users_to_work_on
-
- users_to_work_on = await self.db_pool.runInteraction(
- "populate_user_directory_temp_read", _get_next_batch
- )
+ if not users_to_work_on:
+ return None
- # No more users -- complete the transaction.
- if not users_to_work_on:
- await self.db_pool.updates._end_background_update(
- "populate_user_directory_process_users"
+ logger.debug(
+ "Processing the next %d users of %d remaining",
+ len(users_to_work_on),
+ progress["remaining"],
)
- return 1
-
- logger.debug(
- "Processing the next %d users of %d remaining"
- % (len(users_to_work_on), progress["remaining"])
- )
- # First filter down to users we want to insert into the user directory.
- users_to_insert = [
- user_id
- for user_id in users_to_work_on
- if await self.should_include_local_user_in_dir(user_id)
- ]
+ # First filter down to users we want to insert into the user directory.
+ users_to_insert = self._filter_local_users_for_dir_txn(
+ txn, users_to_work_on
+ )
- # Next fetch their profiles. Note that the `user_id` here is the
- # *localpart*, and that not all users have profiles.
- profile_rows = await self.db_pool.simple_select_many_batch(
- table="profiles",
- column="user_id",
- iterable=[get_localpart_from_id(u) for u in users_to_insert],
- retcols=(
- "user_id",
- "displayname",
- "avatar_url",
- ),
- keyvalues={},
- desc="populate_user_directory_process_users_get_profiles",
- )
- profiles = {
- f"@{row['user_id']}:{self.server_name}": _UserDirProfile(
- f"@{row['user_id']}:{self.server_name}",
- row["displayname"],
- row["avatar_url"],
+ # Next fetch their profiles. Note that the `user_id` here is the
+ # *localpart*, and that not all users have profiles.
+ profile_rows = self.db_pool.simple_select_many_txn(
+ txn,
+ table="profiles",
+ column="user_id",
+ iterable=[get_localpart_from_id(u) for u in users_to_insert],
+ retcols=(
+ "user_id",
+ "displayname",
+ "avatar_url",
+ ),
+ keyvalues={},
)
- for row in profile_rows
- }
+ profiles = {
+ f"@{row['user_id']}:{self.server_name}": _UserDirProfile(
+ f"@{row['user_id']}:{self.server_name}",
+ row["displayname"],
+ row["avatar_url"],
+ )
+ for row in profile_rows
+ }
- profiles_to_insert = [
- profiles.get(user_id) or _UserDirProfile(user_id)
- for user_id in users_to_insert
- ]
+ profiles_to_insert = [
+ profiles.get(user_id) or _UserDirProfile(user_id)
+ for user_id in users_to_insert
+ ]
+
+ # Actually insert the users with their profiles into the directory.
+ self._update_profiles_in_user_dir_txn(txn, profiles_to_insert)
+
+ # We've finished processing the users. Delete it from the table, if
+ # we haven't already.
+ if not self.database_engine.supports_returning:
+ self.db_pool.simple_delete_many_txn(
+ txn,
+ table=TEMP_TABLE + "_users",
+ column="user_id",
+ values=users_to_work_on,
+ keyvalues={},
+ )
- # Actually insert the users with their profiles into the directory.
- await self.db_pool.runInteraction(
- "populate_user_directory_process_users_insertion",
- self._update_profiles_in_user_dir_txn,
- profiles_to_insert,
- )
+ # Update the remaining counter.
+ progress["remaining"] -= len(users_to_work_on)
+ self.db_pool.updates._background_update_progress_txn(
+ txn, "populate_user_directory_process_users", progress
+ )
+ return len(users_to_work_on)
- # We've finished processing the users. Delete it from the table.
- await self.db_pool.simple_delete_many(
- table=TEMP_TABLE + "_users",
- column="user_id",
- iterable=users_to_work_on,
- keyvalues={},
- desc="populate_user_directory_process_users_delete",
+ processed_count = await self.db_pool.runInteraction(
+ "populate_user_directory_temp", _populate_user_directory_process_users_txn
)
- # Update the remaining counter.
- progress["remaining"] -= len(users_to_work_on)
- await self.db_pool.runInteraction(
- "populate_user_directory",
- self.db_pool.updates._background_update_progress_txn,
- "populate_user_directory_process_users",
- progress,
- )
+ # No more users -- complete the transaction.
+ if not processed_count:
+ await self.db_pool.updates._end_background_update(
+ "populate_user_directory_process_users"
+ )
+ return 1
- return len(users_to_work_on)
+ return processed_count
async def should_include_local_user_in_dir(self, user: str) -> bool:
"""Certain classes of local user are omitted from the user directory.
@@ -494,6 +508,30 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
return True
+ def _filter_local_users_for_dir_txn(
+ self, txn: LoggingTransaction, users: Collection[str]
+ ) -> Collection[str]:
+ """A batched version of `should_include_local_user_in_dir`"""
+ users = [
+ user
+ for user in users
+ if self.get_app_service_by_user_id(user) is None # type: ignore[attr-defined]
+ and not self.get_if_app_services_interested_in_user(user) # type: ignore[attr-defined]
+ ]
+
+ rows = self.db_pool.simple_select_many_txn(
+ txn,
+ table="users",
+ column="name",
+ iterable=users,
+ keyvalues={
+ "deactivated": 0,
+ },
+ retcols=("name", "user_type"),
+ )
+
+ return [row["name"] for row in rows if row["user_type"] != UserTypes.SUPPORT]
+
async def is_room_world_readable_or_publicly_joinable(self, room_id: str) -> bool:
"""Check if the room is either world_readable or publically joinable"""
diff --git a/synapse/storage/databases/state/bg_updates.py b/synapse/storage/databases/state/bg_updates.py
index 097dea5182..86eb1a8a08 100644
--- a/synapse/storage/databases/state/bg_updates.py
+++ b/synapse/storage/databases/state/bg_updates.py
@@ -15,6 +15,7 @@
import logging
from typing import TYPE_CHECKING, Dict, List, Mapping, Optional, Tuple, Union
+from synapse.logging.opentracing import tag_args, trace
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import (
DatabasePool,
@@ -40,6 +41,8 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore):
updates.
"""
+ @trace
+ @tag_args
def _count_state_group_hops_txn(
self, txn: LoggingTransaction, state_group: int
) -> int:
@@ -83,6 +86,8 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore):
return count
+ @trace
+ @tag_args
def _get_state_groups_from_groups_txn(
self,
txn: LoggingTransaction,
diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py
index 29ff64e876..6984d11352 100644
--- a/synapse/storage/databases/state/store.py
+++ b/synapse/storage/databases/state/store.py
@@ -20,6 +20,7 @@ import attr
from synapse.api.constants import EventTypes
from synapse.events import EventBase
from synapse.events.snapshot import UnpersistedEventContext, UnpersistedEventContextBase
+from synapse.logging.opentracing import tag_args, trace
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import (
DatabasePool,
@@ -159,6 +160,8 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
"get_state_group_delta", _get_state_group_delta_txn
)
+ @trace
+ @tag_args
@cancellable
async def _get_state_groups_from_groups(
self, groups: List[int], state_filter: StateFilter
@@ -187,6 +190,8 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
return results
+ @trace
+ @tag_args
def _get_state_for_group_using_cache(
self,
cache: DictionaryCache[int, StateKey, str],
@@ -239,6 +244,8 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
return state_filter.filter_state(state_dict_ids), not missing_types
+ @trace
+ @tag_args
@cancellable
async def _get_state_for_groups(
self, groups: Iterable[int], state_filter: Optional[StateFilter] = None
@@ -305,6 +312,8 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
return state
+ @trace
+ @tag_args
def _get_state_for_groups_using_cache(
self,
groups: Iterable[int],
@@ -403,6 +412,8 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
fetched_keys=non_member_types,
)
+ @trace
+ @tag_args
async def store_state_deltas_for_batched(
self,
events_and_context: List[Tuple[EventBase, UnpersistedEventContextBase]],
@@ -520,6 +531,8 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
prev_group,
)
+ @trace
+ @tag_args
async def store_state_group(
self,
event_id: str,
@@ -772,6 +785,8 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
((sg,) for sg in state_groups_to_delete),
)
+ @trace
+ @tag_args
async def get_previous_state_groups(
self, state_groups: Iterable[int]
) -> Dict[int, int]:
|