diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py
index a9843f6e17..8f7bdbc61a 100644
--- a/synapse/storage/databases/main/account_data.py
+++ b/synapse/storage/databases/main/account_data.py
@@ -85,13 +85,10 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
writers=hs.config.worker.writers.account_data,
)
else:
+ # Multiple writers are not supported for SQLite.
+ #
# We shouldn't be running in worker mode with SQLite, but its useful
# to support it for unit tests.
- #
- # If this process is the writer than we need to use
- # `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets
- # updated over replication. (Multiple writers are not supported for
- # SQLite).
self._account_data_id_gen = StreamIdGenerator(
db_conn,
hs.get_replication_notifier(),
diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py
index bd07d20171..46fa0a73f9 100644
--- a/synapse/storage/databases/main/cache.py
+++ b/synapse/storage/databases/main/cache.py
@@ -274,11 +274,11 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
async def invalidate_cache_and_stream(
self, cache_name: str, keys: Tuple[Any, ...]
) -> None:
- """Invalidates the cache and adds it to the cache stream so slaves
+ """Invalidates the cache and adds it to the cache stream so other workers
will know to invalidate their caches.
- This should only be used to invalidate caches where slaves won't
- otherwise know from other replication streams that the cache should
+ This should only be used to invalidate caches where other workers won't
+ otherwise have known from other replication streams that the cache should
be invalidated.
"""
cache_func = getattr(self, cache_name, None)
@@ -297,11 +297,11 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
cache_func: CachedFunction,
keys: Tuple[Any, ...],
) -> None:
- """Invalidates the cache and adds it to the cache stream so slaves
+ """Invalidates the cache and adds it to the cache stream so other workers
will know to invalidate their caches.
- This should only be used to invalidate caches where slaves won't
- otherwise know from other replication streams that the cache should
+ This should only be used to invalidate caches where other workers won't
+ otherwise have known from other replication streams that the cache should
be invalidated.
"""
txn.call_after(cache_func.invalidate, keys)
@@ -310,7 +310,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
def _invalidate_all_cache_and_stream(
self, txn: LoggingTransaction, cache_func: CachedFunction
) -> None:
- """Invalidates the entire cache and adds it to the cache stream so slaves
+ """Invalidates the entire cache and adds it to the cache stream so other workers
will know to invalidate their caches.
"""
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index 5503621ad6..a67fdb3c22 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -105,8 +105,6 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
is_writer=hs.config.worker.worker_app is None,
)
- # Type-ignore: _device_list_id_gen is mixed in from either DataStore (as a
- # StreamIdGenerator) or SlavedDataStore (as a SlavedIdTracker).
device_list_max = self._device_list_id_gen.get_current_token()
device_list_prefill, min_device_list_id = self.db_pool.get_cache_dict(
db_conn,
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 0ff3fc7369..a39bc90974 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -213,13 +213,10 @@ class EventsWorkerStore(SQLBaseStore):
writers=hs.config.worker.writers.events,
)
else:
+ # Multiple writers are not supported for SQLite.
+ #
# We shouldn't be running in worker mode with SQLite, but its useful
# to support it for unit tests.
- #
- # If this process is the writer than we need to use
- # `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets
- # updated over replication. (Multiple writers are not supported for
- # SQLite).
self._stream_id_gen = StreamIdGenerator(
db_conn,
hs.get_replication_notifier(),
@@ -1976,12 +1973,6 @@ class EventsWorkerStore(SQLBaseStore):
return rows, to_token, True
- async def is_event_after(self, event_id1: str, event_id2: str) -> bool:
- """Returns True if event_id1 is after event_id2 in the stream"""
- to_1, so_1 = await self.get_event_ordering(event_id1)
- to_2, so_2 = await self.get_event_ordering(event_id2)
- return (to_1, so_1) > (to_2, so_2)
-
@cached(max_entries=5000)
async def get_event_ordering(self, event_id: str) -> Tuple[int, int]:
res = await self.db_pool.simple_select_one(
diff --git a/synapse/storage/databases/main/filtering.py b/synapse/storage/databases/main/filtering.py
index 50516402f9..da31eb44dc 100644
--- a/synapse/storage/databases/main/filtering.py
+++ b/synapse/storage/databases/main/filtering.py
@@ -25,6 +25,7 @@ from synapse.storage.database import (
LoggingDatabaseConnection,
LoggingTransaction,
)
+from synapse.storage.engines import PostgresEngine
from synapse.types import JsonDict, UserID
from synapse.util.caches.descriptors import cached
@@ -40,6 +41,8 @@ class FilteringWorkerStore(SQLBaseStore):
hs: "HomeServer",
):
super().__init__(database, db_conn, hs)
+ self.server_name: str = hs.hostname
+ self.database_engine = database.engine
self.db_pool.updates.register_background_index_update(
"full_users_filters_unique_idx",
index_name="full_users_unique_idx",
@@ -48,6 +51,98 @@ class FilteringWorkerStore(SQLBaseStore):
unique=True,
)
+ self.db_pool.updates.register_background_update_handler(
+ "populate_full_user_id_user_filters",
+ self.populate_full_user_id_user_filters,
+ )
+
+ async def populate_full_user_id_user_filters(
+ self, progress: JsonDict, batch_size: int
+ ) -> int:
+ """
+ Background update to populate the column `full_user_id` of the table
+ user_filters from entries in the column `user_local_part` of the same table
+ """
+
+ lower_bound_id = progress.get("lower_bound_id", "")
+
+ def _get_last_id(txn: LoggingTransaction) -> Optional[str]:
+ sql = """
+ SELECT user_id FROM user_filters
+ WHERE user_id > ?
+ ORDER BY user_id
+ LIMIT 1 OFFSET 50
+ """
+ txn.execute(sql, (lower_bound_id,))
+ res = txn.fetchone()
+ if res:
+ upper_bound_id = res[0]
+ return upper_bound_id
+ else:
+ return None
+
+ def _process_batch(
+ txn: LoggingTransaction, lower_bound_id: str, upper_bound_id: str
+ ) -> None:
+ sql = """
+ UPDATE user_filters
+ SET full_user_id = '@' || user_id || ?
+ WHERE ? < user_id AND user_id <= ? AND full_user_id IS NULL
+ """
+ txn.execute(sql, (f":{self.server_name}", lower_bound_id, upper_bound_id))
+
+ def _final_batch(txn: LoggingTransaction, lower_bound_id: str) -> None:
+ sql = """
+ UPDATE user_filters
+ SET full_user_id = '@' || user_id || ?
+ WHERE ? < user_id AND full_user_id IS NULL
+ """
+ txn.execute(
+ sql,
+ (
+ f":{self.server_name}",
+ lower_bound_id,
+ ),
+ )
+
+ if isinstance(self.database_engine, PostgresEngine):
+ sql = """
+ ALTER TABLE user_filters VALIDATE CONSTRAINT full_user_id_not_null
+ """
+ txn.execute(sql)
+
+ upper_bound_id = await self.db_pool.runInteraction(
+ "populate_full_user_id_user_filters", _get_last_id
+ )
+
+ if upper_bound_id is None:
+ await self.db_pool.runInteraction(
+ "populate_full_user_id_user_filters", _final_batch, lower_bound_id
+ )
+
+ await self.db_pool.updates._end_background_update(
+ "populate_full_user_id_user_filters"
+ )
+ return 1
+
+ await self.db_pool.runInteraction(
+ "populate_full_user_id_user_filters",
+ _process_batch,
+ lower_bound_id,
+ upper_bound_id,
+ )
+
+ progress["lower_bound_id"] = upper_bound_id
+
+ await self.db_pool.runInteraction(
+ "populate_full_user_id_user_filters",
+ self.db_pool.updates._background_update_progress_txn,
+ "populate_full_user_id_user_filters",
+ progress,
+ )
+
+ return 50
+
@cached(num_args=2)
async def get_user_filter(
self, user_localpart: str, filter_id: Union[int, str]
diff --git a/synapse/storage/databases/main/metrics.py b/synapse/storage/databases/main/metrics.py
index 14294a0bb8..595e22982e 100644
--- a/synapse/storage/databases/main/metrics.py
+++ b/synapse/storage/databases/main/metrics.py
@@ -248,89 +248,6 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
(count,) = cast(Tuple[int], txn.fetchone())
return count
- async def count_r30_users(self) -> Dict[str, int]:
- """
- Counts the number of 30 day retained users, defined as:-
- * Users who have created their accounts more than 30 days ago
- * Where last seen at most 30 days ago
- * Where account creation and last_seen are > 30 days apart
-
- Returns:
- A mapping of counts globally as well as broken out by platform.
- """
-
- def _count_r30_users(txn: LoggingTransaction) -> Dict[str, int]:
- thirty_days_in_secs = 86400 * 30
- now = int(self._clock.time())
- thirty_days_ago_in_secs = now - thirty_days_in_secs
-
- sql = """
- SELECT platform, COUNT(*) FROM (
- SELECT
- users.name, platform, users.creation_ts * 1000,
- MAX(uip.last_seen)
- FROM users
- INNER JOIN (
- SELECT
- user_id,
- last_seen,
- CASE
- WHEN user_agent LIKE '%%Android%%' THEN 'android'
- WHEN user_agent LIKE '%%iOS%%' THEN 'ios'
- WHEN user_agent LIKE '%%Electron%%' THEN 'electron'
- WHEN user_agent LIKE '%%Mozilla%%' THEN 'web'
- WHEN user_agent LIKE '%%Gecko%%' THEN 'web'
- ELSE 'unknown'
- END
- AS platform
- FROM user_ips
- ) uip
- ON users.name = uip.user_id
- AND users.appservice_id is NULL
- AND users.creation_ts < ?
- AND uip.last_seen/1000 > ?
- AND (uip.last_seen/1000) - users.creation_ts > 86400 * 30
- GROUP BY users.name, platform, users.creation_ts
- ) u GROUP BY platform
- """
-
- results = {}
- txn.execute(sql, (thirty_days_ago_in_secs, thirty_days_ago_in_secs))
-
- for row in txn:
- if row[0] == "unknown":
- pass
- results[row[0]] = row[1]
-
- sql = """
- SELECT COUNT(*) FROM (
- SELECT users.name, users.creation_ts * 1000,
- MAX(uip.last_seen)
- FROM users
- INNER JOIN (
- SELECT
- user_id,
- last_seen
- FROM user_ips
- ) uip
- ON users.name = uip.user_id
- AND appservice_id is NULL
- AND users.creation_ts < ?
- AND uip.last_seen/1000 > ?
- AND (uip.last_seen/1000) - users.creation_ts > 86400 * 30
- GROUP BY users.name, users.creation_ts
- ) u
- """
-
- txn.execute(sql, (thirty_days_ago_in_secs, thirty_days_ago_in_secs))
-
- (count,) = cast(Tuple[int], txn.fetchone())
- results["all"] = count
-
- return results
-
- return await self.db_pool.runInteraction("count_r30_users", _count_r30_users)
-
async def count_r30v2_users(self) -> Dict[str, int]:
"""
Counts the number of 30 day retained users, defined as users that:
diff --git a/synapse/storage/databases/main/profile.py b/synapse/storage/databases/main/profile.py
index c4022d2427..65c92bef51 100644
--- a/synapse/storage/databases/main/profile.py
+++ b/synapse/storage/databases/main/profile.py
@@ -15,9 +15,14 @@ from typing import TYPE_CHECKING, Optional
from synapse.api.errors import StoreError
from synapse.storage._base import SQLBaseStore
-from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
+from synapse.storage.database import (
+ DatabasePool,
+ LoggingDatabaseConnection,
+ LoggingTransaction,
+)
from synapse.storage.databases.main.roommember import ProfileInfo
-from synapse.types import UserID
+from synapse.storage.engines import PostgresEngine
+from synapse.types import JsonDict, UserID
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -31,6 +36,8 @@ class ProfileWorkerStore(SQLBaseStore):
hs: "HomeServer",
):
super().__init__(database, db_conn, hs)
+ self.server_name: str = hs.hostname
+ self.database_engine = database.engine
self.db_pool.updates.register_background_index_update(
"profiles_full_user_id_key_idx",
index_name="profiles_full_user_id_key",
@@ -39,6 +46,97 @@ class ProfileWorkerStore(SQLBaseStore):
unique=True,
)
+ self.db_pool.updates.register_background_update_handler(
+ "populate_full_user_id_profiles", self.populate_full_user_id_profiles
+ )
+
+ async def populate_full_user_id_profiles(
+ self, progress: JsonDict, batch_size: int
+ ) -> int:
+ """
+ Background update to populate the column `full_user_id` of the table
+ profiles from entries in the column `user_local_part` of the same table
+ """
+
+ lower_bound_id = progress.get("lower_bound_id", "")
+
+ def _get_last_id(txn: LoggingTransaction) -> Optional[str]:
+ sql = """
+ SELECT user_id FROM profiles
+ WHERE user_id > ?
+ ORDER BY user_id
+ LIMIT 1 OFFSET 50
+ """
+ txn.execute(sql, (lower_bound_id,))
+ res = txn.fetchone()
+ if res:
+ upper_bound_id = res[0]
+ return upper_bound_id
+ else:
+ return None
+
+ def _process_batch(
+ txn: LoggingTransaction, lower_bound_id: str, upper_bound_id: str
+ ) -> None:
+ sql = """
+ UPDATE profiles
+ SET full_user_id = '@' || user_id || ?
+ WHERE ? < user_id AND user_id <= ? AND full_user_id IS NULL
+ """
+ txn.execute(sql, (f":{self.server_name}", lower_bound_id, upper_bound_id))
+
+ def _final_batch(txn: LoggingTransaction, lower_bound_id: str) -> None:
+ sql = """
+ UPDATE profiles
+ SET full_user_id = '@' || user_id || ?
+ WHERE ? < user_id AND full_user_id IS NULL
+ """
+ txn.execute(
+ sql,
+ (
+ f":{self.server_name}",
+ lower_bound_id,
+ ),
+ )
+
+ if isinstance(self.database_engine, PostgresEngine):
+ sql = """
+ ALTER TABLE profiles VALIDATE CONSTRAINT full_user_id_not_null
+ """
+ txn.execute(sql)
+
+ upper_bound_id = await self.db_pool.runInteraction(
+ "populate_full_user_id_profiles", _get_last_id
+ )
+
+ if upper_bound_id is None:
+ await self.db_pool.runInteraction(
+ "populate_full_user_id_profiles", _final_batch, lower_bound_id
+ )
+
+ await self.db_pool.updates._end_background_update(
+ "populate_full_user_id_profiles"
+ )
+ return 1
+
+ await self.db_pool.runInteraction(
+ "populate_full_user_id_profiles",
+ _process_batch,
+ lower_bound_id,
+ upper_bound_id,
+ )
+
+ progress["lower_bound_id"] = upper_bound_id
+
+ await self.db_pool.runInteraction(
+ "populate_full_user_id_profiles",
+ self.db_pool.updates._background_update_progress_txn,
+ "populate_full_user_id_profiles",
+ progress,
+ )
+
+ return 50
+
async def get_profileinfo(self, user_localpart: str) -> ProfileInfo:
try:
profile = await self.db_pool.simple_select_one(
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index 074942b167..5ee5c7ad9f 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -85,13 +85,10 @@ class ReceiptsWorkerStore(SQLBaseStore):
else:
self._can_write_to_receipts = True
+ # Multiple writers are not supported for SQLite.
+ #
# We shouldn't be running in worker mode with SQLite, but its useful
# to support it for unit tests.
- #
- # If this process is the writer than we need to use
- # `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets
- # updated over replication. (Multiple writers are not supported for
- # SQLite).
self._receipts_id_gen = StreamIdGenerator(
db_conn,
hs.get_replication_notifier(),
diff --git a/synapse/storage/databases/state/bg_updates.py b/synapse/storage/databases/state/bg_updates.py
index 097dea5182..86eb1a8a08 100644
--- a/synapse/storage/databases/state/bg_updates.py
+++ b/synapse/storage/databases/state/bg_updates.py
@@ -15,6 +15,7 @@
import logging
from typing import TYPE_CHECKING, Dict, List, Mapping, Optional, Tuple, Union
+from synapse.logging.opentracing import tag_args, trace
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import (
DatabasePool,
@@ -40,6 +41,8 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore):
updates.
"""
+ @trace
+ @tag_args
def _count_state_group_hops_txn(
self, txn: LoggingTransaction, state_group: int
) -> int:
@@ -83,6 +86,8 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore):
return count
+ @trace
+ @tag_args
def _get_state_groups_from_groups_txn(
self,
txn: LoggingTransaction,
diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py
index 29ff64e876..6984d11352 100644
--- a/synapse/storage/databases/state/store.py
+++ b/synapse/storage/databases/state/store.py
@@ -20,6 +20,7 @@ import attr
from synapse.api.constants import EventTypes
from synapse.events import EventBase
from synapse.events.snapshot import UnpersistedEventContext, UnpersistedEventContextBase
+from synapse.logging.opentracing import tag_args, trace
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import (
DatabasePool,
@@ -159,6 +160,8 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
"get_state_group_delta", _get_state_group_delta_txn
)
+ @trace
+ @tag_args
@cancellable
async def _get_state_groups_from_groups(
self, groups: List[int], state_filter: StateFilter
@@ -187,6 +190,8 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
return results
+ @trace
+ @tag_args
def _get_state_for_group_using_cache(
self,
cache: DictionaryCache[int, StateKey, str],
@@ -239,6 +244,8 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
return state_filter.filter_state(state_dict_ids), not missing_types
+ @trace
+ @tag_args
@cancellable
async def _get_state_for_groups(
self, groups: Iterable[int], state_filter: Optional[StateFilter] = None
@@ -305,6 +312,8 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
return state
+ @trace
+ @tag_args
def _get_state_for_groups_using_cache(
self,
groups: Iterable[int],
@@ -403,6 +412,8 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
fetched_keys=non_member_types,
)
+ @trace
+ @tag_args
async def store_state_deltas_for_batched(
self,
events_and_context: List[Tuple[EventBase, UnpersistedEventContextBase]],
@@ -520,6 +531,8 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
prev_group,
)
+ @trace
+ @tag_args
async def store_state_group(
self,
event_id: str,
@@ -772,6 +785,8 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
((sg,) for sg in state_groups_to_delete),
)
+ @trace
+ @tag_args
async def get_previous_state_groups(
self, state_groups: Iterable[int]
) -> Dict[int, int]:
|