diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index b9968debe5..d604e7668f 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -297,6 +297,82 @@ class DataStore(RoomMemberStore, RoomStore,
desc="get_user_ip_and_agents",
)
+ def get_users(self):
+ """Function to reterive a list of users in users table.
+
+ Args:
+ Returns:
+ defer.Deferred: resolves to list[dict[str, Any]]
+ """
+ return self._simple_select_list(
+ table="users",
+ keyvalues={},
+ retcols=[
+ "name",
+ "password_hash",
+ "is_guest",
+ "admin"
+ ],
+ desc="get_users",
+ )
+
+ def get_users_paginate(self, order, start, limit):
+ """Function to reterive a paginated list of users from
+ users list. This will return a json object, which contains
+ list of users and the total number of users in users table.
+
+ Args:
+ order (str): column name to order the select by this column
+ start (int): start number to begin the query from
+ limit (int): number of rows to reterive
+ Returns:
+ defer.Deferred: resolves to json object {list[dict[str, Any]], count}
+ """
+ is_guest = 0
+ i_start = (int)(start)
+ i_limit = (int)(limit)
+ return self.get_user_list_paginate(
+ table="users",
+ keyvalues={
+ "is_guest": is_guest
+ },
+ pagevalues=[
+ order,
+ i_limit,
+ i_start
+ ],
+ retcols=[
+ "name",
+ "password_hash",
+ "is_guest",
+ "admin"
+ ],
+ desc="get_users_paginate",
+ )
+
+ def search_users(self, term):
+ """Function to search users list for one or more users with
+ the matched term.
+
+ Args:
+ term (str): search term
+ col (str): column to query term should be matched to
+ Returns:
+ defer.Deferred: resolves to list[dict[str, Any]]
+ """
+ return self._simple_search_list(
+ table="users",
+ term=term,
+ col="name",
+ retcols=[
+ "name",
+ "password_hash",
+ "is_guest",
+ "admin"
+ ],
+ desc="search_users",
+ )
+
def are_all_users_on_domain(txn, database_engine, domain):
sql = database_engine.convert_param_style(
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 05374682fd..b0dc391190 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -934,6 +934,165 @@ class SQLBaseStore(object):
else:
return 0
+ def _simple_select_list_paginate(self, table, keyvalues, pagevalues, retcols,
+ desc="_simple_select_list_paginate"):
+ """Executes a SELECT query on the named table with start and limit,
+ of row numbers, which may return zero or number of rows from start to limit,
+ returning the result as a list of dicts.
+
+ Args:
+ table (str): the table name
+ keyvalues (dict[str, Any] | None):
+ column names and values to select the rows with, or None to not
+ apply a WHERE clause.
+ retcols (iterable[str]): the names of the columns to return
+ order (str): order the select by this column
+ start (int): start number to begin the query from
+ limit (int): number of rows to reterive
+ Returns:
+ defer.Deferred: resolves to list[dict[str, Any]]
+ """
+ return self.runInteraction(
+ desc,
+ self._simple_select_list_paginate_txn,
+ table, keyvalues, pagevalues, retcols
+ )
+
+ @classmethod
+ def _simple_select_list_paginate_txn(cls, txn, table, keyvalues, pagevalues, retcols):
+ """Executes a SELECT query on the named table with start and limit,
+ of row numbers, which may return zero or number of rows from start to limit,
+ returning the result as a list of dicts.
+
+ Args:
+ txn : Transaction object
+ table (str): the table name
+ keyvalues (dict[str, T] | None):
+ column names and values to select the rows with, or None to not
+ apply a WHERE clause.
+ pagevalues ([]):
+ order (str): order the select by this column
+ start (int): start number to begin the query from
+ limit (int): number of rows to reterive
+ retcols (iterable[str]): the names of the columns to return
+ Returns:
+ defer.Deferred: resolves to list[dict[str, Any]]
+
+ """
+ if keyvalues:
+ sql = "SELECT %s FROM %s WHERE %s ORDER BY %s" % (
+ ", ".join(retcols),
+ table,
+ " AND ".join("%s = ?" % (k,) for k in keyvalues),
+ " ? ASC LIMIT ? OFFSET ?"
+ )
+ txn.execute(sql, keyvalues.values() + pagevalues)
+ else:
+ sql = "SELECT %s FROM %s ORDER BY %s" % (
+ ", ".join(retcols),
+ table,
+ " ? ASC LIMIT ? OFFSET ?"
+ )
+ txn.execute(sql, pagevalues)
+
+ return cls.cursor_to_dict(txn)
+
+ @defer.inlineCallbacks
+ def get_user_list_paginate(self, table, keyvalues, pagevalues, retcols,
+ desc="get_user_list_paginate"):
+ """Get a list of users from start row to a limit number of rows. This will
+ return a json object with users and total number of users in users list.
+
+ Args:
+ table (str): the table name
+ keyvalues (dict[str, Any] | None):
+ column names and values to select the rows with, or None to not
+ apply a WHERE clause.
+ pagevalues ([]):
+ order (str): order the select by this column
+ start (int): start number to begin the query from
+ limit (int): number of rows to reterive
+ retcols (iterable[str]): the names of the columns to return
+ Returns:
+ defer.Deferred: resolves to json object {list[dict[str, Any]], count}
+ """
+ users = yield self.runInteraction(
+ desc,
+ self._simple_select_list_paginate_txn,
+ table, keyvalues, pagevalues, retcols
+ )
+ count = yield self.runInteraction(
+ desc,
+ self.get_user_count_txn
+ )
+ retval = {
+ "users": users,
+ "total": count
+ }
+ defer.returnValue(retval)
+
+ def get_user_count_txn(self, txn):
+ """Get a total number of registerd users in the users list.
+
+ Args:
+ txn : Transaction object
+ Returns:
+ defer.Deferred: resolves to int
+ """
+ sql_count = "SELECT COUNT(*) FROM users WHERE is_guest = 0;"
+ txn.execute(sql_count)
+ count = txn.fetchone()[0]
+ defer.returnValue(count)
+
+ def _simple_search_list(self, table, term, col, retcols,
+ desc="_simple_search_list"):
+ """Executes a SELECT query on the named table, which may return zero or
+ more rows, returning the result as a list of dicts.
+
+ Args:
+ table (str): the table name
+ term (str | None):
+ term for searching the table matched to a column.
+ col (str): column to query term should be matched to
+ retcols (iterable[str]): the names of the columns to return
+ Returns:
+ defer.Deferred: resolves to list[dict[str, Any]] or None
+ """
+
+ return self.runInteraction(
+ desc,
+ self._simple_search_list_txn,
+ table, term, col, retcols
+ )
+
+ @classmethod
+ def _simple_search_list_txn(cls, txn, table, term, col, retcols):
+ """Executes a SELECT query on the named table, which may return zero or
+ more rows, returning the result as a list of dicts.
+
+ Args:
+ txn : Transaction object
+ table (str): the table name
+ term (str | None):
+ term for searching the table matched to a column.
+ col (str): column to query term should be matched to
+ retcols (iterable[str]): the names of the columns to return
+ Returns:
+ defer.Deferred: resolves to list[dict[str, Any]] or None
+ """
+ if term:
+ sql = "SELECT %s FROM %s WHERE %s LIKE ?" % (
+ ", ".join(retcols),
+ table,
+ col
+ )
+ termvalues = ["%%" + term + "%%"]
+ txn.execute(sql, termvalues)
+ else:
+ return 0
+
+ return cls.cursor_to_dict(txn)
+
class _RollbackButIsFineException(Exception):
""" This exception is used to rollback a transaction without implying
diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py
index 2040e022fa..b9f1365f92 100644
--- a/synapse/storage/end_to_end_keys.py
+++ b/synapse/storage/end_to_end_keys.py
@@ -93,7 +93,7 @@ class EndToEndKeyStore(SQLBaseStore):
query_clause = "user_id = ?"
query_params.append(user_id)
- if device_id:
+ if device_id is not None:
query_clause += " AND device_id = ?"
query_params.append(device_id)
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 8659f605a5..c88f689d3a 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -302,7 +302,7 @@ class EventsStore(SQLBaseStore):
room_id
)
new_latest_event_ids = yield self._calculate_new_extremeties(
- room_id, [ev for ev, _ in ev_ctx_rm]
+ room_id, ev_ctx_rm, latest_event_ids
)
if new_latest_event_ids == set(latest_event_ids):
@@ -329,27 +329,24 @@ class EventsStore(SQLBaseStore):
persist_event_counter.inc_by(len(chunk))
@defer.inlineCallbacks
- def _calculate_new_extremeties(self, room_id, events):
+ def _calculate_new_extremeties(self, room_id, event_contexts, latest_event_ids):
"""Calculates the new forward extremeties for a room given events to
persist.
Assumes that we are only persisting events for one room at a time.
"""
- latest_event_ids = yield self.get_latest_event_ids_in_room(
- room_id
- )
new_latest_event_ids = set(latest_event_ids)
# First, add all the new events to the list
new_latest_event_ids.update(
- event.event_id for event in events
- if not event.internal_metadata.is_outlier()
+ event.event_id for event, ctx in event_contexts
+ if not event.internal_metadata.is_outlier() and not ctx.rejected
)
# Now remove all events that are referenced by the to-be-added events
new_latest_event_ids.difference_update(
e_id
- for event in events
+ for event, ctx in event_contexts
for e_id, _ in event.prev_events
- if not event.internal_metadata.is_outlier()
+ if not event.internal_metadata.is_outlier() and not ctx.rejected
)
# And finally remove any events that are referenced by previously added
diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py
index 7460f98a1f..4d1590d2b4 100644
--- a/synapse/storage/presence.py
+++ b/synapse/storage/presence.py
@@ -15,7 +15,7 @@
from ._base import SQLBaseStore
from synapse.api.constants import PresenceState
-from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
+from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
from collections import namedtuple
from twisted.internet import defer
@@ -85,6 +85,9 @@ class PresenceStore(SQLBaseStore):
self.presence_stream_cache.entity_has_changed,
state.user_id, stream_id,
)
+ self._invalidate_cache_and_stream(
+ txn, self._get_presence_for_user, (state.user_id,)
+ )
# Actually insert new rows
self._simple_insert_many_txn(
@@ -143,7 +146,12 @@ class PresenceStore(SQLBaseStore):
"get_all_presence_updates", get_all_presence_updates_txn
)
- @defer.inlineCallbacks
+ @cached()
+ def _get_presence_for_user(self, user_id):
+ raise NotImplementedError()
+
+ @cachedList(cached_method_name="_get_presence_for_user", list_name="user_ids",
+ num_args=1, inlineCallbacks=True)
def get_presence_for_users(self, user_ids):
rows = yield self._simple_select_many_batch(
table="presence_stream",
@@ -165,7 +173,7 @@ class PresenceStore(SQLBaseStore):
for row in rows:
row["currently_active"] = bool(row["currently_active"])
- defer.returnValue([UserPresenceState(**row) for row in rows])
+ defer.returnValue({row["user_id"]: UserPresenceState(**row) for row in rows})
def get_current_presence_token(self):
return self._presence_id_gen.get_current_token()
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index ee800d074f..545d3d3a99 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -129,7 +129,7 @@ class RoomMemberStore(SQLBaseStore):
with self._stream_id_gen.get_next() as stream_ordering:
yield self.runInteraction("locally_reject_invite", f, stream_ordering)
- @cached(max_entries=100000, iterable=True)
+ @cached(max_entries=500000, iterable=True)
def get_users_in_room(self, room_id):
def f(txn):
@@ -274,12 +274,29 @@ class RoomMemberStore(SQLBaseStore):
return rows
- @cached(max_entries=5000)
+ @cached(max_entries=500000, iterable=True)
def get_rooms_for_user(self, user_id):
return self.get_rooms_for_user_where_membership_is(
user_id, membership_list=[Membership.JOIN],
)
+ @cachedInlineCallbacks(max_entries=500000, cache_context=True, iterable=True)
+ def get_users_who_share_room_with_user(self, user_id, cache_context):
+ """Returns the set of users who share a room with `user_id`
+ """
+ rooms = yield self.get_rooms_for_user(
+ user_id, on_invalidate=cache_context.invalidate,
+ )
+
+ user_who_share_room = set()
+ for room in rooms:
+ user_ids = yield self.get_users_in_room(
+ room.room_id, on_invalidate=cache_context.invalidate,
+ )
+ user_who_share_room.update(user_ids)
+
+ defer.returnValue(user_who_share_room)
+
def forget(self, user_id, room_id):
"""Indicate that user_id wishes to discard history for room_id."""
def f(txn):
|