summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2021-03-30 11:41:52 +0100
committerErik Johnston <erik@matrix.org>2021-03-30 11:41:52 +0100
commitf46b864748ca96d6c41f8124d066d329e71a88c3 (patch)
tree2399bad973fa658167b62a8d090187f726159f19 /synapse/storage
parentMerge tag 'v1.30.1' into matrix-org-hotfixes (diff)
parentUpdate changelog (diff)
downloadsynapse-f46b864748ca96d6c41f8124d066d329e71a88c3.tar.xz
Merge remote-tracking branch 'origin/release-v1.31.0' into matrix-org-hotfixes
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/database.py11
-rw-r--r--synapse/storage/databases/main/presence.py60
-rw-r--r--synapse/storage/databases/state/store.py9
3 files changed, 73 insertions, 7 deletions
diff --git a/synapse/storage/database.py b/synapse/storage/database.py

index 5b0b9a20bf..94590e7b45 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py
@@ -1906,6 +1906,7 @@ class DatabasePool: retcols: Iterable[str], filters: Optional[Dict[str, Any]] = None, keyvalues: Optional[Dict[str, Any]] = None, + exclude_keyvalues: Optional[Dict[str, Any]] = None, order_direction: str = "ASC", ) -> List[Dict[str, Any]]: """ @@ -1929,7 +1930,10 @@ class DatabasePool: apply a WHERE ? LIKE ? clause. keyvalues: column names and values to select the rows with, or None to not - apply a WHERE clause. + apply a WHERE key = value clause. + exclude_keyvalues: + column names and values to exclude rows with, or None to not + apply a WHERE key != value clause. order_direction: Whether the results should be ordered "ASC" or "DESC". Returns: @@ -1938,7 +1942,7 @@ class DatabasePool: if order_direction not in ["ASC", "DESC"]: raise ValueError("order_direction must be one of 'ASC' or 'DESC'.") - where_clause = "WHERE " if filters or keyvalues else "" + where_clause = "WHERE " if filters or keyvalues or exclude_keyvalues else "" arg_list = [] # type: List[Any] if filters: where_clause += " AND ".join("%s LIKE ?" % (k,) for k in filters) @@ -1947,6 +1951,9 @@ class DatabasePool: if keyvalues: where_clause += " AND ".join("%s = ?" % (k,) for k in keyvalues) arg_list += list(keyvalues.values()) + if exclude_keyvalues: + where_clause += " AND ".join("%s != ?" % (k,) for k in exclude_keyvalues) + arg_list += list(exclude_keyvalues.values()) sql = "SELECT %s FROM %s %s ORDER BY %s %s LIMIT ? OFFSET ?" % ( ", ".join(retcols), diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py
index 29edab34d4..0ff693a310 100644 --- a/synapse/storage/databases/main/presence.py +++ b/synapse/storage/databases/main/presence.py
@@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import List, Tuple +from typing import Dict, List, Tuple from synapse.api.presence import UserPresenceState from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause @@ -157,5 +157,63 @@ class PresenceStore(SQLBaseStore): return {row["user_id"]: UserPresenceState(**row) for row in rows} + async def get_presence_for_all_users( + self, + include_offline: bool = True, + ) -> Dict[str, UserPresenceState]: + """Retrieve the current presence state for all users. + + Note that the presence_stream table is culled frequently, so it should only + contain the latest presence state for each user. + + Args: + include_offline: Whether to include offline presence states + + Returns: + A dict of user IDs to their current UserPresenceState. + """ + users_to_state = {} + + exclude_keyvalues = None + if not include_offline: + # Exclude offline presence state + exclude_keyvalues = {"state": "offline"} + + # This may be a very heavy database query. + # We paginate in order to not block a database connection. + limit = 100 + offset = 0 + while True: + rows = await self.db_pool.runInteraction( + "get_presence_for_all_users", + self.db_pool.simple_select_list_paginate_txn, + "presence_stream", + orderby="stream_id", + start=offset, + limit=limit, + exclude_keyvalues=exclude_keyvalues, + retcols=( + "user_id", + "state", + "last_active_ts", + "last_federation_update_ts", + "last_user_sync_ts", + "status_msg", + "currently_active", + ), + order_direction="ASC", + ) + + for row in rows: + users_to_state[row["user_id"]] = UserPresenceState(**row) + + # We've run out of updates to query + if len(rows) < limit: + break + + offset += limit + + return users_to_state + def get_current_presence_token(self): return self._presence_id_gen.get_current_token() diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py
index e2240703a7..97ec65f757 100644 --- a/synapse/storage/databases/state/store.py +++ b/synapse/storage/databases/state/store.py
@@ -183,12 +183,13 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): requests state from the cache, if False we need to query the DB for the missing state. """ - is_all, known_absent, state_dict_ids = cache.get(group) + cache_entry = cache.get(group) + state_dict_ids = cache_entry.value - if is_all or state_filter.is_full(): + if cache_entry.full or state_filter.is_full(): # Either we have everything or want everything, either way # `is_all` tells us whether we've gotten everything. - return state_filter.filter_state(state_dict_ids), is_all + return state_filter.filter_state(state_dict_ids), cache_entry.full # tracks whether any of our requested types are missing from the cache missing_types = False @@ -202,7 +203,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): # There aren't any wild cards, so `concrete_types()` returns the # complete list of event types we're wanting. for key in state_filter.concrete_types(): - if key not in state_dict_ids and key not in known_absent: + if key not in state_dict_ids and key not in cache_entry.known_absent: missing_types = True break