diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 53c685c173..42cd3c83ad 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -14,12 +14,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import datetime
+import calendar
import logging
import time
-from dateutil import tz
-
from synapse.api.constants import PresenceState
from synapse.storage.devices import DeviceStore
from synapse.storage.user_erasure_store import UserErasureStore
@@ -119,7 +117,6 @@ class DataStore(RoomMemberStore, RoomStore,
db_conn, "device_lists_stream", "stream_id",
)
- self._transaction_id_gen = IdGenerator(db_conn, "sent_transactions", "id")
self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id")
self._event_reports_id_gen = IdGenerator(db_conn, "event_reports", "id")
self._push_rule_id_gen = IdGenerator(db_conn, "push_rules", "id")
@@ -320,7 +317,7 @@ class DataStore(RoomMemberStore, RoomStore,
thirty_days_ago_in_secs))
for row in txn:
- if row[0] is 'unknown':
+ if row[0] == 'unknown':
pass
results[row[0]] = row[1]
@@ -358,10 +355,11 @@ class DataStore(RoomMemberStore, RoomStore,
"""
Returns millisecond unixtime for start of UTC day.
"""
- now = datetime.datetime.utcnow()
- today_start = datetime.datetime(now.year, now.month,
- now.day, tzinfo=tz.tzutc())
- return int(time.mktime(today_start.timetuple())) * 1000
+ now = time.gmtime()
+ today_start = calendar.timegm((
+ now.tm_year, now.tm_mon, now.tm_mday, 0, 0, 0,
+ ))
+ return today_start * 1000
def generate_user_daily_visits(self):
"""
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index d9d0255d0b..5a80eef211 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import itertools
import logging
import sys
import threading
@@ -26,9 +27,12 @@ from prometheus_client import Histogram
from twisted.internet import defer
from synapse.api.errors import StoreError
-from synapse.storage.engines import PostgresEngine
+from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.storage.engines import PostgresEngine, Sqlite3Engine
+from synapse.types import get_domain_from_id
from synapse.util.caches.descriptors import Cache
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+from synapse.util.stringutils import exception_to_unicode
logger = logging.getLogger(__name__)
@@ -48,6 +52,25 @@ sql_query_timer = Histogram("synapse_storage_query_time", "sec", ["verb"])
sql_txn_timer = Histogram("synapse_storage_transaction_time", "sec", ["desc"])
+# Unique indexes which have been added in background updates. Maps from table name
+# to the name of the background update which added the unique index to that table.
+#
+# This is used by the upsert logic to figure out which tables are safe to do a proper
+# UPSERT on: until the relevant background update has completed, we
+# have to emulate an upsert by locking the table.
+#
+UNIQUE_INDEX_BACKGROUND_UPDATES = {
+ "user_ips": "user_ips_device_unique_index",
+ "device_lists_remote_extremeties": "device_lists_remote_extremeties_unique_idx",
+ "device_lists_remote_cache": "device_lists_remote_cache_unique_idx",
+ "event_search": "event_search_event_id_idx",
+}
+
+# This is a special cache name we use to batch multiple invalidations of caches
+# based on the current state when notifying workers over replication.
+_CURRENT_STATE_CACHE_NAME = "cs_cache_fake"
+
+
class LoggingTransaction(object):
"""An object that almost-transparently proxies for the 'txn' object
passed to the constructor. Adds logging and metrics to the .execute()
@@ -83,6 +106,14 @@ class LoggingTransaction(object):
def __iter__(self):
return self.txn.__iter__()
+ def execute_batch(self, sql, args):
+ if isinstance(self.database_engine, PostgresEngine):
+ from psycopg2.extras import execute_batch
+ self._do_execute(lambda *x: execute_batch(self.txn, *x), sql, args)
+ else:
+ for val in args:
+ self.execute(sql, val)
+
def execute(self, sql, *args):
self._do_execute(self.txn.execute, sql, *args)
@@ -191,6 +222,57 @@ class SQLBaseStore(object):
self.database_engine = hs.database_engine
+ # A set of tables that are not safe to use native upserts in.
+ self._unsafe_to_upsert_tables = set(UNIQUE_INDEX_BACKGROUND_UPDATES.keys())
+
+ # We add the user_directory_search table to the blacklist on SQLite
+ # because the existing search table does not have an index, making it
+ # unsafe to use native upserts.
+ if isinstance(self.database_engine, Sqlite3Engine):
+ self._unsafe_to_upsert_tables.add("user_directory_search")
+
+ if self.database_engine.can_native_upsert:
+ # Check ASAP (and then later, every 1s) to see if we have finished
+ # background updates of tables that aren't safe to update.
+ self._clock.call_later(
+ 0.0,
+ run_as_background_process,
+ "upsert_safety_check",
+ self._check_safe_to_upsert
+ )
+
+ @defer.inlineCallbacks
+ def _check_safe_to_upsert(self):
+ """
+ Is it safe to use native UPSERT?
+
+ If there are background updates, we will need to wait, as they may be
+ the addition of indexes that set the UNIQUE constraint that we require.
+
+ If the background updates have not completed, wait 15 sec and check again.
+ """
+ updates = yield self._simple_select_list(
+ "background_updates",
+ keyvalues=None,
+ retcols=["update_name"],
+ desc="check_background_updates",
+ )
+ updates = [x["update_name"] for x in updates]
+
+ for table, update_name in UNIQUE_INDEX_BACKGROUND_UPDATES.items():
+ if update_name not in updates:
+ logger.debug("Now safe to upsert in %s", table)
+ self._unsafe_to_upsert_tables.discard(table)
+
+ # If there's any updates still running, reschedule to run.
+ if updates:
+ self._clock.call_later(
+ 15.0,
+ run_as_background_process,
+ "upsert_safety_check",
+ self._check_safe_to_upsert
+ )
+
def start_profiling(self):
self._previous_loop_ts = self._clock.time_msec()
@@ -249,32 +331,32 @@ class SQLBaseStore(object):
except self.database_engine.module.OperationalError as e:
# This can happen if the database disappears mid
# transaction.
- logger.warn(
+ logger.warning(
"[TXN OPERROR] {%s} %s %d/%d",
- name, e, i, N
+ name, exception_to_unicode(e), i, N
)
if i < N:
i += 1
try:
conn.rollback()
except self.database_engine.module.Error as e1:
- logger.warn(
+ logger.warning(
"[TXN EROLL] {%s} %s",
- name, e1,
+ name, exception_to_unicode(e1),
)
continue
raise
except self.database_engine.module.DatabaseError as e:
if self.database_engine.is_deadlock(e):
- logger.warn("[TXN DEADLOCK] {%s} %d/%d", name, i, N)
+ logger.warning("[TXN DEADLOCK] {%s} %d/%d", name, i, N)
if i < N:
i += 1
try:
conn.rollback()
except self.database_engine.module.Error as e1:
- logger.warn(
+ logger.warning(
"[TXN EROLL] {%s} %s",
- name, e1,
+ name, exception_to_unicode(e1),
)
continue
raise
@@ -493,8 +575,15 @@ class SQLBaseStore(object):
txn.executemany(sql, vals)
@defer.inlineCallbacks
- def _simple_upsert(self, table, keyvalues, values,
- insertion_values={}, desc="_simple_upsert", lock=True):
+ def _simple_upsert(
+ self,
+ table,
+ keyvalues,
+ values,
+ insertion_values={},
+ desc="_simple_upsert",
+ lock=True
+ ):
"""
`lock` should generally be set to True (the default), but can be set
@@ -515,16 +604,21 @@ class SQLBaseStore(object):
inserting
lock (bool): True to lock the table when doing the upsert.
Returns:
- Deferred(bool): True if a new entry was created, False if an
- existing one was updated.
+ Deferred(None or bool): Native upserts always return None. Emulated
+ upserts return True if a new entry was created, False if an existing
+ one was updated.
"""
attempts = 0
while True:
try:
result = yield self.runInteraction(
desc,
- self._simple_upsert_txn, table, keyvalues, values, insertion_values,
- lock=lock
+ self._simple_upsert_txn,
+ table,
+ keyvalues,
+ values,
+ insertion_values,
+ lock=lock,
)
defer.returnValue(result)
except self.database_engine.module.IntegrityError as e:
@@ -536,30 +630,111 @@ class SQLBaseStore(object):
# presumably we raced with another transaction: let's retry.
logger.warn(
- "IntegrityError when upserting into %s; retrying: %s",
- table, e
+ "%s when upserting into %s; retrying: %s", e.__name__, table, e
)
- def _simple_upsert_txn(self, txn, table, keyvalues, values, insertion_values={},
- lock=True):
+ def _simple_upsert_txn(
+ self,
+ txn,
+ table,
+ keyvalues,
+ values,
+ insertion_values={},
+ lock=True,
+ ):
+ """
+ Pick the UPSERT method which works best on the platform. Either the
+ native one (Pg9.5+, recent SQLites), or fall back to an emulated method.
+
+ Args:
+ txn: The transaction to use.
+ table (str): The table to upsert into
+ keyvalues (dict): The unique key tables and their new values
+ values (dict): The nonunique columns and their new values
+ insertion_values (dict): additional key/values to use only when
+ inserting
+ lock (bool): True to lock the table when doing the upsert.
+ Returns:
+ None or bool: Native upserts always return None. Emulated
+ upserts return True if a new entry was created, False if an existing
+ one was updated.
+ """
+ if (
+ self.database_engine.can_native_upsert
+ and table not in self._unsafe_to_upsert_tables
+ ):
+ return self._simple_upsert_txn_native_upsert(
+ txn,
+ table,
+ keyvalues,
+ values,
+ insertion_values=insertion_values,
+ )
+ else:
+ return self._simple_upsert_txn_emulated(
+ txn,
+ table,
+ keyvalues,
+ values,
+ insertion_values=insertion_values,
+ lock=lock,
+ )
+
+ def _simple_upsert_txn_emulated(
+ self, txn, table, keyvalues, values, insertion_values={}, lock=True
+ ):
+ """
+ Args:
+ table (str): The table to upsert into
+ keyvalues (dict): The unique key tables and their new values
+ values (dict): The nonunique columns and their new values
+ insertion_values (dict): additional key/values to use only when
+ inserting
+ lock (bool): True to lock the table when doing the upsert.
+ Returns:
+ bool: Return True if a new entry was created, False if an existing
+ one was updated.
+ """
# We need to lock the table :(, unless we're *really* careful
if lock:
self.database_engine.lock_table(txn, table)
- # First try to update.
- sql = "UPDATE %s SET %s WHERE %s" % (
- table,
- ", ".join("%s = ?" % (k,) for k in values),
- " AND ".join("%s = ?" % (k,) for k in keyvalues)
- )
- sqlargs = list(values.values()) + list(keyvalues.values())
+ def _getwhere(key):
+ # If the value we're passing in is None (aka NULL), we need to use
+ # IS, not =, as NULL = NULL equals NULL (False).
+ if keyvalues[key] is None:
+ return "%s IS ?" % (key,)
+ else:
+ return "%s = ?" % (key,)
- txn.execute(sql, sqlargs)
- if txn.rowcount > 0:
- # successfully updated at least one row.
- return False
+ if not values:
+ # If `values` is empty, then all of the values we care about are in
+ # the unique key, so there is nothing to UPDATE. We can just do a
+ # SELECT instead to see if it exists.
+ sql = "SELECT 1 FROM %s WHERE %s" % (
+ table,
+ " AND ".join(_getwhere(k) for k in keyvalues)
+ )
+ sqlargs = list(keyvalues.values())
+ txn.execute(sql, sqlargs)
+ if txn.fetchall():
+ # We have an existing record.
+ return False
+ else:
+ # First try to update.
+ sql = "UPDATE %s SET %s WHERE %s" % (
+ table,
+ ", ".join("%s = ?" % (k,) for k in values),
+ " AND ".join(_getwhere(k) for k in keyvalues)
+ )
+ sqlargs = list(values.values()) + list(keyvalues.values())
- # We didn't update any rows so insert a new one
+ txn.execute(sql, sqlargs)
+ if txn.rowcount > 0:
+ # successfully updated at least one row.
+ return False
+
+ # We didn't find any existing rows, so insert a new one
allvalues = {}
allvalues.update(keyvalues)
allvalues.update(values)
@@ -568,12 +743,144 @@ class SQLBaseStore(object):
sql = "INSERT INTO %s (%s) VALUES (%s)" % (
table,
", ".join(k for k in allvalues),
- ", ".join("?" for _ in allvalues)
+ ", ".join("?" for _ in allvalues),
)
txn.execute(sql, list(allvalues.values()))
# successfully inserted
return True
+ def _simple_upsert_txn_native_upsert(
+ self, txn, table, keyvalues, values, insertion_values={}
+ ):
+ """
+ Use the native UPSERT functionality in recent PostgreSQL versions.
+
+ Args:
+ table (str): The table to upsert into
+ keyvalues (dict): The unique key tables and their new values
+ values (dict): The nonunique columns and their new values
+ insertion_values (dict): additional key/values to use only when
+ inserting
+ Returns:
+ None
+ """
+ allvalues = {}
+ allvalues.update(keyvalues)
+ allvalues.update(values)
+ allvalues.update(insertion_values)
+
+ sql = (
+ "INSERT INTO %s (%s) VALUES (%s) "
+ "ON CONFLICT (%s) DO UPDATE SET %s"
+ ) % (
+ table,
+ ", ".join(k for k in allvalues),
+ ", ".join("?" for _ in allvalues),
+ ", ".join(k for k in keyvalues),
+ ", ".join(k + "=EXCLUDED." + k for k in values),
+ )
+ txn.execute(sql, list(allvalues.values()))
+
+ def _simple_upsert_many_txn(
+ self, txn, table, key_names, key_values, value_names, value_values
+ ):
+ """
+ Upsert, many times.
+
+ Args:
+ table (str): The table to upsert into
+ key_names (list[str]): The key column names.
+ key_values (list[list]): A list of each row's key column values.
+ value_names (list[str]): The value column names. If empty, no
+ values will be used, even if value_values is provided.
+ value_values (list[list]): A list of each row's value column values.
+ Returns:
+ None
+ """
+ if (
+ self.database_engine.can_native_upsert
+ and table not in self._unsafe_to_upsert_tables
+ ):
+ return self._simple_upsert_many_txn_native_upsert(
+ txn, table, key_names, key_values, value_names, value_values
+ )
+ else:
+ return self._simple_upsert_many_txn_emulated(
+ txn, table, key_names, key_values, value_names, value_values
+ )
+
+ def _simple_upsert_many_txn_emulated(
+ self, txn, table, key_names, key_values, value_names, value_values
+ ):
+ """
+ Upsert, many times, but without native UPSERT support or batching.
+
+ Args:
+ table (str): The table to upsert into
+ key_names (list[str]): The key column names.
+ key_values (list[list]): A list of each row's key column values.
+ value_names (list[str]): The value column names. If empty, no
+ values will be used, even if value_values is provided.
+ value_values (list[list]): A list of each row's value column values.
+ Returns:
+ None
+ """
+ # No value columns, therefore make a blank list so that the following
+ # zip() works correctly.
+ if not value_names:
+ value_values = [() for x in range(len(key_values))]
+
+ for keyv, valv in zip(key_values, value_values):
+ _keys = {x: y for x, y in zip(key_names, keyv)}
+ _vals = {x: y for x, y in zip(value_names, valv)}
+
+ self._simple_upsert_txn_emulated(txn, table, _keys, _vals)
+
+ def _simple_upsert_many_txn_native_upsert(
+ self, txn, table, key_names, key_values, value_names, value_values
+ ):
+ """
+ Upsert, many times, using batching where possible.
+
+ Args:
+ table (str): The table to upsert into
+ key_names (list[str]): The key column names.
+ key_values (list[list]): A list of each row's key column values.
+ value_names (list[str]): The value column names. If empty, no
+ values will be used, even if value_values is provided.
+ value_values (list[list]): A list of each row's value column values.
+ Returns:
+ None
+ """
+ allnames = []
+ allnames.extend(key_names)
+ allnames.extend(value_names)
+
+ if not value_names:
+ # No value columns, therefore make a blank list so that the
+ # following zip() works correctly.
+ latter = "NOTHING"
+ value_values = [() for x in range(len(key_values))]
+ else:
+ latter = (
+ "UPDATE SET " + ", ".join(k + "=EXCLUDED." + k for k in value_names)
+ )
+
+ sql = "INSERT INTO %s (%s) VALUES (%s) ON CONFLICT (%s) DO %s" % (
+ table,
+ ", ".join(k for k in allnames),
+ ", ".join("?" for _ in allnames),
+ ", ".join(key_names),
+ latter,
+ )
+
+ args = []
+
+ for x, y in zip(key_values, value_values):
+ args.append(tuple(x) + tuple(y))
+
+ return txn.execute_batch(sql, args)
+
def _simple_select_one(self, table, keyvalues, retcols,
allow_none=False, desc="_simple_select_one"):
"""Executes a SELECT query on the named table, which is expected to
@@ -849,9 +1156,9 @@ class SQLBaseStore(object):
rowcount = cls._simple_update_txn(txn, table, keyvalues, updatevalues)
if rowcount == 0:
- raise StoreError(404, "No row found")
+ raise StoreError(404, "No row found (%s)" % (table,))
if rowcount > 1:
- raise StoreError(500, "More than one row matched")
+ raise StoreError(500, "More than one row matched (%s)" % (table,))
@staticmethod
def _simple_select_one_txn(txn, table, keyvalues, retcols,
@@ -868,9 +1175,9 @@ class SQLBaseStore(object):
if not row:
if allow_none:
return None
- raise StoreError(404, "No row found")
+ raise StoreError(404, "No row found (%s)" % (table,))
if txn.rowcount > 1:
- raise StoreError(500, "More than one row matched")
+ raise StoreError(500, "More than one row matched (%s)" % (table,))
return dict(zip(retcols, row))
@@ -902,9 +1209,9 @@ class SQLBaseStore(object):
txn.execute(sql, list(keyvalues.values()))
if txn.rowcount == 0:
- raise StoreError(404, "No row found")
+ raise StoreError(404, "No row found (%s)" % (table,))
if txn.rowcount > 1:
- raise StoreError(500, "more than one row matched")
+ raise StoreError(500, "More than one row matched (%s)" % (table,))
def _simple_delete(self, table, keyvalues, desc):
return self.runInteraction(
@@ -1005,6 +1312,84 @@ class SQLBaseStore(object):
be invalidated.
"""
txn.call_after(cache_func.invalidate, keys)
+ self._send_invalidation_to_replication(txn, cache_func.__name__, keys)
+
+ def _invalidate_state_caches_and_stream(self, txn, room_id, members_changed):
+ """Special case invalidation of caches based on current state.
+
+ We special case this so that we can batch the cache invalidations into a
+ single replication poke.
+
+ Args:
+ txn
+ room_id (str): Room where state changed
+ members_changed (iterable[str]): The user_ids of members that have changed
+ """
+ txn.call_after(self._invalidate_state_caches, room_id, members_changed)
+
+ keys = itertools.chain([room_id], members_changed)
+ self._send_invalidation_to_replication(
+ txn, _CURRENT_STATE_CACHE_NAME, keys,
+ )
+
+ def _invalidate_state_caches(self, room_id, members_changed):
+ """Invalidates caches that are based on the current state, but does
+ not stream invalidations down replication.
+
+ Args:
+ room_id (str): Room where state changed
+ members_changed (iterable[str]): The user_ids of members that have
+ changed
+ """
+ for member in members_changed:
+ self._attempt_to_invalidate_cache(
+ "get_rooms_for_user_with_stream_ordering", (member,),
+ )
+
+ for host in set(get_domain_from_id(u) for u in members_changed):
+ self._attempt_to_invalidate_cache(
+ "is_host_joined", (room_id, host,),
+ )
+ self._attempt_to_invalidate_cache(
+ "was_host_joined", (room_id, host,),
+ )
+
+ self._attempt_to_invalidate_cache(
+ "get_users_in_room", (room_id,),
+ )
+ self._attempt_to_invalidate_cache(
+ "get_room_summary", (room_id,),
+ )
+ self._attempt_to_invalidate_cache(
+ "get_current_state_ids", (room_id,),
+ )
+
+ def _attempt_to_invalidate_cache(self, cache_name, key):
+ """Attempts to invalidate the cache of the given name, ignoring if the
+ cache doesn't exist. Mainly used for invalidating caches on workers,
+ where they may not have the cache.
+
+ Args:
+ cache_name (str)
+ key (tuple)
+ """
+ try:
+ getattr(self, cache_name).invalidate(key)
+ except AttributeError:
+ # We probably haven't pulled in the cache in this worker,
+ # which is fine.
+ pass
+
+ def _send_invalidation_to_replication(self, txn, cache_name, keys):
+ """Notifies replication that given cache has been invalidated.
+
+ Note that this does *not* invalidate the cache locally.
+
+ Args:
+ txn
+ cache_name (str)
+ keys (iterable[str])
+ """
if isinstance(self.database_engine, PostgresEngine):
# get_next() returns a context manager which is designed to wrap
@@ -1022,7 +1407,7 @@ class SQLBaseStore(object):
table="cache_invalidation_stream",
values={
"stream_id": stream_id,
- "cache_func": cache_func.__name__,
+ "cache_func": cache_name,
"keys": list(keys),
"invalidation_ts": self.clock.time_msec(),
}
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index 5fe1ca2de7..60cdc884e6 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -240,7 +240,7 @@ class BackgroundUpdateStore(SQLBaseStore):
* An integer count of the number of items to update in this batch.
The handler should return a deferred integer count of items updated.
- The hander is responsible for updating the progress of the update.
+ The handler is responsible for updating the progress of the update.
Args:
update_name(str): The name of the update that this code handles.
diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py
index 9ad17b7c25..9c21362226 100644
--- a/synapse/storage/client_ips.py
+++ b/synapse/storage/client_ips.py
@@ -65,7 +65,32 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
columns=["last_seen"],
)
- # (user_id, access_token, ip) -> (user_agent, device_id, last_seen)
+ self.register_background_update_handler(
+ "user_ips_analyze",
+ self._analyze_user_ip,
+ )
+
+ self.register_background_update_handler(
+ "user_ips_remove_dupes",
+ self._remove_user_ip_dupes,
+ )
+
+ # Register a unique index
+ self.register_background_index_update(
+ "user_ips_device_unique_index",
+ index_name="user_ips_user_token_ip_unique_index",
+ table="user_ips",
+ columns=["user_id", "access_token", "ip"],
+ unique=True,
+ )
+
+ # Drop the old non-unique index
+ self.register_background_update_handler(
+ "user_ips_drop_nonunique_index",
+ self._remove_user_ip_nonunique,
+ )
+
+ # (user_id, access_token, ip,) -> (user_agent, device_id, last_seen)
self._batch_row_update = {}
self._client_ip_looper = self._clock.looping_call(
@@ -76,6 +101,205 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
)
@defer.inlineCallbacks
+ def _remove_user_ip_nonunique(self, progress, batch_size):
+ def f(conn):
+ txn = conn.cursor()
+ txn.execute(
+ "DROP INDEX IF EXISTS user_ips_user_ip"
+ )
+ txn.close()
+
+ yield self.runWithConnection(f)
+ yield self._end_background_update("user_ips_drop_nonunique_index")
+ defer.returnValue(1)
+
+ @defer.inlineCallbacks
+ def _analyze_user_ip(self, progress, batch_size):
+ # Background update to analyze user_ips table before we run the
+ # deduplication background update. The table may not have been analyzed
+ # for ages due to the table locks.
+ #
+ # This will lock out the naive upserts to user_ips while it happens, but
+ # the analyze should be quick (28GB table takes ~10s)
+ def user_ips_analyze(txn):
+ txn.execute("ANALYZE user_ips")
+
+ yield self.runInteraction(
+ "user_ips_analyze", user_ips_analyze
+ )
+
+ yield self._end_background_update("user_ips_analyze")
+
+ defer.returnValue(1)
+
+ @defer.inlineCallbacks
+ def _remove_user_ip_dupes(self, progress, batch_size):
+ # This works function works by scanning the user_ips table in batches
+ # based on `last_seen`. For each row in a batch it searches the rest of
+ # the table to see if there are any duplicates, if there are then they
+ # are removed and replaced with a suitable row.
+
+ # Fetch the start of the batch
+ begin_last_seen = progress.get("last_seen", 0)
+
+ def get_last_seen(txn):
+ txn.execute(
+ """
+ SELECT last_seen FROM user_ips
+ WHERE last_seen > ?
+ ORDER BY last_seen
+ LIMIT 1
+ OFFSET ?
+ """,
+ (begin_last_seen, batch_size)
+ )
+ row = txn.fetchone()
+ if row:
+ return row[0]
+ else:
+ return None
+
+ # Get a last seen that has roughly `batch_size` since `begin_last_seen`
+ end_last_seen = yield self.runInteraction(
+ "user_ips_dups_get_last_seen", get_last_seen
+ )
+
+ # If it returns None, then we're processing the last batch
+ last = end_last_seen is None
+
+ logger.info(
+ "Scanning for duplicate 'user_ips' rows in range: %s <= last_seen < %s",
+ begin_last_seen, end_last_seen,
+ )
+
+ def remove(txn):
+ # This works by looking at all entries in the given time span, and
+ # then for each (user_id, access_token, ip) tuple in that range
+ # checking for any duplicates in the rest of the table (via a join).
+ # It then only returns entries which have duplicates, and the max
+ # last_seen across all duplicates, which can the be used to delete
+ # all other duplicates.
+ # It is efficient due to the existence of (user_id, access_token,
+ # ip) and (last_seen) indices.
+
+ # Define the search space, which requires handling the last batch in
+ # a different way
+ if last:
+ clause = "? <= last_seen"
+ args = (begin_last_seen,)
+ else:
+ clause = "? <= last_seen AND last_seen < ?"
+ args = (begin_last_seen, end_last_seen)
+
+ # (Note: The DISTINCT in the inner query is important to ensure that
+ # the COUNT(*) is accurate, otherwise double counting may happen due
+ # to the join effectively being a cross product)
+ txn.execute(
+ """
+ SELECT user_id, access_token, ip,
+ MAX(device_id), MAX(user_agent), MAX(last_seen),
+ COUNT(*)
+ FROM (
+ SELECT DISTINCT user_id, access_token, ip
+ FROM user_ips
+ WHERE {}
+ ) c
+ INNER JOIN user_ips USING (user_id, access_token, ip)
+ GROUP BY user_id, access_token, ip
+ HAVING count(*) > 1
+ """.format(clause),
+ args
+ )
+ res = txn.fetchall()
+
+ # We've got some duplicates
+ for i in res:
+ user_id, access_token, ip, device_id, user_agent, last_seen, count = i
+
+ # We want to delete the duplicates so we end up with only a
+ # single row.
+ #
+ # The naive way of doing this would be just to delete all rows
+ # and reinsert a constructed row. However, if there are a lot of
+ # duplicate rows this can cause the table to grow a lot, which
+ # can be problematic in two ways:
+ # 1. If user_ips is already large then this can cause the
+ # table to rapidly grow, potentially filling the disk.
+ # 2. Reinserting a lot of rows can confuse the table
+ # statistics for postgres, causing it to not use the
+ # correct indices for the query above, resulting in a full
+ # table scan. This is incredibly slow for large tables and
+ # can kill database performance. (This seems to mainly
+ # happen for the last query where the clause is simply `? <
+ # last_seen`)
+ #
+ # So instead we want to delete all but *one* of the duplicate
+ # rows. That is hard to do reliably, so we cheat and do a two
+ # step process:
+ # 1. Delete all rows with a last_seen strictly less than the
+ # max last_seen. This hopefully results in deleting all but
+ # one row the majority of the time, but there may be
+ # duplicate last_seen
+ # 2. If multiple rows remain, we fall back to the naive method
+ # and simply delete all rows and reinsert.
+ #
+ # Note that this relies on no new duplicate rows being inserted,
+ # but if that is happening then this entire process is futile
+ # anyway.
+
+ # Do step 1:
+
+ txn.execute(
+ """
+ DELETE FROM user_ips
+ WHERE user_id = ? AND access_token = ? AND ip = ? AND last_seen < ?
+ """,
+ (user_id, access_token, ip, last_seen)
+ )
+ if txn.rowcount == count - 1:
+ # We deleted all but one of the duplicate rows, i.e. there
+ # is exactly one remaining and so there is nothing left to
+ # do.
+ continue
+ elif txn.rowcount >= count:
+ raise Exception(
+ "We deleted more duplicate rows from 'user_ips' than expected",
+ )
+
+ # The previous step didn't delete enough rows, so we fallback to
+ # step 2:
+
+ # Drop all the duplicates
+ txn.execute(
+ """
+ DELETE FROM user_ips
+ WHERE user_id = ? AND access_token = ? AND ip = ?
+ """,
+ (user_id, access_token, ip)
+ )
+
+ # Add in one to be the last_seen
+ txn.execute(
+ """
+ INSERT INTO user_ips
+ (user_id, access_token, ip, device_id, user_agent, last_seen)
+ VALUES (?, ?, ?, ?, ?, ?)
+ """,
+ (user_id, access_token, ip, device_id, user_agent, last_seen)
+ )
+
+ self._background_update_progress_txn(
+ txn, "user_ips_remove_dupes", {"last_seen": end_last_seen}
+ )
+
+ yield self.runInteraction("user_ips_dups_remove", remove)
+
+ if last:
+ yield self._end_background_update("user_ips_remove_dupes")
+
+ defer.returnValue(batch_size)
+
+ @defer.inlineCallbacks
def insert_client_ip(self, user_id, access_token, ip, user_agent, device_id,
now=None):
if not now:
@@ -114,7 +338,10 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
)
def _update_client_ips_batch_txn(self, txn, to_update):
- self.database_engine.lock_table(txn, "user_ips")
+ if "user_ips" in self._unsafe_to_upsert_tables or (
+ not self.database_engine.can_native_upsert
+ ):
+ self.database_engine.lock_table(txn, "user_ips")
for entry in iteritems(to_update):
(user_id, access_token, ip), (user_agent, device_id, last_seen) = entry
@@ -127,10 +354,10 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
"user_id": user_id,
"access_token": access_token,
"ip": ip,
- "user_agent": user_agent,
- "device_id": device_id,
},
values={
+ "user_agent": user_agent,
+ "device_id": device_id,
"last_seen": last_seen,
},
lock=False,
@@ -227,7 +454,7 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
results = {}
for key in self._batch_row_update:
- uid, access_token, ip = key
+ uid, access_token, ip, = key
if uid == user_id:
user_agent, _, last_seen = self._batch_row_update[key]
results[(access_token, ip)] = (user_agent, last_seen)
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index d10ff9e4b9..ecdab34e7d 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -22,14 +22,19 @@ from twisted.internet import defer
from synapse.api.errors import StoreError
from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.storage.background_updates import BackgroundUpdateStore
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
-from ._base import Cache, SQLBaseStore, db_to_json
+from ._base import Cache, db_to_json
logger = logging.getLogger(__name__)
+DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES = (
+ "drop_device_list_streams_non_unique_indexes"
+)
-class DeviceStore(SQLBaseStore):
+
+class DeviceStore(BackgroundUpdateStore):
def __init__(self, db_conn, hs):
super(DeviceStore, self).__init__(db_conn, hs)
@@ -52,6 +57,30 @@ class DeviceStore(SQLBaseStore):
columns=["user_id", "device_id"],
)
+ # create a unique index on device_lists_remote_cache
+ self.register_background_index_update(
+ "device_lists_remote_cache_unique_idx",
+ index_name="device_lists_remote_cache_unique_id",
+ table="device_lists_remote_cache",
+ columns=["user_id", "device_id"],
+ unique=True,
+ )
+
+ # And one on device_lists_remote_extremeties
+ self.register_background_index_update(
+ "device_lists_remote_extremeties_unique_idx",
+ index_name="device_lists_remote_extremeties_unique_idx",
+ table="device_lists_remote_extremeties",
+ columns=["user_id"],
+ unique=True,
+ )
+
+ # once they complete, we can remove the old non-unique indexes.
+ self.register_background_update_handler(
+ DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES,
+ self._drop_device_list_streams_non_unique_indexes,
+ )
+
@defer.inlineCallbacks
def store_device(self, user_id, device_id,
initial_device_display_name):
@@ -239,7 +268,19 @@ class DeviceStore(SQLBaseStore):
def update_remote_device_list_cache_entry(self, user_id, device_id, content,
stream_id):
- """Updates a single user's device in the cache.
+ """Updates a single device in the cache of a remote user's devicelist.
+
+ Note: assumes that we are the only thread that can be updating this user's
+ device list.
+
+ Args:
+ user_id (str): User to update device list for
+ device_id (str): ID of decivice being updated
+ content (dict): new data on this device
+ stream_id (int): the version of the device list
+
+ Returns:
+ Deferred[None]
"""
return self.runInteraction(
"update_remote_device_list_cache_entry",
@@ -272,7 +313,11 @@ class DeviceStore(SQLBaseStore):
},
values={
"content": json.dumps(content),
- }
+ },
+
+ # we don't need to lock, because we assume we are the only thread
+ # updating this user's devices.
+ lock=False,
)
txn.call_after(self._get_cached_user_device.invalidate, (user_id, device_id,))
@@ -289,11 +334,26 @@ class DeviceStore(SQLBaseStore):
},
values={
"stream_id": stream_id,
- }
+ },
+
+ # again, we can assume we are the only thread updating this user's
+ # extremity.
+ lock=False,
)
def update_remote_device_list_cache(self, user_id, devices, stream_id):
- """Replace the cache of the remote user's devices.
+ """Replace the entire cache of the remote user's devices.
+
+ Note: assumes that we are the only thread that can be updating this user's
+ device list.
+
+ Args:
+ user_id (str): User to update device list for
+ devices (list[dict]): list of device objects supplied over federation
+ stream_id (int): the version of the device list
+
+ Returns:
+ Deferred[None]
"""
return self.runInteraction(
"update_remote_device_list_cache",
@@ -338,7 +398,11 @@ class DeviceStore(SQLBaseStore):
},
values={
"stream_id": stream_id,
- }
+ },
+
+ # we don't need to lock, because we can assume we are the only thread
+ # updating this user's extremity.
+ lock=False,
)
def get_devices_by_remote(self, destination, from_stream_id):
@@ -589,10 +653,14 @@ class DeviceStore(SQLBaseStore):
combined list of changes to devices, and which destinations need to be
poked. `destination` may be None if no destinations need to be poked.
"""
+ # We do a group by here as there can be a large number of duplicate
+ # entries, since we throw away device IDs.
sql = """
- SELECT stream_id, user_id, destination FROM device_lists_stream
+ SELECT MAX(stream_id) AS stream_id, user_id, destination
+ FROM device_lists_stream
LEFT JOIN device_lists_outbound_pokes USING (stream_id, user_id, device_id)
WHERE ? < stream_id AND stream_id <= ?
+ GROUP BY user_id, destination
"""
return self._execute(
"get_all_device_list_changes_for_remotes", None,
@@ -718,3 +786,19 @@ class DeviceStore(SQLBaseStore):
"_prune_old_outbound_device_pokes",
_prune_txn,
)
+
+ @defer.inlineCallbacks
+ def _drop_device_list_streams_non_unique_indexes(self, progress, batch_size):
+ def f(conn):
+ txn = conn.cursor()
+ txn.execute(
+ "DROP INDEX IF EXISTS device_lists_remote_cache_id"
+ )
+ txn.execute(
+ "DROP INDEX IF EXISTS device_lists_remote_extremeties_id"
+ )
+ txn.close()
+
+ yield self.runWithConnection(f)
+ yield self._end_background_update(DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES)
+ defer.returnValue(1)
diff --git a/synapse/storage/e2e_room_keys.py b/synapse/storage/e2e_room_keys.py
index f25ded2295..9a3aec759e 100644
--- a/synapse/storage/e2e_room_keys.py
+++ b/synapse/storage/e2e_room_keys.py
@@ -118,6 +118,11 @@ class EndToEndRoomKeyStore(SQLBaseStore):
these room keys.
"""
+ try:
+ version = int(version)
+ except ValueError:
+ defer.returnValue({'rooms': {}})
+
keyvalues = {
"user_id": user_id,
"version": version,
@@ -177,7 +182,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
keyvalues = {
"user_id": user_id,
- "version": version,
+ "version": int(version),
}
if room_id:
keyvalues['room_id'] = room_id
@@ -212,14 +217,23 @@ class EndToEndRoomKeyStore(SQLBaseStore):
Raises:
StoreError: with code 404 if there are no e2e_room_keys_versions present
Returns:
- A deferred dict giving the info metadata for this backup version
+ A deferred dict giving the info metadata for this backup version, with
+ fields including:
+ version(str)
+ algorithm(str)
+ auth_data(object): opaque dict supplied by the client
"""
def _get_e2e_room_keys_version_info_txn(txn):
if version is None:
this_version = self._get_current_version(txn, user_id)
else:
- this_version = version
+ try:
+ this_version = int(version)
+ except ValueError:
+ # Our versions are all ints so if we can't convert it to an integer,
+ # it isn't there.
+ raise StoreError(404, "No row found")
result = self._simple_select_one_txn(
txn,
@@ -236,6 +250,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
),
)
result["auth_data"] = json.loads(result["auth_data"])
+ result["version"] = str(result["version"])
return result
return self.runInteraction(
@@ -283,6 +298,27 @@ class EndToEndRoomKeyStore(SQLBaseStore):
"create_e2e_room_keys_version_txn", _create_e2e_room_keys_version_txn
)
+ def update_e2e_room_keys_version(self, user_id, version, info):
+ """Update a given backup version
+
+ Args:
+ user_id(str): the user whose backup version we're updating
+ version(str): the version ID of the backup version we're updating
+ info(dict): the new backup version info to store
+ """
+
+ return self._simple_update(
+ table="e2e_room_keys_versions",
+ keyvalues={
+ "user_id": user_id,
+ "version": version,
+ },
+ updatevalues={
+ "auth_data": json.dumps(info["auth_data"]),
+ },
+ desc="update_e2e_room_keys_version"
+ )
+
def delete_e2e_room_keys_version(self, user_id, version=None):
"""Delete a given backup version of the user's room keys.
Doesn't delete their actual key data.
diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py
index 1f1721e820..2a0f6cfca9 100644
--- a/synapse/storage/end_to_end_keys.py
+++ b/synapse/storage/end_to_end_keys.py
@@ -40,7 +40,10 @@ class EndToEndKeyStore(SQLBaseStore):
allow_none=True,
)
- new_key_json = encode_canonical_json(device_keys)
+ # In py3 we need old_key_json to match new_key_json type. The DB
+ # returns unicode while encode_canonical_json returns bytes.
+ new_key_json = encode_canonical_json(device_keys).decode("utf-8")
+
if old_key_json == new_key_json:
return False
diff --git a/synapse/storage/engines/__init__.py b/synapse/storage/engines/__init__.py
index e2f9de8451..ff5ef97ca8 100644
--- a/synapse/storage/engines/__init__.py
+++ b/synapse/storage/engines/__init__.py
@@ -18,7 +18,7 @@ import platform
from ._base import IncorrectDatabaseSetup
from .postgres import PostgresEngine
-from .sqlite3 import Sqlite3Engine
+from .sqlite import Sqlite3Engine
SUPPORTED_MODULE = {
"sqlite3": Sqlite3Engine,
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index 42225f8a2a..4004427c7b 100644
--- a/synapse/storage/engines/postgres.py
+++ b/synapse/storage/engines/postgres.py
@@ -38,6 +38,13 @@ class PostgresEngine(object):
return sql.replace("?", "%s")
def on_new_connection(self, db_conn):
+
+ # Get the version of PostgreSQL that we're using. As per the psycopg2
+ # docs: The number is formed by converting the major, minor, and
+ # revision numbers into two-decimal-digit numbers and appending them
+ # together. For example, version 8.1.5 will be returned as 80105
+ self._version = db_conn.server_version
+
db_conn.set_isolation_level(
self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ
)
@@ -54,6 +61,13 @@ class PostgresEngine(object):
cursor.close()
+ @property
+ def can_native_upsert(self):
+ """
+ Can we use native UPSERTs? This requires PostgreSQL 9.5+.
+ """
+ return self._version >= 90500
+
def is_deadlock(self, error):
if isinstance(error, self.module.DatabaseError):
# https://www.postgresql.org/docs/current/static/errcodes-appendix.html
diff --git a/synapse/storage/engines/sqlite3.py b/synapse/storage/engines/sqlite.py
index 19949fc474..059ab81055 100644
--- a/synapse/storage/engines/sqlite3.py
+++ b/synapse/storage/engines/sqlite.py
@@ -30,6 +30,14 @@ class Sqlite3Engine(object):
self._current_state_group_id = None
self._current_state_group_id_lock = threading.Lock()
+ @property
+ def can_native_upsert(self):
+ """
+ Do we support native UPSERTs? This requires SQLite3 3.24+, plus some
+ more work we haven't done yet to tell what was inserted vs updated.
+ """
+ return self.module.sqlite_version_info >= (3, 24, 0)
+
def check_database(self, txn):
pass
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 3faca2a042..38809ed0fc 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -125,6 +125,29 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
return dict(txn)
+ @defer.inlineCallbacks
+ def get_max_depth_of(self, event_ids):
+ """Returns the max depth of a set of event IDs
+
+ Args:
+ event_ids (list[str])
+
+ Returns
+ Deferred[int]
+ """
+ rows = yield self._simple_select_many_batch(
+ table="events",
+ column="event_id",
+ iterable=event_ids,
+ retcols=("depth",),
+ desc="get_max_depth_of",
+ )
+
+ if not rows:
+ defer.returnValue(0)
+ else:
+ defer.returnValue(max(row["depth"] for row in rows))
+
def _get_oldest_events_in_room_txn(self, txn, room_id):
return self._simple_select_onecol_txn(
txn,
@@ -477,7 +500,7 @@ class EventFederationStore(EventFederationWorkerStore):
"is_state": False,
}
for ev in events
- for e_id, _ in ev.prev_events
+ for e_id in ev.prev_event_ids()
],
)
@@ -510,7 +533,7 @@ class EventFederationStore(EventFederationWorkerStore):
txn.executemany(query, [
(e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False)
- for ev in events for e_id, _ in ev.prev_events
+ for ev in events for e_id in ev.prev_event_ids()
if not ev.internal_metadata.is_outlier()
])
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 8881b009df..06db9e56e6 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -38,6 +38,7 @@ from synapse.state import StateResolutionStore
from synapse.storage.background_updates import BackgroundUpdateStore
from synapse.storage.event_federation import EventFederationStore
from synapse.storage.events_worker import EventsWorkerStore
+from synapse.storage.state import StateGroupWorkerStore
from synapse.types import RoomStreamToken, get_domain_from_id
from synapse.util import batch_iter
from synapse.util.async_helpers import ObservableDeferred
@@ -205,7 +206,8 @@ def _retry_on_integrity_error(func):
# inherits from EventFederationStore so that we can call _update_backward_extremities
# and _handle_mult_prev_events (though arguably those could both be moved in here)
-class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore):
+class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore,
+ BackgroundUpdateStore):
EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
@@ -414,7 +416,7 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
)
if len_1:
all_single_prev_not_state = all(
- len(event.prev_events) == 1
+ len(event.prev_event_ids()) == 1
and not event.is_state()
for event, ctx in ev_ctx_rm
)
@@ -438,7 +440,7 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
# guess this by looking at the prev_events and checking
# if they match the current forward extremities.
for ev, _ in ev_ctx_rm:
- prev_event_ids = set(e for e, _ in ev.prev_events)
+ prev_event_ids = set(ev.prev_event_ids())
if latest_event_ids == prev_event_ids:
state_delta_reuse_delta_counter.inc()
break
@@ -549,7 +551,7 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
result.difference_update(
e_id
for event in new_events
- for e_id, _ in event.prev_events
+ for e_id in event.prev_event_ids()
)
# Finally, remove any events which are prev_events of any existing events.
@@ -737,7 +739,18 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
}
events_map = {ev.event_id: ev for ev, _ in events_context}
- room_version = yield self.get_room_version(room_id)
+
+ # We need to get the room version, which is in the create event.
+ # Normally that'd be in the database, but its also possible that we're
+ # currently trying to persist it.
+ room_version = None
+ for ev, _ in events_context:
+ if ev.type == EventTypes.Create and ev.state_key == "":
+ room_version = ev.content.get("room_version", "1")
+ break
+
+ if not room_version:
+ room_version = yield self.get_room_version(room_id)
logger.debug("calling resolve_state_groups from preserve_events")
res = yield self._state_resolution_handler.resolve_state_groups(
@@ -867,7 +880,7 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
"auth_id": auth_id,
}
for event, _ in events_and_contexts
- for auth_id, _ in event.auth_events
+ for auth_id in event.auth_event_ids()
if event.is_state()
],
)
@@ -891,105 +904,82 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order):
for room_id, current_state_tuple in iteritems(state_delta_by_room):
- to_delete, to_insert = current_state_tuple
-
- # First we add entries to the current_state_delta_stream. We
- # do this before updating the current_state_events table so
- # that we can use it to calculate the `prev_event_id`. (This
- # allows us to not have to pull out the existing state
- # unnecessarily).
- sql = """
- INSERT INTO current_state_delta_stream
- (stream_id, room_id, type, state_key, event_id, prev_event_id)
- SELECT ?, ?, ?, ?, ?, (
- SELECT event_id FROM current_state_events
- WHERE room_id = ? AND type = ? AND state_key = ?
- )
- """
- txn.executemany(sql, (
- (
- max_stream_order, room_id, etype, state_key, None,
- room_id, etype, state_key,
- )
- for etype, state_key in to_delete
- # We sanity check that we're deleting rather than updating
- if (etype, state_key) not in to_insert
- ))
- txn.executemany(sql, (
- (
- max_stream_order, room_id, etype, state_key, ev_id,
- room_id, etype, state_key,
- )
- for (etype, state_key), ev_id in iteritems(to_insert)
- ))
-
- # Now we actually update the current_state_events table
+ to_delete, to_insert = current_state_tuple
- txn.executemany(
- "DELETE FROM current_state_events"
- " WHERE room_id = ? AND type = ? AND state_key = ?",
- (
- (room_id, etype, state_key)
- for etype, state_key in itertools.chain(to_delete, to_insert)
- ),
+ # First we add entries to the current_state_delta_stream. We
+ # do this before updating the current_state_events table so
+ # that we can use it to calculate the `prev_event_id`. (This
+ # allows us to not have to pull out the existing state
+ # unnecessarily).
+ sql = """
+ INSERT INTO current_state_delta_stream
+ (stream_id, room_id, type, state_key, event_id, prev_event_id)
+ SELECT ?, ?, ?, ?, ?, (
+ SELECT event_id FROM current_state_events
+ WHERE room_id = ? AND type = ? AND state_key = ?
)
-
- self._simple_insert_many_txn(
- txn,
- table="current_state_events",
- values=[
- {
- "event_id": ev_id,
- "room_id": room_id,
- "type": key[0],
- "state_key": key[1],
- }
- for key, ev_id in iteritems(to_insert)
- ],
+ """
+ txn.executemany(sql, (
+ (
+ max_stream_order, room_id, etype, state_key, None,
+ room_id, etype, state_key,
)
-
- txn.call_after(
- self._curr_state_delta_stream_cache.entity_has_changed,
- room_id, max_stream_order,
+ for etype, state_key in to_delete
+ # We sanity check that we're deleting rather than updating
+ if (etype, state_key) not in to_insert
+ ))
+ txn.executemany(sql, (
+ (
+ max_stream_order, room_id, etype, state_key, ev_id,
+ room_id, etype, state_key,
)
+ for (etype, state_key), ev_id in iteritems(to_insert)
+ ))
- # Invalidate the various caches
-
- # Figure out the changes of membership to invalidate the
- # `get_rooms_for_user` cache.
- # We find out which membership events we may have deleted
- # and which we have added, then we invlidate the caches for all
- # those users.
- members_changed = set(
- state_key
- for ev_type, state_key in itertools.chain(to_delete, to_insert)
- if ev_type == EventTypes.Member
- )
+ # Now we actually update the current_state_events table
- for member in members_changed:
- self._invalidate_cache_and_stream(
- txn, self.get_rooms_for_user_with_stream_ordering, (member,)
- )
+ txn.executemany(
+ "DELETE FROM current_state_events"
+ " WHERE room_id = ? AND type = ? AND state_key = ?",
+ (
+ (room_id, etype, state_key)
+ for etype, state_key in itertools.chain(to_delete, to_insert)
+ ),
+ )
- for host in set(get_domain_from_id(u) for u in members_changed):
- self._invalidate_cache_and_stream(
- txn, self.is_host_joined, (room_id, host)
- )
- self._invalidate_cache_and_stream(
- txn, self.was_host_joined, (room_id, host)
- )
+ self._simple_insert_many_txn(
+ txn,
+ table="current_state_events",
+ values=[
+ {
+ "event_id": ev_id,
+ "room_id": room_id,
+ "type": key[0],
+ "state_key": key[1],
+ }
+ for key, ev_id in iteritems(to_insert)
+ ],
+ )
- self._invalidate_cache_and_stream(
- txn, self.get_users_in_room, (room_id,)
- )
+ txn.call_after(
+ self._curr_state_delta_stream_cache.entity_has_changed,
+ room_id, max_stream_order,
+ )
- self._invalidate_cache_and_stream(
- txn, self.get_room_summary, (room_id,)
- )
+ # Invalidate the various caches
+
+ # Figure out the changes of membership to invalidate the
+ # `get_rooms_for_user` cache.
+ # We find out which membership events we may have deleted
+ # and which we have added, then we invlidate the caches for all
+ # those users.
+ members_changed = set(
+ state_key
+ for ev_type, state_key in itertools.chain(to_delete, to_insert)
+ if ev_type == EventTypes.Member
+ )
- self._invalidate_cache_and_stream(
- txn, self.get_current_state_ids, (room_id,)
- )
+ self._invalidate_state_caches_and_stream(txn, room_id, members_changed)
def _update_forward_extremities_txn(self, txn, new_forward_extremities,
max_stream_order):
@@ -1255,6 +1245,7 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
event.internal_metadata.get_dict()
),
"json": encode_json(event_dict(event)),
+ "format_version": event.format_version,
}
for event, _ in events_and_contexts
],
@@ -2034,55 +2025,37 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
logger.info("[purge] finding redundant state groups")
- # Get all state groups that are only referenced by events that are
- # to be deleted.
- # This works by first getting state groups that we may want to delete,
- # joining against event_to_state_groups to get events that use that
- # state group, then left joining against events_to_purge again. Any
- # state group where the left join produce *no nulls* are referenced
- # only by events that are going to be purged.
+ # Get all state groups that are referenced by events that are to be
+ # deleted. We then go and check if they are referenced by other events
+ # or state groups, and if not we delete them.
txn.execute("""
- SELECT state_group FROM
- (
- SELECT DISTINCT state_group FROM events_to_purge
- INNER JOIN event_to_state_groups USING (event_id)
- ) AS sp
- INNER JOIN event_to_state_groups USING (state_group)
- LEFT JOIN events_to_purge AS ep USING (event_id)
- GROUP BY state_group
- HAVING SUM(CASE WHEN ep.event_id IS NULL THEN 1 ELSE 0 END) = 0
+ SELECT DISTINCT state_group FROM events_to_purge
+ INNER JOIN event_to_state_groups USING (event_id)
""")
- state_rows = txn.fetchall()
- logger.info("[purge] found %i redundant state groups", len(state_rows))
-
- # make a set of the redundant state groups, so that we can look them up
- # efficiently
- state_groups_to_delete = set([sg for sg, in state_rows])
-
- # Now we get all the state groups that rely on these state groups
- logger.info("[purge] finding state groups which depend on redundant"
- " state groups")
- remaining_state_groups = []
- for i in range(0, len(state_rows), 100):
- chunk = [sg for sg, in state_rows[i:i + 100]]
- # look for state groups whose prev_state_group is one we are about
- # to delete
- rows = self._simple_select_many_txn(
- txn,
- table="state_group_edges",
- column="prev_state_group",
- iterable=chunk,
- retcols=["state_group"],
- keyvalues={},
- )
- remaining_state_groups.extend(
- row["state_group"] for row in rows
+ referenced_state_groups = set(sg for sg, in txn)
+ logger.info(
+ "[purge] found %i referenced state groups",
+ len(referenced_state_groups),
+ )
+
+ logger.info("[purge] finding state groups that can be deleted")
- # exclude state groups we are about to delete: no point in
- # updating them
- if row["state_group"] not in state_groups_to_delete
+ state_groups_to_delete, remaining_state_groups = (
+ self._find_unreferenced_groups_during_purge(
+ txn, referenced_state_groups,
)
+ )
+
+ logger.info(
+ "[purge] found %i state groups to delete",
+ len(state_groups_to_delete),
+ )
+
+ logger.info(
+ "[purge] de-delta-ing %i remaining state groups",
+ len(remaining_state_groups),
+ )
# Now we turn the state groups that reference to-be-deleted state
# groups to non delta versions.
@@ -2127,11 +2100,11 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
logger.info("[purge] removing redundant state groups")
txn.executemany(
"DELETE FROM state_groups_state WHERE state_group = ?",
- state_rows
+ ((sg,) for sg in state_groups_to_delete),
)
txn.executemany(
"DELETE FROM state_groups WHERE id = ?",
- state_rows
+ ((sg,) for sg in state_groups_to_delete),
)
logger.info("[purge] removing events from event_to_state_groups")
@@ -2227,6 +2200,85 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
logger.info("[purge] done")
+ def _find_unreferenced_groups_during_purge(self, txn, state_groups):
+ """Used when purging history to figure out which state groups can be
+ deleted and which need to be de-delta'ed (due to one of its prev groups
+ being scheduled for deletion).
+
+ Args:
+ txn
+ state_groups (set[int]): Set of state groups referenced by events
+ that are going to be deleted.
+
+ Returns:
+ tuple[set[int], set[int]]: The set of state groups that can be
+ deleted and the set of state groups that need to be de-delta'ed
+ """
+ # Graph of state group -> previous group
+ graph = {}
+
+ # Set of events that we have found to be referenced by events
+ referenced_groups = set()
+
+ # Set of state groups we've already seen
+ state_groups_seen = set(state_groups)
+
+ # Set of state groups to handle next.
+ next_to_search = set(state_groups)
+ while next_to_search:
+ # We bound size of groups we're looking up at once, to stop the
+ # SQL query getting too big
+ if len(next_to_search) < 100:
+ current_search = next_to_search
+ next_to_search = set()
+ else:
+ current_search = set(itertools.islice(next_to_search, 100))
+ next_to_search -= current_search
+
+ # Check if state groups are referenced
+ sql = """
+ SELECT DISTINCT state_group FROM event_to_state_groups
+ LEFT JOIN events_to_purge AS ep USING (event_id)
+ WHERE state_group IN (%s) AND ep.event_id IS NULL
+ """ % (",".join("?" for _ in current_search),)
+ txn.execute(sql, list(current_search))
+
+ referenced = set(sg for sg, in txn)
+ referenced_groups |= referenced
+
+ # We don't continue iterating up the state group graphs for state
+ # groups that are referenced.
+ current_search -= referenced
+
+ rows = self._simple_select_many_txn(
+ txn,
+ table="state_group_edges",
+ column="prev_state_group",
+ iterable=current_search,
+ keyvalues={},
+ retcols=("prev_state_group", "state_group",),
+ )
+
+ prevs = set(row["state_group"] for row in rows)
+ # We don't bother re-handling groups we've already seen
+ prevs -= state_groups_seen
+ next_to_search |= prevs
+ state_groups_seen |= prevs
+
+ for row in rows:
+ # Note: Each state group can have at most one prev group
+ graph[row["state_group"]] = row["prev_state_group"]
+
+ to_delete = state_groups_seen - referenced_groups
+
+ to_dedelta = set()
+ for sg in referenced_groups:
+ prev_sg = graph.get(sg)
+ if prev_sg and prev_sg in to_delete:
+ to_dedelta.add(sg)
+
+ return to_delete, to_dedelta
+
@defer.inlineCallbacks
def is_event_after(self, event_id1, event_id2):
"""Returns True if event_id1 is after event_id2 in the stream
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index a8326f5296..1716be529a 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -21,13 +21,14 @@ from canonicaljson import json
from twisted.internet import defer
+from synapse.api.constants import EventFormatVersions, EventTypes
from synapse.api.errors import NotFoundError
+from synapse.events import FrozenEvent, event_type_from_format_version # noqa: F401
# these are only included to make the type annotations work
-from synapse.events import EventBase # noqa: F401
-from synapse.events import FrozenEvent
from synapse.events.snapshot import EventContext # noqa: F401
from synapse.events.utils import prune_event
from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.types import get_domain_from_id
from synapse.util.logcontext import (
LoggingContext,
PreserveLoggingContext,
@@ -160,9 +161,14 @@ class EventsWorkerStore(SQLBaseStore):
log_ctx = LoggingContext.current_context()
log_ctx.record_event_fetch(len(missing_events_ids))
+ # Note that _enqueue_events is also responsible for turning db rows
+ # into FrozenEvents (via _get_event_from_row), which involves seeing if
+ # the events have been redacted, and if so pulling the redaction event out
+ # of the database to check it.
+ #
+ # _enqueue_events is a bit of a rubbish name but naming is hard.
missing_events = yield self._enqueue_events(
missing_events_ids,
- check_redacted=check_redacted,
allow_rejected=allow_rejected,
)
@@ -174,6 +180,50 @@ class EventsWorkerStore(SQLBaseStore):
if not entry:
continue
+ # Starting in room version v3, some redactions need to be rechecked if we
+ # didn't have the redacted event at the time, so we recheck on read
+ # instead.
+ if not allow_rejected and entry.event.type == EventTypes.Redaction:
+ if entry.event.internal_metadata.need_to_check_redaction():
+ # XXX: we need to avoid calling get_event here.
+ #
+ # The problem is that we end up at this point when an event
+ # which has been redacted is pulled out of the database by
+ # _enqueue_events, because _enqueue_events needs to check the
+ # redaction before it can cache the redacted event. So obviously,
+ # calling get_event to get the redacted event out of the database
+ # gives us an infinite loop.
+ #
+ # For now (quick hack to fix during 0.99 release cycle), we just
+ # go and fetch the relevant row from the db, but it would be nice
+ # to think about how we can cache this rather than hit the db
+ # every time we access a redaction event.
+ #
+ # One thought on how to do this:
+ # 1. split _get_events up so that it is divided into (a) get the
+ # rawish event from the db/cache, (b) do the redaction/rejection
+ # filtering
+ # 2. have _get_event_from_row just call the first half of that
+
+ orig_sender = yield self._simple_select_one_onecol(
+ table="events",
+ keyvalues={"event_id": entry.event.redacts},
+ retcol="sender",
+ allow_none=True,
+ )
+
+ expected_domain = get_domain_from_id(entry.event.sender)
+ if orig_sender and get_domain_from_id(orig_sender) == expected_domain:
+ # This redaction event is allowed. Mark as not needing a
+ # recheck.
+ entry.event.internal_metadata.recheck_redaction = False
+ else:
+ # We don't have the event that is being redacted, so we
+ # assume that the event isn't authorized for now. (If we
+ # later receive the event, then we will always redact
+ # it anyway, since we have this redaction)
+ continue
+
if allow_rejected or not entry.event.rejected_reason:
if check_redacted and entry.redacted_event:
event = entry.redacted_event
@@ -197,7 +247,7 @@ class EventsWorkerStore(SQLBaseStore):
defer.returnValue(events)
def _invalidate_get_event_cache(self, event_id):
- self._get_event_cache.invalidate((event_id,))
+ self._get_event_cache.invalidate((event_id,))
def _get_events_from_cache(self, events, allow_rejected, update_metrics=True):
"""Fetch events from the caches
@@ -310,7 +360,7 @@ class EventsWorkerStore(SQLBaseStore):
self.hs.get_reactor().callFromThread(fire, event_list, e)
@defer.inlineCallbacks
- def _enqueue_events(self, events, check_redacted=True, allow_rejected=False):
+ def _enqueue_events(self, events, allow_rejected=False):
"""Fetches events from the database using the _event_fetch_list. This
allows batch and bulk fetching of events - it allows us to fetch events
without having to create a new transaction for each request for events.
@@ -353,6 +403,7 @@ class EventsWorkerStore(SQLBaseStore):
self._get_event_from_row,
row["internal_metadata"], row["json"], row["redacts"],
rejected_reason=row["rejects"],
+ format_version=row["format_version"],
)
for row in rows
],
@@ -377,6 +428,7 @@ class EventsWorkerStore(SQLBaseStore):
" e.event_id as event_id, "
" e.internal_metadata,"
" e.json,"
+ " e.format_version, "
" r.redacts as redacts,"
" rej.event_id as rejects "
" FROM event_json as e"
@@ -392,7 +444,7 @@ class EventsWorkerStore(SQLBaseStore):
@defer.inlineCallbacks
def _get_event_from_row(self, internal_metadata, js, redacted,
- rejected_reason=None):
+ format_version, rejected_reason=None):
with Measure(self._clock, "_get_event_from_row"):
d = json.loads(js)
internal_metadata = json.loads(internal_metadata)
@@ -405,8 +457,13 @@ class EventsWorkerStore(SQLBaseStore):
desc="_get_event_from_row_rejected_reason",
)
- original_ev = FrozenEvent(
- d,
+ if format_version is None:
+ # This means that we stored the event before we had the concept
+ # of a event format version, so it must be a V1 event.
+ format_version = EventFormatVersions.V1
+
+ original_ev = event_type_from_format_version(format_version)(
+ event_dict=d,
internal_metadata_dict=internal_metadata,
rejected_reason=rejected_reason,
)
@@ -436,6 +493,19 @@ class EventsWorkerStore(SQLBaseStore):
# will serialise this field correctly
redacted_event.unsigned["redacted_because"] = because
+ # Starting in room version v3, some redactions need to be
+ # rechecked if we didn't have the redacted event at the
+ # time, so we recheck on read instead.
+ if because.internal_metadata.need_to_check_redaction():
+ expected_domain = get_domain_from_id(original_ev.sender)
+ if get_domain_from_id(because.sender) == expected_domain:
+ # This redaction event is allowed. Mark as not needing a
+ # recheck.
+ because.internal_metadata.recheck_redaction = False
+ else:
+ # Senders don't match, so the event isn't actually redacted
+ redacted_event = None
+
cache_entry = _EventCacheEntry(
event=original_ev,
redacted_event=redacted_event,
diff --git a/synapse/storage/monthly_active_users.py b/synapse/storage/monthly_active_users.py
index cf4104dc2e..9e7e09b8c1 100644
--- a/synapse/storage/monthly_active_users.py
+++ b/synapse/storage/monthly_active_users.py
@@ -34,8 +34,9 @@ class MonthlyActiveUsersStore(SQLBaseStore):
self.hs = hs
self.reserved_users = ()
# Do not add more reserved users than the total allowable number
- self._initialise_reserved_users(
- dbconn.cursor(),
+ self._new_transaction(
+ dbconn, "initialise_mau_threepids", [], [],
+ self._initialise_reserved_users,
hs.config.mau_limits_reserved_threepids[:self.hs.config.max_mau_value],
)
@@ -54,9 +55,12 @@ class MonthlyActiveUsersStore(SQLBaseStore):
txn,
tp["medium"], tp["address"]
)
+
if user_id:
- self.upsert_monthly_active_user_txn(txn, user_id)
- reserved_user_list.append(user_id)
+ is_support = self.is_support_user_txn(txn, user_id)
+ if not is_support:
+ self.upsert_monthly_active_user_txn(txn, user_id)
+ reserved_user_list.append(user_id)
else:
logger.warning(
"mau limit reserved threepid %s not found in db" % tp
@@ -96,37 +100,38 @@ class MonthlyActiveUsersStore(SQLBaseStore):
txn.execute(sql, query_args)
- # If MAU user count still exceeds the MAU threshold, then delete on
- # a least recently active basis.
- # Note it is not possible to write this query using OFFSET due to
- # incompatibilities in how sqlite and postgres support the feature.
- # sqlite requires 'LIMIT -1 OFFSET ?', the LIMIT must be present
- # While Postgres does not require 'LIMIT', but also does not support
- # negative LIMIT values. So there is no way to write it that both can
- # support
- safe_guard = self.hs.config.max_mau_value - len(self.reserved_users)
- # Must be greater than zero for postgres
- safe_guard = safe_guard if safe_guard > 0 else 0
- query_args = [safe_guard]
-
- base_sql = """
- DELETE FROM monthly_active_users
- WHERE user_id NOT IN (
- SELECT user_id FROM monthly_active_users
- ORDER BY timestamp DESC
- LIMIT ?
+ if self.hs.config.limit_usage_by_mau:
+ # If MAU user count still exceeds the MAU threshold, then delete on
+ # a least recently active basis.
+ # Note it is not possible to write this query using OFFSET due to
+ # incompatibilities in how sqlite and postgres support the feature.
+ # sqlite requires 'LIMIT -1 OFFSET ?', the LIMIT must be present
+ # While Postgres does not require 'LIMIT', but also does not support
+ # negative LIMIT values. So there is no way to write it that both can
+ # support
+ safe_guard = self.hs.config.max_mau_value - len(self.reserved_users)
+ # Must be greater than zero for postgres
+ safe_guard = safe_guard if safe_guard > 0 else 0
+ query_args = [safe_guard]
+
+ base_sql = """
+ DELETE FROM monthly_active_users
+ WHERE user_id NOT IN (
+ SELECT user_id FROM monthly_active_users
+ ORDER BY timestamp DESC
+ LIMIT ?
+ )
+ """
+ # Need if/else since 'AND user_id NOT IN ({})' fails on Postgres
+ # when len(reserved_users) == 0. Works fine on sqlite.
+ if len(self.reserved_users) > 0:
+ query_args.extend(self.reserved_users)
+ sql = base_sql + """ AND user_id NOT IN ({})""".format(
+ ','.join(questionmarks)
)
- """
- # Need if/else since 'AND user_id NOT IN ({})' fails on Postgres
- # when len(reserved_users) == 0. Works fine on sqlite.
- if len(self.reserved_users) > 0:
- query_args.extend(self.reserved_users)
- sql = base_sql + """ AND user_id NOT IN ({})""".format(
- ','.join(questionmarks)
- )
- else:
- sql = base_sql
- txn.execute(sql, query_args)
+ else:
+ sql = base_sql
+ txn.execute(sql, query_args)
yield self.runInteraction("reap_monthly_active_users", _reap_users)
# It seems poor to invalidate the whole cache, Postgres supports
@@ -180,15 +185,33 @@ class MonthlyActiveUsersStore(SQLBaseStore):
Args:
user_id (str): user to add/update
"""
- is_insert = yield self.runInteraction(
+ # Support user never to be included in MAU stats. Note I can't easily call this
+ # from upsert_monthly_active_user_txn because then I need a _txn form of
+ # is_support_user which is complicated because I want to cache the result.
+ # Therefore I call it here and ignore the case where
+ # upsert_monthly_active_user_txn is called directly from
+ # _initialise_reserved_users reasoning that it would be very strange to
+ # include a support user in this context.
+
+ is_support = yield self.is_support_user(user_id)
+ if is_support:
+ return
+
+ yield self.runInteraction(
"upsert_monthly_active_user", self.upsert_monthly_active_user_txn,
user_id
)
- if is_insert:
- self.user_last_seen_monthly_active.invalidate((user_id,))
+ user_in_mau = self.user_last_seen_monthly_active.cache.get(
+ (user_id,),
+ None,
+ update_metrics=False
+ )
+ if user_in_mau is None:
self.get_monthly_active_count.invalidate(())
+ self.user_last_seen_monthly_active.invalidate((user_id,))
+
def upsert_monthly_active_user_txn(self, txn, user_id):
"""Updates or inserts monthly active user member
@@ -198,6 +221,16 @@ class MonthlyActiveUsersStore(SQLBaseStore):
in a database thread rather than the main thread, and we can't call
txn.call_after because txn may not be a LoggingTransaction.
+ We consciously do not call is_support_txn from this method because it
+ is not possible to cache the response. is_support_txn will be false in
+ almost all cases, so it seems reasonable to call it only for
+ upsert_monthly_active_user and to call is_support_txn manually
+ for cases where upsert_monthly_active_user_txn is called directly,
+ like _initialise_reserved_users
+
+ In short, don't call this method with support users. (Support users
+ should not appear in the MAU stats).
+
Args:
txn (cursor):
user_id (str): user to add/update
@@ -206,6 +239,7 @@ class MonthlyActiveUsersStore(SQLBaseStore):
bool: True if a new entry was created, False if an
existing one was updated.
"""
+
# Am consciously deciding to lock the table on the basis that is ought
# never be a big table and alternative approaches (batching multiple
# upserts into a single txn) introduced a lot of extra complexity.
@@ -252,8 +286,7 @@ class MonthlyActiveUsersStore(SQLBaseStore):
Args:
user_id(str): the user_id to query
"""
-
- if self.hs.config.limit_usage_by_mau:
+ if self.hs.config.limit_usage_by_mau or self.hs.config.mau_stats_only:
# Trial users and guests should not be included as part of MAU group
is_guest = yield self.is_guest(user_id)
if is_guest:
@@ -271,8 +304,14 @@ class MonthlyActiveUsersStore(SQLBaseStore):
# but only update if we have not previously seen the user for
# LAST_SEEN_GRANULARITY ms
if last_seen_timestamp is None:
- count = yield self.get_monthly_active_count()
- if count < self.hs.config.max_mau_value:
+ # In the case where mau_stats_only is True and limit_usage_by_mau is
+ # False, there is no point in checking get_monthly_active_count - it
+ # adds no value and will break the logic if max_mau_value is exceeded.
+ if not self.hs.config.limit_usage_by_mau:
yield self.upsert_monthly_active_user(user_id)
+ else:
+ count = yield self.get_monthly_active_count()
+ if count < self.hs.config.max_mau_value:
+ yield self.upsert_monthly_active_user(user_id)
elif now - last_seen_timestamp > LAST_SEEN_GRANULARITY:
yield self.upsert_monthly_active_user(user_id)
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index b364719312..fa36daac52 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
# Remember to update this number every time a change is made to database
# schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 51
+SCHEMA_VERSION = 53
dir_path = os.path.abspath(os.path.dirname(__file__))
@@ -257,7 +257,7 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files,
module.run_create(cur, database_engine)
if not is_empty:
module.run_upgrade(cur, database_engine, config=config)
- elif ext == ".pyc":
+ elif ext == ".pyc" or file_name == "__pycache__":
# Sometimes .pyc files turn up anyway even though we've
# disabled their generation; e.g. from distribution package
# installers. Silently skip it
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 2743b52bad..134297e284 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -215,7 +215,7 @@ class PusherStore(PusherWorkerStore):
with self._pushers_id_gen.get_next() as stream_id:
# no need to lock because `pushers` has a unique key on
# (app_id, pushkey, user_name) so _simple_upsert will retry
- newly_inserted = yield self._simple_upsert(
+ yield self._simple_upsert(
table="pushers",
keyvalues={
"app_id": app_id,
@@ -238,7 +238,12 @@ class PusherStore(PusherWorkerStore):
lock=False,
)
- if newly_inserted:
+ user_has_pusher = self.get_if_user_has_pusher.cache.get(
+ (user_id,), None, update_metrics=False
+ )
+
+ if user_has_pusher is not True:
+ # invalidate, since we the user might not have had a pusher before
yield self.runInteraction(
"add_pusher",
self._invalidate_cache_and_stream,
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 80d76bf9d7..9b9572890b 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -19,9 +19,11 @@ from six.moves import range
from twisted.internet import defer
+from synapse.api.constants import UserTypes
from synapse.api.errors import Codes, StoreError
from synapse.storage import background_updates
from synapse.storage._base import SQLBaseStore
+from synapse.types import UserID
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
@@ -112,6 +114,187 @@ class RegistrationWorkerStore(SQLBaseStore):
return None
+ @cachedInlineCallbacks()
+ def is_support_user(self, user_id):
+ """Determines if the user is of type UserTypes.SUPPORT
+
+ Args:
+ user_id (str): user id to test
+
+ Returns:
+ Deferred[bool]: True if user is of type UserTypes.SUPPORT
+ """
+ res = yield self.runInteraction(
+ "is_support_user", self.is_support_user_txn, user_id
+ )
+ defer.returnValue(res)
+
+ def is_support_user_txn(self, txn, user_id):
+ res = self._simple_select_one_onecol_txn(
+ txn=txn,
+ table="users",
+ keyvalues={"name": user_id},
+ retcol="user_type",
+ allow_none=True,
+ )
+ return True if res == UserTypes.SUPPORT else False
+
+ def get_users_by_id_case_insensitive(self, user_id):
+ """Gets users that match user_id case insensitively.
+ Returns a mapping of user_id -> password_hash.
+ """
+ def f(txn):
+ sql = (
+ "SELECT name, password_hash FROM users"
+ " WHERE lower(name) = lower(?)"
+ )
+ txn.execute(sql, (user_id,))
+ return dict(txn)
+
+ return self.runInteraction("get_users_by_id_case_insensitive", f)
+
+ @defer.inlineCallbacks
+ def count_all_users(self):
+ """Counts all users registered on the homeserver."""
+ def _count_users(txn):
+ txn.execute("SELECT COUNT(*) AS users FROM users")
+ rows = self.cursor_to_dict(txn)
+ if rows:
+ return rows[0]["users"]
+ return 0
+
+ ret = yield self.runInteraction("count_users", _count_users)
+ defer.returnValue(ret)
+
+ def count_daily_user_type(self):
+ """
+ Counts 1) native non guest users
+ 2) native guests users
+ 3) bridged users
+ who registered on the homeserver in the past 24 hours
+ """
+ def _count_daily_user_type(txn):
+ yesterday = int(self._clock.time()) - (60 * 60 * 24)
+
+ sql = """
+ SELECT user_type, COALESCE(count(*), 0) AS count FROM (
+ SELECT
+ CASE
+ WHEN is_guest=0 AND appservice_id IS NULL THEN 'native'
+ WHEN is_guest=1 AND appservice_id IS NULL THEN 'guest'
+ WHEN is_guest=0 AND appservice_id IS NOT NULL THEN 'bridged'
+ END AS user_type
+ FROM users
+ WHERE creation_ts > ?
+ ) AS t GROUP BY user_type
+ """
+ results = {'native': 0, 'guest': 0, 'bridged': 0}
+ txn.execute(sql, (yesterday,))
+ for row in txn:
+ results[row[0]] = row[1]
+ return results
+ return self.runInteraction("count_daily_user_type", _count_daily_user_type)
+
+ @defer.inlineCallbacks
+ def count_nonbridged_users(self):
+ def _count_users(txn):
+ txn.execute("""
+ SELECT COALESCE(COUNT(*), 0) FROM users
+ WHERE appservice_id IS NULL
+ """)
+ count, = txn.fetchone()
+ return count
+
+ ret = yield self.runInteraction("count_users", _count_users)
+ defer.returnValue(ret)
+
+ @defer.inlineCallbacks
+ def find_next_generated_user_id_localpart(self):
+ """
+ Gets the localpart of the next generated user ID.
+
+ Generated user IDs are integers, and we aim for them to be as small as
+ we can. Unfortunately, it's possible some of them are already taken by
+ existing users, and there may be gaps in the already taken range. This
+ function returns the start of the first allocatable gap. This is to
+ avoid the case of ID 10000000 being pre-allocated, so us wasting the
+ first (and shortest) many generated user IDs.
+ """
+ def _find_next_generated_user_id(txn):
+ txn.execute("SELECT name FROM users")
+
+ regex = re.compile(r"^@(\d+):")
+
+ found = set()
+
+ for user_id, in txn:
+ match = regex.search(user_id)
+ if match:
+ found.add(int(match.group(1)))
+ for i in range(len(found) + 1):
+ if i not in found:
+ return i
+
+ defer.returnValue((yield self.runInteraction(
+ "find_next_generated_user_id",
+ _find_next_generated_user_id
+ )))
+
+ @defer.inlineCallbacks
+ def get_3pid_guest_access_token(self, medium, address):
+ ret = yield self._simple_select_one(
+ "threepid_guest_access_tokens",
+ {
+ "medium": medium,
+ "address": address
+ },
+ ["guest_access_token"], True, 'get_3pid_guest_access_token'
+ )
+ if ret:
+ defer.returnValue(ret["guest_access_token"])
+ defer.returnValue(None)
+
+ @defer.inlineCallbacks
+ def get_user_id_by_threepid(self, medium, address):
+ """Returns user id from threepid
+
+ Args:
+ medium (str): threepid medium e.g. email
+ address (str): threepid address e.g. me@example.com
+
+ Returns:
+ Deferred[str|None]: user id or None if no user id/threepid mapping exists
+ """
+ user_id = yield self.runInteraction(
+ "get_user_id_by_threepid", self.get_user_id_by_threepid_txn,
+ medium, address
+ )
+ defer.returnValue(user_id)
+
+ def get_user_id_by_threepid_txn(self, txn, medium, address):
+ """Returns user id from threepid
+
+ Args:
+ txn (cursor):
+ medium (str): threepid medium e.g. email
+ address (str): threepid address e.g. me@example.com
+
+ Returns:
+ str|None: user id or None if no user id/threepid mapping exists
+ """
+ ret = self._simple_select_one_txn(
+ txn,
+ "user_threepids",
+ {
+ "medium": medium,
+ "address": address
+ },
+ ['user_id'], True
+ )
+ if ret:
+ return ret['user_id']
+ return None
+
class RegistrationStore(RegistrationWorkerStore,
background_updates.BackgroundUpdateStore):
@@ -167,7 +350,7 @@ class RegistrationStore(RegistrationWorkerStore,
def register(self, user_id, token=None, password_hash=None,
was_guest=False, make_guest=False, appservice_id=None,
- create_profile_with_localpart=None, admin=False):
+ create_profile_with_displayname=None, admin=False, user_type=None):
"""Attempts to register an account.
Args:
@@ -181,8 +364,12 @@ class RegistrationStore(RegistrationWorkerStore,
make_guest (boolean): True if the the new user should be guest,
false to add a regular user account.
appservice_id (str): The ID of the appservice registering the user.
- create_profile_with_localpart (str): Optionally create a profile for
- the given localpart.
+ create_profile_with_displayname (unicode): Optionally create a profile for
+ the user, setting their displayname to the given value
+ admin (boolean): is an admin user?
+ user_type (str|None): type of user. One of the values from
+ api.constants.UserTypes, or None for a normal user.
+
Raises:
StoreError if the user_id could not be registered.
"""
@@ -195,8 +382,9 @@ class RegistrationStore(RegistrationWorkerStore,
was_guest,
make_guest,
appservice_id,
- create_profile_with_localpart,
- admin
+ create_profile_with_displayname,
+ admin,
+ user_type
)
def _register(
@@ -208,9 +396,12 @@ class RegistrationStore(RegistrationWorkerStore,
was_guest,
make_guest,
appservice_id,
- create_profile_with_localpart,
+ create_profile_with_displayname,
admin,
+ user_type,
):
+ user_id_obj = UserID.from_string(user_id)
+
now = int(self.clock.time())
next_id = self._access_tokens_id_gen.get_next()
@@ -244,6 +435,7 @@ class RegistrationStore(RegistrationWorkerStore,
"is_guest": 1 if make_guest else 0,
"appservice_id": appservice_id,
"admin": 1 if admin else 0,
+ "user_type": user_type,
}
)
else:
@@ -257,6 +449,7 @@ class RegistrationStore(RegistrationWorkerStore,
"is_guest": 1 if make_guest else 0,
"appservice_id": appservice_id,
"admin": 1 if admin else 0,
+ "user_type": user_type,
}
)
except self.database_engine.module.IntegrityError:
@@ -273,12 +466,15 @@ class RegistrationStore(RegistrationWorkerStore,
(next_id, user_id, token,)
)
- if create_profile_with_localpart:
+ if create_profile_with_displayname:
# set a default displayname serverside to avoid ugly race
# between auto-joins and clients trying to set displaynames
+ #
+ # *obviously* the 'profiles' table uses localpart for user_id
+ # while everything else uses the full mxid.
txn.execute(
"INSERT INTO profiles(user_id, displayname) VALUES (?,?)",
- (create_profile_with_localpart, create_profile_with_localpart)
+ (user_id_obj.localpart, create_profile_with_displayname)
)
self._invalidate_cache_and_stream(
@@ -286,20 +482,6 @@ class RegistrationStore(RegistrationWorkerStore,
)
txn.call_after(self.is_guest.invalidate, (user_id,))
- def get_users_by_id_case_insensitive(self, user_id):
- """Gets users that match user_id case insensitively.
- Returns a mapping of user_id -> password_hash.
- """
- def f(txn):
- sql = (
- "SELECT name, password_hash FROM users"
- " WHERE lower(name) = lower(?)"
- )
- txn.execute(sql, (user_id,))
- return dict(txn)
-
- return self.runInteraction("get_users_by_id_case_insensitive", f)
-
def user_set_password_hash(self, user_id, password_hash):
"""
NB. This does *not* evict any cache because the one use for this
@@ -472,47 +654,6 @@ class RegistrationStore(RegistrationWorkerStore,
)
defer.returnValue(ret)
- @defer.inlineCallbacks
- def get_user_id_by_threepid(self, medium, address):
- """Returns user id from threepid
-
- Args:
- medium (str): threepid medium e.g. email
- address (str): threepid address e.g. me@example.com
-
- Returns:
- Deferred[str|None]: user id or None if no user id/threepid mapping exists
- """
- user_id = yield self.runInteraction(
- "get_user_id_by_threepid", self.get_user_id_by_threepid_txn,
- medium, address
- )
- defer.returnValue(user_id)
-
- def get_user_id_by_threepid_txn(self, txn, medium, address):
- """Returns user id from threepid
-
- Args:
- txn (cursor):
- medium (str): threepid medium e.g. email
- address (str): threepid address e.g. me@example.com
-
- Returns:
- str|None: user id or None if no user id/threepid mapping exists
- """
- ret = self._simple_select_one_txn(
- txn,
- "user_threepids",
- {
- "medium": medium,
- "address": address
- },
- ['user_id'], True
- )
- if ret:
- return ret['user_id']
- return None
-
def user_delete_threepid(self, user_id, medium, address):
return self._simple_delete(
"user_threepids",
@@ -525,107 +666,6 @@ class RegistrationStore(RegistrationWorkerStore,
)
@defer.inlineCallbacks
- def count_all_users(self):
- """Counts all users registered on the homeserver."""
- def _count_users(txn):
- txn.execute("SELECT COUNT(*) AS users FROM users")
- rows = self.cursor_to_dict(txn)
- if rows:
- return rows[0]["users"]
- return 0
-
- ret = yield self.runInteraction("count_users", _count_users)
- defer.returnValue(ret)
-
- def count_daily_user_type(self):
- """
- Counts 1) native non guest users
- 2) native guests users
- 3) bridged users
- who registered on the homeserver in the past 24 hours
- """
- def _count_daily_user_type(txn):
- yesterday = int(self._clock.time()) - (60 * 60 * 24)
-
- sql = """
- SELECT user_type, COALESCE(count(*), 0) AS count FROM (
- SELECT
- CASE
- WHEN is_guest=0 AND appservice_id IS NULL THEN 'native'
- WHEN is_guest=1 AND appservice_id IS NULL THEN 'guest'
- WHEN is_guest=0 AND appservice_id IS NOT NULL THEN 'bridged'
- END AS user_type
- FROM users
- WHERE creation_ts > ?
- ) AS t GROUP BY user_type
- """
- results = {'native': 0, 'guest': 0, 'bridged': 0}
- txn.execute(sql, (yesterday,))
- for row in txn:
- results[row[0]] = row[1]
- return results
- return self.runInteraction("count_daily_user_type", _count_daily_user_type)
-
- @defer.inlineCallbacks
- def count_nonbridged_users(self):
- def _count_users(txn):
- txn.execute("""
- SELECT COALESCE(COUNT(*), 0) FROM users
- WHERE appservice_id IS NULL
- """)
- count, = txn.fetchone()
- return count
-
- ret = yield self.runInteraction("count_users", _count_users)
- defer.returnValue(ret)
-
- @defer.inlineCallbacks
- def find_next_generated_user_id_localpart(self):
- """
- Gets the localpart of the next generated user ID.
-
- Generated user IDs are integers, and we aim for them to be as small as
- we can. Unfortunately, it's possible some of them are already taken by
- existing users, and there may be gaps in the already taken range. This
- function returns the start of the first allocatable gap. This is to
- avoid the case of ID 10000000 being pre-allocated, so us wasting the
- first (and shortest) many generated user IDs.
- """
- def _find_next_generated_user_id(txn):
- txn.execute("SELECT name FROM users")
-
- regex = re.compile(r"^@(\d+):")
-
- found = set()
-
- for user_id, in txn:
- match = regex.search(user_id)
- if match:
- found.add(int(match.group(1)))
- for i in range(len(found) + 1):
- if i not in found:
- return i
-
- defer.returnValue((yield self.runInteraction(
- "find_next_generated_user_id",
- _find_next_generated_user_id
- )))
-
- @defer.inlineCallbacks
- def get_3pid_guest_access_token(self, medium, address):
- ret = yield self._simple_select_one(
- "threepid_guest_access_tokens",
- {
- "medium": medium,
- "address": address
- },
- ["guest_access_token"], True, 'get_3pid_guest_access_token'
- )
- if ret:
- defer.returnValue(ret["guest_access_token"])
- defer.returnValue(None)
-
- @defer.inlineCallbacks
def save_or_get_3pid_guest_access_token(
self, medium, address, access_token, inviter_user_id
):
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 61013b8919..41c65e112a 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -47,7 +47,7 @@ class RoomWorkerStore(SQLBaseStore):
Args:
room_id (str): The ID of the room to retrieve.
Returns:
- A namedtuple containing the room information, or an empty list.
+ A dict containing the room information, or None if the room is unknown.
"""
return self._simple_select_one(
table="rooms",
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 0707f9a86a..592c1bcd33 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -588,12 +588,12 @@ class RoomMemberStore(RoomMemberWorkerStore):
)
# We update the local_invites table only if the event is "current",
- # i.e., its something that has just happened.
- # The only current event that can also be an outlier is if its an
- # invite that has come in across federation.
+ # i.e., its something that has just happened. If the event is an
+ # outlier it is only current if its an "out of band membership",
+ # like a remote invite or a rejection of a remote invite.
is_new_state = not backfilled and (
not event.internal_metadata.is_outlier()
- or event.internal_metadata.is_invite_from_remote()
+ or event.internal_metadata.is_out_of_band_membership()
)
is_mine = self.hs.is_mine_id(event.state_key)
if is_new_state and is_mine:
diff --git a/synapse/storage/schema/delta/34/sent_txn_purge.py b/synapse/storage/schema/delta/34/sent_txn_purge.py
deleted file mode 100644
index 0ffab10b6f..0000000000
--- a/synapse/storage/schema/delta/34/sent_txn_purge.py
+++ /dev/null
@@ -1,32 +0,0 @@
-# Copyright 2016 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import logging
-
-from synapse.storage.engines import PostgresEngine
-
-logger = logging.getLogger(__name__)
-
-
-def run_create(cur, database_engine, *args, **kwargs):
- if isinstance(database_engine, PostgresEngine):
- cur.execute("TRUNCATE sent_transactions")
- else:
- cur.execute("DELETE FROM sent_transactions")
-
- cur.execute("CREATE INDEX sent_transactions_ts ON sent_transactions(ts)")
-
-
-def run_upgrade(cur, database_engine, *args, **kwargs):
- pass
diff --git a/synapse/storage/schema/delta/40/device_list_streams.sql b/synapse/storage/schema/delta/40/device_list_streams.sql
index 54841b3843..dd6dcb65f1 100644
--- a/synapse/storage/schema/delta/40/device_list_streams.sql
+++ b/synapse/storage/schema/delta/40/device_list_streams.sql
@@ -20,9 +20,6 @@ CREATE TABLE device_lists_remote_cache (
content TEXT NOT NULL
);
-CREATE INDEX device_lists_remote_cache_id ON device_lists_remote_cache(user_id, device_id);
-
-
-- The last update we got for a user. Empty if we're not receiving updates for
-- that user.
CREATE TABLE device_lists_remote_extremeties (
@@ -30,7 +27,11 @@ CREATE TABLE device_lists_remote_extremeties (
stream_id TEXT NOT NULL
);
-CREATE INDEX device_lists_remote_extremeties_id ON device_lists_remote_extremeties(user_id, stream_id);
+-- we used to create non-unique indexes on these tables, but as of update 52 we create
+-- unique indexes concurrently:
+--
+-- CREATE INDEX device_lists_remote_cache_id ON device_lists_remote_cache(user_id, device_id);
+-- CREATE INDEX device_lists_remote_extremeties_id ON device_lists_remote_extremeties(user_id, stream_id);
-- Stream of device lists updates. Includes both local and remotes
diff --git a/synapse/storage/schema/delta/52/add_event_to_state_group_index.sql b/synapse/storage/schema/delta/52/add_event_to_state_group_index.sql
new file mode 100644
index 0000000000..91e03d13e1
--- /dev/null
+++ b/synapse/storage/schema/delta/52/add_event_to_state_group_index.sql
@@ -0,0 +1,19 @@
+/* Copyright 2018 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- This is needed to efficiently check for unreferenced state groups during
+-- purge. Added events_to_state_group(state_group) index
+INSERT into background_updates (update_name, progress_json)
+ VALUES ('event_to_state_groups_sg_index', '{}');
diff --git a/synapse/storage/schema/delta/52/device_list_streams_unique_idx.sql b/synapse/storage/schema/delta/52/device_list_streams_unique_idx.sql
new file mode 100644
index 0000000000..bfa49e6f92
--- /dev/null
+++ b/synapse/storage/schema/delta/52/device_list_streams_unique_idx.sql
@@ -0,0 +1,36 @@
+/* Copyright 2018 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- register a background update which will create a unique index on
+-- device_lists_remote_cache
+INSERT into background_updates (update_name, progress_json)
+ VALUES ('device_lists_remote_cache_unique_idx', '{}');
+
+-- and one on device_lists_remote_extremeties
+INSERT into background_updates (update_name, progress_json, depends_on)
+ VALUES (
+ 'device_lists_remote_extremeties_unique_idx', '{}',
+
+ -- doesn't really depend on this, but we need to make sure both happen
+ -- before we drop the old indexes.
+ 'device_lists_remote_cache_unique_idx'
+ );
+
+-- once they complete, we can drop the old indexes.
+INSERT into background_updates (update_name, progress_json, depends_on)
+ VALUES (
+ 'drop_device_list_streams_non_unique_indexes', '{}',
+ 'device_lists_remote_extremeties_unique_idx'
+ );
diff --git a/synapse/storage/schema/delta/52/e2e_room_keys.sql b/synapse/storage/schema/delta/52/e2e_room_keys.sql
new file mode 100644
index 0000000000..db687cccae
--- /dev/null
+++ b/synapse/storage/schema/delta/52/e2e_room_keys.sql
@@ -0,0 +1,53 @@
+/* Copyright 2018 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* Change version column to an integer so we can do MAX() sensibly
+ */
+CREATE TABLE e2e_room_keys_versions_new (
+ user_id TEXT NOT NULL,
+ version BIGINT NOT NULL,
+ algorithm TEXT NOT NULL,
+ auth_data TEXT NOT NULL,
+ deleted SMALLINT DEFAULT 0 NOT NULL
+);
+
+INSERT INTO e2e_room_keys_versions_new
+ SELECT user_id, CAST(version as BIGINT), algorithm, auth_data, deleted FROM e2e_room_keys_versions;
+
+DROP TABLE e2e_room_keys_versions;
+ALTER TABLE e2e_room_keys_versions_new RENAME TO e2e_room_keys_versions;
+
+CREATE UNIQUE INDEX e2e_room_keys_versions_idx ON e2e_room_keys_versions(user_id, version);
+
+/* Change e2e_rooms_keys to match
+ */
+CREATE TABLE e2e_room_keys_new (
+ user_id TEXT NOT NULL,
+ room_id TEXT NOT NULL,
+ session_id TEXT NOT NULL,
+ version BIGINT NOT NULL,
+ first_message_index INT,
+ forwarded_count INT,
+ is_verified BOOLEAN,
+ session_data TEXT NOT NULL
+);
+
+INSERT INTO e2e_room_keys_new
+ SELECT user_id, room_id, session_id, CAST(version as BIGINT), first_message_index, forwarded_count, is_verified, session_data FROM e2e_room_keys;
+
+DROP TABLE e2e_room_keys;
+ALTER TABLE e2e_room_keys_new RENAME TO e2e_room_keys;
+
+CREATE UNIQUE INDEX e2e_room_keys_idx ON e2e_room_keys(user_id, room_id, session_id);
diff --git a/synapse/storage/schema/delta/53/add_user_type_to_users.sql b/synapse/storage/schema/delta/53/add_user_type_to_users.sql
new file mode 100644
index 0000000000..88ec2f83e5
--- /dev/null
+++ b/synapse/storage/schema/delta/53/add_user_type_to_users.sql
@@ -0,0 +1,19 @@
+/* Copyright 2018 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* The type of the user: NULL for a regular user, or one of the constants in
+ * synapse.api.constants.UserTypes
+ */
+ALTER TABLE users ADD COLUMN user_type TEXT DEFAULT NULL;
diff --git a/synapse/storage/schema/delta/11/v11.sql b/synapse/storage/schema/delta/53/drop_sent_transactions.sql
index e7b4f90127..e372f5a44a 100644
--- a/synapse/storage/schema/delta/11/v11.sql
+++ b/synapse/storage/schema/delta/53/drop_sent_transactions.sql
@@ -1,4 +1,4 @@
-/* Copyright 2015, 2016 OpenMarket Ltd
+/* Copyright 2018 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -13,4 +13,4 @@
* limitations under the License.
*/
-CREATE INDEX IF NOT EXISTS sent_transaction_txn_id ON sent_transactions(transaction_id);
\ No newline at end of file
+DROP TABLE IF EXISTS sent_transactions;
diff --git a/synapse/storage/schema/delta/53/event_format_version.sql b/synapse/storage/schema/delta/53/event_format_version.sql
new file mode 100644
index 0000000000..1d977c2834
--- /dev/null
+++ b/synapse/storage/schema/delta/53/event_format_version.sql
@@ -0,0 +1,16 @@
+/* Copyright 2019 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE event_json ADD COLUMN format_version INTEGER;
diff --git a/synapse/storage/schema/delta/53/user_ips_index.sql b/synapse/storage/schema/delta/53/user_ips_index.sql
new file mode 100644
index 0000000000..b812c5794f
--- /dev/null
+++ b/synapse/storage/schema/delta/53/user_ips_index.sql
@@ -0,0 +1,30 @@
+/* Copyright 2018 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ -- analyze user_ips, to help ensure the correct indices are used
+INSERT INTO background_updates (update_name, progress_json) VALUES
+ ('user_ips_analyze', '{}');
+
+-- delete duplicates
+INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
+ ('user_ips_remove_dupes', '{}', 'user_ips_analyze');
+
+-- add a new unique index to user_ips table
+INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
+ ('user_ips_device_unique_index', '{}', 'user_ips_remove_dupes');
+
+-- drop the old original index
+INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
+ ('user_ips_drop_nonunique_index', '{}', 'user_ips_device_unique_index');
diff --git a/synapse/storage/schema/full_schemas/11/transactions.sql b/synapse/storage/schema/full_schemas/11/transactions.sql
index a3f4a0a790..f6a058832e 100644
--- a/synapse/storage/schema/full_schemas/11/transactions.sql
+++ b/synapse/storage/schema/full_schemas/11/transactions.sql
@@ -25,25 +25,6 @@ CREATE TABLE IF NOT EXISTS received_transactions(
CREATE INDEX transactions_have_ref ON received_transactions(origin, has_been_referenced);-- WHERE has_been_referenced = 0;
-
--- Stores what transactions we've sent, what their response was (if we got one) and whether we have
--- since referenced the transaction in another outgoing transaction
-CREATE TABLE IF NOT EXISTS sent_transactions(
- id INTEGER PRIMARY KEY AUTOINCREMENT, -- This is used to apply insertion ordering
- transaction_id TEXT,
- destination TEXT,
- response_code INTEGER DEFAULT 0,
- response_json TEXT,
- ts BIGINT
-);
-
-CREATE INDEX sent_transaction_dest ON sent_transactions(destination);
-CREATE INDEX sent_transaction_txn_id ON sent_transactions(transaction_id);
--- So that we can do an efficient look up of all transactions that have yet to be successfully
--- sent.
-CREATE INDEX sent_transaction_sent ON sent_transactions(response_code);
-
-
-- For sent transactions only.
CREATE TABLE IF NOT EXISTS transaction_id_to_pdu(
transaction_id INTEGER,
diff --git a/synapse/storage/schema/full_schemas/16/transactions.sql b/synapse/storage/schema/full_schemas/16/transactions.sql
index 14b67cce25..17e67bedac 100644
--- a/synapse/storage/schema/full_schemas/16/transactions.sql
+++ b/synapse/storage/schema/full_schemas/16/transactions.sql
@@ -25,25 +25,6 @@ CREATE TABLE IF NOT EXISTS received_transactions(
CREATE INDEX transactions_have_ref ON received_transactions(origin, has_been_referenced);-- WHERE has_been_referenced = 0;
-
--- Stores what transactions we've sent, what their response was (if we got one) and whether we have
--- since referenced the transaction in another outgoing transaction
-CREATE TABLE IF NOT EXISTS sent_transactions(
- id BIGINT PRIMARY KEY, -- This is used to apply insertion ordering
- transaction_id TEXT,
- destination TEXT,
- response_code INTEGER DEFAULT 0,
- response_json TEXT,
- ts BIGINT
-);
-
-CREATE INDEX sent_transaction_dest ON sent_transactions(destination);
-CREATE INDEX sent_transaction_txn_id ON sent_transactions(transaction_id);
--- So that we can do an efficient look up of all transactions that have yet to be successfully
--- sent.
-CREATE INDEX sent_transaction_sent ON sent_transactions(response_code);
-
-
-- For sent transactions only.
CREATE TABLE IF NOT EXISTS transaction_id_to_pdu(
transaction_id INTEGER,
diff --git a/synapse/storage/search.py b/synapse/storage/search.py
index d5b5df93e6..c6420b2374 100644
--- a/synapse/storage/search.py
+++ b/synapse/storage/search.py
@@ -45,6 +45,10 @@ class SearchStore(BackgroundUpdateStore):
def __init__(self, db_conn, hs):
super(SearchStore, self).__init__(db_conn, hs)
+
+ if not hs.config.enable_search:
+ return
+
self.register_background_update_handler(
self.EVENT_SEARCH_UPDATE_NAME, self._background_reindex_search
)
@@ -316,6 +320,8 @@ class SearchStore(BackgroundUpdateStore):
entries (iterable[SearchEntry]):
entries to be added to the table
"""
+ if not self.hs.config.enable_search:
+ return
if isinstance(self.database_engine, PostgresEngine):
sql = (
"INSERT INTO event_search"
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index ef65929bb2..6ddc4055d2 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -428,14 +428,54 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
"""
# for now we do this by looking at the create event. We may want to cache this
# more intelligently in future.
+
+ # Retrieve the room's create event
+ create_event = yield self.get_create_event_for_room(room_id)
+ defer.returnValue(create_event.content.get("room_version", "1"))
+
+ @defer.inlineCallbacks
+ def get_room_predecessor(self, room_id):
+ """Get the predecessor room of an upgraded room if one exists.
+ Otherwise return None.
+
+ Args:
+ room_id (str)
+
+ Returns:
+ Deferred[unicode|None]: predecessor room id
+
+ Raises:
+ NotFoundError if the room is unknown
+ """
+ # Retrieve the room's create event
+ create_event = yield self.get_create_event_for_room(room_id)
+
+ # Return predecessor if present
+ defer.returnValue(create_event.content.get("predecessor", None))
+
+ @defer.inlineCallbacks
+ def get_create_event_for_room(self, room_id):
+ """Get the create state event for a room.
+
+ Args:
+ room_id (str)
+
+ Returns:
+ Deferred[EventBase]: The room creation event.
+
+ Raises:
+ NotFoundError if the room is unknown
+ """
state_ids = yield self.get_current_state_ids(room_id)
create_id = state_ids.get((EventTypes.Create, ""))
+ # If we can't find the create event, assume we've hit a dead end
if not create_id:
- raise NotFoundError("Unknown room")
+ raise NotFoundError("Unknown room %s" % (room_id))
+ # Retrieve the room's create event and return
create_event = yield self.get_event(create_id)
- defer.returnValue(create_event.content.get("room_version", "1"))
+ defer.returnValue(create_event)
@cached(max_entries=100000, iterable=True)
def get_current_state_ids(self, room_id):
@@ -508,6 +548,31 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
_get_filtered_current_state_ids_txn,
)
+ @defer.inlineCallbacks
+ def get_canonical_alias_for_room(self, room_id):
+ """Get canonical alias for room, if any
+
+ Args:
+ room_id (str)
+
+ Returns:
+ Deferred[str|None]: The canonical alias, if any
+ """
+
+ state = yield self.get_filtered_current_state_ids(room_id, StateFilter.from_types(
+ [(EventTypes.CanonicalAlias, "")]
+ ))
+
+ event_id = state.get((EventTypes.CanonicalAlias, ""))
+ if not event_id:
+ return
+
+ event = yield self.get_event(event_id, allow_none=True)
+ if not event:
+ return
+
+ defer.returnValue(event.content.get("canonical_alias"))
+
@cached(max_entries=10000, iterable=True)
def get_state_group_delta(self, state_group):
"""Given a state group try to return a previous group and a delta between
@@ -1257,6 +1322,7 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
STATE_GROUP_DEDUPLICATION_UPDATE_NAME = "state_group_state_deduplication"
STATE_GROUP_INDEX_UPDATE_NAME = "state_group_state_type_index"
CURRENT_STATE_INDEX_UPDATE_NAME = "current_state_members_idx"
+ EVENT_STATE_GROUP_INDEX_UPDATE_NAME = "event_to_state_groups_sg_index"
def __init__(self, db_conn, hs):
super(StateStore, self).__init__(db_conn, hs)
@@ -1275,6 +1341,12 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
columns=["state_key"],
where_clause="type='m.room.member'",
)
+ self.register_background_index_update(
+ self.EVENT_STATE_GROUP_INDEX_UPDATE_NAME,
+ index_name="event_to_state_groups_sg_index",
+ table="event_to_state_groups",
+ columns=["state_group"],
+ )
def _store_event_state_mappings_txn(self, txn, events_and_contexts):
state_groups = {}
diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py
index a8781b0e5d..fea866c043 100644
--- a/synapse/storage/user_directory.py
+++ b/synapse/storage/user_directory.py
@@ -22,6 +22,7 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, JoinRules
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
+from synapse.storage.state import StateFilter
from synapse.types import get_domain_from_id, get_localpart_from_id
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
@@ -31,12 +32,19 @@ logger = logging.getLogger(__name__)
class UserDirectoryStore(SQLBaseStore):
- @cachedInlineCallbacks(cache_context=True)
- def is_room_world_readable_or_publicly_joinable(self, room_id, cache_context):
+ @defer.inlineCallbacks
+ def is_room_world_readable_or_publicly_joinable(self, room_id):
"""Check if the room is either world_readable or publically joinable
"""
- current_state_ids = yield self.get_current_state_ids(
- room_id, on_invalidate=cache_context.invalidate
+
+ # Create a state filter that only queries join and history state event
+ types_to_filter = (
+ (EventTypes.JoinRules, ""),
+ (EventTypes.RoomHistoryVisibility, ""),
+ )
+
+ current_state_ids = yield self.get_filtered_current_state_ids(
+ room_id, StateFilter.from_types(types_to_filter)
)
join_rules_id = current_state_ids.get((EventTypes.JoinRules, ""))
@@ -66,14 +74,8 @@ class UserDirectoryStore(SQLBaseStore):
"""
yield self._simple_insert_many(
table="users_in_public_rooms",
- values=[
- {
- "user_id": user_id,
- "room_id": room_id,
- }
- for user_id in user_ids
- ],
- desc="add_users_to_public_room"
+ values=[{"user_id": user_id, "room_id": room_id} for user_id in user_ids],
+ desc="add_users_to_public_room",
)
for user_id in user_ids:
self.get_user_in_public_room.invalidate((user_id,))
@@ -99,7 +101,9 @@ class UserDirectoryStore(SQLBaseStore):
"""
args = (
(
- user_id, get_localpart_from_id(user_id), get_domain_from_id(user_id),
+ user_id,
+ get_localpart_from_id(user_id),
+ get_domain_from_id(user_id),
profile.display_name,
)
for user_id, profile in iteritems(users_with_profile)
@@ -112,7 +116,7 @@ class UserDirectoryStore(SQLBaseStore):
args = (
(
user_id,
- "%s %s" % (user_id, p.display_name,) if p.display_name else user_id
+ "%s %s" % (user_id, p.display_name) if p.display_name else user_id,
)
for user_id, p in iteritems(users_with_profile)
)
@@ -133,12 +137,10 @@ class UserDirectoryStore(SQLBaseStore):
"avatar_url": profile.avatar_url,
}
for user_id, profile in iteritems(users_with_profile)
- ]
+ ],
)
for user_id in users_with_profile:
- txn.call_after(
- self.get_user_in_directory.invalidate, (user_id,)
- )
+ txn.call_after(self.get_user_in_directory.invalidate, (user_id,))
return self.runInteraction(
"add_profiles_to_user_dir", _add_profiles_to_user_dir_txn
@@ -168,39 +170,69 @@ class UserDirectoryStore(SQLBaseStore):
if isinstance(self.database_engine, PostgresEngine):
# We weight the localpart most highly, then display name and finally
# server name
- if new_entry:
+ if self.database_engine.can_native_upsert:
sql = """
INSERT INTO user_directory_search(user_id, vector)
VALUES (?,
setweight(to_tsvector('english', ?), 'A')
|| setweight(to_tsvector('english', ?), 'D')
|| setweight(to_tsvector('english', COALESCE(?, '')), 'B')
- )
+ ) ON CONFLICT (user_id) DO UPDATE SET vector=EXCLUDED.vector
"""
txn.execute(
sql,
(
- user_id, get_localpart_from_id(user_id),
- get_domain_from_id(user_id), display_name,
- )
+ user_id,
+ get_localpart_from_id(user_id),
+ get_domain_from_id(user_id),
+ display_name,
+ ),
)
else:
- sql = """
- UPDATE user_directory_search
- SET vector = setweight(to_tsvector('english', ?), 'A')
- || setweight(to_tsvector('english', ?), 'D')
- || setweight(to_tsvector('english', COALESCE(?, '')), 'B')
- WHERE user_id = ?
- """
- txn.execute(
- sql,
- (
- get_localpart_from_id(user_id), get_domain_from_id(user_id),
- display_name, user_id,
+ # TODO: Remove this code after we've bumped the minimum version
+ # of postgres to always support upserts, so we can get rid of
+ # `new_entry` usage
+ if new_entry is True:
+ sql = """
+ INSERT INTO user_directory_search(user_id, vector)
+ VALUES (?,
+ setweight(to_tsvector('english', ?), 'A')
+ || setweight(to_tsvector('english', ?), 'D')
+ || setweight(to_tsvector('english', COALESCE(?, '')), 'B')
+ )
+ """
+ txn.execute(
+ sql,
+ (
+ user_id,
+ get_localpart_from_id(user_id),
+ get_domain_from_id(user_id),
+ display_name,
+ ),
+ )
+ elif new_entry is False:
+ sql = """
+ UPDATE user_directory_search
+ SET vector = setweight(to_tsvector('english', ?), 'A')
+ || setweight(to_tsvector('english', ?), 'D')
+ || setweight(to_tsvector('english', COALESCE(?, '')), 'B')
+ WHERE user_id = ?
+ """
+ txn.execute(
+ sql,
+ (
+ get_localpart_from_id(user_id),
+ get_domain_from_id(user_id),
+ display_name,
+ user_id,
+ ),
+ )
+ else:
+ raise RuntimeError(
+ "upsert returned None when 'can_native_upsert' is False"
)
- )
elif isinstance(self.database_engine, Sqlite3Engine):
- value = "%s %s" % (user_id, display_name,) if display_name else user_id
+ value = "%s %s" % (user_id, display_name) if display_name else user_id
self._simple_upsert_txn(
txn,
table="user_directory_search",
@@ -231,29 +263,18 @@ class UserDirectoryStore(SQLBaseStore):
def remove_from_user_dir(self, user_id):
def _remove_from_user_dir_txn(txn):
self._simple_delete_txn(
- txn,
- table="user_directory",
- keyvalues={"user_id": user_id},
+ txn, table="user_directory", keyvalues={"user_id": user_id}
)
self._simple_delete_txn(
- txn,
- table="user_directory_search",
- keyvalues={"user_id": user_id},
+ txn, table="user_directory_search", keyvalues={"user_id": user_id}
)
self._simple_delete_txn(
- txn,
- table="users_in_public_rooms",
- keyvalues={"user_id": user_id},
- )
- txn.call_after(
- self.get_user_in_directory.invalidate, (user_id,)
+ txn, table="users_in_public_rooms", keyvalues={"user_id": user_id}
)
- txn.call_after(
- self.get_user_in_public_room.invalidate, (user_id,)
- )
- return self.runInteraction(
- "remove_from_user_dir", _remove_from_user_dir_txn,
- )
+ txn.call_after(self.get_user_in_directory.invalidate, (user_id,))
+ txn.call_after(self.get_user_in_public_room.invalidate, (user_id,))
+
+ return self.runInteraction("remove_from_user_dir", _remove_from_user_dir_txn)
@defer.inlineCallbacks
def remove_from_user_in_public_room(self, user_id):
@@ -338,6 +359,7 @@ class UserDirectoryStore(SQLBaseStore):
share_private (bool): Is the room private
user_id_tuples([(str, str)]): iterable of 2-tuple of user IDs.
"""
+
def _add_users_who_share_room_txn(txn):
self._simple_insert_many_txn(
txn,
@@ -354,13 +376,12 @@ class UserDirectoryStore(SQLBaseStore):
)
for user_id, other_user_id in user_id_tuples:
txn.call_after(
- self.get_users_who_share_room_from_dir.invalidate,
- (user_id,),
+ self.get_users_who_share_room_from_dir.invalidate, (user_id,)
)
txn.call_after(
- self.get_if_users_share_a_room.invalidate,
- (user_id, other_user_id),
+ self.get_if_users_share_a_room.invalidate, (user_id, other_user_id)
)
+
return self.runInteraction(
"add_users_who_share_room", _add_users_who_share_room_txn
)
@@ -374,6 +395,7 @@ class UserDirectoryStore(SQLBaseStore):
share_private (bool): Is the room private
user_id_tuples([(str, str)]): iterable of 2-tuple of user IDs.
"""
+
def _update_users_who_share_room_txn(txn):
sql = """
UPDATE users_who_share_rooms
@@ -381,21 +403,16 @@ class UserDirectoryStore(SQLBaseStore):
WHERE user_id = ? AND other_user_id = ?
"""
txn.executemany(
- sql,
- (
- (room_id, share_private, uid, oid)
- for uid, oid in user_id_sets
- )
+ sql, ((room_id, share_private, uid, oid) for uid, oid in user_id_sets)
)
for user_id, other_user_id in user_id_sets:
txn.call_after(
- self.get_users_who_share_room_from_dir.invalidate,
- (user_id,),
+ self.get_users_who_share_room_from_dir.invalidate, (user_id,)
)
txn.call_after(
- self.get_if_users_share_a_room.invalidate,
- (user_id, other_user_id),
+ self.get_if_users_share_a_room.invalidate, (user_id, other_user_id)
)
+
return self.runInteraction(
"update_users_who_share_room", _update_users_who_share_room_txn
)
@@ -409,22 +426,18 @@ class UserDirectoryStore(SQLBaseStore):
share_private (bool): Is the room private
user_id_tuples([(str, str)]): iterable of 2-tuple of user IDs.
"""
+
def _remove_user_who_share_room_txn(txn):
self._simple_delete_txn(
txn,
table="users_who_share_rooms",
- keyvalues={
- "user_id": user_id,
- "other_user_id": other_user_id,
- },
+ keyvalues={"user_id": user_id, "other_user_id": other_user_id},
)
txn.call_after(
- self.get_users_who_share_room_from_dir.invalidate,
- (user_id,),
+ self.get_users_who_share_room_from_dir.invalidate, (user_id,)
)
txn.call_after(
- self.get_if_users_share_a_room.invalidate,
- (user_id, other_user_id),
+ self.get_if_users_share_a_room.invalidate, (user_id, other_user_id)
)
return self.runInteraction(
@@ -445,10 +458,7 @@ class UserDirectoryStore(SQLBaseStore):
"""
return self._simple_select_one_onecol(
table="users_who_share_rooms",
- keyvalues={
- "user_id": user_id,
- "other_user_id": other_user_id,
- },
+ keyvalues={"user_id": user_id, "other_user_id": other_user_id},
retcol="share_private",
allow_none=True,
desc="get_if_users_share_a_room",
@@ -466,17 +476,12 @@ class UserDirectoryStore(SQLBaseStore):
"""
rows = yield self._simple_select_list(
table="users_who_share_rooms",
- keyvalues={
- "user_id": user_id,
- },
- retcols=("other_user_id", "share_private",),
+ keyvalues={"user_id": user_id},
+ retcols=("other_user_id", "share_private"),
desc="get_users_who_share_room_with_user",
)
- defer.returnValue({
- row["other_user_id"]: row["share_private"]
- for row in rows
- })
+ defer.returnValue({row["other_user_id"]: row["share_private"] for row in rows})
def get_users_in_share_dir_with_room_id(self, user_id, room_id):
"""Get all user tuples that are in the users_who_share_rooms due to the
@@ -523,6 +528,7 @@ class UserDirectoryStore(SQLBaseStore):
def delete_all_from_user_dir(self):
"""Delete the entire user directory
"""
+
def _delete_all_from_user_dir_txn(txn):
txn.execute("DELETE FROM user_directory")
txn.execute("DELETE FROM user_directory_search")
@@ -532,6 +538,7 @@ class UserDirectoryStore(SQLBaseStore):
txn.call_after(self.get_user_in_public_room.invalidate_all)
txn.call_after(self.get_users_who_share_room_from_dir.invalidate_all)
txn.call_after(self.get_if_users_share_a_room.invalidate_all)
+
return self.runInteraction(
"delete_all_from_user_dir", _delete_all_from_user_dir_txn
)
@@ -541,7 +548,7 @@ class UserDirectoryStore(SQLBaseStore):
return self._simple_select_one(
table="user_directory",
keyvalues={"user_id": user_id},
- retcols=("room_id", "display_name", "avatar_url",),
+ retcols=("room_id", "display_name", "avatar_url"),
allow_none=True,
desc="get_user_in_directory",
)
@@ -574,7 +581,9 @@ class UserDirectoryStore(SQLBaseStore):
def get_current_state_deltas(self, prev_stream_id):
prev_stream_id = int(prev_stream_id)
- if not self._curr_state_delta_stream_cache.has_any_entity_changed(prev_stream_id):
+ if not self._curr_state_delta_stream_cache.has_any_entity_changed(
+ prev_stream_id
+ ):
return []
def get_current_state_deltas_txn(txn):
@@ -608,7 +617,7 @@ class UserDirectoryStore(SQLBaseStore):
WHERE ? < stream_id AND stream_id <= ?
ORDER BY stream_id ASC
"""
- txn.execute(sql, (prev_stream_id, max_stream_id,))
+ txn.execute(sql, (prev_stream_id, max_stream_id))
return self.cursor_to_dict(txn)
return self.runInteraction(
@@ -698,8 +707,11 @@ class UserDirectoryStore(SQLBaseStore):
display_name IS NULL,
avatar_url IS NULL
LIMIT ?
- """ % (join_clause, where_clause)
- args = join_args + (full_query, exact_query, prefix_query, limit + 1,)
+ """ % (
+ join_clause,
+ where_clause,
+ )
+ args = join_args + (full_query, exact_query, prefix_query, limit + 1)
elif isinstance(self.database_engine, Sqlite3Engine):
search_query = _parse_query_sqlite(search_term)
@@ -716,7 +728,10 @@ class UserDirectoryStore(SQLBaseStore):
display_name IS NULL,
avatar_url IS NULL
LIMIT ?
- """ % (join_clause, where_clause)
+ """ % (
+ join_clause,
+ where_clause,
+ )
args = join_args + (search_query, limit + 1)
else:
# This should be unreachable.
@@ -728,10 +743,7 @@ class UserDirectoryStore(SQLBaseStore):
limited = len(results) > limit
- defer.returnValue({
- "limited": limited,
- "results": results,
- })
+ defer.returnValue({"limited": limited, "results": results})
def _parse_query_sqlite(search_term):
@@ -746,7 +758,7 @@ def _parse_query_sqlite(search_term):
# Pull out the individual words, discarding any non-word characters.
results = re.findall(r"([\w\-]+)", search_term, re.UNICODE)
- return " & ".join("(%s* OR %s)" % (result, result,) for result in results)
+ return " & ".join("(%s* OR %s)" % (result, result) for result in results)
def _parse_query_postgres(search_term):
@@ -759,7 +771,7 @@ def _parse_query_postgres(search_term):
# Pull out the individual words, discarding any non-word characters.
results = re.findall(r"([\w\-]+)", search_term, re.UNICODE)
- both = " & ".join("(%s:* | %s)" % (result, result,) for result in results)
+ both = " & ".join("(%s:* | %s)" % (result, result) for result in results)
exact = " & ".join("%s" % (result,) for result in results)
prefix = " & ".join("%s:*" % (result,) for result in results)
|