summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/database.py11
-rw-r--r--synapse/storage/databases/main/presence.py60
2 files changed, 68 insertions, 3 deletions
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index 5b0b9a20bf..94590e7b45 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -1906,6 +1906,7 @@ class DatabasePool:
         retcols: Iterable[str],
         filters: Optional[Dict[str, Any]] = None,
         keyvalues: Optional[Dict[str, Any]] = None,
+        exclude_keyvalues: Optional[Dict[str, Any]] = None,
         order_direction: str = "ASC",
     ) -> List[Dict[str, Any]]:
         """
@@ -1929,7 +1930,10 @@ class DatabasePool:
                 apply a WHERE ? LIKE ? clause.
             keyvalues:
                 column names and values to select the rows with, or None to not
-                apply a WHERE clause.
+                apply a WHERE key = value clause.
+            exclude_keyvalues:
+                column names and values to exclude rows with, or None to not
+                apply a WHERE key != value clause.
             order_direction: Whether the results should be ordered "ASC" or "DESC".
 
         Returns:
@@ -1938,7 +1942,7 @@ class DatabasePool:
         if order_direction not in ["ASC", "DESC"]:
             raise ValueError("order_direction must be one of 'ASC' or 'DESC'.")
 
-        where_clause = "WHERE " if filters or keyvalues else ""
+        where_clause = "WHERE " if filters or keyvalues or exclude_keyvalues else ""
         arg_list = []  # type: List[Any]
         if filters:
             where_clause += " AND ".join("%s LIKE ?" % (k,) for k in filters)
@@ -1947,6 +1951,9 @@ class DatabasePool:
         if keyvalues:
             where_clause += " AND ".join("%s = ?" % (k,) for k in keyvalues)
             arg_list += list(keyvalues.values())
+        if exclude_keyvalues:
+            where_clause += " AND ".join("%s != ?" % (k,) for k in exclude_keyvalues)
+            arg_list += list(exclude_keyvalues.values())
 
         sql = "SELECT %s FROM %s %s ORDER BY %s %s LIMIT ? OFFSET ?" % (
             ", ".join(retcols),
diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py
index 29edab34d4..0ff693a310 100644
--- a/synapse/storage/databases/main/presence.py
+++ b/synapse/storage/databases/main/presence.py
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from typing import List, Tuple
+from typing import Dict, List, Tuple
 
 from synapse.api.presence import UserPresenceState
 from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
@@ -157,5 +157,63 @@ class PresenceStore(SQLBaseStore):
 
         return {row["user_id"]: UserPresenceState(**row) for row in rows}
 
+    async def get_presence_for_all_users(
+        self,
+        include_offline: bool = True,
+    ) -> Dict[str, UserPresenceState]:
+        """Retrieve the current presence state for all users.
+
+        Note that the presence_stream table is culled frequently, so it should only
+        contain the latest presence state for each user.
+
+        Args:
+            include_offline: Whether to include offline presence states
+
+        Returns:
+            A dict of user IDs to their current UserPresenceState.
+        """
+        users_to_state = {}
+
+        exclude_keyvalues = None
+        if not include_offline:
+            # Exclude offline presence state
+            exclude_keyvalues = {"state": "offline"}
+
+        # This may be a very heavy database query.
+        # We paginate in order to not block a database connection.
+        limit = 100
+        offset = 0
+        while True:
+            rows = await self.db_pool.runInteraction(
+                "get_presence_for_all_users",
+                self.db_pool.simple_select_list_paginate_txn,
+                "presence_stream",
+                orderby="stream_id",
+                start=offset,
+                limit=limit,
+                exclude_keyvalues=exclude_keyvalues,
+                retcols=(
+                    "user_id",
+                    "state",
+                    "last_active_ts",
+                    "last_federation_update_ts",
+                    "last_user_sync_ts",
+                    "status_msg",
+                    "currently_active",
+                ),
+                order_direction="ASC",
+            )
+
+            for row in rows:
+                users_to_state[row["user_id"]] = UserPresenceState(**row)
+
+            # We've run out of updates to query
+            if len(rows) < limit:
+                break
+
+            offset += limit
+
+        return users_to_state
+
     def get_current_presence_token(self):
         return self._presence_id_gen.get_current_token()