summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/__init__.py76
-rw-r--r--synapse/storage/_base.py159
-rw-r--r--synapse/storage/end_to_end_keys.py2
-rw-r--r--synapse/storage/events.py15
-rw-r--r--synapse/storage/presence.py14
-rw-r--r--synapse/storage/roommember.py21
6 files changed, 272 insertions, 15 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index b9968debe5..d604e7668f 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -297,6 +297,82 @@ class DataStore(RoomMemberStore, RoomStore,
             desc="get_user_ip_and_agents",
         )
 
+    def get_users(self):
+        """Function to reterive a list of users in users table.
+
+        Args:
+        Returns:
+            defer.Deferred: resolves to list[dict[str, Any]]
+        """
+        return self._simple_select_list(
+            table="users",
+            keyvalues={},
+            retcols=[
+                "name",
+                "password_hash",
+                "is_guest",
+                "admin"
+            ],
+            desc="get_users",
+        )
+
+    def get_users_paginate(self, order, start, limit):
+        """Function to reterive a paginated list of users from
+        users list. This will return a json object, which contains
+        list of users and the total number of users in users table.
+
+        Args:
+            order (str): column name to order the select by this column
+            start (int): start number to begin the query from
+            limit (int): number of rows to reterive
+        Returns:
+            defer.Deferred: resolves to json object {list[dict[str, Any]], count}
+        """
+        is_guest = 0
+        i_start = (int)(start)
+        i_limit = (int)(limit)
+        return self.get_user_list_paginate(
+            table="users",
+            keyvalues={
+                "is_guest": is_guest
+            },
+            pagevalues=[
+                order,
+                i_limit,
+                i_start
+            ],
+            retcols=[
+                "name",
+                "password_hash",
+                "is_guest",
+                "admin"
+            ],
+            desc="get_users_paginate",
+        )
+
+    def search_users(self, term):
+        """Function to search users list for one or more users with
+        the matched term.
+
+        Args:
+            term (str): search term
+            col (str): column to query term should be matched to
+        Returns:
+            defer.Deferred: resolves to list[dict[str, Any]]
+        """
+        return self._simple_search_list(
+            table="users",
+            term=term,
+            col="name",
+            retcols=[
+                "name",
+                "password_hash",
+                "is_guest",
+                "admin"
+            ],
+            desc="search_users",
+        )
+
 
 def are_all_users_on_domain(txn, database_engine, domain):
     sql = database_engine.convert_param_style(
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 05374682fd..b0dc391190 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -934,6 +934,165 @@ class SQLBaseStore(object):
         else:
             return 0
 
+    def _simple_select_list_paginate(self, table, keyvalues, pagevalues, retcols,
+                                     desc="_simple_select_list_paginate"):
+        """Executes a SELECT query on the named table with start and limit,
+        of row numbers, which may return zero or number of rows from start to limit,
+        returning the result as a list of dicts.
+
+        Args:
+            table (str): the table name
+            keyvalues (dict[str, Any] | None):
+                column names and values to select the rows with, or None to not
+                apply a WHERE clause.
+            retcols (iterable[str]): the names of the columns to return
+            order (str): order the select by this column
+            start (int): start number to begin the query from
+            limit (int): number of rows to reterive
+        Returns:
+            defer.Deferred: resolves to list[dict[str, Any]]
+        """
+        return self.runInteraction(
+            desc,
+            self._simple_select_list_paginate_txn,
+            table, keyvalues, pagevalues, retcols
+        )
+
+    @classmethod
+    def _simple_select_list_paginate_txn(cls, txn, table, keyvalues, pagevalues, retcols):
+        """Executes a SELECT query on the named table with start and limit,
+        of row numbers, which may return zero or number of rows from start to limit,
+        returning the result as a list of dicts.
+
+        Args:
+            txn : Transaction object
+            table (str): the table name
+            keyvalues (dict[str, T] | None):
+                column names and values to select the rows with, or None to not
+                apply a WHERE clause.
+            pagevalues ([]):
+                order (str): order the select by this column
+                start (int): start number to begin the query from
+                limit (int): number of rows to reterive
+            retcols (iterable[str]): the names of the columns to return
+        Returns:
+            defer.Deferred: resolves to list[dict[str, Any]]
+
+        """
+        if keyvalues:
+            sql = "SELECT %s FROM %s WHERE %s ORDER BY %s" % (
+                ", ".join(retcols),
+                table,
+                " AND ".join("%s = ?" % (k,) for k in keyvalues),
+                " ? ASC LIMIT ? OFFSET ?"
+            )
+            txn.execute(sql, keyvalues.values() + pagevalues)
+        else:
+            sql = "SELECT %s FROM %s ORDER BY %s" % (
+                ", ".join(retcols),
+                table,
+                " ? ASC LIMIT ? OFFSET ?"
+            )
+            txn.execute(sql, pagevalues)
+
+        return cls.cursor_to_dict(txn)
+
+    @defer.inlineCallbacks
+    def get_user_list_paginate(self, table, keyvalues, pagevalues, retcols,
+                               desc="get_user_list_paginate"):
+        """Get a list of users from start row to a limit number of rows. This will
+        return a json object with users and total number of users in users list.
+
+        Args:
+            table (str): the table name
+            keyvalues (dict[str, Any] | None):
+                column names and values to select the rows with, or None to not
+                apply a WHERE clause.
+            pagevalues ([]):
+                order (str): order the select by this column
+                start (int): start number to begin the query from
+                limit (int): number of rows to reterive
+            retcols (iterable[str]): the names of the columns to return
+        Returns:
+            defer.Deferred: resolves to json object {list[dict[str, Any]], count}
+        """
+        users = yield self.runInteraction(
+            desc,
+            self._simple_select_list_paginate_txn,
+            table, keyvalues, pagevalues, retcols
+        )
+        count = yield self.runInteraction(
+            desc,
+            self.get_user_count_txn
+        )
+        retval = {
+            "users": users,
+            "total": count
+        }
+        defer.returnValue(retval)
+
+    def get_user_count_txn(self, txn):
+        """Get a total number of registerd users in the users list.
+
+        Args:
+            txn : Transaction object
+        Returns:
+            defer.Deferred: resolves to int
+        """
+        sql_count = "SELECT COUNT(*) FROM users WHERE is_guest = 0;"
+        txn.execute(sql_count)
+        count = txn.fetchone()[0]
+        defer.returnValue(count)
+
+    def _simple_search_list(self, table, term, col, retcols,
+                            desc="_simple_search_list"):
+        """Executes a SELECT query on the named table, which may return zero or
+        more rows, returning the result as a list of dicts.
+
+        Args:
+            table (str): the table name
+            term (str | None):
+                term for searching the table matched to a column.
+            col (str): column to query term should be matched to
+            retcols (iterable[str]): the names of the columns to return
+        Returns:
+            defer.Deferred: resolves to list[dict[str, Any]] or None
+        """
+
+        return self.runInteraction(
+            desc,
+            self._simple_search_list_txn,
+            table, term, col, retcols
+        )
+
+    @classmethod
+    def _simple_search_list_txn(cls, txn, table, term, col, retcols):
+        """Executes a SELECT query on the named table, which may return zero or
+        more rows, returning the result as a list of dicts.
+
+        Args:
+            txn : Transaction object
+            table (str): the table name
+            term (str | None):
+                term for searching the table matched to a column.
+            col (str): column to query term should be matched to
+            retcols (iterable[str]): the names of the columns to return
+        Returns:
+            defer.Deferred: resolves to list[dict[str, Any]] or None
+        """
+        if term:
+            sql = "SELECT %s FROM %s WHERE %s LIKE ?" % (
+                ", ".join(retcols),
+                table,
+                col
+            )
+            termvalues = ["%%" + term + "%%"]
+            txn.execute(sql, termvalues)
+        else:
+            return 0
+
+        return cls.cursor_to_dict(txn)
+
 
 class _RollbackButIsFineException(Exception):
     """ This exception is used to rollback a transaction without implying
diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py
index 2040e022fa..b9f1365f92 100644
--- a/synapse/storage/end_to_end_keys.py
+++ b/synapse/storage/end_to_end_keys.py
@@ -93,7 +93,7 @@ class EndToEndKeyStore(SQLBaseStore):
             query_clause = "user_id = ?"
             query_params.append(user_id)
 
-            if device_id:
+            if device_id is not None:
                 query_clause += " AND device_id = ?"
                 query_params.append(device_id)
 
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 8659f605a5..c88f689d3a 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -302,7 +302,7 @@ class EventsStore(SQLBaseStore):
                                 room_id
                             )
                             new_latest_event_ids = yield self._calculate_new_extremeties(
-                                room_id, [ev for ev, _ in ev_ctx_rm]
+                                room_id, ev_ctx_rm, latest_event_ids
                             )
 
                             if new_latest_event_ids == set(latest_event_ids):
@@ -329,27 +329,24 @@ class EventsStore(SQLBaseStore):
                 persist_event_counter.inc_by(len(chunk))
 
     @defer.inlineCallbacks
-    def _calculate_new_extremeties(self, room_id, events):
+    def _calculate_new_extremeties(self, room_id, event_contexts, latest_event_ids):
         """Calculates the new forward extremeties for a room given events to
         persist.
 
         Assumes that we are only persisting events for one room at a time.
         """
-        latest_event_ids = yield self.get_latest_event_ids_in_room(
-            room_id
-        )
         new_latest_event_ids = set(latest_event_ids)
         # First, add all the new events to the list
         new_latest_event_ids.update(
-            event.event_id for event in events
-            if not event.internal_metadata.is_outlier()
+            event.event_id for event, ctx in event_contexts
+            if not event.internal_metadata.is_outlier() and not ctx.rejected
         )
         # Now remove all events that are referenced by the to-be-added events
         new_latest_event_ids.difference_update(
             e_id
-            for event in events
+            for event, ctx in event_contexts
             for e_id, _ in event.prev_events
-            if not event.internal_metadata.is_outlier()
+            if not event.internal_metadata.is_outlier() and not ctx.rejected
         )
 
         # And finally remove any events that are referenced by previously added
diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py
index 7460f98a1f..4d1590d2b4 100644
--- a/synapse/storage/presence.py
+++ b/synapse/storage/presence.py
@@ -15,7 +15,7 @@
 
 from ._base import SQLBaseStore
 from synapse.api.constants import PresenceState
-from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
+from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
 
 from collections import namedtuple
 from twisted.internet import defer
@@ -85,6 +85,9 @@ class PresenceStore(SQLBaseStore):
                 self.presence_stream_cache.entity_has_changed,
                 state.user_id, stream_id,
             )
+            self._invalidate_cache_and_stream(
+                txn, self._get_presence_for_user, (state.user_id,)
+            )
 
         # Actually insert new rows
         self._simple_insert_many_txn(
@@ -143,7 +146,12 @@ class PresenceStore(SQLBaseStore):
             "get_all_presence_updates", get_all_presence_updates_txn
         )
 
-    @defer.inlineCallbacks
+    @cached()
+    def _get_presence_for_user(self, user_id):
+        raise NotImplementedError()
+
+    @cachedList(cached_method_name="_get_presence_for_user", list_name="user_ids",
+                num_args=1, inlineCallbacks=True)
     def get_presence_for_users(self, user_ids):
         rows = yield self._simple_select_many_batch(
             table="presence_stream",
@@ -165,7 +173,7 @@ class PresenceStore(SQLBaseStore):
         for row in rows:
             row["currently_active"] = bool(row["currently_active"])
 
-        defer.returnValue([UserPresenceState(**row) for row in rows])
+        defer.returnValue({row["user_id"]: UserPresenceState(**row) for row in rows})
 
     def get_current_presence_token(self):
         return self._presence_id_gen.get_current_token()
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index ee800d074f..545d3d3a99 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -129,7 +129,7 @@ class RoomMemberStore(SQLBaseStore):
         with self._stream_id_gen.get_next() as stream_ordering:
             yield self.runInteraction("locally_reject_invite", f, stream_ordering)
 
-    @cached(max_entries=100000, iterable=True)
+    @cached(max_entries=500000, iterable=True)
     def get_users_in_room(self, room_id):
         def f(txn):
 
@@ -274,12 +274,29 @@ class RoomMemberStore(SQLBaseStore):
 
         return rows
 
-    @cached(max_entries=5000)
+    @cached(max_entries=500000, iterable=True)
     def get_rooms_for_user(self, user_id):
         return self.get_rooms_for_user_where_membership_is(
             user_id, membership_list=[Membership.JOIN],
         )
 
+    @cachedInlineCallbacks(max_entries=500000, cache_context=True, iterable=True)
+    def get_users_who_share_room_with_user(self, user_id, cache_context):
+        """Returns the set of users who share a room with `user_id`
+        """
+        rooms = yield self.get_rooms_for_user(
+            user_id, on_invalidate=cache_context.invalidate,
+        )
+
+        user_who_share_room = set()
+        for room in rooms:
+            user_ids = yield self.get_users_in_room(
+                room.room_id, on_invalidate=cache_context.invalidate,
+            )
+            user_who_share_room.update(user_ids)
+
+        defer.returnValue(user_who_share_room)
+
     def forget(self, user_id, room_id):
         """Indicate that user_id wishes to discard history for room_id."""
         def f(txn):