diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 56c434d4e8..6b0ca80087 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -56,6 +56,7 @@ from .roommember import RoomMemberStore
from .search import SearchStore
from .signatures import SignatureStore
from .state import StateStore
+from .stats import StatsStore
from .stream import StreamStore
from .tags import TagsStore
from .transactions import TransactionStore
@@ -102,6 +103,7 @@ class DataStore(
GroupServerStore,
UserErasureStore,
MonthlyActiveUsersStore,
+ StatsStore,
RelationsStore,
):
def __init__(self, db_conn, hs):
@@ -277,23 +279,35 @@ class DataStore(
"""
Counts the number of users who used this homeserver in the last 24 hours.
"""
+ yesterday = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24)
+ return self.runInteraction("count_daily_users", self._count_users, yesterday)
- def _count_users(txn):
- yesterday = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24)
-
- sql = """
- SELECT COALESCE(count(*), 0) FROM (
- SELECT user_id FROM user_ips
- WHERE last_seen > ?
- GROUP BY user_id
- ) u
- """
-
- txn.execute(sql, (yesterday,))
- count, = txn.fetchone()
- return count
+ def count_monthly_users(self):
+ """
+ Counts the number of users who used this homeserver in the last 30 days.
+ Note this method is intended for phonehome metrics only and is different
+ from the mau figure in synapse.storage.monthly_active_users which,
+ amongst other things, includes a 3 day grace period before a user counts.
+ """
+ thirty_days_ago = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24 * 30)
+ return self.runInteraction(
+ "count_monthly_users", self._count_users, thirty_days_ago
+ )
- return self.runInteraction("count_users", _count_users)
+ def _count_users(self, txn, time_from):
+ """
+ Returns number of users seen in the past time_from period
+ """
+ sql = """
+ SELECT COALESCE(count(*), 0) FROM (
+ SELECT user_id FROM user_ips
+ WHERE last_seen > ?
+ GROUP BY user_id
+ ) u
+ """
+ txn.execute(sql, (time_from,))
+ count, = txn.fetchone()
+ return count
def count_r30_users(self):
"""
@@ -345,7 +359,7 @@ class DataStore(
txn.execute(sql, (thirty_days_ago_in_secs, thirty_days_ago_in_secs))
for row in txn:
- if row[0] == 'unknown':
+ if row[0] == "unknown":
pass
results[row[0]] = row[1]
@@ -372,7 +386,7 @@ class DataStore(
txn.execute(sql, (thirty_days_ago_in_secs, thirty_days_ago_in_secs))
count, = txn.fetchone()
- results['all'] = count
+ results["all"] = count
return results
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 3fe827cd43..29589853c6 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -16,6 +16,7 @@
# limitations under the License.
import itertools
import logging
+import random
import sys
import threading
import time
@@ -37,6 +38,14 @@ from synapse.util.caches.descriptors import Cache
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.stringutils import exception_to_unicode
+# import a function which will return a monotonic time, in seconds
+try:
+ # on python 3, use time.monotonic, since time.clock can go backwards
+ from time import monotonic as monotonic_time
+except ImportError:
+ # ... but python 2 doesn't have it
+ from time import clock as monotonic_time
+
logger = logging.getLogger(__name__)
try:
@@ -166,22 +175,22 @@ class PerformanceCounters(object):
self.current_counters = {}
self.previous_counters = {}
- def update(self, key, start_time, end_time=None):
- if end_time is None:
- end_time = time.time()
- duration = end_time - start_time
+ def update(self, key, duration_secs):
count, cum_time = self.current_counters.get(key, (0, 0))
count += 1
- cum_time += duration
+ cum_time += duration_secs
self.current_counters[key] = (count, cum_time)
- return end_time
- def interval(self, interval_duration, limit=3):
+ def interval(self, interval_duration_secs, limit=3):
counters = []
for name, (count, cum_time) in iteritems(self.current_counters):
prev_count, prev_time = self.previous_counters.get(name, (0, 0))
counters.append(
- ((cum_time - prev_time) / interval_duration, count - prev_count, name)
+ (
+ (cum_time - prev_time) / interval_duration_secs,
+ count - prev_count,
+ name,
+ )
)
self.previous_counters = dict(self.current_counters)
@@ -212,7 +221,6 @@ class SQLBaseStore(object):
# is running in mainline, and we have some nice monitoring frontends
# to watch it
self._txn_perf_counters = PerformanceCounters()
- self._get_event_counters = PerformanceCounters()
self._get_event_cache = Cache(
"*getEvent*", keylen=3, max_entries=hs.config.event_cache_size
@@ -247,6 +255,8 @@ class SQLBaseStore(object):
self._check_safe_to_upsert,
)
+ self.rand = random.SystemRandom()
+
if self._account_validity.enabled:
self._clock.call_later(
0.0,
@@ -296,33 +306,46 @@ class SQLBaseStore(object):
def select_users_with_no_expiration_date_txn(txn):
"""Retrieves the list of registered users with no expiration date from the
- database.
+ database, filtering out deactivated users.
"""
sql = (
"SELECT users.name FROM users"
" LEFT JOIN account_validity ON (users.name = account_validity.user_id)"
- " WHERE account_validity.user_id is NULL;"
+ " WHERE account_validity.user_id is NULL AND users.deactivated = 0;"
)
txn.execute(sql, [])
res = self.cursor_to_dict(txn)
if res:
for user in res:
- self.set_expiration_date_for_user_txn(txn, user["name"])
+ self.set_expiration_date_for_user_txn(
+ txn, user["name"], use_delta=True
+ )
yield self.runInteraction(
"get_users_with_no_expiration_date",
select_users_with_no_expiration_date_txn,
)
- def set_expiration_date_for_user_txn(self, txn, user_id):
+ def set_expiration_date_for_user_txn(self, txn, user_id, use_delta=False):
"""Sets an expiration date to the account with the given user ID.
Args:
user_id (str): User ID to set an expiration date for.
+ use_delta (bool): If set to False, the expiration date for the user will be
+ now + validity period. If set to True, this expiration date will be a
+ random value in the [now + period - d ; now + period] range, d being a
+ delta equal to 10% of the validity period.
"""
now_ms = self._clock.time_msec()
expiration_ts = now_ms + self._account_validity.period
+
+ if use_delta:
+ expiration_ts = self.rand.randrange(
+ expiration_ts - self._account_validity.startup_job_max_delta,
+ expiration_ts,
+ )
+
self._simple_insert_txn(
txn,
"account_validity",
@@ -334,32 +357,24 @@ class SQLBaseStore(object):
)
def start_profiling(self):
- self._previous_loop_ts = self._clock.time_msec()
+ self._previous_loop_ts = monotonic_time()
def loop():
curr = self._current_txn_total_time
prev = self._previous_txn_total_time
self._previous_txn_total_time = curr
- time_now = self._clock.time_msec()
+ time_now = monotonic_time()
time_then = self._previous_loop_ts
self._previous_loop_ts = time_now
- ratio = (curr - prev) / (time_now - time_then)
-
- top_three_counters = self._txn_perf_counters.interval(
- time_now - time_then, limit=3
- )
+ duration = time_now - time_then
+ ratio = (curr - prev) / duration
- top_3_event_counters = self._get_event_counters.interval(
- time_now - time_then, limit=3
- )
+ top_three_counters = self._txn_perf_counters.interval(duration, limit=3)
perf_logger.info(
- "Total database time: %.3f%% {%s} {%s}",
- ratio * 100,
- top_three_counters,
- top_3_event_counters,
+ "Total database time: %.3f%% {%s}", ratio * 100, top_three_counters
)
self._clock.looping_call(loop, 10000)
@@ -367,7 +382,7 @@ class SQLBaseStore(object):
def _new_transaction(
self, conn, desc, after_callbacks, exception_callbacks, func, *args, **kwargs
):
- start = time.time()
+ start = monotonic_time()
txn_id = self._TXN_ID
# We don't really need these to be unique, so lets stop it from
@@ -433,7 +448,7 @@ class SQLBaseStore(object):
logger.debug("[TXN FAIL] {%s} %s", name, e)
raise
finally:
- end = time.time()
+ end = monotonic_time()
duration = end - start
LoggingContext.current_context().add_database_transaction(duration)
@@ -441,7 +456,7 @@ class SQLBaseStore(object):
transaction_logger.debug("[TXN END] {%s} %f sec", name, duration)
self._current_txn_total_time += duration
- self._txn_perf_counters.update(desc, start, end)
+ self._txn_perf_counters.update(desc, duration)
sql_txn_timer.labels(desc).observe(duration)
@defer.inlineCallbacks
@@ -507,11 +522,11 @@ class SQLBaseStore(object):
)
parent_context = None
- start_time = time.time()
+ start_time = monotonic_time()
def inner_func(conn, *args, **kwargs):
with LoggingContext("runWithConnection", parent_context) as context:
- sched_duration_sec = time.time() - start_time
+ sched_duration_sec = monotonic_time() - start_time
sql_scheduling_timer.observe(sched_duration_sec)
context.add_database_scheduled(sched_duration_sec)
@@ -570,6 +585,10 @@ class SQLBaseStore(object):
Args:
table : string giving the table name
values : dict of new column names and values for them
+ or_ignore : bool stating whether an exception should be raised
+ when a conflicting row already exists. If True, False will be
+ returned by the function instead
+ desc : string giving a description of the transaction
Returns:
bool: Whether the row was inserted or not. Only useful when
@@ -1210,8 +1229,8 @@ class SQLBaseStore(object):
)
txn.execute(select_sql, list(keyvalues.values()))
-
row = txn.fetchone()
+
if not row:
if allow_none:
return None
@@ -1645,7 +1664,7 @@ def db_to_json(db_content):
# Decode it to a Unicode string before feeding it to json.loads, so we
# consistenty get a Unicode-containing object out.
if isinstance(db_content, (bytes, bytearray)):
- db_content = db_content.decode('utf8')
+ db_content = db_content.decode("utf8")
try:
return json.loads(db_content)
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index b8b8273f73..50f913a414 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -169,7 +169,7 @@ class BackgroundUpdateStore(SQLBaseStore):
in_flight = set(update["update_name"] for update in updates)
for update in updates:
if update["depends_on"] not in in_flight:
- self._background_update_queue.append(update['update_name'])
+ self._background_update_queue.append(update["update_name"])
if not self._background_update_queue:
# no work left to do
diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py
index 9b0a99cb49..4ea0deea4f 100644
--- a/synapse/storage/deviceinbox.py
+++ b/synapse/storage/deviceinbox.py
@@ -138,6 +138,10 @@ class DeviceInboxWorkerStore(SQLBaseStore):
if not has_changed or last_stream_id == current_stream_id:
return defer.succeed(([], current_stream_id))
+ if limit <= 0:
+ # This can happen if we run out of room for EDUs in the transaction.
+ return defer.succeed(([], last_stream_id))
+
def get_new_messages_for_remote_destination_txn(txn):
sql = (
"SELECT stream_id, messages_json FROM device_federation_outbox"
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index fd869b934c..3413a46675 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -14,7 +14,7 @@
# limitations under the License.
import logging
-from six import iteritems, itervalues
+from six import iteritems
from canonicaljson import json
@@ -72,11 +72,14 @@ class DeviceWorkerStore(SQLBaseStore):
defer.returnValue({d["device_id"]: d for d in devices})
- def get_devices_by_remote(self, destination, from_stream_id):
+ @defer.inlineCallbacks
+ def get_devices_by_remote(self, destination, from_stream_id, limit):
"""Get stream of updates to send to remote servers
Returns:
- (int, list[dict]): current stream id and list of updates
+ Deferred[tuple[int, list[dict]]]:
+ current stream id (ie, the stream id of the last update included in the
+ response), and the list of updates
"""
now_stream_id = self._device_list_id_gen.get_current_token()
@@ -84,55 +87,127 @@ class DeviceWorkerStore(SQLBaseStore):
destination, int(from_stream_id)
)
if not has_changed:
- return (now_stream_id, [])
-
- return self.runInteraction(
+ defer.returnValue((now_stream_id, []))
+
+ # We retrieve n+1 devices from the list of outbound pokes where n is
+ # our outbound device update limit. We then check if the very last
+ # device has the same stream_id as the second-to-last device. If so,
+ # then we ignore all devices with that stream_id and only send the
+ # devices with a lower stream_id.
+ #
+ # If when culling the list we end up with no devices afterwards, we
+ # consider the device update to be too large, and simply skip the
+ # stream_id; the rationale being that such a large device list update
+ # is likely an error.
+ updates = yield self.runInteraction(
"get_devices_by_remote",
self._get_devices_by_remote_txn,
destination,
from_stream_id,
now_stream_id,
+ limit + 1,
+ )
+
+ # Return an empty list if there are no updates
+ if not updates:
+ defer.returnValue((now_stream_id, []))
+
+ # if we have exceeded the limit, we need to exclude any results with the
+ # same stream_id as the last row.
+ if len(updates) > limit:
+ stream_id_cutoff = updates[-1][2]
+ now_stream_id = stream_id_cutoff - 1
+ else:
+ stream_id_cutoff = None
+
+ # Perform the equivalent of a GROUP BY
+ #
+ # Iterate through the updates list and copy non-duplicate
+ # (user_id, device_id) entries into a map, with the value being
+ # the max stream_id across each set of duplicate entries
+ #
+ # maps (user_id, device_id) -> stream_id
+ # as long as their stream_id does not match that of the last row
+ query_map = {}
+ for update in updates:
+ if stream_id_cutoff is not None and update[2] >= stream_id_cutoff:
+ # Stop processing updates
+ break
+
+ key = (update[0], update[1])
+ query_map[key] = max(query_map.get(key, 0), update[2])
+
+ # If we didn't find any updates with a stream_id lower than the cutoff, it
+ # means that there are more than limit updates all of which have the same
+ # steam_id.
+
+ # That should only happen if a client is spamming the server with new
+ # devices, in which case E2E isn't going to work well anyway. We'll just
+ # skip that stream_id and return an empty list, and continue with the next
+ # stream_id next time.
+ if not query_map:
+ defer.returnValue((stream_id_cutoff, []))
+
+ results = yield self._get_device_update_edus_by_remote(
+ destination, from_stream_id, query_map
)
+ defer.returnValue((now_stream_id, results))
+
def _get_devices_by_remote_txn(
- self, txn, destination, from_stream_id, now_stream_id
+ self, txn, destination, from_stream_id, now_stream_id, limit
):
+ """Return device update information for a given remote destination
+
+ Args:
+ txn (LoggingTransaction): The transaction to execute
+ destination (str): The host the device updates are intended for
+ from_stream_id (int): The minimum stream_id to filter updates by, exclusive
+ now_stream_id (int): The maximum stream_id to filter updates by, inclusive
+ limit (int): Maximum number of device updates to return
+
+ Returns:
+ List: List of device updates
+ """
sql = """
- SELECT user_id, device_id, max(stream_id) FROM device_lists_outbound_pokes
+ SELECT user_id, device_id, stream_id FROM device_lists_outbound_pokes
WHERE destination = ? AND ? < stream_id AND stream_id <= ? AND sent = ?
- GROUP BY user_id, device_id
- LIMIT 20
+ ORDER BY stream_id
+ LIMIT ?
"""
- txn.execute(sql, (destination, from_stream_id, now_stream_id, False))
+ txn.execute(sql, (destination, from_stream_id, now_stream_id, False, limit))
- # maps (user_id, device_id) -> stream_id
- query_map = {(r[0], r[1]): r[2] for r in txn}
- if not query_map:
- return (now_stream_id, [])
+ return list(txn)
- if len(query_map) >= 20:
- now_stream_id = max(stream_id for stream_id in itervalues(query_map))
+ @defer.inlineCallbacks
+ def _get_device_update_edus_by_remote(self, destination, from_stream_id, query_map):
+ """Returns a list of device update EDUs as well as E2EE keys
- devices = self._get_e2e_device_keys_txn(
- txn,
+ Args:
+ destination (str): The host the device updates are intended for
+ from_stream_id (int): The minimum stream_id to filter updates by, exclusive
+ query_map (Dict[(str, str): int]): Dictionary mapping
+ user_id/device_id to update stream_id
+
+ Returns:
+ List[Dict]: List of objects representing an device update EDU
+
+ """
+ devices = yield self.runInteraction(
+ "_get_e2e_device_keys_txn",
+ self._get_e2e_device_keys_txn,
query_map.keys(),
include_all_devices=True,
include_deleted_devices=True,
)
- prev_sent_id_sql = """
- SELECT coalesce(max(stream_id), 0) as stream_id
- FROM device_lists_outbound_last_success
- WHERE destination = ? AND user_id = ? AND stream_id <= ?
- """
-
results = []
for user_id, user_devices in iteritems(devices):
# The prev_id for the first row is always the last row before
# `from_stream_id`
- txn.execute(prev_sent_id_sql, (destination, user_id, from_stream_id))
- rows = txn.fetchall()
- prev_id = rows[0][0]
+ prev_id = yield self._get_last_device_update_for_remote_user(
+ destination, user_id, from_stream_id
+ )
for device_id, device in iteritems(user_devices):
stream_id = query_map[(user_id, device_id)]
result = {
@@ -156,7 +231,22 @@ class DeviceWorkerStore(SQLBaseStore):
results.append(result)
- return (now_stream_id, results)
+ defer.returnValue(results)
+
+ def _get_last_device_update_for_remote_user(
+ self, destination, user_id, from_stream_id
+ ):
+ def f(txn):
+ prev_sent_id_sql = """
+ SELECT coalesce(max(stream_id), 0) as stream_id
+ FROM device_lists_outbound_last_success
+ WHERE destination = ? AND user_id = ? AND stream_id <= ?
+ """
+ txn.execute(prev_sent_id_sql, (destination, user_id, from_stream_id))
+ rows = txn.fetchall()
+ return rows[0][0]
+
+ return self.runInteraction("get_last_device_update_for_remote_user", f)
def mark_as_sent_devices_by_remote(self, destination, stream_id):
"""Mark that updates have successfully been sent to the destination.
diff --git a/synapse/storage/e2e_room_keys.py b/synapse/storage/e2e_room_keys.py
index 521936e3b0..f40ef2ab64 100644
--- a/synapse/storage/e2e_room_keys.py
+++ b/synapse/storage/e2e_room_keys.py
@@ -87,10 +87,10 @@ class EndToEndRoomKeyStore(SQLBaseStore):
},
values={
"version": version,
- "first_message_index": room_key['first_message_index'],
- "forwarded_count": room_key['forwarded_count'],
- "is_verified": room_key['is_verified'],
- "session_data": json.dumps(room_key['session_data']),
+ "first_message_index": room_key["first_message_index"],
+ "forwarded_count": room_key["forwarded_count"],
+ "is_verified": room_key["is_verified"],
+ "session_data": json.dumps(room_key["session_data"]),
},
lock=False,
)
@@ -118,13 +118,13 @@ class EndToEndRoomKeyStore(SQLBaseStore):
try:
version = int(version)
except ValueError:
- defer.returnValue({'rooms': {}})
+ defer.returnValue({"rooms": {}})
keyvalues = {"user_id": user_id, "version": version}
if room_id:
- keyvalues['room_id'] = room_id
+ keyvalues["room_id"] = room_id
if session_id:
- keyvalues['session_id'] = session_id
+ keyvalues["session_id"] = session_id
rows = yield self._simple_select_list(
table="e2e_room_keys",
@@ -141,10 +141,10 @@ class EndToEndRoomKeyStore(SQLBaseStore):
desc="get_e2e_room_keys",
)
- sessions = {'rooms': {}}
+ sessions = {"rooms": {}}
for row in rows:
- room_entry = sessions['rooms'].setdefault(row['room_id'], {"sessions": {}})
- room_entry['sessions'][row['session_id']] = {
+ room_entry = sessions["rooms"].setdefault(row["room_id"], {"sessions": {}})
+ room_entry["sessions"][row["session_id"]] = {
"first_message_index": row["first_message_index"],
"forwarded_count": row["forwarded_count"],
"is_verified": row["is_verified"],
@@ -174,9 +174,9 @@ class EndToEndRoomKeyStore(SQLBaseStore):
keyvalues = {"user_id": user_id, "version": int(version)}
if room_id:
- keyvalues['room_id'] = room_id
+ keyvalues["room_id"] = room_id
if session_id:
- keyvalues['session_id'] = session_id
+ keyvalues["session_id"] = session_id
yield self._simple_delete(
table="e2e_room_keys", keyvalues=keyvalues, desc="delete_e2e_room_keys"
@@ -191,7 +191,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
)
row = txn.fetchone()
if not row:
- raise StoreError(404, 'No current backup version')
+ raise StoreError(404, "No current backup version")
return row[0]
def get_e2e_room_keys_version_info(self, user_id, version=None):
@@ -255,7 +255,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
)
current_version = txn.fetchone()[0]
if current_version is None:
- current_version = '0'
+ current_version = "0"
new_version = str(int(current_version) + 1)
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index 1b97ee74e3..289b6bc281 100644
--- a/synapse/storage/engines/postgres.py
+++ b/synapse/storage/engines/postgres.py
@@ -45,6 +45,10 @@ class PostgresEngine(object):
# together. For example, version 8.1.5 will be returned as 80105
self._version = db_conn.server_version
+ # Are we on a supported PostgreSQL version?
+ if self._version < 90500:
+ raise RuntimeError("Synapse requires PostgreSQL 9.5+ or above.")
+
db_conn.set_isolation_level(
self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ
)
@@ -64,9 +68,9 @@ class PostgresEngine(object):
@property
def can_native_upsert(self):
"""
- Can we use native UPSERTs? This requires PostgreSQL 9.5+.
+ Can we use native UPSERTs?
"""
- return self._version >= 90500
+ return True
def is_deadlock(self, error):
if isinstance(error, self.module.DatabaseError):
diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py
index 933bcf42c2..e9b9caa49a 100644
--- a/synapse/storage/engines/sqlite.py
+++ b/synapse/storage/engines/sqlite.py
@@ -85,7 +85,7 @@ class Sqlite3Engine(object):
def _parse_match_info(buf):
bufsize = len(buf)
- return [struct.unpack('@I', buf[i : i + 4])[0] for i in range(0, bufsize, 4)]
+ return [struct.unpack("@I", buf[i : i + 4])[0] for i in range(0, bufsize, 4)]
def _rank(raw_match_info):
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 09e39c2c28..cb4478342f 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -190,6 +190,34 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
room_id,
)
+ def get_rooms_with_many_extremities(self, min_count, limit):
+ """Get the top rooms with at least N extremities.
+
+ Args:
+ min_count (int): The minimum number of extremities
+ limit (int): The maximum number of rooms to return.
+
+ Returns:
+ Deferred[list]: At most `limit` room IDs that have at least
+ `min_count` extremities, sorted by extremity count.
+ """
+
+ def _get_rooms_with_many_extremities_txn(txn):
+ sql = """
+ SELECT room_id FROM event_forward_extremities
+ GROUP BY room_id
+ HAVING count(*) > ?
+ ORDER BY count(*) DESC
+ LIMIT ?
+ """
+
+ txn.execute(sql, (min_count, limit))
+ return [room_id for room_id, in txn]
+
+ return self.runInteraction(
+ "get_rooms_with_many_extremities", _get_rooms_with_many_extremities_txn
+ )
+
@cached(max_entries=5000, iterable=True)
def get_latest_event_ids_in_room(self, room_id):
return self._simple_select_onecol(
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index a729f3e067..eca77069fd 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -277,7 +277,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
# contain results from the first query, correctly ordered, followed
# by results from the second query, but we want them all ordered
# by stream_ordering, oldest first.
- notifs.sort(key=lambda r: r['stream_ordering'])
+ notifs.sort(key=lambda r: r["stream_ordering"])
# Take only up to the limit. We have to stop at the limit because
# one of the subqueries may have hit the limit.
@@ -379,7 +379,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
# contain results from the first query, correctly ordered, followed
# by results from the second query, but we want them all ordered
# by received_ts (most recent first)
- notifs.sort(key=lambda r: -(r['received_ts'] or 0))
+ notifs.sort(key=lambda r: -(r["received_ts"] or 0))
# Now return the first `limit`
defer.returnValue(notifs[:limit])
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index f9162be9b9..fefba39ea1 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -17,14 +17,14 @@
import itertools
import logging
-from collections import OrderedDict, deque, namedtuple
+from collections import Counter as c_counter, OrderedDict, deque, namedtuple
from functools import wraps
from six import iteritems, text_type
from six.moves import range
from canonicaljson import json
-from prometheus_client import Counter
+from prometheus_client import Counter, Histogram
from twisted.internet import defer
@@ -33,6 +33,7 @@ from synapse.api.constants import EventTypes
from synapse.api.errors import SynapseError
from synapse.events import EventBase # noqa: F401
from synapse.events.snapshot import EventContext # noqa: F401
+from synapse.metrics import BucketCollector
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.state import StateResolutionStore
from synapse.storage.background_updates import BackgroundUpdateStore
@@ -73,6 +74,21 @@ state_delta_reuse_delta_counter = Counter(
"synapse_storage_events_state_delta_reuse_delta", ""
)
+# The number of forward extremities for each new event.
+forward_extremities_counter = Histogram(
+ "synapse_storage_events_forward_extremities_persisted",
+ "Number of forward extremities for each new event",
+ buckets=(1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"),
+)
+
+# The number of stale forward extremities for each new event. Stale extremities
+# are those that were in the previous set of extremities as well as the new.
+stale_forward_extremities_counter = Histogram(
+ "synapse_storage_events_stale_forward_extremities_persisted",
+ "Number of unchanged forward extremities for each new event",
+ buckets=(0, 1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"),
+)
+
def encode_json(json_object):
"""
@@ -220,13 +236,39 @@ class EventsStore(
EventsWorkerStore,
BackgroundUpdateStore,
):
-
def __init__(self, db_conn, hs):
super(EventsStore, self).__init__(db_conn, hs)
self._event_persist_queue = _EventPeristenceQueue()
self._state_resolution_handler = hs.get_state_resolution_handler()
+ # Collect metrics on the number of forward extremities that exist.
+ # Counter of number of extremities to count
+ self._current_forward_extremities_amount = c_counter()
+
+ BucketCollector(
+ "synapse_forward_extremities",
+ lambda: self._current_forward_extremities_amount,
+ buckets=[1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"],
+ )
+
+ # Read the extrems every 60 minutes
+ hs.get_clock().looping_call(self._read_forward_extremities, 60 * 60 * 1000)
+
+ @defer.inlineCallbacks
+ def _read_forward_extremities(self):
+ def fetch(txn):
+ txn.execute(
+ """
+ select count(*) c from event_forward_extremities
+ group by room_id
+ """
+ )
+ return txn.fetchall()
+
+ res = yield self.runInteraction("read_forward_extremities", fetch)
+ self._current_forward_extremities_amount = c_counter(list(x[0] for x in res))
+
@defer.inlineCallbacks
def persist_events(self, events_and_contexts, backfilled=False):
"""
@@ -514,6 +556,8 @@ class EventsStore(
and not event.internal_metadata.is_soft_failed()
]
+ latest_event_ids = set(latest_event_ids)
+
# start with the existing forward extremities
result = set(latest_event_ids)
@@ -537,6 +581,13 @@ class EventsStore(
)
result.difference_update(existing_prevs)
+ # We only update metrics for events that change forward extremities
+ # (e.g. we ignore backfill/outliers/etc)
+ if result != latest_event_ids:
+ forward_extremities_counter.observe(len(result))
+ stale = latest_event_ids & result
+ stale_forward_extremities_counter.observe(len(stale))
+
defer.returnValue(result)
@defer.inlineCallbacks
@@ -568,17 +619,11 @@ class EventsStore(
)
txn.execute(sql, batch)
- results.extend(
- r[0]
- for r in txn
- if not json.loads(r[1]).get("soft_failed")
- )
+ results.extend(r[0] for r in txn if not json.loads(r[1]).get("soft_failed"))
for chunk in batch_iter(event_ids, 100):
yield self.runInteraction(
- "_get_events_which_are_prevs",
- _get_events_which_are_prevs_txn,
- chunk,
+ "_get_events_which_are_prevs", _get_events_which_are_prevs_txn, chunk
)
defer.returnValue(results)
@@ -640,9 +685,7 @@ class EventsStore(
for chunk in batch_iter(event_ids, 100):
yield self.runInteraction(
- "_get_prevs_before_rejected",
- _get_prevs_before_rejected_txn,
- chunk,
+ "_get_prevs_before_rejected", _get_prevs_before_rejected_txn, chunk
)
defer.returnValue(existing_prevs)
diff --git a/synapse/storage/events_bg_updates.py b/synapse/storage/events_bg_updates.py
index 75c1935bf3..1ce21d190c 100644
--- a/synapse/storage/events_bg_updates.py
+++ b/synapse/storage/events_bg_updates.py
@@ -64,8 +64,7 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
)
self.register_background_update_handler(
- self.DELETE_SOFT_FAILED_EXTREMITIES,
- self._cleanup_extremities_bg_update,
+ self.DELETE_SOFT_FAILED_EXTREMITIES, self._cleanup_extremities_bg_update
)
@defer.inlineCallbacks
@@ -269,7 +268,8 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
LEFT JOIN events USING (event_id)
LEFT JOIN event_json USING (event_id)
LEFT JOIN rejections USING (event_id)
- """, (batch_size,)
+ """,
+ (batch_size,),
)
for prev_event_id, event_id, metadata, rejected, outlier in txn:
@@ -364,13 +364,12 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
column="event_id",
iterable=to_delete,
keyvalues={},
- retcols=("room_id",)
+ retcols=("room_id",),
)
room_ids = set(row["room_id"] for row in rows)
for room_id in room_ids:
txn.call_after(
- self.get_latest_event_ids_in_room.invalidate,
- (room_id,)
+ self.get_latest_event_ids_in_room.invalidate, (room_id,)
)
self._simple_delete_many_txn(
@@ -384,7 +383,7 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
return len(original_set)
num_handled = yield self.runInteraction(
- "_cleanup_extremities_bg_update", _cleanup_extremities_bg_update_txn,
+ "_cleanup_extremities_bg_update", _cleanup_extremities_bg_update_txn
)
if not num_handled:
@@ -394,8 +393,7 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
txn.execute("DROP TABLE _extremities_to_check")
yield self.runInteraction(
- "_cleanup_extremities_bg_update_drop_table",
- _drop_table_txn,
+ "_cleanup_extremities_bg_update_drop_table", _drop_table_txn
)
defer.returnValue(num_handled)
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index adc6cf26b5..6d680d405a 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from __future__ import division
+
import itertools
import logging
from collections import namedtuple
@@ -25,7 +27,6 @@ from synapse.api.constants import EventTypes
from synapse.api.errors import NotFoundError
from synapse.api.room_versions import EventFormatVersions
from synapse.events import FrozenEvent, event_type_from_format_version # noqa: F401
-# these are only included to make the type annotations work
from synapse.events.snapshot import EventContext # noqa: F401
from synapse.events.utils import prune_event
from synapse.metrics.background_process_metrics import run_as_background_process
@@ -76,6 +77,42 @@ class EventsWorkerStore(SQLBaseStore):
desc="get_received_ts",
)
+ def get_received_ts_by_stream_pos(self, stream_ordering):
+ """Given a stream ordering get an approximate timestamp of when it
+ happened.
+
+ This is done by simply taking the received ts of the first event that
+ has a stream ordering greater than or equal to the given stream pos.
+ If none exists returns the current time, on the assumption that it must
+ have happened recently.
+
+ Args:
+ stream_ordering (int)
+
+ Returns:
+ Deferred[int]
+ """
+
+ def _get_approximate_received_ts_txn(txn):
+ sql = """
+ SELECT received_ts FROM events
+ WHERE stream_ordering >= ?
+ LIMIT 1
+ """
+
+ txn.execute(sql, (stream_ordering,))
+ row = txn.fetchone()
+ if row and row[0]:
+ ts = row[0]
+ else:
+ ts = self.clock.time_msec()
+
+ return ts
+
+ return self.runInteraction(
+ "get_approximate_received_ts", _get_approximate_received_ts_txn
+ )
+
@defer.inlineCallbacks
def get_event(
self,
@@ -610,4 +647,81 @@ class EventsWorkerStore(SQLBaseStore):
return res
- return self.runInteraction("get_rejection_reasons", f)
+ return self.runInteraction("get_seen_events_with_rejections", f)
+
+ def _get_total_state_event_counts_txn(self, txn, room_id):
+ """
+ See get_total_state_event_counts.
+ """
+ # We join against the events table as that has an index on room_id
+ sql = """
+ SELECT COUNT(*) FROM state_events
+ INNER JOIN events USING (room_id, event_id)
+ WHERE room_id=?
+ """
+ txn.execute(sql, (room_id,))
+ row = txn.fetchone()
+ return row[0] if row else 0
+
+ def get_total_state_event_counts(self, room_id):
+ """
+ Gets the total number of state events in a room.
+
+ Args:
+ room_id (str)
+
+ Returns:
+ Deferred[int]
+ """
+ return self.runInteraction(
+ "get_total_state_event_counts",
+ self._get_total_state_event_counts_txn,
+ room_id,
+ )
+
+ def _get_current_state_event_counts_txn(self, txn, room_id):
+ """
+ See get_current_state_event_counts.
+ """
+ sql = "SELECT COUNT(*) FROM current_state_events WHERE room_id=?"
+ txn.execute(sql, (room_id,))
+ row = txn.fetchone()
+ return row[0] if row else 0
+
+ def get_current_state_event_counts(self, room_id):
+ """
+ Gets the current number of state events in a room.
+
+ Args:
+ room_id (str)
+
+ Returns:
+ Deferred[int]
+ """
+ return self.runInteraction(
+ "get_current_state_event_counts",
+ self._get_current_state_event_counts_txn,
+ room_id,
+ )
+
+ @defer.inlineCallbacks
+ def get_room_complexity(self, room_id):
+ """
+ Get a rough approximation of the complexity of the room. This is used by
+ remote servers to decide whether they wish to join the room or not.
+ Higher complexity value indicates that being in the room will consume
+ more resources.
+
+ Args:
+ room_id (str)
+
+ Returns:
+ Deferred[dict[str:int]] of complexity version to complexity.
+ """
+ state_events = yield self.get_current_state_event_counts(room_id)
+
+ # Call this one "v1", so we can introduce new ones as we want to develop
+ # it.
+ complexity_v1 = round(state_events / 500, 2)
+
+ defer.returnValue({"v1": complexity_v1})
diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py
index dce6a43ac1..73e6fc6de2 100644
--- a/synapse/storage/group_server.py
+++ b/synapse/storage/group_server.py
@@ -1179,11 +1179,7 @@ class GroupServerStore(SQLBaseStore):
for table in tables:
self._simple_delete_txn(
- txn,
- table=table,
- keyvalues={"group_id": group_id},
+ txn, table=table, keyvalues={"group_id": group_id}
)
- return self.runInteraction(
- "delete_group", _delete_group_txn
- )
+ return self.runInteraction("delete_group", _delete_group_txn)
diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py
index 7036541792..e72f89e446 100644
--- a/synapse/storage/keys.py
+++ b/synapse/storage/keys.py
@@ -19,6 +19,7 @@ import logging
import six
+import attr
from signedjson.key import decode_verify_key_bytes
from synapse.util import batch_iter
@@ -36,6 +37,12 @@ else:
db_binary_type = memoryview
+@attr.s(slots=True, frozen=True)
+class FetchKeyResult(object):
+ verify_key = attr.ib() # VerifyKey: the key itself
+ valid_until_ts = attr.ib() # int: how long we can use this key for
+
+
class KeyStore(SQLBaseStore):
"""Persistence for signature verification keys
"""
@@ -54,8 +61,8 @@ class KeyStore(SQLBaseStore):
iterable of (server_name, key-id) tuples to fetch keys for
Returns:
- Deferred: resolves to dict[Tuple[str, str], VerifyKey|None]:
- map from (server_name, key_id) -> VerifyKey, or None if the key is
+ Deferred: resolves to dict[Tuple[str, str], FetchKeyResult|None]:
+ map from (server_name, key_id) -> FetchKeyResult, or None if the key is
unknown
"""
keys = {}
@@ -65,17 +72,27 @@ class KeyStore(SQLBaseStore):
# batch_iter always returns tuples so it's safe to do len(batch)
sql = (
- "SELECT server_name, key_id, verify_key FROM server_signature_keys "
- "WHERE 1=0"
+ "SELECT server_name, key_id, verify_key, ts_valid_until_ms "
+ "FROM server_signature_keys WHERE 1=0"
) + " OR (server_name=? AND key_id=?)" * len(batch)
txn.execute(sql, tuple(itertools.chain.from_iterable(batch)))
for row in txn:
- server_name, key_id, key_bytes = row
- keys[(server_name, key_id)] = decode_verify_key_bytes(
- key_id, bytes(key_bytes)
+ server_name, key_id, key_bytes, ts_valid_until_ms = row
+
+ if ts_valid_until_ms is None:
+ # Old keys may be stored with a ts_valid_until_ms of null,
+ # in which case we treat this as if it was set to `0`, i.e.
+ # it won't match key requests that define a minimum
+ # `ts_valid_until_ms`.
+ ts_valid_until_ms = 0
+
+ res = FetchKeyResult(
+ verify_key=decode_verify_key_bytes(key_id, bytes(key_bytes)),
+ valid_until_ts=ts_valid_until_ms,
)
+ keys[(server_name, key_id)] = res
def _txn(txn):
for batch in batch_iter(server_name_and_key_ids, 50):
@@ -84,38 +101,53 @@ class KeyStore(SQLBaseStore):
return self.runInteraction("get_server_verify_keys", _txn)
- def store_server_verify_key(
- self, server_name, from_server, time_now_ms, verify_key
- ):
- """Stores a NACL verification key for the given server.
+ def store_server_verify_keys(self, from_server, ts_added_ms, verify_keys):
+ """Stores NACL verification keys for remote servers.
Args:
- server_name (str): The name of the server.
- from_server (str): Where the verification key was looked up
- time_now_ms (int): The time now in milliseconds
- verify_key (nacl.signing.VerifyKey): The NACL verify key.
+ from_server (str): Where the verification keys were looked up
+ ts_added_ms (int): The time to record that the key was added
+ verify_keys (iterable[tuple[str, str, FetchKeyResult]]):
+ keys to be stored. Each entry is a triplet of
+ (server_name, key_id, key).
"""
- key_id = "%s:%s" % (verify_key.alg, verify_key.version)
-
- # XXX fix this to not need a lock (#3819)
- def _txn(txn):
- self._simple_upsert_txn(
- txn,
- table="server_signature_keys",
- keyvalues={"server_name": server_name, "key_id": key_id},
- values={
- "from_server": from_server,
- "ts_added_ms": time_now_ms,
- "verify_key": db_binary_type(verify_key.encode()),
- },
+ key_values = []
+ value_values = []
+ invalidations = []
+ for server_name, key_id, fetch_result in verify_keys:
+ key_values.append((server_name, key_id))
+ value_values.append(
+ (
+ from_server,
+ ts_added_ms,
+ fetch_result.valid_until_ts,
+ db_binary_type(fetch_result.verify_key.encode()),
+ )
)
# invalidate takes a tuple corresponding to the params of
# _get_server_verify_key. _get_server_verify_key only takes one
# param, which is itself the 2-tuple (server_name, key_id).
- txn.call_after(
- self._get_server_verify_key.invalidate, ((server_name, key_id),)
- )
-
- return self.runInteraction("store_server_verify_key", _txn)
+ invalidations.append((server_name, key_id))
+
+ def _invalidate(res):
+ f = self._get_server_verify_key.invalidate
+ for i in invalidations:
+ f((i,))
+ return res
+
+ return self.runInteraction(
+ "store_server_verify_keys",
+ self._simple_upsert_many_txn,
+ table="server_signature_keys",
+ key_names=("server_name", "key_id"),
+ key_values=key_values,
+ value_names=(
+ "from_server",
+ "ts_added_ms",
+ "ts_valid_until_ms",
+ "verify_key",
+ ),
+ value_values=value_values,
+ ).addCallback(_invalidate)
def store_server_keys_json(
self, server_name, key_id, from_server, ts_now_ms, ts_expires_ms, key_json_bytes
diff --git a/synapse/storage/media_repository.py b/synapse/storage/media_repository.py
index 3ecf47e7a7..6b1238ce4a 100644
--- a/synapse/storage/media_repository.py
+++ b/synapse/storage/media_repository.py
@@ -22,11 +22,11 @@ class MediaRepositoryStore(BackgroundUpdateStore):
super(MediaRepositoryStore, self).__init__(db_conn, hs)
self.register_background_index_update(
- update_name='local_media_repository_url_idx',
- index_name='local_media_repository_url_idx',
- table='local_media_repository',
- columns=['created_ts'],
- where_clause='url_cache IS NOT NULL',
+ update_name="local_media_repository_url_idx",
+ index_name="local_media_repository_url_idx",
+ table="local_media_repository",
+ columns=["created_ts"],
+ where_clause="url_cache IS NOT NULL",
)
def get_local_media(self, media_id):
@@ -108,12 +108,12 @@ class MediaRepositoryStore(BackgroundUpdateStore):
return dict(
zip(
(
- 'response_code',
- 'etag',
- 'expires_ts',
- 'og',
- 'media_id',
- 'download_ts',
+ "response_code",
+ "etag",
+ "expires_ts",
+ "og",
+ "media_id",
+ "download_ts",
),
row,
)
diff --git a/synapse/storage/monthly_active_users.py b/synapse/storage/monthly_active_users.py
index 8aa8abc470..081564360f 100644
--- a/synapse/storage/monthly_active_users.py
+++ b/synapse/storage/monthly_active_users.py
@@ -86,11 +86,11 @@ class MonthlyActiveUsersStore(SQLBaseStore):
if len(self.reserved_users) > 0:
# questionmarks is a hack to overcome sqlite not supporting
# tuples in 'WHERE IN %s'
- questionmarks = '?' * len(self.reserved_users)
+ questionmarks = "?" * len(self.reserved_users)
query_args.extend(self.reserved_users)
sql = base_sql + """ AND user_id NOT IN ({})""".format(
- ','.join(questionmarks)
+ ",".join(questionmarks)
)
else:
sql = base_sql
@@ -124,7 +124,7 @@ class MonthlyActiveUsersStore(SQLBaseStore):
if len(self.reserved_users) > 0:
query_args.extend(self.reserved_users)
sql = base_sql + """ AND user_id NOT IN ({})""".format(
- ','.join(questionmarks)
+ ",".join(questionmarks)
)
else:
sql = base_sql
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index c1711bc8bd..7c4e1dc7ec 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -20,12 +20,14 @@ import logging
import os
import re
+from synapse.storage.engines.postgres import PostgresEngine
+
logger = logging.getLogger(__name__)
# Remember to update this number every time a change is made to database
# schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 54
+SCHEMA_VERSION = 55
dir_path = os.path.abspath(os.path.dirname(__file__))
@@ -115,15 +117,23 @@ def _setup_new_database(cur, database_engine):
valid_dirs = []
pattern = re.compile(r"^\d+(\.sql)?$")
+
+ if isinstance(database_engine, PostgresEngine):
+ specific = "postgres"
+ else:
+ specific = "sqlite"
+
+ specific_pattern = re.compile(r"^\d+(\.sql." + specific + r")?$")
+
for filename in directory_entries:
- match = pattern.match(filename)
+ match = pattern.match(filename) or specific_pattern.match(filename)
abs_path = os.path.join(current_dir, filename)
if match and os.path.isdir(abs_path):
ver = int(match.group(0))
if ver <= SCHEMA_VERSION:
valid_dirs.append((ver, abs_path))
else:
- logger.warn("Unexpected entry in 'full_schemas': %s", filename)
+ logger.debug("Ignoring entry '%s' in 'full_schemas'", filename)
if not valid_dirs:
raise PrepareDatabaseException(
@@ -136,7 +146,10 @@ def _setup_new_database(cur, database_engine):
directory_entries = os.listdir(sql_dir)
- for filename in fnmatch.filter(directory_entries, "*.sql"):
+ for filename in sorted(
+ fnmatch.filter(directory_entries, "*.sql")
+ + fnmatch.filter(directory_entries, "*.sql." + specific)
+ ):
sql_loc = os.path.join(sql_dir, filename)
logger.debug("Applying schema %s", sql_loc)
executescript(cur, sql_loc)
@@ -301,7 +314,7 @@ def _apply_module_schemas(txn, database_engine, config):
application config
"""
for (mod, _config) in config.password_providers:
- if not hasattr(mod, 'get_db_schema_files'):
+ if not hasattr(mod, "get_db_schema_files"):
continue
modname = ".".join((mod.__module__, mod.__name__))
_apply_module_schema_files(
@@ -331,7 +344,7 @@ def _apply_module_schema_files(cur, database_engine, modname, names_and_streams)
continue
root_name, ext = os.path.splitext(name)
- if ext != '.sql':
+ if ext != ".sql":
raise PrepareDatabaseException(
"only .sql files are currently supported for module schemas"
)
@@ -395,7 +408,7 @@ def get_statements(f):
def executescript(txn, schema_path):
- with open(schema_path, 'r') as f:
+ with open(schema_path, "r") as f:
for statement in get_statements(f):
txn.execute(statement)
diff --git a/synapse/storage/profile.py b/synapse/storage/profile.py
index aeec2f57c4..0ff392bdb4 100644
--- a/synapse/storage/profile.py
+++ b/synapse/storage/profile.py
@@ -41,7 +41,7 @@ class ProfileWorkerStore(SQLBaseStore):
defer.returnValue(
ProfileInfo(
- avatar_url=profile['avatar_url'], display_name=profile['displayname']
+ avatar_url=profile["avatar_url"], display_name=profile["displayname"]
)
)
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index 9e406baafa..98cec8c82b 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -46,12 +46,12 @@ def _load_rules(rawrules, enabled_map):
rules = list(list_with_base_rules(ruleslist))
for i, rule in enumerate(rules):
- rule_id = rule['rule_id']
+ rule_id = rule["rule_id"]
if rule_id in enabled_map:
- if rule.get('enabled', True) != bool(enabled_map[rule_id]):
+ if rule.get("enabled", True) != bool(enabled_map[rule_id]):
# Rules are cached across users.
rule = dict(rule)
- rule['enabled'] = bool(enabled_map[rule_id])
+ rule["enabled"] = bool(enabled_map[rule_id])
rules[i] = rule
return rules
@@ -126,12 +126,12 @@ class PushRulesWorkerStore(
def get_push_rules_enabled_for_user(self, user_id):
results = yield self._simple_select_list(
table="push_rules_enable",
- keyvalues={'user_name': user_id},
+ keyvalues={"user_name": user_id},
retcols=("user_name", "rule_id", "enabled"),
desc="get_push_rules_enabled_for_user",
)
defer.returnValue(
- {r['rule_id']: False if r['enabled'] == 0 else True for r in results}
+ {r["rule_id"]: False if r["enabled"] == 0 else True for r in results}
)
def have_push_rules_changed_for_user(self, user_id, last_id):
@@ -175,7 +175,7 @@ class PushRulesWorkerStore(
rows.sort(key=lambda row: (-int(row["priority_class"]), -int(row["priority"])))
for row in rows:
- results.setdefault(row['user_name'], []).append(row)
+ results.setdefault(row["user_name"], []).append(row)
enabled_map_by_user = yield self.bulk_get_push_rules_enabled(user_ids)
@@ -194,7 +194,7 @@ class PushRulesWorkerStore(
rule (Dict): A push rule.
"""
# Create new rule id
- rule_id_scope = '/'.join(rule["rule_id"].split('/')[:-1])
+ rule_id_scope = "/".join(rule["rule_id"].split("/")[:-1])
new_rule_id = rule_id_scope + "/" + new_room_id
# Change room id in each condition
@@ -334,8 +334,8 @@ class PushRulesWorkerStore(
desc="bulk_get_push_rules_enabled",
)
for row in rows:
- enabled = bool(row['enabled'])
- results.setdefault(row['user_name'], {})[row['rule_id']] = enabled
+ enabled = bool(row["enabled"])
+ results.setdefault(row["user_name"], {})[row["rule_id"]] = enabled
defer.returnValue(results)
@@ -568,7 +568,7 @@ class PushRuleStore(PushRulesWorkerStore):
def delete_push_rule_txn(txn, stream_id, event_stream_ordering):
self._simple_delete_one_txn(
- txn, "push_rules", {'user_name': user_id, 'rule_id': rule_id}
+ txn, "push_rules", {"user_name": user_id, "rule_id": rule_id}
)
self._insert_push_rules_update_txn(
@@ -605,9 +605,9 @@ class PushRuleStore(PushRulesWorkerStore):
self._simple_upsert_txn(
txn,
"push_rules_enable",
- {'user_name': user_id, 'rule_id': rule_id},
- {'enabled': 1 if enabled else 0},
- {'id': new_id},
+ {"user_name": user_id, "rule_id": rule_id},
+ {"enabled": 1 if enabled else 0},
+ {"id": new_id},
)
self._insert_push_rules_update_txn(
@@ -645,8 +645,8 @@ class PushRuleStore(PushRulesWorkerStore):
self._simple_update_one_txn(
txn,
"push_rules",
- {'user_name': user_id, 'rule_id': rule_id},
- {'actions': actions_json},
+ {"user_name": user_id, "rule_id": rule_id},
+ {"actions": actions_json},
)
self._insert_push_rules_update_txn(
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 1567e1df48..cfe0a94330 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -37,24 +37,24 @@ else:
class PusherWorkerStore(SQLBaseStore):
def _decode_pushers_rows(self, rows):
for r in rows:
- dataJson = r['data']
- r['data'] = None
+ dataJson = r["data"]
+ r["data"] = None
try:
if isinstance(dataJson, db_binary_type):
dataJson = str(dataJson).decode("UTF8")
- r['data'] = json.loads(dataJson)
+ r["data"] = json.loads(dataJson)
except Exception as e:
logger.warn(
"Invalid JSON in data for pusher %d: %s, %s",
- r['id'],
+ r["id"],
dataJson,
e.args[0],
)
pass
- if isinstance(r['pushkey'], db_binary_type):
- r['pushkey'] = str(r['pushkey']).decode("UTF8")
+ if isinstance(r["pushkey"], db_binary_type):
+ r["pushkey"] = str(r["pushkey"]).decode("UTF8")
return rows
@@ -195,15 +195,15 @@ class PusherWorkerStore(SQLBaseStore):
)
def get_if_users_have_pushers(self, user_ids):
rows = yield self._simple_select_many_batch(
- table='pushers',
- column='user_name',
+ table="pushers",
+ column="user_name",
iterable=user_ids,
- retcols=['user_name'],
- desc='get_if_users_have_pushers',
+ retcols=["user_name"],
+ desc="get_if_users_have_pushers",
)
result = {user_id: False for user_id in user_ids}
- result.update({r['user_name']: True for r in rows})
+ result.update({r["user_name"]: True for r in rows})
defer.returnValue(result)
@@ -299,8 +299,8 @@ class PusherStore(PusherWorkerStore):
):
yield self._simple_update_one(
"pushers",
- {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id},
- {'last_stream_ordering': last_stream_ordering},
+ {"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
+ {"last_stream_ordering": last_stream_ordering},
desc="update_pusher_last_stream_ordering",
)
@@ -310,10 +310,10 @@ class PusherStore(PusherWorkerStore):
):
yield self._simple_update_one(
"pushers",
- {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id},
+ {"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
{
- 'last_stream_ordering': last_stream_ordering,
- 'last_success': last_success,
+ "last_stream_ordering": last_stream_ordering,
+ "last_success": last_success,
},
desc="update_pusher_last_stream_ordering_and_success",
)
@@ -322,8 +322,8 @@ class PusherStore(PusherWorkerStore):
def update_pusher_failing_since(self, app_id, pushkey, user_id, failing_since):
yield self._simple_update_one(
"pushers",
- {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id},
- {'failing_since': failing_since},
+ {"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
+ {"failing_since": failing_since},
desc="update_pusher_failing_since",
)
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index a1647e50a1..b477da12b1 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -58,7 +58,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
@cachedInlineCallbacks()
def get_users_with_read_receipts_in_room(self, room_id):
receipts = yield self.get_receipts_for_room(room_id, "m.read")
- defer.returnValue(set(r['user_id'] for r in receipts))
+ defer.returnValue(set(r["user_id"] for r in receipts))
@cached(num_args=2)
def get_receipts_for_room(self, room_id, receipt_type):
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 4cf159ba81..983ce13291 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -15,19 +15,25 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
import re
+from six import iterkeys
from six.moves import range
from twisted.internet import defer
from synapse.api.constants import UserTypes
-from synapse.api.errors import Codes, StoreError
+from synapse.api.errors import Codes, StoreError, ThreepidValidationError
from synapse.storage import background_updates
from synapse.storage._base import SQLBaseStore
from synapse.types import UserID
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
+THIRTY_MINUTES_IN_MS = 30 * 60 * 1000
+
+logger = logging.getLogger(__name__)
+
class RegistrationWorkerStore(SQLBaseStore):
def __init__(self, db_conn, hs):
@@ -110,8 +116,9 @@ class RegistrationWorkerStore(SQLBaseStore):
defer.returnValue(res)
@defer.inlineCallbacks
- def set_account_validity_for_user(self, user_id, expiration_ts, email_sent,
- renewal_token=None):
+ def set_account_validity_for_user(
+ self, user_id, expiration_ts, email_sent, renewal_token=None
+ ):
"""Updates the account validity properties of the given account, with the
given values.
@@ -125,6 +132,7 @@ class RegistrationWorkerStore(SQLBaseStore):
renewal_token (str): Renewal token the user can use to extend the validity
of their account. Defaults to no token.
"""
+
def set_account_validity_for_user_txn(txn):
self._simple_update_txn(
txn=txn,
@@ -137,12 +145,11 @@ class RegistrationWorkerStore(SQLBaseStore):
},
)
self._invalidate_cache_and_stream(
- txn, self.get_expiration_ts_for_user, (user_id,),
+ txn, self.get_expiration_ts_for_user, (user_id,)
)
yield self.runInteraction(
- "set_account_validity_for_user",
- set_account_validity_for_user_txn,
+ "set_account_validity_for_user", set_account_validity_for_user_txn
)
@defer.inlineCallbacks
@@ -211,6 +218,7 @@ class RegistrationWorkerStore(SQLBaseStore):
Returns:
Deferred: Resolves to a list[dict[user_id (str), expiration_ts_ms (int)]]
"""
+
def select_users_txn(txn, now_ms, renew_at):
sql = (
"SELECT user_id, expiration_ts_ms FROM account_validity"
@@ -223,7 +231,8 @@ class RegistrationWorkerStore(SQLBaseStore):
res = yield self.runInteraction(
"get_users_expiring_soon",
select_users_txn,
- self.clock.time_msec(), self.config.account_validity.renew_at,
+ self.clock.time_msec(),
+ self.config.account_validity.renew_at,
)
defer.returnValue(res)
@@ -246,6 +255,20 @@ class RegistrationWorkerStore(SQLBaseStore):
)
@defer.inlineCallbacks
+ def delete_account_validity_for_user(self, user_id):
+ """Deletes the entry for the given user in the account validity table, removing
+ their expiration date and renewal token.
+
+ Args:
+ user_id (str): ID of the user to remove from the account validity table.
+ """
+ yield self._simple_delete_one(
+ table="account_validity",
+ keyvalues={"user_id": user_id},
+ desc="delete_account_validity_for_user",
+ )
+
+ @defer.inlineCallbacks
def is_server_admin(self, user):
res = yield self._simple_select_one_onecol(
table="users",
@@ -349,7 +372,7 @@ class RegistrationWorkerStore(SQLBaseStore):
WHERE creation_ts > ?
) AS t GROUP BY user_type
"""
- results = {'native': 0, 'guest': 0, 'bridged': 0}
+ results = {"native": 0, "guest": 0, "bridged": 0}
txn.execute(sql, (yesterday,))
for row in txn:
results[row[0]] = row[1]
@@ -415,14 +438,14 @@ class RegistrationWorkerStore(SQLBaseStore):
{"medium": medium, "address": address},
["guest_access_token"],
True,
- 'get_3pid_guest_access_token',
+ "get_3pid_guest_access_token",
)
if ret:
defer.returnValue(ret["guest_access_token"])
defer.returnValue(None)
@defer.inlineCallbacks
- def get_user_id_by_threepid(self, medium, address):
+ def get_user_id_by_threepid(self, medium, address, require_verified=False):
"""Returns user id from threepid
Args:
@@ -452,11 +475,11 @@ class RegistrationWorkerStore(SQLBaseStore):
txn,
"user_threepids",
{"medium": medium, "address": address},
- ['user_id'],
+ ["user_id"],
True,
)
if ret:
- return ret['user_id']
+ return ret["user_id"]
return None
@defer.inlineCallbacks
@@ -472,8 +495,8 @@ class RegistrationWorkerStore(SQLBaseStore):
ret = yield self._simple_select_list(
"user_threepids",
{"user_id": user_id},
- ['medium', 'address', 'validated_at', 'added_at'],
- 'user_get_threepids',
+ ["medium", "address", "validated_at", "added_at"],
+ "user_get_threepids",
)
defer.returnValue(ret)
@@ -552,11 +575,7 @@ class RegistrationWorkerStore(SQLBaseStore):
"""
return self._simple_select_onecol(
table="user_threepid_id_server",
- keyvalues={
- "user_id": user_id,
- "medium": medium,
- "address": address,
- },
+ keyvalues={"user_id": user_id, "medium": medium, "address": address},
retcol="id_server",
desc="get_id_servers_user_bound",
)
@@ -592,9 +611,79 @@ class RegistrationStore(
self.register_noop_background_update("refresh_tokens_device_index")
self.register_background_update_handler(
- "user_threepids_grandfather", self._bg_user_threepids_grandfather,
+ "user_threepids_grandfather", self._bg_user_threepids_grandfather
+ )
+
+ self.register_background_update_handler(
+ "users_set_deactivated_flag", self._backgroud_update_set_deactivated_flag
+ )
+
+ # Create a background job for culling expired 3PID validity tokens
+ hs.get_clock().looping_call(
+ self.cull_expired_threepid_validation_tokens, THIRTY_MINUTES_IN_MS
+ )
+
+ @defer.inlineCallbacks
+ def _backgroud_update_set_deactivated_flag(self, progress, batch_size):
+ """Retrieves a list of all deactivated users and sets the 'deactivated' flag to 1
+ for each of them.
+ """
+
+ last_user = progress.get("user_id", "")
+
+ def _backgroud_update_set_deactivated_flag_txn(txn):
+ txn.execute(
+ """
+ SELECT
+ users.name,
+ COUNT(access_tokens.token) AS count_tokens,
+ COUNT(user_threepids.address) AS count_threepids
+ FROM users
+ LEFT JOIN access_tokens ON (access_tokens.user_id = users.name)
+ LEFT JOIN user_threepids ON (user_threepids.user_id = users.name)
+ WHERE (users.password_hash IS NULL OR users.password_hash = '')
+ AND (users.appservice_id IS NULL OR users.appservice_id = '')
+ AND users.is_guest = 0
+ AND users.name > ?
+ GROUP BY users.name
+ ORDER BY users.name ASC
+ LIMIT ?;
+ """,
+ (last_user, batch_size),
+ )
+
+ rows = self.cursor_to_dict(txn)
+
+ if not rows:
+ return True
+
+ rows_processed_nb = 0
+
+ for user in rows:
+ if not user["count_tokens"] and not user["count_threepids"]:
+ self.set_user_deactivated_status_txn(txn, user["name"], True)
+ rows_processed_nb += 1
+
+ logger.info("Marked %d rows as deactivated", rows_processed_nb)
+
+ self._background_update_progress_txn(
+ txn, "users_set_deactivated_flag", {"user_id": rows[-1]["name"]}
+ )
+
+ if batch_size > len(rows):
+ return True
+ else:
+ return False
+
+ end = yield self.runInteraction(
+ "users_set_deactivated_flag", _backgroud_update_set_deactivated_flag_txn
)
+ if end:
+ yield self._end_background_update("users_set_deactivated_flag")
+
+ defer.returnValue(batch_size)
+
@defer.inlineCallbacks
def add_access_token_to_user(self, user_id, token, device_id=None):
"""Adds an access token for the given user.
@@ -760,7 +849,7 @@ class RegistrationStore(
def user_set_password_hash_txn(txn):
self._simple_update_one_txn(
- txn, 'users', {'name': user_id}, {'password_hash': password_hash}
+ txn, "users", {"name": user_id}, {"password_hash": password_hash}
)
self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
@@ -781,9 +870,9 @@ class RegistrationStore(
def f(txn):
self._simple_update_one_txn(
txn,
- table='users',
- keyvalues={'name': user_id},
- updatevalues={'consent_version': consent_version},
+ table="users",
+ keyvalues={"name": user_id},
+ updatevalues={"consent_version": consent_version},
)
self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
@@ -805,9 +894,9 @@ class RegistrationStore(
def f(txn):
self._simple_update_one_txn(
txn,
- table='users',
- keyvalues={'name': user_id},
- updatevalues={'consent_server_notice_sent': consent_version},
+ table="users",
+ keyvalues={"name": user_id},
+ updatevalues={"consent_server_notice_sent": consent_version},
)
self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
@@ -963,7 +1052,6 @@ class RegistrationStore(
We do this by grandfathering in existing user threepids assuming that
they used one of the server configured trusted identity servers.
"""
-
id_servers = set(self.config.trusted_third_party_id_servers)
def _bg_user_threepids_grandfather_txn(txn):
@@ -978,9 +1066,322 @@ class RegistrationStore(
if id_servers:
yield self.runInteraction(
- "_bg_user_threepids_grandfather", _bg_user_threepids_grandfather_txn,
+ "_bg_user_threepids_grandfather", _bg_user_threepids_grandfather_txn
)
yield self._end_background_update("user_threepids_grandfather")
defer.returnValue(1)
+
+ def get_threepid_validation_session(
+ self, medium, client_secret, address=None, sid=None, validated=True
+ ):
+ """Gets a session_id and last_send_attempt (if available) for a
+ client_secret/medium/(address|session_id) combo
+
+ Args:
+ medium (str|None): The medium of the 3PID
+ address (str|None): The address of the 3PID
+ sid (str|None): The ID of the validation session
+ client_secret (str|None): A unique string provided by the client to
+ help identify this validation attempt
+ validated (bool|None): Whether sessions should be filtered by
+ whether they have been validated already or not. None to
+ perform no filtering
+
+ Returns:
+ deferred {str, int}|None: A dict containing the
+ latest session_id and send_attempt count for this 3PID.
+ Otherwise None if there hasn't been a previous attempt
+ """
+ keyvalues = {"medium": medium, "client_secret": client_secret}
+ if address:
+ keyvalues["address"] = address
+ if sid:
+ keyvalues["session_id"] = sid
+
+ assert address or sid
+
+ def get_threepid_validation_session_txn(txn):
+ sql = """
+ SELECT address, session_id, medium, client_secret,
+ last_send_attempt, validated_at
+ FROM threepid_validation_session WHERE %s
+ """ % (
+ " AND ".join("%s = ?" % k for k in iterkeys(keyvalues)),
+ )
+
+ if validated is not None:
+ sql += " AND validated_at IS " + ("NOT NULL" if validated else "NULL")
+
+ sql += " LIMIT 1"
+
+ txn.execute(sql, list(keyvalues.values()))
+ rows = self.cursor_to_dict(txn)
+ if not rows:
+ return None
+
+ return rows[0]
+
+ return self.runInteraction(
+ "get_threepid_validation_session", get_threepid_validation_session_txn
+ )
+
+ def validate_threepid_session(self, session_id, client_secret, token, current_ts):
+ """Attempt to validate a threepid session using a token
+
+ Args:
+ session_id (str): The id of a validation session
+ client_secret (str): A unique string provided by the client to
+ help identify this validation attempt
+ token (str): A validation token
+ current_ts (int): The current unix time in milliseconds. Used for
+ checking token expiry status
+
+ Returns:
+ deferred str|None: A str representing a link to redirect the user
+ to if there is one.
+ """
+ # Insert everything into a transaction in order to run atomically
+ def validate_threepid_session_txn(txn):
+ row = self._simple_select_one_txn(
+ txn,
+ table="threepid_validation_session",
+ keyvalues={"session_id": session_id},
+ retcols=["client_secret", "validated_at"],
+ allow_none=True,
+ )
+
+ if not row:
+ raise ThreepidValidationError(400, "Unknown session_id")
+ retrieved_client_secret = row["client_secret"]
+ validated_at = row["validated_at"]
+
+ if retrieved_client_secret != client_secret:
+ raise ThreepidValidationError(
+ 400, "This client_secret does not match the provided session_id"
+ )
+
+ row = self._simple_select_one_txn(
+ txn,
+ table="threepid_validation_token",
+ keyvalues={"session_id": session_id, "token": token},
+ retcols=["expires", "next_link"],
+ allow_none=True,
+ )
+
+ if not row:
+ raise ThreepidValidationError(
+ 400, "Validation token not found or has expired"
+ )
+ expires = row["expires"]
+ next_link = row["next_link"]
+
+ # If the session is already validated, no need to revalidate
+ if validated_at:
+ return next_link
+
+ if expires <= current_ts:
+ raise ThreepidValidationError(
+ 400, "This token has expired. Please request a new one"
+ )
+
+ # Looks good. Validate the session
+ self._simple_update_txn(
+ txn,
+ table="threepid_validation_session",
+ keyvalues={"session_id": session_id},
+ updatevalues={"validated_at": self.clock.time_msec()},
+ )
+
+ return next_link
+
+ # Return next_link if it exists
+ return self.runInteraction(
+ "validate_threepid_session_txn", validate_threepid_session_txn
+ )
+
+ def upsert_threepid_validation_session(
+ self,
+ medium,
+ address,
+ client_secret,
+ send_attempt,
+ session_id,
+ validated_at=None,
+ ):
+ """Upsert a threepid validation session
+ Args:
+ medium (str): The medium of the 3PID
+ address (str): The address of the 3PID
+ client_secret (str): A unique string provided by the client to
+ help identify this validation attempt
+ send_attempt (int): The latest send_attempt on this session
+ session_id (str): The id of this validation session
+ validated_at (int|None): The unix timestamp in milliseconds of
+ when the session was marked as valid
+ """
+ insertion_values = {
+ "medium": medium,
+ "address": address,
+ "client_secret": client_secret,
+ }
+
+ if validated_at:
+ insertion_values["validated_at"] = validated_at
+
+ return self._simple_upsert(
+ table="threepid_validation_session",
+ keyvalues={"session_id": session_id},
+ values={"last_send_attempt": send_attempt},
+ insertion_values=insertion_values,
+ desc="upsert_threepid_validation_session",
+ )
+
+ def start_or_continue_validation_session(
+ self,
+ medium,
+ address,
+ session_id,
+ client_secret,
+ send_attempt,
+ next_link,
+ token,
+ token_expires,
+ ):
+ """Creates a new threepid validation session if it does not already
+ exist and associates a new validation token with it
+
+ Args:
+ medium (str): The medium of the 3PID
+ address (str): The address of the 3PID
+ session_id (str): The id of this validation session
+ client_secret (str): A unique string provided by the client to
+ help identify this validation attempt
+ send_attempt (int): The latest send_attempt on this session
+ next_link (str|None): The link to redirect the user to upon
+ successful validation
+ token (str): The validation token
+ token_expires (int): The timestamp for which after the token
+ will no longer be valid
+ """
+
+ def start_or_continue_validation_session_txn(txn):
+ # Create or update a validation session
+ self._simple_upsert_txn(
+ txn,
+ table="threepid_validation_session",
+ keyvalues={"session_id": session_id},
+ values={"last_send_attempt": send_attempt},
+ insertion_values={
+ "medium": medium,
+ "address": address,
+ "client_secret": client_secret,
+ },
+ )
+
+ # Create a new validation token with this session ID
+ self._simple_insert_txn(
+ txn,
+ table="threepid_validation_token",
+ values={
+ "session_id": session_id,
+ "token": token,
+ "next_link": next_link,
+ "expires": token_expires,
+ },
+ )
+
+ return self.runInteraction(
+ "start_or_continue_validation_session",
+ start_or_continue_validation_session_txn,
+ )
+
+ def cull_expired_threepid_validation_tokens(self):
+ """Remove threepid validation tokens with expiry dates that have passed"""
+
+ def cull_expired_threepid_validation_tokens_txn(txn, ts):
+ sql = """
+ DELETE FROM threepid_validation_token WHERE
+ expires < ?
+ """
+ return txn.execute(sql, (ts,))
+
+ return self.runInteraction(
+ "cull_expired_threepid_validation_tokens",
+ cull_expired_threepid_validation_tokens_txn,
+ self.clock.time_msec(),
+ )
+
+ def delete_threepid_session(self, session_id):
+ """Removes a threepid validation session from the database. This can
+ be done after validation has been performed and whatever action was
+ waiting on it has been carried out
+
+ Args:
+ session_id (str): The ID of the session to delete
+ """
+
+ def delete_threepid_session_txn(txn):
+ self._simple_delete_txn(
+ txn,
+ table="threepid_validation_token",
+ keyvalues={"session_id": session_id},
+ )
+ self._simple_delete_txn(
+ txn,
+ table="threepid_validation_session",
+ keyvalues={"session_id": session_id},
+ )
+
+ return self.runInteraction(
+ "delete_threepid_session", delete_threepid_session_txn
+ )
+
+ def set_user_deactivated_status_txn(self, txn, user_id, deactivated):
+ self._simple_update_one_txn(
+ txn=txn,
+ table="users",
+ keyvalues={"name": user_id},
+ updatevalues={"deactivated": 1 if deactivated else 0},
+ )
+ self._invalidate_cache_and_stream(
+ txn, self.get_user_deactivated_status, (user_id,)
+ )
+
+ @defer.inlineCallbacks
+ def set_user_deactivated_status(self, user_id, deactivated):
+ """Set the `deactivated` property for the provided user to the provided value.
+
+ Args:
+ user_id (str): The ID of the user to set the status for.
+ deactivated (bool): The value to set for `deactivated`.
+ """
+
+ yield self.runInteraction(
+ "set_user_deactivated_status",
+ self.set_user_deactivated_status_txn,
+ user_id,
+ deactivated,
+ )
+
+ @cachedInlineCallbacks()
+ def get_user_deactivated_status(self, user_id):
+ """Retrieve the value for the `deactivated` property for the provided user.
+
+ Args:
+ user_id (str): The ID of the user to retrieve the status for.
+
+ Returns:
+ defer.Deferred(bool): The requested value.
+ """
+
+ res = yield self._simple_select_one_onecol(
+ table="users",
+ keyvalues={"name": user_id},
+ retcol="deactivated",
+ desc="get_user_deactivated_status",
+ )
+
+ # Convert the integer into a boolean.
+ defer.returnValue(res == 1)
diff --git a/synapse/storage/relations.py b/synapse/storage/relations.py
index 4c83800cca..1b01934c19 100644
--- a/synapse/storage/relations.py
+++ b/synapse/storage/relations.py
@@ -468,9 +468,5 @@ class RelationsStore(RelationsWorkerStore):
"""
self._simple_delete_txn(
- txn,
- table="event_relations",
- keyvalues={
- "event_id": redacted_event_id,
- }
+ txn, table="event_relations", keyvalues={"event_id": redacted_event_id}
)
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 57df17bcc2..8004aeb909 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -142,6 +142,27 @@ class RoomMemberWorkerStore(EventsWorkerStore):
return self.runInteraction("get_room_summary", _get_room_summary_txn)
+ def _get_user_counts_in_room_txn(self, txn, room_id):
+ """
+ Get the user count in a room by membership.
+
+ Args:
+ room_id (str)
+ membership (Membership)
+
+ Returns:
+ Deferred[int]
+ """
+ sql = """
+ SELECT m.membership, count(*) FROM room_memberships as m
+ INNER JOIN current_state_events as c USING(event_id)
+ WHERE c.type = 'm.room.member' AND c.room_id = ?
+ GROUP BY m.membership
+ """
+
+ txn.execute(sql, (room_id,))
+ return {row[0]: row[1] for row in txn}
+
@cached()
def get_invited_rooms_for_user(self, user_id):
""" Get all the rooms the user is invited to
@@ -399,7 +420,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
table="room_memberships",
column="event_id",
iterable=missing_member_event_ids,
- retcols=('user_id', 'display_name', 'avatar_url'),
+ retcols=("user_id", "display_name", "avatar_url"),
keyvalues={"membership": Membership.JOIN},
batch_size=500,
desc="_get_joined_users_from_context",
@@ -427,7 +448,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
@cachedInlineCallbacks(max_entries=10000)
def is_host_joined(self, room_id, host):
- if '%' in host or '_' in host:
+ if "%" in host or "_" in host:
raise Exception("Invalid host name")
sql = """
@@ -469,7 +490,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
Deferred: Resolves to True if the host is/was in the room, otherwise
False.
"""
- if '%' in host or '_' in host:
+ if "%" in host or "_" in host:
raise Exception("Invalid host name")
sql = """
@@ -702,7 +723,7 @@ class RoomMemberStore(RoomMemberWorkerStore):
room_id = row["room_id"]
try:
event_json = json.loads(row["json"])
- content = event_json['content']
+ content = event_json["content"]
except Exception:
continue
diff --git a/synapse/storage/schema/delta/20/pushers.py b/synapse/storage/schema/delta/20/pushers.py
index 147496a38b..3edfcfd783 100644
--- a/synapse/storage/schema/delta/20/pushers.py
+++ b/synapse/storage/schema/delta/20/pushers.py
@@ -29,7 +29,8 @@ logger = logging.getLogger(__name__)
def run_create(cur, database_engine, *args, **kwargs):
logger.info("Porting pushers table...")
- cur.execute("""
+ cur.execute(
+ """
CREATE TABLE IF NOT EXISTS pushers2 (
id BIGINT PRIMARY KEY,
user_name TEXT NOT NULL,
@@ -48,27 +49,34 @@ def run_create(cur, database_engine, *args, **kwargs):
failing_since BIGINT,
UNIQUE (app_id, pushkey, user_name)
)
- """)
- cur.execute("""SELECT
+ """
+ )
+ cur.execute(
+ """SELECT
id, user_name, access_token, profile_tag, kind,
app_id, app_display_name, device_display_name,
pushkey, ts, lang, data, last_token, last_success,
failing_since
FROM pushers
- """)
+ """
+ )
count = 0
for row in cur.fetchall():
row = list(row)
row[8] = bytes(row[8]).decode("utf-8")
row[11] = bytes(row[11]).decode("utf-8")
- cur.execute(database_engine.convert_param_style("""
+ cur.execute(
+ database_engine.convert_param_style(
+ """
INSERT into pushers2 (
id, user_name, access_token, profile_tag, kind,
app_id, app_display_name, device_display_name,
pushkey, ts, lang, data, last_token, last_success,
failing_since
- ) values (%s)""" % (','.join(['?' for _ in range(len(row))]))),
- row
+ ) values (%s)"""
+ % (",".join(["?" for _ in range(len(row))]))
+ ),
+ row,
)
count += 1
cur.execute("DROP TABLE pushers")
diff --git a/synapse/storage/schema/delta/30/as_users.py b/synapse/storage/schema/delta/30/as_users.py
index ef7ec34346..9b95411fb6 100644
--- a/synapse/storage/schema/delta/30/as_users.py
+++ b/synapse/storage/schema/delta/30/as_users.py
@@ -40,9 +40,7 @@ def run_upgrade(cur, database_engine, config, *args, **kwargs):
logger.warning("Could not get app_service_config_files from config")
pass
- appservices = load_appservices(
- config.server_name, config_files
- )
+ appservices = load_appservices(config.server_name, config_files)
owned = {}
@@ -53,20 +51,19 @@ def run_upgrade(cur, database_engine, config, *args, **kwargs):
if user_id in owned.keys():
logger.error(
"user_id %s was owned by more than one application"
- " service (IDs %s and %s); assigning arbitrarily to %s" %
- (user_id, owned[user_id], appservice.id, owned[user_id])
+ " service (IDs %s and %s); assigning arbitrarily to %s"
+ % (user_id, owned[user_id], appservice.id, owned[user_id])
)
owned.setdefault(appservice.id, []).append(user_id)
for as_id, user_ids in owned.items():
n = 100
- user_chunks = (user_ids[i:i + 100] for i in range(0, len(user_ids), n))
+ user_chunks = (user_ids[i : i + 100] for i in range(0, len(user_ids), n))
for chunk in user_chunks:
cur.execute(
database_engine.convert_param_style(
- "UPDATE users SET appservice_id = ? WHERE name IN (%s)" % (
- ",".join("?" for _ in chunk),
- )
+ "UPDATE users SET appservice_id = ? WHERE name IN (%s)"
+ % (",".join("?" for _ in chunk),)
),
- [as_id] + chunk
+ [as_id] + chunk,
)
diff --git a/synapse/storage/schema/delta/31/pushers.py b/synapse/storage/schema/delta/31/pushers.py
index 93367fa09e..9bb504aad5 100644
--- a/synapse/storage/schema/delta/31/pushers.py
+++ b/synapse/storage/schema/delta/31/pushers.py
@@ -24,12 +24,13 @@ logger = logging.getLogger(__name__)
def token_to_stream_ordering(token):
- return int(token[1:].split('_')[0])
+ return int(token[1:].split("_")[0])
def run_create(cur, database_engine, *args, **kwargs):
logger.info("Porting pushers table, delta 31...")
- cur.execute("""
+ cur.execute(
+ """
CREATE TABLE IF NOT EXISTS pushers2 (
id BIGINT PRIMARY KEY,
user_name TEXT NOT NULL,
@@ -48,26 +49,33 @@ def run_create(cur, database_engine, *args, **kwargs):
failing_since BIGINT,
UNIQUE (app_id, pushkey, user_name)
)
- """)
- cur.execute("""SELECT
+ """
+ )
+ cur.execute(
+ """SELECT
id, user_name, access_token, profile_tag, kind,
app_id, app_display_name, device_display_name,
pushkey, ts, lang, data, last_token, last_success,
failing_since
FROM pushers
- """)
+ """
+ )
count = 0
for row in cur.fetchall():
row = list(row)
row[12] = token_to_stream_ordering(row[12])
- cur.execute(database_engine.convert_param_style("""
+ cur.execute(
+ database_engine.convert_param_style(
+ """
INSERT into pushers2 (
id, user_name, access_token, profile_tag, kind,
app_id, app_display_name, device_display_name,
pushkey, ts, lang, data, last_stream_ordering, last_success,
failing_since
- ) values (%s)""" % (','.join(['?' for _ in range(len(row))]))),
- row
+ ) values (%s)"""
+ % (",".join(["?" for _ in range(len(row))]))
+ ),
+ row,
)
count += 1
cur.execute("DROP TABLE pushers")
diff --git a/synapse/storage/schema/delta/33/remote_media_ts.py b/synapse/storage/schema/delta/33/remote_media_ts.py
index 9754d3ccfb..a26057dfb6 100644
--- a/synapse/storage/schema/delta/33/remote_media_ts.py
+++ b/synapse/storage/schema/delta/33/remote_media_ts.py
@@ -26,5 +26,5 @@ def run_upgrade(cur, database_engine, *args, **kwargs):
database_engine.convert_param_style(
"UPDATE remote_media_cache SET last_access_ts = ?"
),
- (int(time.time() * 1000),)
+ (int(time.time() * 1000),),
)
diff --git a/synapse/storage/schema/delta/47/state_group_seq.py b/synapse/storage/schema/delta/47/state_group_seq.py
index f6766501d2..9fd1ccf6f7 100644
--- a/synapse/storage/schema/delta/47/state_group_seq.py
+++ b/synapse/storage/schema/delta/47/state_group_seq.py
@@ -27,10 +27,7 @@ def run_create(cur, database_engine, *args, **kwargs):
else:
start_val = row[0] + 1
- cur.execute(
- "CREATE SEQUENCE state_group_id_seq START WITH %s",
- (start_val, ),
- )
+ cur.execute("CREATE SEQUENCE state_group_id_seq START WITH %s", (start_val,))
def run_upgrade(*args, **kwargs):
diff --git a/synapse/storage/schema/delta/48/group_unique_indexes.py b/synapse/storage/schema/delta/48/group_unique_indexes.py
index 2233af87d7..49f5f2c003 100644
--- a/synapse/storage/schema/delta/48/group_unique_indexes.py
+++ b/synapse/storage/schema/delta/48/group_unique_indexes.py
@@ -38,16 +38,22 @@ def run_create(cur, database_engine, *args, **kwargs):
rowid = "ctid" if isinstance(database_engine, PostgresEngine) else "rowid"
# remove duplicates from group_users & group_invites tables
- cur.execute("""
+ cur.execute(
+ """
DELETE FROM group_users WHERE %s NOT IN (
SELECT min(%s) FROM group_users GROUP BY group_id, user_id
);
- """ % (rowid, rowid))
- cur.execute("""
+ """
+ % (rowid, rowid)
+ )
+ cur.execute(
+ """
DELETE FROM group_invites WHERE %s NOT IN (
SELECT min(%s) FROM group_invites GROUP BY group_id, user_id
);
- """ % (rowid, rowid))
+ """
+ % (rowid, rowid)
+ )
for statement in get_statements(FIX_INDEXES.splitlines()):
cur.execute(statement)
diff --git a/synapse/storage/schema/delta/50/make_event_content_nullable.py b/synapse/storage/schema/delta/50/make_event_content_nullable.py
index 6dd467b6c5..b1684a8441 100644
--- a/synapse/storage/schema/delta/50/make_event_content_nullable.py
+++ b/synapse/storage/schema/delta/50/make_event_content_nullable.py
@@ -65,14 +65,18 @@ def run_create(cur, database_engine, *args, **kwargs):
def run_upgrade(cur, database_engine, *args, **kwargs):
if isinstance(database_engine, PostgresEngine):
- cur.execute("""
+ cur.execute(
+ """
ALTER TABLE events ALTER COLUMN content DROP NOT NULL;
- """)
+ """
+ )
return
# sqlite is an arse about this. ref: https://www.sqlite.org/lang_altertable.html
- cur.execute("SELECT sql FROM sqlite_master WHERE tbl_name='events' AND type='table'")
+ cur.execute(
+ "SELECT sql FROM sqlite_master WHERE tbl_name='events' AND type='table'"
+ )
(oldsql,) = cur.fetchone()
sql = oldsql.replace("content TEXT NOT NULL", "content TEXT")
@@ -86,7 +90,7 @@ def run_upgrade(cur, database_engine, *args, **kwargs):
cur.execute("PRAGMA writable_schema=ON")
cur.execute(
"UPDATE sqlite_master SET sql=? WHERE tbl_name='events' AND type='table'",
- (sql, ),
+ (sql,),
)
cur.execute("PRAGMA schema_version=%i" % (oldver + 1,))
cur.execute("PRAGMA writable_schema=OFF")
diff --git a/synapse/storage/schema/delta/54/account_validity.sql b/synapse/storage/schema/delta/54/account_validity_with_renewal.sql
index 2357626000..0adb2ad55e 100644
--- a/synapse/storage/schema/delta/54/account_validity.sql
+++ b/synapse/storage/schema/delta/54/account_validity_with_renewal.sql
@@ -13,6 +13,9 @@
* limitations under the License.
*/
+-- We previously changed the schema for this table without renaming the file, which means
+-- that some databases might still be using the old schema. This ensures Synapse uses the
+-- right schema for the table.
DROP TABLE IF EXISTS account_validity;
-- Track what users are in public rooms.
diff --git a/synapse/storage/schema/delta/54/add_validity_to_server_keys.sql b/synapse/storage/schema/delta/54/add_validity_to_server_keys.sql
new file mode 100644
index 0000000000..c01aa9d2d9
--- /dev/null
+++ b/synapse/storage/schema/delta/54/add_validity_to_server_keys.sql
@@ -0,0 +1,23 @@
+/* Copyright 2019 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* When we can use this key until, before we have to refresh it. */
+ALTER TABLE server_signature_keys ADD COLUMN ts_valid_until_ms BIGINT;
+
+UPDATE server_signature_keys SET ts_valid_until_ms = (
+ SELECT MAX(ts_valid_until_ms) FROM server_keys_json skj WHERE
+ skj.server_name = server_signature_keys.server_name AND
+ skj.key_id = server_signature_keys.key_id
+);
diff --git a/synapse/storage/schema/delta/54/stats.sql b/synapse/storage/schema/delta/54/stats.sql
new file mode 100644
index 0000000000..652e58308e
--- /dev/null
+++ b/synapse/storage/schema/delta/54/stats.sql
@@ -0,0 +1,80 @@
+/* Copyright 2018 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE stats_stream_pos (
+ Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row.
+ stream_id BIGINT,
+ CHECK (Lock='X')
+);
+
+INSERT INTO stats_stream_pos (stream_id) VALUES (null);
+
+CREATE TABLE user_stats (
+ user_id TEXT NOT NULL,
+ ts BIGINT NOT NULL,
+ bucket_size INT NOT NULL,
+ public_rooms INT NOT NULL,
+ private_rooms INT NOT NULL
+);
+
+CREATE UNIQUE INDEX user_stats_user_ts ON user_stats(user_id, ts);
+
+CREATE TABLE room_stats (
+ room_id TEXT NOT NULL,
+ ts BIGINT NOT NULL,
+ bucket_size INT NOT NULL,
+ current_state_events INT NOT NULL,
+ joined_members INT NOT NULL,
+ invited_members INT NOT NULL,
+ left_members INT NOT NULL,
+ banned_members INT NOT NULL,
+ state_events INT NOT NULL
+);
+
+CREATE UNIQUE INDEX room_stats_room_ts ON room_stats(room_id, ts);
+
+-- cache of current room state; useful for the publicRooms list
+CREATE TABLE room_state (
+ room_id TEXT NOT NULL,
+ join_rules TEXT,
+ history_visibility TEXT,
+ encryption TEXT,
+ name TEXT,
+ topic TEXT,
+ avatar TEXT,
+ canonical_alias TEXT
+ -- get aliases straight from the right table
+);
+
+CREATE UNIQUE INDEX room_state_room ON room_state(room_id);
+
+CREATE TABLE room_stats_earliest_token (
+ room_id TEXT NOT NULL,
+ token BIGINT NOT NULL
+);
+
+CREATE UNIQUE INDEX room_stats_earliest_token_idx ON room_stats_earliest_token(room_id);
+
+-- Set up staging tables
+INSERT INTO background_updates (update_name, progress_json) VALUES
+ ('populate_stats_createtables', '{}');
+
+-- Run through each room and update stats
+INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
+ ('populate_stats_process_rooms', '{}', 'populate_stats_createtables');
+
+-- Clean up staging tables
+INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
+ ('populate_stats_cleanup', '{}', 'populate_stats_process_rooms');
diff --git a/synapse/storage/schema/delta/54/stats2.sql b/synapse/storage/schema/delta/54/stats2.sql
new file mode 100644
index 0000000000..3b2d48447f
--- /dev/null
+++ b/synapse/storage/schema/delta/54/stats2.sql
@@ -0,0 +1,28 @@
+/* Copyright 2019 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- This delta file gets run after `54/stats.sql` delta.
+
+-- We want to add some indices to the temporary stats table, so we re-insert
+-- 'populate_stats_createtables' if we are still processing the rooms update.
+INSERT INTO background_updates (update_name, progress_json)
+ SELECT 'populate_stats_createtables', '{}'
+ WHERE
+ 'populate_stats_process_rooms' IN (
+ SELECT update_name FROM background_updates
+ )
+ AND 'populate_stats_createtables' NOT IN ( -- don't insert if already exists
+ SELECT update_name FROM background_updates
+ );
diff --git a/synapse/storage/schema/delta/55/track_threepid_validations.sql b/synapse/storage/schema/delta/55/track_threepid_validations.sql
new file mode 100644
index 0000000000..a8eced2e0a
--- /dev/null
+++ b/synapse/storage/schema/delta/55/track_threepid_validations.sql
@@ -0,0 +1,31 @@
+/* Copyright 2019 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+CREATE TABLE IF NOT EXISTS threepid_validation_session (
+ session_id TEXT PRIMARY KEY,
+ medium TEXT NOT NULL,
+ address TEXT NOT NULL,
+ client_secret TEXT NOT NULL,
+ last_send_attempt BIGINT NOT NULL,
+ validated_at BIGINT
+);
+
+CREATE TABLE IF NOT EXISTS threepid_validation_token (
+ token TEXT PRIMARY KEY,
+ session_id TEXT NOT NULL,
+ next_link TEXT,
+ expires BIGINT NOT NULL
+);
+
+CREATE INDEX threepid_validation_token_session_id ON threepid_validation_token(session_id);
diff --git a/synapse/storage/schema/delta/55/users_alter_deactivated.sql b/synapse/storage/schema/delta/55/users_alter_deactivated.sql
new file mode 100644
index 0000000000..dabdde489b
--- /dev/null
+++ b/synapse/storage/schema/delta/55/users_alter_deactivated.sql
@@ -0,0 +1,19 @@
+/* Copyright 2019 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE users ADD deactivated SMALLINT DEFAULT 0 NOT NULL;
+
+INSERT INTO background_updates (update_name, progress_json) VALUES
+ ('users_set_deactivated_flag', '{}');
diff --git a/synapse/storage/schema/full_schemas/54/full.sql.postgres b/synapse/storage/schema/full_schemas/54/full.sql.postgres
new file mode 100644
index 0000000000..098434356f
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/54/full.sql.postgres
@@ -0,0 +1,2052 @@
+
+
+
+
+
+CREATE TABLE access_tokens (
+ id bigint NOT NULL,
+ user_id text NOT NULL,
+ device_id text,
+ token text NOT NULL,
+ last_used bigint
+);
+
+
+
+CREATE TABLE account_data (
+ user_id text NOT NULL,
+ account_data_type text NOT NULL,
+ stream_id bigint NOT NULL,
+ content text NOT NULL
+);
+
+
+
+CREATE TABLE account_data_max_stream_id (
+ lock character(1) DEFAULT 'X'::bpchar NOT NULL,
+ stream_id bigint NOT NULL,
+ CONSTRAINT private_user_data_max_stream_id_lock_check CHECK ((lock = 'X'::bpchar))
+);
+
+
+
+CREATE TABLE account_validity (
+ user_id text NOT NULL,
+ expiration_ts_ms bigint NOT NULL,
+ email_sent boolean NOT NULL,
+ renewal_token text
+);
+
+
+
+CREATE TABLE application_services_state (
+ as_id text NOT NULL,
+ state character varying(5),
+ last_txn integer
+);
+
+
+
+CREATE TABLE application_services_txns (
+ as_id text NOT NULL,
+ txn_id integer NOT NULL,
+ event_ids text NOT NULL
+);
+
+
+
+CREATE TABLE appservice_room_list (
+ appservice_id text NOT NULL,
+ network_id text NOT NULL,
+ room_id text NOT NULL
+);
+
+
+
+CREATE TABLE appservice_stream_position (
+ lock character(1) DEFAULT 'X'::bpchar NOT NULL,
+ stream_ordering bigint,
+ CONSTRAINT appservice_stream_position_lock_check CHECK ((lock = 'X'::bpchar))
+);
+
+
+
+CREATE TABLE background_updates (
+ update_name text NOT NULL,
+ progress_json text NOT NULL,
+ depends_on text
+);
+
+
+
+CREATE TABLE blocked_rooms (
+ room_id text NOT NULL,
+ user_id text NOT NULL
+);
+
+
+
+CREATE TABLE cache_invalidation_stream (
+ stream_id bigint,
+ cache_func text,
+ keys text[],
+ invalidation_ts bigint
+);
+
+
+
+CREATE TABLE current_state_delta_stream (
+ stream_id bigint NOT NULL,
+ room_id text NOT NULL,
+ type text NOT NULL,
+ state_key text NOT NULL,
+ event_id text,
+ prev_event_id text
+);
+
+
+
+CREATE TABLE current_state_events (
+ event_id text NOT NULL,
+ room_id text NOT NULL,
+ type text NOT NULL,
+ state_key text NOT NULL
+);
+
+
+
+CREATE TABLE deleted_pushers (
+ stream_id bigint NOT NULL,
+ app_id text NOT NULL,
+ pushkey text NOT NULL,
+ user_id text NOT NULL
+);
+
+
+
+CREATE TABLE destinations (
+ destination text NOT NULL,
+ retry_last_ts bigint,
+ retry_interval integer
+);
+
+
+
+CREATE TABLE device_federation_inbox (
+ origin text NOT NULL,
+ message_id text NOT NULL,
+ received_ts bigint NOT NULL
+);
+
+
+
+CREATE TABLE device_federation_outbox (
+ destination text NOT NULL,
+ stream_id bigint NOT NULL,
+ queued_ts bigint NOT NULL,
+ messages_json text NOT NULL
+);
+
+
+
+CREATE TABLE device_inbox (
+ user_id text NOT NULL,
+ device_id text NOT NULL,
+ stream_id bigint NOT NULL,
+ message_json text NOT NULL
+);
+
+
+
+CREATE TABLE device_lists_outbound_last_success (
+ destination text NOT NULL,
+ user_id text NOT NULL,
+ stream_id bigint NOT NULL
+);
+
+
+
+CREATE TABLE device_lists_outbound_pokes (
+ destination text NOT NULL,
+ stream_id bigint NOT NULL,
+ user_id text NOT NULL,
+ device_id text NOT NULL,
+ sent boolean NOT NULL,
+ ts bigint NOT NULL
+);
+
+
+
+CREATE TABLE device_lists_remote_cache (
+ user_id text NOT NULL,
+ device_id text NOT NULL,
+ content text NOT NULL
+);
+
+
+
+CREATE TABLE device_lists_remote_extremeties (
+ user_id text NOT NULL,
+ stream_id text NOT NULL
+);
+
+
+
+CREATE TABLE device_lists_stream (
+ stream_id bigint NOT NULL,
+ user_id text NOT NULL,
+ device_id text NOT NULL
+);
+
+
+
+CREATE TABLE device_max_stream_id (
+ stream_id bigint NOT NULL
+);
+
+
+
+CREATE TABLE devices (
+ user_id text NOT NULL,
+ device_id text NOT NULL,
+ display_name text
+);
+
+
+
+CREATE TABLE e2e_device_keys_json (
+ user_id text NOT NULL,
+ device_id text NOT NULL,
+ ts_added_ms bigint NOT NULL,
+ key_json text NOT NULL
+);
+
+
+
+CREATE TABLE e2e_one_time_keys_json (
+ user_id text NOT NULL,
+ device_id text NOT NULL,
+ algorithm text NOT NULL,
+ key_id text NOT NULL,
+ ts_added_ms bigint NOT NULL,
+ key_json text NOT NULL
+);
+
+
+
+CREATE TABLE e2e_room_keys (
+ user_id text NOT NULL,
+ room_id text NOT NULL,
+ session_id text NOT NULL,
+ version bigint NOT NULL,
+ first_message_index integer,
+ forwarded_count integer,
+ is_verified boolean,
+ session_data text NOT NULL
+);
+
+
+
+CREATE TABLE e2e_room_keys_versions (
+ user_id text NOT NULL,
+ version bigint NOT NULL,
+ algorithm text NOT NULL,
+ auth_data text NOT NULL,
+ deleted smallint DEFAULT 0 NOT NULL
+);
+
+
+
+CREATE TABLE erased_users (
+ user_id text NOT NULL
+);
+
+
+
+CREATE TABLE event_auth (
+ event_id text NOT NULL,
+ auth_id text NOT NULL,
+ room_id text NOT NULL
+);
+
+
+
+CREATE TABLE event_backward_extremities (
+ event_id text NOT NULL,
+ room_id text NOT NULL
+);
+
+
+
+CREATE TABLE event_edges (
+ event_id text NOT NULL,
+ prev_event_id text NOT NULL,
+ room_id text NOT NULL,
+ is_state boolean NOT NULL
+);
+
+
+
+CREATE TABLE event_forward_extremities (
+ event_id text NOT NULL,
+ room_id text NOT NULL
+);
+
+
+
+CREATE TABLE event_json (
+ event_id text NOT NULL,
+ room_id text NOT NULL,
+ internal_metadata text NOT NULL,
+ json text NOT NULL,
+ format_version integer
+);
+
+
+
+CREATE TABLE event_push_actions (
+ room_id text NOT NULL,
+ event_id text NOT NULL,
+ user_id text NOT NULL,
+ profile_tag character varying(32),
+ actions text NOT NULL,
+ topological_ordering bigint,
+ stream_ordering bigint,
+ notif smallint,
+ highlight smallint
+);
+
+
+
+CREATE TABLE event_push_actions_staging (
+ event_id text NOT NULL,
+ user_id text NOT NULL,
+ actions text NOT NULL,
+ notif smallint NOT NULL,
+ highlight smallint NOT NULL
+);
+
+
+
+CREATE TABLE event_push_summary (
+ user_id text NOT NULL,
+ room_id text NOT NULL,
+ notif_count bigint NOT NULL,
+ stream_ordering bigint NOT NULL
+);
+
+
+
+CREATE TABLE event_push_summary_stream_ordering (
+ lock character(1) DEFAULT 'X'::bpchar NOT NULL,
+ stream_ordering bigint NOT NULL,
+ CONSTRAINT event_push_summary_stream_ordering_lock_check CHECK ((lock = 'X'::bpchar))
+);
+
+
+
+CREATE TABLE event_reference_hashes (
+ event_id text,
+ algorithm text,
+ hash bytea
+);
+
+
+
+CREATE TABLE event_relations (
+ event_id text NOT NULL,
+ relates_to_id text NOT NULL,
+ relation_type text NOT NULL,
+ aggregation_key text
+);
+
+
+
+CREATE TABLE event_reports (
+ id bigint NOT NULL,
+ received_ts bigint NOT NULL,
+ room_id text NOT NULL,
+ event_id text NOT NULL,
+ user_id text NOT NULL,
+ reason text,
+ content text
+);
+
+
+
+CREATE TABLE event_search (
+ event_id text,
+ room_id text,
+ sender text,
+ key text,
+ vector tsvector,
+ origin_server_ts bigint,
+ stream_ordering bigint
+);
+
+
+
+CREATE TABLE event_to_state_groups (
+ event_id text NOT NULL,
+ state_group bigint NOT NULL
+);
+
+
+
+CREATE TABLE events (
+ stream_ordering integer NOT NULL,
+ topological_ordering bigint NOT NULL,
+ event_id text NOT NULL,
+ type text NOT NULL,
+ room_id text NOT NULL,
+ content text,
+ unrecognized_keys text,
+ processed boolean NOT NULL,
+ outlier boolean NOT NULL,
+ depth bigint DEFAULT 0 NOT NULL,
+ origin_server_ts bigint,
+ received_ts bigint,
+ sender text,
+ contains_url boolean
+);
+
+
+
+CREATE TABLE ex_outlier_stream (
+ event_stream_ordering bigint NOT NULL,
+ event_id text NOT NULL,
+ state_group bigint NOT NULL
+);
+
+
+
+CREATE TABLE federation_stream_position (
+ type text NOT NULL,
+ stream_id integer NOT NULL
+);
+
+
+
+CREATE TABLE group_attestations_remote (
+ group_id text NOT NULL,
+ user_id text NOT NULL,
+ valid_until_ms bigint NOT NULL,
+ attestation_json text NOT NULL
+);
+
+
+
+CREATE TABLE group_attestations_renewals (
+ group_id text NOT NULL,
+ user_id text NOT NULL,
+ valid_until_ms bigint NOT NULL
+);
+
+
+
+CREATE TABLE group_invites (
+ group_id text NOT NULL,
+ user_id text NOT NULL
+);
+
+
+
+CREATE TABLE group_roles (
+ group_id text NOT NULL,
+ role_id text NOT NULL,
+ profile text NOT NULL,
+ is_public boolean NOT NULL
+);
+
+
+
+CREATE TABLE group_room_categories (
+ group_id text NOT NULL,
+ category_id text NOT NULL,
+ profile text NOT NULL,
+ is_public boolean NOT NULL
+);
+
+
+
+CREATE TABLE group_rooms (
+ group_id text NOT NULL,
+ room_id text NOT NULL,
+ is_public boolean NOT NULL
+);
+
+
+
+CREATE TABLE group_summary_roles (
+ group_id text NOT NULL,
+ role_id text NOT NULL,
+ role_order bigint NOT NULL,
+ CONSTRAINT group_summary_roles_role_order_check CHECK ((role_order > 0))
+);
+
+
+
+CREATE TABLE group_summary_room_categories (
+ group_id text NOT NULL,
+ category_id text NOT NULL,
+ cat_order bigint NOT NULL,
+ CONSTRAINT group_summary_room_categories_cat_order_check CHECK ((cat_order > 0))
+);
+
+
+
+CREATE TABLE group_summary_rooms (
+ group_id text NOT NULL,
+ room_id text NOT NULL,
+ category_id text NOT NULL,
+ room_order bigint NOT NULL,
+ is_public boolean NOT NULL,
+ CONSTRAINT group_summary_rooms_room_order_check CHECK ((room_order > 0))
+);
+
+
+
+CREATE TABLE group_summary_users (
+ group_id text NOT NULL,
+ user_id text NOT NULL,
+ role_id text NOT NULL,
+ user_order bigint NOT NULL,
+ is_public boolean NOT NULL
+);
+
+
+
+CREATE TABLE group_users (
+ group_id text NOT NULL,
+ user_id text NOT NULL,
+ is_admin boolean NOT NULL,
+ is_public boolean NOT NULL
+);
+
+
+
+CREATE TABLE groups (
+ group_id text NOT NULL,
+ name text,
+ avatar_url text,
+ short_description text,
+ long_description text,
+ is_public boolean NOT NULL,
+ join_policy text DEFAULT 'invite'::text NOT NULL
+);
+
+
+
+CREATE TABLE guest_access (
+ event_id text NOT NULL,
+ room_id text NOT NULL,
+ guest_access text NOT NULL
+);
+
+
+
+CREATE TABLE history_visibility (
+ event_id text NOT NULL,
+ room_id text NOT NULL,
+ history_visibility text NOT NULL
+);
+
+
+
+CREATE TABLE local_group_membership (
+ group_id text NOT NULL,
+ user_id text NOT NULL,
+ is_admin boolean NOT NULL,
+ membership text NOT NULL,
+ is_publicised boolean NOT NULL,
+ content text NOT NULL
+);
+
+
+
+CREATE TABLE local_group_updates (
+ stream_id bigint NOT NULL,
+ group_id text NOT NULL,
+ user_id text NOT NULL,
+ type text NOT NULL,
+ content text NOT NULL
+);
+
+
+
+CREATE TABLE local_invites (
+ stream_id bigint NOT NULL,
+ inviter text NOT NULL,
+ invitee text NOT NULL,
+ event_id text NOT NULL,
+ room_id text NOT NULL,
+ locally_rejected text,
+ replaced_by text
+);
+
+
+
+CREATE TABLE local_media_repository (
+ media_id text,
+ media_type text,
+ media_length integer,
+ created_ts bigint,
+ upload_name text,
+ user_id text,
+ quarantined_by text,
+ url_cache text,
+ last_access_ts bigint
+);
+
+
+
+CREATE TABLE local_media_repository_thumbnails (
+ media_id text,
+ thumbnail_width integer,
+ thumbnail_height integer,
+ thumbnail_type text,
+ thumbnail_method text,
+ thumbnail_length integer
+);
+
+
+
+CREATE TABLE local_media_repository_url_cache (
+ url text,
+ response_code integer,
+ etag text,
+ expires_ts bigint,
+ og text,
+ media_id text,
+ download_ts bigint
+);
+
+
+
+CREATE TABLE monthly_active_users (
+ user_id text NOT NULL,
+ "timestamp" bigint NOT NULL
+);
+
+
+
+CREATE TABLE open_id_tokens (
+ token text NOT NULL,
+ ts_valid_until_ms bigint NOT NULL,
+ user_id text NOT NULL
+);
+
+
+
+CREATE TABLE presence (
+ user_id text NOT NULL,
+ state character varying(20),
+ status_msg text,
+ mtime bigint
+);
+
+
+
+CREATE TABLE presence_allow_inbound (
+ observed_user_id text NOT NULL,
+ observer_user_id text NOT NULL
+);
+
+
+
+CREATE TABLE presence_stream (
+ stream_id bigint,
+ user_id text,
+ state text,
+ last_active_ts bigint,
+ last_federation_update_ts bigint,
+ last_user_sync_ts bigint,
+ status_msg text,
+ currently_active boolean
+);
+
+
+
+CREATE TABLE profiles (
+ user_id text NOT NULL,
+ displayname text,
+ avatar_url text
+);
+
+
+
+CREATE TABLE public_room_list_stream (
+ stream_id bigint NOT NULL,
+ room_id text NOT NULL,
+ visibility boolean NOT NULL,
+ appservice_id text,
+ network_id text
+);
+
+
+
+CREATE TABLE push_rules (
+ id bigint NOT NULL,
+ user_name text NOT NULL,
+ rule_id text NOT NULL,
+ priority_class smallint NOT NULL,
+ priority integer DEFAULT 0 NOT NULL,
+ conditions text NOT NULL,
+ actions text NOT NULL
+);
+
+
+
+CREATE TABLE push_rules_enable (
+ id bigint NOT NULL,
+ user_name text NOT NULL,
+ rule_id text NOT NULL,
+ enabled smallint
+);
+
+
+
+CREATE TABLE push_rules_stream (
+ stream_id bigint NOT NULL,
+ event_stream_ordering bigint NOT NULL,
+ user_id text NOT NULL,
+ rule_id text NOT NULL,
+ op text NOT NULL,
+ priority_class smallint,
+ priority integer,
+ conditions text,
+ actions text
+);
+
+
+
+CREATE TABLE pusher_throttle (
+ pusher bigint NOT NULL,
+ room_id text NOT NULL,
+ last_sent_ts bigint,
+ throttle_ms bigint
+);
+
+
+
+CREATE TABLE pushers (
+ id bigint NOT NULL,
+ user_name text NOT NULL,
+ access_token bigint,
+ profile_tag text NOT NULL,
+ kind text NOT NULL,
+ app_id text NOT NULL,
+ app_display_name text NOT NULL,
+ device_display_name text NOT NULL,
+ pushkey text NOT NULL,
+ ts bigint NOT NULL,
+ lang text,
+ data text,
+ last_stream_ordering integer,
+ last_success bigint,
+ failing_since bigint
+);
+
+
+
+CREATE TABLE ratelimit_override (
+ user_id text NOT NULL,
+ messages_per_second bigint,
+ burst_count bigint
+);
+
+
+
+CREATE TABLE receipts_graph (
+ room_id text NOT NULL,
+ receipt_type text NOT NULL,
+ user_id text NOT NULL,
+ event_ids text NOT NULL,
+ data text NOT NULL
+);
+
+
+
+CREATE TABLE receipts_linearized (
+ stream_id bigint NOT NULL,
+ room_id text NOT NULL,
+ receipt_type text NOT NULL,
+ user_id text NOT NULL,
+ event_id text NOT NULL,
+ data text NOT NULL
+);
+
+
+
+CREATE TABLE received_transactions (
+ transaction_id text,
+ origin text,
+ ts bigint,
+ response_code integer,
+ response_json bytea,
+ has_been_referenced smallint DEFAULT 0
+);
+
+
+
+CREATE TABLE redactions (
+ event_id text NOT NULL,
+ redacts text NOT NULL
+);
+
+
+
+CREATE TABLE rejections (
+ event_id text NOT NULL,
+ reason text NOT NULL,
+ last_check text NOT NULL
+);
+
+
+
+CREATE TABLE remote_media_cache (
+ media_origin text,
+ media_id text,
+ media_type text,
+ created_ts bigint,
+ upload_name text,
+ media_length integer,
+ filesystem_id text,
+ last_access_ts bigint,
+ quarantined_by text
+);
+
+
+
+CREATE TABLE remote_media_cache_thumbnails (
+ media_origin text,
+ media_id text,
+ thumbnail_width integer,
+ thumbnail_height integer,
+ thumbnail_method text,
+ thumbnail_type text,
+ thumbnail_length integer,
+ filesystem_id text
+);
+
+
+
+CREATE TABLE remote_profile_cache (
+ user_id text NOT NULL,
+ displayname text,
+ avatar_url text,
+ last_check bigint NOT NULL
+);
+
+
+
+CREATE TABLE room_account_data (
+ user_id text NOT NULL,
+ room_id text NOT NULL,
+ account_data_type text NOT NULL,
+ stream_id bigint NOT NULL,
+ content text NOT NULL
+);
+
+
+
+CREATE TABLE room_alias_servers (
+ room_alias text NOT NULL,
+ server text NOT NULL
+);
+
+
+
+CREATE TABLE room_aliases (
+ room_alias text NOT NULL,
+ room_id text NOT NULL,
+ creator text
+);
+
+
+
+CREATE TABLE room_depth (
+ room_id text NOT NULL,
+ min_depth integer NOT NULL
+);
+
+
+
+CREATE TABLE room_memberships (
+ event_id text NOT NULL,
+ user_id text NOT NULL,
+ sender text NOT NULL,
+ room_id text NOT NULL,
+ membership text NOT NULL,
+ forgotten integer DEFAULT 0,
+ display_name text,
+ avatar_url text
+);
+
+
+
+CREATE TABLE room_names (
+ event_id text NOT NULL,
+ room_id text NOT NULL,
+ name text NOT NULL
+);
+
+
+
+CREATE TABLE room_state (
+ room_id text NOT NULL,
+ join_rules text,
+ history_visibility text,
+ encryption text,
+ name text,
+ topic text,
+ avatar text,
+ canonical_alias text
+);
+
+
+
+CREATE TABLE room_stats (
+ room_id text NOT NULL,
+ ts bigint NOT NULL,
+ bucket_size integer NOT NULL,
+ current_state_events integer NOT NULL,
+ joined_members integer NOT NULL,
+ invited_members integer NOT NULL,
+ left_members integer NOT NULL,
+ banned_members integer NOT NULL,
+ state_events integer NOT NULL
+);
+
+
+
+CREATE TABLE room_stats_earliest_token (
+ room_id text NOT NULL,
+ token bigint NOT NULL
+);
+
+
+
+CREATE TABLE room_tags (
+ user_id text NOT NULL,
+ room_id text NOT NULL,
+ tag text NOT NULL,
+ content text NOT NULL
+);
+
+
+
+CREATE TABLE room_tags_revisions (
+ user_id text NOT NULL,
+ room_id text NOT NULL,
+ stream_id bigint NOT NULL
+);
+
+
+
+CREATE TABLE rooms (
+ room_id text NOT NULL,
+ is_public boolean,
+ creator text
+);
+
+
+
+CREATE TABLE server_keys_json (
+ server_name text NOT NULL,
+ key_id text NOT NULL,
+ from_server text NOT NULL,
+ ts_added_ms bigint NOT NULL,
+ ts_valid_until_ms bigint NOT NULL,
+ key_json bytea NOT NULL
+);
+
+
+
+CREATE TABLE server_signature_keys (
+ server_name text,
+ key_id text,
+ from_server text,
+ ts_added_ms bigint,
+ verify_key bytea,
+ ts_valid_until_ms bigint
+);
+
+
+
+CREATE TABLE state_events (
+ event_id text NOT NULL,
+ room_id text NOT NULL,
+ type text NOT NULL,
+ state_key text NOT NULL,
+ prev_state text
+);
+
+
+
+CREATE TABLE state_group_edges (
+ state_group bigint NOT NULL,
+ prev_state_group bigint NOT NULL
+);
+
+
+
+CREATE SEQUENCE state_group_id_seq
+ START WITH 1
+ INCREMENT BY 1
+ NO MINVALUE
+ NO MAXVALUE
+ CACHE 1;
+
+
+
+CREATE TABLE state_groups (
+ id bigint NOT NULL,
+ room_id text NOT NULL,
+ event_id text NOT NULL
+);
+
+
+
+CREATE TABLE state_groups_state (
+ state_group bigint NOT NULL,
+ room_id text NOT NULL,
+ type text NOT NULL,
+ state_key text NOT NULL,
+ event_id text NOT NULL
+);
+
+
+
+CREATE TABLE stats_stream_pos (
+ lock character(1) DEFAULT 'X'::bpchar NOT NULL,
+ stream_id bigint,
+ CONSTRAINT stats_stream_pos_lock_check CHECK ((lock = 'X'::bpchar))
+);
+
+
+
+CREATE TABLE stream_ordering_to_exterm (
+ stream_ordering bigint NOT NULL,
+ room_id text NOT NULL,
+ event_id text NOT NULL
+);
+
+
+
+CREATE TABLE threepid_guest_access_tokens (
+ medium text,
+ address text,
+ guest_access_token text,
+ first_inviter text
+);
+
+
+
+CREATE TABLE topics (
+ event_id text NOT NULL,
+ room_id text NOT NULL,
+ topic text NOT NULL
+);
+
+
+
+CREATE TABLE user_daily_visits (
+ user_id text NOT NULL,
+ device_id text,
+ "timestamp" bigint NOT NULL
+);
+
+
+
+CREATE TABLE user_directory (
+ user_id text NOT NULL,
+ room_id text,
+ display_name text,
+ avatar_url text
+);
+
+
+
+CREATE TABLE user_directory_search (
+ user_id text NOT NULL,
+ vector tsvector
+);
+
+
+
+CREATE TABLE user_directory_stream_pos (
+ lock character(1) DEFAULT 'X'::bpchar NOT NULL,
+ stream_id bigint,
+ CONSTRAINT user_directory_stream_pos_lock_check CHECK ((lock = 'X'::bpchar))
+);
+
+
+
+CREATE TABLE user_filters (
+ user_id text,
+ filter_id bigint,
+ filter_json bytea
+);
+
+
+
+CREATE TABLE user_ips (
+ user_id text NOT NULL,
+ access_token text NOT NULL,
+ device_id text,
+ ip text NOT NULL,
+ user_agent text NOT NULL,
+ last_seen bigint NOT NULL
+);
+
+
+
+CREATE TABLE user_stats (
+ user_id text NOT NULL,
+ ts bigint NOT NULL,
+ bucket_size integer NOT NULL,
+ public_rooms integer NOT NULL,
+ private_rooms integer NOT NULL
+);
+
+
+
+CREATE TABLE user_threepid_id_server (
+ user_id text NOT NULL,
+ medium text NOT NULL,
+ address text NOT NULL,
+ id_server text NOT NULL
+);
+
+
+
+CREATE TABLE user_threepids (
+ user_id text NOT NULL,
+ medium text NOT NULL,
+ address text NOT NULL,
+ validated_at bigint NOT NULL,
+ added_at bigint NOT NULL
+);
+
+
+
+CREATE TABLE users (
+ name text,
+ password_hash text,
+ creation_ts bigint,
+ admin smallint DEFAULT 0 NOT NULL,
+ upgrade_ts bigint,
+ is_guest smallint DEFAULT 0 NOT NULL,
+ appservice_id text,
+ consent_version text,
+ consent_server_notice_sent text,
+ user_type text
+);
+
+
+
+CREATE TABLE users_in_public_rooms (
+ user_id text NOT NULL,
+ room_id text NOT NULL
+);
+
+
+
+CREATE TABLE users_pending_deactivation (
+ user_id text NOT NULL
+);
+
+
+
+CREATE TABLE users_who_share_private_rooms (
+ user_id text NOT NULL,
+ other_user_id text NOT NULL,
+ room_id text NOT NULL
+);
+
+
+
+ALTER TABLE ONLY access_tokens
+ ADD CONSTRAINT access_tokens_pkey PRIMARY KEY (id);
+
+
+
+ALTER TABLE ONLY access_tokens
+ ADD CONSTRAINT access_tokens_token_key UNIQUE (token);
+
+
+
+ALTER TABLE ONLY account_data
+ ADD CONSTRAINT account_data_uniqueness UNIQUE (user_id, account_data_type);
+
+
+
+ALTER TABLE ONLY account_validity
+ ADD CONSTRAINT account_validity_pkey PRIMARY KEY (user_id);
+
+
+
+ALTER TABLE ONLY application_services_state
+ ADD CONSTRAINT application_services_state_pkey PRIMARY KEY (as_id);
+
+
+
+ALTER TABLE ONLY application_services_txns
+ ADD CONSTRAINT application_services_txns_as_id_txn_id_key UNIQUE (as_id, txn_id);
+
+
+
+ALTER TABLE ONLY appservice_stream_position
+ ADD CONSTRAINT appservice_stream_position_lock_key UNIQUE (lock);
+
+
+
+ALTER TABLE ONLY background_updates
+ ADD CONSTRAINT background_updates_uniqueness UNIQUE (update_name);
+
+
+
+ALTER TABLE ONLY current_state_events
+ ADD CONSTRAINT current_state_events_event_id_key UNIQUE (event_id);
+
+
+
+ALTER TABLE ONLY current_state_events
+ ADD CONSTRAINT current_state_events_room_id_type_state_key_key UNIQUE (room_id, type, state_key);
+
+
+
+ALTER TABLE ONLY destinations
+ ADD CONSTRAINT destinations_pkey PRIMARY KEY (destination);
+
+
+
+ALTER TABLE ONLY devices
+ ADD CONSTRAINT device_uniqueness UNIQUE (user_id, device_id);
+
+
+
+ALTER TABLE ONLY e2e_device_keys_json
+ ADD CONSTRAINT e2e_device_keys_json_uniqueness UNIQUE (user_id, device_id);
+
+
+
+ALTER TABLE ONLY e2e_one_time_keys_json
+ ADD CONSTRAINT e2e_one_time_keys_json_uniqueness UNIQUE (user_id, device_id, algorithm, key_id);
+
+
+
+ALTER TABLE ONLY event_backward_extremities
+ ADD CONSTRAINT event_backward_extremities_event_id_room_id_key UNIQUE (event_id, room_id);
+
+
+
+ALTER TABLE ONLY event_edges
+ ADD CONSTRAINT event_edges_event_id_prev_event_id_room_id_is_state_key UNIQUE (event_id, prev_event_id, room_id, is_state);
+
+
+
+ALTER TABLE ONLY event_forward_extremities
+ ADD CONSTRAINT event_forward_extremities_event_id_room_id_key UNIQUE (event_id, room_id);
+
+
+
+ALTER TABLE ONLY event_push_actions
+ ADD CONSTRAINT event_id_user_id_profile_tag_uniqueness UNIQUE (room_id, event_id, user_id, profile_tag);
+
+
+
+ALTER TABLE ONLY event_json
+ ADD CONSTRAINT event_json_event_id_key UNIQUE (event_id);
+
+
+
+ALTER TABLE ONLY event_push_summary_stream_ordering
+ ADD CONSTRAINT event_push_summary_stream_ordering_lock_key UNIQUE (lock);
+
+
+
+ALTER TABLE ONLY event_reference_hashes
+ ADD CONSTRAINT event_reference_hashes_event_id_algorithm_key UNIQUE (event_id, algorithm);
+
+
+
+ALTER TABLE ONLY event_reports
+ ADD CONSTRAINT event_reports_pkey PRIMARY KEY (id);
+
+
+
+ALTER TABLE ONLY event_to_state_groups
+ ADD CONSTRAINT event_to_state_groups_event_id_key UNIQUE (event_id);
+
+
+
+ALTER TABLE ONLY events
+ ADD CONSTRAINT events_event_id_key UNIQUE (event_id);
+
+
+
+ALTER TABLE ONLY events
+ ADD CONSTRAINT events_pkey PRIMARY KEY (stream_ordering);
+
+
+
+ALTER TABLE ONLY ex_outlier_stream
+ ADD CONSTRAINT ex_outlier_stream_pkey PRIMARY KEY (event_stream_ordering);
+
+
+
+ALTER TABLE ONLY group_roles
+ ADD CONSTRAINT group_roles_group_id_role_id_key UNIQUE (group_id, role_id);
+
+
+
+ALTER TABLE ONLY group_room_categories
+ ADD CONSTRAINT group_room_categories_group_id_category_id_key UNIQUE (group_id, category_id);
+
+
+
+ALTER TABLE ONLY group_summary_roles
+ ADD CONSTRAINT group_summary_roles_group_id_role_id_role_order_key UNIQUE (group_id, role_id, role_order);
+
+
+
+ALTER TABLE ONLY group_summary_room_categories
+ ADD CONSTRAINT group_summary_room_categories_group_id_category_id_cat_orde_key UNIQUE (group_id, category_id, cat_order);
+
+
+
+ALTER TABLE ONLY group_summary_rooms
+ ADD CONSTRAINT group_summary_rooms_group_id_category_id_room_id_room_order_key UNIQUE (group_id, category_id, room_id, room_order);
+
+
+
+ALTER TABLE ONLY guest_access
+ ADD CONSTRAINT guest_access_event_id_key UNIQUE (event_id);
+
+
+
+ALTER TABLE ONLY history_visibility
+ ADD CONSTRAINT history_visibility_event_id_key UNIQUE (event_id);
+
+
+
+ALTER TABLE ONLY local_media_repository
+ ADD CONSTRAINT local_media_repository_media_id_key UNIQUE (media_id);
+
+
+
+ALTER TABLE ONLY local_media_repository_thumbnails
+ ADD CONSTRAINT local_media_repository_thumbn_media_id_thumbnail_width_thum_key UNIQUE (media_id, thumbnail_width, thumbnail_height, thumbnail_type);
+
+
+
+ALTER TABLE ONLY user_threepids
+ ADD CONSTRAINT medium_address UNIQUE (medium, address);
+
+
+
+ALTER TABLE ONLY open_id_tokens
+ ADD CONSTRAINT open_id_tokens_pkey PRIMARY KEY (token);
+
+
+
+ALTER TABLE ONLY presence_allow_inbound
+ ADD CONSTRAINT presence_allow_inbound_observed_user_id_observer_user_id_key UNIQUE (observed_user_id, observer_user_id);
+
+
+
+ALTER TABLE ONLY presence
+ ADD CONSTRAINT presence_user_id_key UNIQUE (user_id);
+
+
+
+ALTER TABLE ONLY account_data_max_stream_id
+ ADD CONSTRAINT private_user_data_max_stream_id_lock_key UNIQUE (lock);
+
+
+
+ALTER TABLE ONLY profiles
+ ADD CONSTRAINT profiles_user_id_key UNIQUE (user_id);
+
+
+
+ALTER TABLE ONLY push_rules_enable
+ ADD CONSTRAINT push_rules_enable_pkey PRIMARY KEY (id);
+
+
+
+ALTER TABLE ONLY push_rules_enable
+ ADD CONSTRAINT push_rules_enable_user_name_rule_id_key UNIQUE (user_name, rule_id);
+
+
+
+ALTER TABLE ONLY push_rules
+ ADD CONSTRAINT push_rules_pkey PRIMARY KEY (id);
+
+
+
+ALTER TABLE ONLY push_rules
+ ADD CONSTRAINT push_rules_user_name_rule_id_key UNIQUE (user_name, rule_id);
+
+
+
+ALTER TABLE ONLY pusher_throttle
+ ADD CONSTRAINT pusher_throttle_pkey PRIMARY KEY (pusher, room_id);
+
+
+
+ALTER TABLE ONLY pushers
+ ADD CONSTRAINT pushers2_app_id_pushkey_user_name_key UNIQUE (app_id, pushkey, user_name);
+
+
+
+ALTER TABLE ONLY pushers
+ ADD CONSTRAINT pushers2_pkey PRIMARY KEY (id);
+
+
+
+ALTER TABLE ONLY receipts_graph
+ ADD CONSTRAINT receipts_graph_uniqueness UNIQUE (room_id, receipt_type, user_id);
+
+
+
+ALTER TABLE ONLY receipts_linearized
+ ADD CONSTRAINT receipts_linearized_uniqueness UNIQUE (room_id, receipt_type, user_id);
+
+
+
+ALTER TABLE ONLY received_transactions
+ ADD CONSTRAINT received_transactions_transaction_id_origin_key UNIQUE (transaction_id, origin);
+
+
+
+ALTER TABLE ONLY redactions
+ ADD CONSTRAINT redactions_event_id_key UNIQUE (event_id);
+
+
+
+ALTER TABLE ONLY rejections
+ ADD CONSTRAINT rejections_event_id_key UNIQUE (event_id);
+
+
+
+ALTER TABLE ONLY remote_media_cache
+ ADD CONSTRAINT remote_media_cache_media_origin_media_id_key UNIQUE (media_origin, media_id);
+
+
+
+ALTER TABLE ONLY remote_media_cache_thumbnails
+ ADD CONSTRAINT remote_media_cache_thumbnails_media_origin_media_id_thumbna_key UNIQUE (media_origin, media_id, thumbnail_width, thumbnail_height, thumbnail_type);
+
+
+
+ALTER TABLE ONLY room_account_data
+ ADD CONSTRAINT room_account_data_uniqueness UNIQUE (user_id, room_id, account_data_type);
+
+
+
+ALTER TABLE ONLY room_aliases
+ ADD CONSTRAINT room_aliases_room_alias_key UNIQUE (room_alias);
+
+
+
+ALTER TABLE ONLY room_depth
+ ADD CONSTRAINT room_depth_room_id_key UNIQUE (room_id);
+
+
+
+ALTER TABLE ONLY room_memberships
+ ADD CONSTRAINT room_memberships_event_id_key UNIQUE (event_id);
+
+
+
+ALTER TABLE ONLY room_names
+ ADD CONSTRAINT room_names_event_id_key UNIQUE (event_id);
+
+
+
+ALTER TABLE ONLY room_tags_revisions
+ ADD CONSTRAINT room_tag_revisions_uniqueness UNIQUE (user_id, room_id);
+
+
+
+ALTER TABLE ONLY room_tags
+ ADD CONSTRAINT room_tag_uniqueness UNIQUE (user_id, room_id, tag);
+
+
+
+ALTER TABLE ONLY rooms
+ ADD CONSTRAINT rooms_pkey PRIMARY KEY (room_id);
+
+
+
+ALTER TABLE ONLY server_keys_json
+ ADD CONSTRAINT server_keys_json_uniqueness UNIQUE (server_name, key_id, from_server);
+
+
+
+ALTER TABLE ONLY server_signature_keys
+ ADD CONSTRAINT server_signature_keys_server_name_key_id_key UNIQUE (server_name, key_id);
+
+
+
+ALTER TABLE ONLY state_events
+ ADD CONSTRAINT state_events_event_id_key UNIQUE (event_id);
+
+
+
+ALTER TABLE ONLY state_groups
+ ADD CONSTRAINT state_groups_pkey PRIMARY KEY (id);
+
+
+
+ALTER TABLE ONLY stats_stream_pos
+ ADD CONSTRAINT stats_stream_pos_lock_key UNIQUE (lock);
+
+
+
+ALTER TABLE ONLY topics
+ ADD CONSTRAINT topics_event_id_key UNIQUE (event_id);
+
+
+
+ALTER TABLE ONLY user_directory_stream_pos
+ ADD CONSTRAINT user_directory_stream_pos_lock_key UNIQUE (lock);
+
+
+
+ALTER TABLE ONLY users
+ ADD CONSTRAINT users_name_key UNIQUE (name);
+
+
+
+CREATE INDEX access_tokens_device_id ON access_tokens USING btree (user_id, device_id);
+
+
+
+CREATE INDEX account_data_stream_id ON account_data USING btree (user_id, stream_id);
+
+
+
+CREATE INDEX application_services_txns_id ON application_services_txns USING btree (as_id);
+
+
+
+CREATE UNIQUE INDEX appservice_room_list_idx ON appservice_room_list USING btree (appservice_id, network_id, room_id);
+
+
+
+CREATE UNIQUE INDEX blocked_rooms_idx ON blocked_rooms USING btree (room_id);
+
+
+
+CREATE INDEX cache_invalidation_stream_id ON cache_invalidation_stream USING btree (stream_id);
+
+
+
+CREATE INDEX current_state_delta_stream_idx ON current_state_delta_stream USING btree (stream_id);
+
+
+
+CREATE INDEX current_state_events_member_index ON current_state_events USING btree (state_key) WHERE (type = 'm.room.member'::text);
+
+
+
+CREATE INDEX deleted_pushers_stream_id ON deleted_pushers USING btree (stream_id);
+
+
+
+CREATE INDEX device_federation_inbox_sender_id ON device_federation_inbox USING btree (origin, message_id);
+
+
+
+CREATE INDEX device_federation_outbox_destination_id ON device_federation_outbox USING btree (destination, stream_id);
+
+
+
+CREATE INDEX device_federation_outbox_id ON device_federation_outbox USING btree (stream_id);
+
+
+
+CREATE INDEX device_inbox_stream_id_user_id ON device_inbox USING btree (stream_id, user_id);
+
+
+
+CREATE INDEX device_inbox_user_stream_id ON device_inbox USING btree (user_id, device_id, stream_id);
+
+
+
+CREATE INDEX device_lists_outbound_last_success_idx ON device_lists_outbound_last_success USING btree (destination, user_id, stream_id);
+
+
+
+CREATE INDEX device_lists_outbound_pokes_id ON device_lists_outbound_pokes USING btree (destination, stream_id);
+
+
+
+CREATE INDEX device_lists_outbound_pokes_stream ON device_lists_outbound_pokes USING btree (stream_id);
+
+
+
+CREATE INDEX device_lists_outbound_pokes_user ON device_lists_outbound_pokes USING btree (destination, user_id);
+
+
+
+CREATE UNIQUE INDEX device_lists_remote_cache_unique_id ON device_lists_remote_cache USING btree (user_id, device_id);
+
+
+
+CREATE UNIQUE INDEX device_lists_remote_extremeties_unique_idx ON device_lists_remote_extremeties USING btree (user_id);
+
+
+
+CREATE INDEX device_lists_stream_id ON device_lists_stream USING btree (stream_id, user_id);
+
+
+
+CREATE INDEX device_lists_stream_user_id ON device_lists_stream USING btree (user_id, device_id);
+
+
+
+CREATE UNIQUE INDEX e2e_room_keys_idx ON e2e_room_keys USING btree (user_id, room_id, session_id);
+
+
+
+CREATE UNIQUE INDEX e2e_room_keys_versions_idx ON e2e_room_keys_versions USING btree (user_id, version);
+
+
+
+CREATE UNIQUE INDEX erased_users_user ON erased_users USING btree (user_id);
+
+
+
+CREATE INDEX ev_b_extrem_id ON event_backward_extremities USING btree (event_id);
+
+
+
+CREATE INDEX ev_b_extrem_room ON event_backward_extremities USING btree (room_id);
+
+
+
+CREATE INDEX ev_edges_id ON event_edges USING btree (event_id);
+
+
+
+CREATE INDEX ev_edges_prev_id ON event_edges USING btree (prev_event_id);
+
+
+
+CREATE INDEX ev_extrem_id ON event_forward_extremities USING btree (event_id);
+
+
+
+CREATE INDEX ev_extrem_room ON event_forward_extremities USING btree (room_id);
+
+
+
+CREATE INDEX evauth_edges_id ON event_auth USING btree (event_id);
+
+
+
+CREATE INDEX event_contains_url_index ON events USING btree (room_id, topological_ordering, stream_ordering) WHERE ((contains_url = true) AND (outlier = false));
+
+
+
+CREATE INDEX event_json_room_id ON event_json USING btree (room_id);
+
+
+
+CREATE INDEX event_push_actions_highlights_index ON event_push_actions USING btree (user_id, room_id, topological_ordering, stream_ordering) WHERE (highlight = 1);
+
+
+
+CREATE INDEX event_push_actions_rm_tokens ON event_push_actions USING btree (user_id, room_id, topological_ordering, stream_ordering);
+
+
+
+CREATE INDEX event_push_actions_room_id_user_id ON event_push_actions USING btree (room_id, user_id);
+
+
+
+CREATE INDEX event_push_actions_staging_id ON event_push_actions_staging USING btree (event_id);
+
+
+
+CREATE INDEX event_push_actions_stream_ordering ON event_push_actions USING btree (stream_ordering, user_id);
+
+
+
+CREATE INDEX event_push_actions_u_highlight ON event_push_actions USING btree (user_id, stream_ordering);
+
+
+
+CREATE INDEX event_push_summary_user_rm ON event_push_summary USING btree (user_id, room_id);
+
+
+
+CREATE INDEX event_reference_hashes_id ON event_reference_hashes USING btree (event_id);
+
+
+
+CREATE UNIQUE INDEX event_relations_id ON event_relations USING btree (event_id);
+
+
+
+CREATE INDEX event_relations_relates ON event_relations USING btree (relates_to_id, relation_type, aggregation_key);
+
+
+
+CREATE INDEX event_search_ev_ridx ON event_search USING btree (room_id);
+
+
+
+CREATE UNIQUE INDEX event_search_event_id_idx ON event_search USING btree (event_id);
+
+
+
+CREATE INDEX event_search_fts_idx ON event_search USING gin (vector);
+
+
+
+CREATE INDEX event_to_state_groups_sg_index ON event_to_state_groups USING btree (state_group);
+
+
+
+CREATE INDEX events_order_room ON events USING btree (room_id, topological_ordering, stream_ordering);
+
+
+
+CREATE INDEX events_room_stream ON events USING btree (room_id, stream_ordering);
+
+
+
+CREATE INDEX events_ts ON events USING btree (origin_server_ts, stream_ordering);
+
+
+
+CREATE INDEX group_attestations_remote_g_idx ON group_attestations_remote USING btree (group_id, user_id);
+
+
+
+CREATE INDEX group_attestations_remote_u_idx ON group_attestations_remote USING btree (user_id);
+
+
+
+CREATE INDEX group_attestations_remote_v_idx ON group_attestations_remote USING btree (valid_until_ms);
+
+
+
+CREATE INDEX group_attestations_renewals_g_idx ON group_attestations_renewals USING btree (group_id, user_id);
+
+
+
+CREATE INDEX group_attestations_renewals_u_idx ON group_attestations_renewals USING btree (user_id);
+
+
+
+CREATE INDEX group_attestations_renewals_v_idx ON group_attestations_renewals USING btree (valid_until_ms);
+
+
+
+CREATE UNIQUE INDEX group_invites_g_idx ON group_invites USING btree (group_id, user_id);
+
+
+
+CREATE INDEX group_invites_u_idx ON group_invites USING btree (user_id);
+
+
+
+CREATE UNIQUE INDEX group_rooms_g_idx ON group_rooms USING btree (group_id, room_id);
+
+
+
+CREATE INDEX group_rooms_r_idx ON group_rooms USING btree (room_id);
+
+
+
+CREATE UNIQUE INDEX group_summary_rooms_g_idx ON group_summary_rooms USING btree (group_id, room_id, category_id);
+
+
+
+CREATE INDEX group_summary_users_g_idx ON group_summary_users USING btree (group_id);
+
+
+
+CREATE UNIQUE INDEX group_users_g_idx ON group_users USING btree (group_id, user_id);
+
+
+
+CREATE INDEX group_users_u_idx ON group_users USING btree (user_id);
+
+
+
+CREATE UNIQUE INDEX groups_idx ON groups USING btree (group_id);
+
+
+
+CREATE INDEX local_group_membership_g_idx ON local_group_membership USING btree (group_id);
+
+
+
+CREATE INDEX local_group_membership_u_idx ON local_group_membership USING btree (user_id, group_id);
+
+
+
+CREATE INDEX local_invites_for_user_idx ON local_invites USING btree (invitee, locally_rejected, replaced_by, room_id);
+
+
+
+CREATE INDEX local_invites_id ON local_invites USING btree (stream_id);
+
+
+
+CREATE INDEX local_media_repository_thumbnails_media_id ON local_media_repository_thumbnails USING btree (media_id);
+
+
+
+CREATE INDEX local_media_repository_url_cache_by_url_download_ts ON local_media_repository_url_cache USING btree (url, download_ts);
+
+
+
+CREATE INDEX local_media_repository_url_cache_expires_idx ON local_media_repository_url_cache USING btree (expires_ts);
+
+
+
+CREATE INDEX local_media_repository_url_cache_media_idx ON local_media_repository_url_cache USING btree (media_id);
+
+
+
+CREATE INDEX local_media_repository_url_idx ON local_media_repository USING btree (created_ts) WHERE (url_cache IS NOT NULL);
+
+
+
+CREATE INDEX monthly_active_users_time_stamp ON monthly_active_users USING btree ("timestamp");
+
+
+
+CREATE UNIQUE INDEX monthly_active_users_users ON monthly_active_users USING btree (user_id);
+
+
+
+CREATE INDEX open_id_tokens_ts_valid_until_ms ON open_id_tokens USING btree (ts_valid_until_ms);
+
+
+
+CREATE INDEX presence_stream_id ON presence_stream USING btree (stream_id, user_id);
+
+
+
+CREATE INDEX presence_stream_user_id ON presence_stream USING btree (user_id);
+
+
+
+CREATE INDEX public_room_index ON rooms USING btree (is_public);
+
+
+
+CREATE INDEX public_room_list_stream_idx ON public_room_list_stream USING btree (stream_id);
+
+
+
+CREATE INDEX public_room_list_stream_rm_idx ON public_room_list_stream USING btree (room_id, stream_id);
+
+
+
+CREATE INDEX push_rules_enable_user_name ON push_rules_enable USING btree (user_name);
+
+
+
+CREATE INDEX push_rules_stream_id ON push_rules_stream USING btree (stream_id);
+
+
+
+CREATE INDEX push_rules_stream_user_stream_id ON push_rules_stream USING btree (user_id, stream_id);
+
+
+
+CREATE INDEX push_rules_user_name ON push_rules USING btree (user_name);
+
+
+
+CREATE UNIQUE INDEX ratelimit_override_idx ON ratelimit_override USING btree (user_id);
+
+
+
+CREATE INDEX receipts_linearized_id ON receipts_linearized USING btree (stream_id);
+
+
+
+CREATE INDEX receipts_linearized_room_stream ON receipts_linearized USING btree (room_id, stream_id);
+
+
+
+CREATE INDEX receipts_linearized_user ON receipts_linearized USING btree (user_id);
+
+
+
+CREATE INDEX received_transactions_ts ON received_transactions USING btree (ts);
+
+
+
+CREATE INDEX redactions_redacts ON redactions USING btree (redacts);
+
+
+
+CREATE INDEX remote_profile_cache_time ON remote_profile_cache USING btree (last_check);
+
+
+
+CREATE UNIQUE INDEX remote_profile_cache_user_id ON remote_profile_cache USING btree (user_id);
+
+
+
+CREATE INDEX room_account_data_stream_id ON room_account_data USING btree (user_id, stream_id);
+
+
+
+CREATE INDEX room_alias_servers_alias ON room_alias_servers USING btree (room_alias);
+
+
+
+CREATE INDEX room_aliases_id ON room_aliases USING btree (room_id);
+
+
+
+CREATE INDEX room_depth_room ON room_depth USING btree (room_id);
+
+
+
+CREATE INDEX room_memberships_room_id ON room_memberships USING btree (room_id);
+
+
+
+CREATE INDEX room_memberships_user_id ON room_memberships USING btree (user_id);
+
+
+
+CREATE INDEX room_names_room_id ON room_names USING btree (room_id);
+
+
+
+CREATE UNIQUE INDEX room_state_room ON room_state USING btree (room_id);
+
+
+
+CREATE UNIQUE INDEX room_stats_earliest_token_idx ON room_stats_earliest_token USING btree (room_id);
+
+
+
+CREATE UNIQUE INDEX room_stats_room_ts ON room_stats USING btree (room_id, ts);
+
+
+
+CREATE INDEX state_group_edges_idx ON state_group_edges USING btree (state_group);
+
+
+
+CREATE INDEX state_group_edges_prev_idx ON state_group_edges USING btree (prev_state_group);
+
+
+
+CREATE INDEX state_groups_state_type_idx ON state_groups_state USING btree (state_group, type, state_key);
+
+
+
+CREATE INDEX stream_ordering_to_exterm_idx ON stream_ordering_to_exterm USING btree (stream_ordering);
+
+
+
+CREATE INDEX stream_ordering_to_exterm_rm_idx ON stream_ordering_to_exterm USING btree (room_id, stream_ordering);
+
+
+
+CREATE UNIQUE INDEX threepid_guest_access_tokens_index ON threepid_guest_access_tokens USING btree (medium, address);
+
+
+
+CREATE INDEX topics_room_id ON topics USING btree (room_id);
+
+
+
+CREATE INDEX user_daily_visits_ts_idx ON user_daily_visits USING btree ("timestamp");
+
+
+
+CREATE INDEX user_daily_visits_uts_idx ON user_daily_visits USING btree (user_id, "timestamp");
+
+
+
+CREATE INDEX user_directory_room_idx ON user_directory USING btree (room_id);
+
+
+
+CREATE INDEX user_directory_search_fts_idx ON user_directory_search USING gin (vector);
+
+
+
+CREATE UNIQUE INDEX user_directory_search_user_idx ON user_directory_search USING btree (user_id);
+
+
+
+CREATE UNIQUE INDEX user_directory_user_idx ON user_directory USING btree (user_id);
+
+
+
+CREATE INDEX user_filters_by_user_id_filter_id ON user_filters USING btree (user_id, filter_id);
+
+
+
+CREATE INDEX user_ips_device_id ON user_ips USING btree (user_id, device_id, last_seen);
+
+
+
+CREATE INDEX user_ips_last_seen ON user_ips USING btree (user_id, last_seen);
+
+
+
+CREATE INDEX user_ips_last_seen_only ON user_ips USING btree (last_seen);
+
+
+
+CREATE UNIQUE INDEX user_ips_user_token_ip_unique_index ON user_ips USING btree (user_id, access_token, ip);
+
+
+
+CREATE UNIQUE INDEX user_stats_user_ts ON user_stats USING btree (user_id, ts);
+
+
+
+CREATE UNIQUE INDEX user_threepid_id_server_idx ON user_threepid_id_server USING btree (user_id, medium, address, id_server);
+
+
+
+CREATE INDEX user_threepids_medium_address ON user_threepids USING btree (medium, address);
+
+
+
+CREATE INDEX user_threepids_user_id ON user_threepids USING btree (user_id);
+
+
+
+CREATE INDEX users_creation_ts ON users USING btree (creation_ts);
+
+
+
+CREATE UNIQUE INDEX users_in_public_rooms_u_idx ON users_in_public_rooms USING btree (user_id, room_id);
+
+
+
+CREATE INDEX users_who_share_private_rooms_o_idx ON users_who_share_private_rooms USING btree (other_user_id);
+
+
+
+CREATE INDEX users_who_share_private_rooms_r_idx ON users_who_share_private_rooms USING btree (room_id);
+
+
+
+CREATE UNIQUE INDEX users_who_share_private_rooms_u_idx ON users_who_share_private_rooms USING btree (user_id, other_user_id, room_id);
+
+
+
diff --git a/synapse/storage/schema/full_schemas/54/full.sql.sqlite b/synapse/storage/schema/full_schemas/54/full.sql.sqlite
new file mode 100644
index 0000000000..be9295e4c9
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/54/full.sql.sqlite
@@ -0,0 +1,260 @@
+CREATE TABLE application_services_state( as_id TEXT PRIMARY KEY, state VARCHAR(5), last_txn INTEGER );
+CREATE TABLE application_services_txns( as_id TEXT NOT NULL, txn_id INTEGER NOT NULL, event_ids TEXT NOT NULL, UNIQUE(as_id, txn_id) );
+CREATE INDEX application_services_txns_id ON application_services_txns ( as_id );
+CREATE TABLE presence( user_id TEXT NOT NULL, state VARCHAR(20), status_msg TEXT, mtime BIGINT, UNIQUE (user_id) );
+CREATE TABLE presence_allow_inbound( observed_user_id TEXT NOT NULL, observer_user_id TEXT NOT NULL, UNIQUE (observed_user_id, observer_user_id) );
+CREATE TABLE users( name TEXT, password_hash TEXT, creation_ts BIGINT, admin SMALLINT DEFAULT 0 NOT NULL, upgrade_ts BIGINT, is_guest SMALLINT DEFAULT 0 NOT NULL, appservice_id TEXT, consent_version TEXT, consent_server_notice_sent TEXT, user_type TEXT DEFAULT NULL, UNIQUE(name) );
+CREATE TABLE access_tokens( id BIGINT PRIMARY KEY, user_id TEXT NOT NULL, device_id TEXT, token TEXT NOT NULL, last_used BIGINT, UNIQUE(token) );
+CREATE TABLE user_ips ( user_id TEXT NOT NULL, access_token TEXT NOT NULL, device_id TEXT, ip TEXT NOT NULL, user_agent TEXT NOT NULL, last_seen BIGINT NOT NULL );
+CREATE TABLE profiles( user_id TEXT NOT NULL, displayname TEXT, avatar_url TEXT, UNIQUE(user_id) );
+CREATE TABLE received_transactions( transaction_id TEXT, origin TEXT, ts BIGINT, response_code INTEGER, response_json bytea, has_been_referenced smallint default 0, UNIQUE (transaction_id, origin) );
+CREATE TABLE destinations( destination TEXT PRIMARY KEY, retry_last_ts BIGINT, retry_interval INTEGER );
+CREATE TABLE events( stream_ordering INTEGER PRIMARY KEY, topological_ordering BIGINT NOT NULL, event_id TEXT NOT NULL, type TEXT NOT NULL, room_id TEXT NOT NULL, content TEXT, unrecognized_keys TEXT, processed BOOL NOT NULL, outlier BOOL NOT NULL, depth BIGINT DEFAULT 0 NOT NULL, origin_server_ts BIGINT, received_ts BIGINT, sender TEXT, contains_url BOOLEAN, UNIQUE (event_id) );
+CREATE INDEX events_order_room ON events ( room_id, topological_ordering, stream_ordering );
+CREATE TABLE event_json( event_id TEXT NOT NULL, room_id TEXT NOT NULL, internal_metadata TEXT NOT NULL, json TEXT NOT NULL, format_version INTEGER, UNIQUE (event_id) );
+CREATE INDEX event_json_room_id ON event_json(room_id);
+CREATE TABLE state_events( event_id TEXT NOT NULL, room_id TEXT NOT NULL, type TEXT NOT NULL, state_key TEXT NOT NULL, prev_state TEXT, UNIQUE (event_id) );
+CREATE TABLE current_state_events( event_id TEXT NOT NULL, room_id TEXT NOT NULL, type TEXT NOT NULL, state_key TEXT NOT NULL, UNIQUE (event_id), UNIQUE (room_id, type, state_key) );
+CREATE TABLE room_memberships( event_id TEXT NOT NULL, user_id TEXT NOT NULL, sender TEXT NOT NULL, room_id TEXT NOT NULL, membership TEXT NOT NULL, forgotten INTEGER DEFAULT 0, display_name TEXT, avatar_url TEXT, UNIQUE (event_id) );
+CREATE INDEX room_memberships_room_id ON room_memberships (room_id);
+CREATE INDEX room_memberships_user_id ON room_memberships (user_id);
+CREATE TABLE topics( event_id TEXT NOT NULL, room_id TEXT NOT NULL, topic TEXT NOT NULL, UNIQUE (event_id) );
+CREATE INDEX topics_room_id ON topics(room_id);
+CREATE TABLE room_names( event_id TEXT NOT NULL, room_id TEXT NOT NULL, name TEXT NOT NULL, UNIQUE (event_id) );
+CREATE INDEX room_names_room_id ON room_names(room_id);
+CREATE TABLE rooms( room_id TEXT PRIMARY KEY NOT NULL, is_public BOOL, creator TEXT );
+CREATE TABLE server_signature_keys( server_name TEXT, key_id TEXT, from_server TEXT, ts_added_ms BIGINT, verify_key bytea, ts_valid_until_ms BIGINT, UNIQUE (server_name, key_id) );
+CREATE TABLE rejections( event_id TEXT NOT NULL, reason TEXT NOT NULL, last_check TEXT NOT NULL, UNIQUE (event_id) );
+CREATE TABLE push_rules ( id BIGINT PRIMARY KEY, user_name TEXT NOT NULL, rule_id TEXT NOT NULL, priority_class SMALLINT NOT NULL, priority INTEGER NOT NULL DEFAULT 0, conditions TEXT NOT NULL, actions TEXT NOT NULL, UNIQUE(user_name, rule_id) );
+CREATE INDEX push_rules_user_name on push_rules (user_name);
+CREATE TABLE user_filters( user_id TEXT, filter_id BIGINT, filter_json bytea );
+CREATE INDEX user_filters_by_user_id_filter_id ON user_filters( user_id, filter_id );
+CREATE TABLE push_rules_enable ( id BIGINT PRIMARY KEY, user_name TEXT NOT NULL, rule_id TEXT NOT NULL, enabled SMALLINT, UNIQUE(user_name, rule_id) );
+CREATE INDEX push_rules_enable_user_name on push_rules_enable (user_name);
+CREATE TABLE event_forward_extremities( event_id TEXT NOT NULL, room_id TEXT NOT NULL, UNIQUE (event_id, room_id) );
+CREATE INDEX ev_extrem_room ON event_forward_extremities(room_id);
+CREATE INDEX ev_extrem_id ON event_forward_extremities(event_id);
+CREATE TABLE event_backward_extremities( event_id TEXT NOT NULL, room_id TEXT NOT NULL, UNIQUE (event_id, room_id) );
+CREATE INDEX ev_b_extrem_room ON event_backward_extremities(room_id);
+CREATE INDEX ev_b_extrem_id ON event_backward_extremities(event_id);
+CREATE TABLE event_edges( event_id TEXT NOT NULL, prev_event_id TEXT NOT NULL, room_id TEXT NOT NULL, is_state BOOL NOT NULL, UNIQUE (event_id, prev_event_id, room_id, is_state) );
+CREATE INDEX ev_edges_id ON event_edges(event_id);
+CREATE INDEX ev_edges_prev_id ON event_edges(prev_event_id);
+CREATE TABLE room_depth( room_id TEXT NOT NULL, min_depth INTEGER NOT NULL, UNIQUE (room_id) );
+CREATE INDEX room_depth_room ON room_depth(room_id);
+CREATE TABLE state_groups( id BIGINT PRIMARY KEY, room_id TEXT NOT NULL, event_id TEXT NOT NULL );
+CREATE TABLE state_groups_state( state_group BIGINT NOT NULL, room_id TEXT NOT NULL, type TEXT NOT NULL, state_key TEXT NOT NULL, event_id TEXT NOT NULL );
+CREATE TABLE event_to_state_groups( event_id TEXT NOT NULL, state_group BIGINT NOT NULL, UNIQUE (event_id) );
+CREATE TABLE local_media_repository ( media_id TEXT, media_type TEXT, media_length INTEGER, created_ts BIGINT, upload_name TEXT, user_id TEXT, quarantined_by TEXT, url_cache TEXT, last_access_ts BIGINT, UNIQUE (media_id) );
+CREATE TABLE local_media_repository_thumbnails ( media_id TEXT, thumbnail_width INTEGER, thumbnail_height INTEGER, thumbnail_type TEXT, thumbnail_method TEXT, thumbnail_length INTEGER, UNIQUE ( media_id, thumbnail_width, thumbnail_height, thumbnail_type ) );
+CREATE INDEX local_media_repository_thumbnails_media_id ON local_media_repository_thumbnails (media_id);
+CREATE TABLE remote_media_cache ( media_origin TEXT, media_id TEXT, media_type TEXT, created_ts BIGINT, upload_name TEXT, media_length INTEGER, filesystem_id TEXT, last_access_ts BIGINT, quarantined_by TEXT, UNIQUE (media_origin, media_id) );
+CREATE TABLE remote_media_cache_thumbnails ( media_origin TEXT, media_id TEXT, thumbnail_width INTEGER, thumbnail_height INTEGER, thumbnail_method TEXT, thumbnail_type TEXT, thumbnail_length INTEGER, filesystem_id TEXT, UNIQUE ( media_origin, media_id, thumbnail_width, thumbnail_height, thumbnail_type ) );
+CREATE TABLE redactions ( event_id TEXT NOT NULL, redacts TEXT NOT NULL, UNIQUE (event_id) );
+CREATE INDEX redactions_redacts ON redactions (redacts);
+CREATE TABLE room_aliases( room_alias TEXT NOT NULL, room_id TEXT NOT NULL, creator TEXT, UNIQUE (room_alias) );
+CREATE INDEX room_aliases_id ON room_aliases(room_id);
+CREATE TABLE room_alias_servers( room_alias TEXT NOT NULL, server TEXT NOT NULL );
+CREATE INDEX room_alias_servers_alias ON room_alias_servers(room_alias);
+CREATE TABLE event_reference_hashes ( event_id TEXT, algorithm TEXT, hash bytea, UNIQUE (event_id, algorithm) );
+CREATE INDEX event_reference_hashes_id ON event_reference_hashes(event_id);
+CREATE TABLE IF NOT EXISTS "server_keys_json" ( server_name TEXT NOT NULL, key_id TEXT NOT NULL, from_server TEXT NOT NULL, ts_added_ms BIGINT NOT NULL, ts_valid_until_ms BIGINT NOT NULL, key_json bytea NOT NULL, CONSTRAINT server_keys_json_uniqueness UNIQUE (server_name, key_id, from_server) );
+CREATE TABLE e2e_device_keys_json ( user_id TEXT NOT NULL, device_id TEXT NOT NULL, ts_added_ms BIGINT NOT NULL, key_json TEXT NOT NULL, CONSTRAINT e2e_device_keys_json_uniqueness UNIQUE (user_id, device_id) );
+CREATE TABLE e2e_one_time_keys_json ( user_id TEXT NOT NULL, device_id TEXT NOT NULL, algorithm TEXT NOT NULL, key_id TEXT NOT NULL, ts_added_ms BIGINT NOT NULL, key_json TEXT NOT NULL, CONSTRAINT e2e_one_time_keys_json_uniqueness UNIQUE (user_id, device_id, algorithm, key_id) );
+CREATE TABLE receipts_graph( room_id TEXT NOT NULL, receipt_type TEXT NOT NULL, user_id TEXT NOT NULL, event_ids TEXT NOT NULL, data TEXT NOT NULL, CONSTRAINT receipts_graph_uniqueness UNIQUE (room_id, receipt_type, user_id) );
+CREATE TABLE receipts_linearized ( stream_id BIGINT NOT NULL, room_id TEXT NOT NULL, receipt_type TEXT NOT NULL, user_id TEXT NOT NULL, event_id TEXT NOT NULL, data TEXT NOT NULL, CONSTRAINT receipts_linearized_uniqueness UNIQUE (room_id, receipt_type, user_id) );
+CREATE INDEX receipts_linearized_id ON receipts_linearized( stream_id );
+CREATE INDEX receipts_linearized_room_stream ON receipts_linearized( room_id, stream_id );
+CREATE TABLE IF NOT EXISTS "user_threepids" ( user_id TEXT NOT NULL, medium TEXT NOT NULL, address TEXT NOT NULL, validated_at BIGINT NOT NULL, added_at BIGINT NOT NULL, CONSTRAINT medium_address UNIQUE (medium, address) );
+CREATE INDEX user_threepids_user_id ON user_threepids(user_id);
+CREATE TABLE background_updates( update_name TEXT NOT NULL, progress_json TEXT NOT NULL, depends_on TEXT, CONSTRAINT background_updates_uniqueness UNIQUE (update_name) );
+CREATE VIRTUAL TABLE event_search USING fts4 ( event_id, room_id, sender, key, value )
+/* event_search(event_id,room_id,sender,"key",value) */;
+CREATE TABLE IF NOT EXISTS 'event_search_content'(docid INTEGER PRIMARY KEY, 'c0event_id', 'c1room_id', 'c2sender', 'c3key', 'c4value');
+CREATE TABLE IF NOT EXISTS 'event_search_segments'(blockid INTEGER PRIMARY KEY, block BLOB);
+CREATE TABLE IF NOT EXISTS 'event_search_segdir'(level INTEGER,idx INTEGER,start_block INTEGER,leaves_end_block INTEGER,end_block INTEGER,root BLOB,PRIMARY KEY(level, idx));
+CREATE TABLE IF NOT EXISTS 'event_search_docsize'(docid INTEGER PRIMARY KEY, size BLOB);
+CREATE TABLE IF NOT EXISTS 'event_search_stat'(id INTEGER PRIMARY KEY, value BLOB);
+CREATE TABLE guest_access( event_id TEXT NOT NULL, room_id TEXT NOT NULL, guest_access TEXT NOT NULL, UNIQUE (event_id) );
+CREATE TABLE history_visibility( event_id TEXT NOT NULL, room_id TEXT NOT NULL, history_visibility TEXT NOT NULL, UNIQUE (event_id) );
+CREATE TABLE room_tags( user_id TEXT NOT NULL, room_id TEXT NOT NULL, tag TEXT NOT NULL, content TEXT NOT NULL, CONSTRAINT room_tag_uniqueness UNIQUE (user_id, room_id, tag) );
+CREATE TABLE room_tags_revisions ( user_id TEXT NOT NULL, room_id TEXT NOT NULL, stream_id BIGINT NOT NULL, CONSTRAINT room_tag_revisions_uniqueness UNIQUE (user_id, room_id) );
+CREATE TABLE IF NOT EXISTS "account_data_max_stream_id"( Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, stream_id BIGINT NOT NULL, CHECK (Lock='X') );
+CREATE TABLE account_data( user_id TEXT NOT NULL, account_data_type TEXT NOT NULL, stream_id BIGINT NOT NULL, content TEXT NOT NULL, CONSTRAINT account_data_uniqueness UNIQUE (user_id, account_data_type) );
+CREATE TABLE room_account_data( user_id TEXT NOT NULL, room_id TEXT NOT NULL, account_data_type TEXT NOT NULL, stream_id BIGINT NOT NULL, content TEXT NOT NULL, CONSTRAINT room_account_data_uniqueness UNIQUE (user_id, room_id, account_data_type) );
+CREATE INDEX account_data_stream_id on account_data(user_id, stream_id);
+CREATE INDEX room_account_data_stream_id on room_account_data(user_id, stream_id);
+CREATE INDEX events_ts ON events(origin_server_ts, stream_ordering);
+CREATE TABLE event_push_actions( room_id TEXT NOT NULL, event_id TEXT NOT NULL, user_id TEXT NOT NULL, profile_tag VARCHAR(32), actions TEXT NOT NULL, topological_ordering BIGINT, stream_ordering BIGINT, notif SMALLINT, highlight SMALLINT, CONSTRAINT event_id_user_id_profile_tag_uniqueness UNIQUE (room_id, event_id, user_id, profile_tag) );
+CREATE INDEX event_push_actions_room_id_user_id on event_push_actions(room_id, user_id);
+CREATE INDEX events_room_stream on events(room_id, stream_ordering);
+CREATE INDEX public_room_index on rooms(is_public);
+CREATE INDEX receipts_linearized_user ON receipts_linearized( user_id );
+CREATE INDEX event_push_actions_rm_tokens on event_push_actions( user_id, room_id, topological_ordering, stream_ordering );
+CREATE TABLE presence_stream( stream_id BIGINT, user_id TEXT, state TEXT, last_active_ts BIGINT, last_federation_update_ts BIGINT, last_user_sync_ts BIGINT, status_msg TEXT, currently_active BOOLEAN );
+CREATE INDEX presence_stream_id ON presence_stream(stream_id, user_id);
+CREATE INDEX presence_stream_user_id ON presence_stream(user_id);
+CREATE TABLE push_rules_stream( stream_id BIGINT NOT NULL, event_stream_ordering BIGINT NOT NULL, user_id TEXT NOT NULL, rule_id TEXT NOT NULL, op TEXT NOT NULL, priority_class SMALLINT, priority INTEGER, conditions TEXT, actions TEXT );
+CREATE INDEX push_rules_stream_id ON push_rules_stream(stream_id);
+CREATE INDEX push_rules_stream_user_stream_id on push_rules_stream(user_id, stream_id);
+CREATE TABLE ex_outlier_stream( event_stream_ordering BIGINT PRIMARY KEY NOT NULL, event_id TEXT NOT NULL, state_group BIGINT NOT NULL );
+CREATE TABLE threepid_guest_access_tokens( medium TEXT, address TEXT, guest_access_token TEXT, first_inviter TEXT );
+CREATE UNIQUE INDEX threepid_guest_access_tokens_index ON threepid_guest_access_tokens(medium, address);
+CREATE TABLE local_invites( stream_id BIGINT NOT NULL, inviter TEXT NOT NULL, invitee TEXT NOT NULL, event_id TEXT NOT NULL, room_id TEXT NOT NULL, locally_rejected TEXT, replaced_by TEXT );
+CREATE INDEX local_invites_id ON local_invites(stream_id);
+CREATE INDEX local_invites_for_user_idx ON local_invites(invitee, locally_rejected, replaced_by, room_id);
+CREATE INDEX event_push_actions_stream_ordering on event_push_actions( stream_ordering, user_id );
+CREATE TABLE open_id_tokens ( token TEXT NOT NULL PRIMARY KEY, ts_valid_until_ms bigint NOT NULL, user_id TEXT NOT NULL, UNIQUE (token) );
+CREATE INDEX open_id_tokens_ts_valid_until_ms ON open_id_tokens(ts_valid_until_ms);
+CREATE TABLE pusher_throttle( pusher BIGINT NOT NULL, room_id TEXT NOT NULL, last_sent_ts BIGINT, throttle_ms BIGINT, PRIMARY KEY (pusher, room_id) );
+CREATE TABLE event_reports( id BIGINT NOT NULL PRIMARY KEY, received_ts BIGINT NOT NULL, room_id TEXT NOT NULL, event_id TEXT NOT NULL, user_id TEXT NOT NULL, reason TEXT, content TEXT );
+CREATE TABLE devices ( user_id TEXT NOT NULL, device_id TEXT NOT NULL, display_name TEXT, CONSTRAINT device_uniqueness UNIQUE (user_id, device_id) );
+CREATE TABLE appservice_stream_position( Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, stream_ordering BIGINT, CHECK (Lock='X') );
+CREATE TABLE device_inbox ( user_id TEXT NOT NULL, device_id TEXT NOT NULL, stream_id BIGINT NOT NULL, message_json TEXT NOT NULL );
+CREATE INDEX device_inbox_user_stream_id ON device_inbox(user_id, device_id, stream_id);
+CREATE INDEX received_transactions_ts ON received_transactions(ts);
+CREATE TABLE device_federation_outbox ( destination TEXT NOT NULL, stream_id BIGINT NOT NULL, queued_ts BIGINT NOT NULL, messages_json TEXT NOT NULL );
+CREATE INDEX device_federation_outbox_destination_id ON device_federation_outbox(destination, stream_id);
+CREATE TABLE device_federation_inbox ( origin TEXT NOT NULL, message_id TEXT NOT NULL, received_ts BIGINT NOT NULL );
+CREATE INDEX device_federation_inbox_sender_id ON device_federation_inbox(origin, message_id);
+CREATE TABLE device_max_stream_id ( stream_id BIGINT NOT NULL );
+CREATE TABLE public_room_list_stream ( stream_id BIGINT NOT NULL, room_id TEXT NOT NULL, visibility BOOLEAN NOT NULL , appservice_id TEXT, network_id TEXT);
+CREATE INDEX public_room_list_stream_idx on public_room_list_stream( stream_id );
+CREATE INDEX public_room_list_stream_rm_idx on public_room_list_stream( room_id, stream_id );
+CREATE TABLE state_group_edges( state_group BIGINT NOT NULL, prev_state_group BIGINT NOT NULL );
+CREATE INDEX state_group_edges_idx ON state_group_edges(state_group);
+CREATE INDEX state_group_edges_prev_idx ON state_group_edges(prev_state_group);
+CREATE TABLE stream_ordering_to_exterm ( stream_ordering BIGINT NOT NULL, room_id TEXT NOT NULL, event_id TEXT NOT NULL );
+CREATE INDEX stream_ordering_to_exterm_idx on stream_ordering_to_exterm( stream_ordering );
+CREATE INDEX stream_ordering_to_exterm_rm_idx on stream_ordering_to_exterm( room_id, stream_ordering );
+CREATE TABLE IF NOT EXISTS "event_auth"( event_id TEXT NOT NULL, auth_id TEXT NOT NULL, room_id TEXT NOT NULL );
+CREATE INDEX evauth_edges_id ON event_auth(event_id);
+CREATE INDEX user_threepids_medium_address on user_threepids (medium, address);
+CREATE TABLE appservice_room_list( appservice_id TEXT NOT NULL, network_id TEXT NOT NULL, room_id TEXT NOT NULL );
+CREATE UNIQUE INDEX appservice_room_list_idx ON appservice_room_list( appservice_id, network_id, room_id );
+CREATE INDEX device_federation_outbox_id ON device_federation_outbox(stream_id);
+CREATE TABLE federation_stream_position( type TEXT NOT NULL, stream_id INTEGER NOT NULL );
+CREATE TABLE device_lists_remote_cache ( user_id TEXT NOT NULL, device_id TEXT NOT NULL, content TEXT NOT NULL );
+CREATE TABLE device_lists_remote_extremeties ( user_id TEXT NOT NULL, stream_id TEXT NOT NULL );
+CREATE TABLE device_lists_stream ( stream_id BIGINT NOT NULL, user_id TEXT NOT NULL, device_id TEXT NOT NULL );
+CREATE INDEX device_lists_stream_id ON device_lists_stream(stream_id, user_id);
+CREATE TABLE device_lists_outbound_pokes ( destination TEXT NOT NULL, stream_id BIGINT NOT NULL, user_id TEXT NOT NULL, device_id TEXT NOT NULL, sent BOOLEAN NOT NULL, ts BIGINT NOT NULL );
+CREATE INDEX device_lists_outbound_pokes_id ON device_lists_outbound_pokes(destination, stream_id);
+CREATE INDEX device_lists_outbound_pokes_user ON device_lists_outbound_pokes(destination, user_id);
+CREATE TABLE event_push_summary ( user_id TEXT NOT NULL, room_id TEXT NOT NULL, notif_count BIGINT NOT NULL, stream_ordering BIGINT NOT NULL );
+CREATE INDEX event_push_summary_user_rm ON event_push_summary(user_id, room_id);
+CREATE TABLE event_push_summary_stream_ordering ( Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, stream_ordering BIGINT NOT NULL, CHECK (Lock='X') );
+CREATE TABLE IF NOT EXISTS "pushers" ( id BIGINT PRIMARY KEY, user_name TEXT NOT NULL, access_token BIGINT DEFAULT NULL, profile_tag TEXT NOT NULL, kind TEXT NOT NULL, app_id TEXT NOT NULL, app_display_name TEXT NOT NULL, device_display_name TEXT NOT NULL, pushkey TEXT NOT NULL, ts BIGINT NOT NULL, lang TEXT, data TEXT, last_stream_ordering INTEGER, last_success BIGINT, failing_since BIGINT, UNIQUE (app_id, pushkey, user_name) );
+CREATE INDEX device_lists_outbound_pokes_stream ON device_lists_outbound_pokes(stream_id);
+CREATE TABLE ratelimit_override ( user_id TEXT NOT NULL, messages_per_second BIGINT, burst_count BIGINT );
+CREATE UNIQUE INDEX ratelimit_override_idx ON ratelimit_override(user_id);
+CREATE TABLE current_state_delta_stream ( stream_id BIGINT NOT NULL, room_id TEXT NOT NULL, type TEXT NOT NULL, state_key TEXT NOT NULL, event_id TEXT, prev_event_id TEXT );
+CREATE INDEX current_state_delta_stream_idx ON current_state_delta_stream(stream_id);
+CREATE TABLE device_lists_outbound_last_success ( destination TEXT NOT NULL, user_id TEXT NOT NULL, stream_id BIGINT NOT NULL );
+CREATE INDEX device_lists_outbound_last_success_idx ON device_lists_outbound_last_success( destination, user_id, stream_id );
+CREATE TABLE user_directory_stream_pos ( Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, stream_id BIGINT, CHECK (Lock='X') );
+CREATE VIRTUAL TABLE user_directory_search USING fts4 ( user_id, value )
+/* user_directory_search(user_id,value) */;
+CREATE TABLE IF NOT EXISTS 'user_directory_search_content'(docid INTEGER PRIMARY KEY, 'c0user_id', 'c1value');
+CREATE TABLE IF NOT EXISTS 'user_directory_search_segments'(blockid INTEGER PRIMARY KEY, block BLOB);
+CREATE TABLE IF NOT EXISTS 'user_directory_search_segdir'(level INTEGER,idx INTEGER,start_block INTEGER,leaves_end_block INTEGER,end_block INTEGER,root BLOB,PRIMARY KEY(level, idx));
+CREATE TABLE IF NOT EXISTS 'user_directory_search_docsize'(docid INTEGER PRIMARY KEY, size BLOB);
+CREATE TABLE IF NOT EXISTS 'user_directory_search_stat'(id INTEGER PRIMARY KEY, value BLOB);
+CREATE TABLE blocked_rooms ( room_id TEXT NOT NULL, user_id TEXT NOT NULL );
+CREATE UNIQUE INDEX blocked_rooms_idx ON blocked_rooms(room_id);
+CREATE TABLE IF NOT EXISTS "local_media_repository_url_cache"( url TEXT, response_code INTEGER, etag TEXT, expires_ts BIGINT, og TEXT, media_id TEXT, download_ts BIGINT );
+CREATE INDEX local_media_repository_url_cache_expires_idx ON local_media_repository_url_cache(expires_ts);
+CREATE INDEX local_media_repository_url_cache_by_url_download_ts ON local_media_repository_url_cache(url, download_ts);
+CREATE INDEX local_media_repository_url_cache_media_idx ON local_media_repository_url_cache(media_id);
+CREATE TABLE group_users ( group_id TEXT NOT NULL, user_id TEXT NOT NULL, is_admin BOOLEAN NOT NULL, is_public BOOLEAN NOT NULL );
+CREATE TABLE group_invites ( group_id TEXT NOT NULL, user_id TEXT NOT NULL );
+CREATE TABLE group_rooms ( group_id TEXT NOT NULL, room_id TEXT NOT NULL, is_public BOOLEAN NOT NULL );
+CREATE TABLE group_summary_rooms ( group_id TEXT NOT NULL, room_id TEXT NOT NULL, category_id TEXT NOT NULL, room_order BIGINT NOT NULL, is_public BOOLEAN NOT NULL, UNIQUE (group_id, category_id, room_id, room_order), CHECK (room_order > 0) );
+CREATE UNIQUE INDEX group_summary_rooms_g_idx ON group_summary_rooms(group_id, room_id, category_id);
+CREATE TABLE group_summary_room_categories ( group_id TEXT NOT NULL, category_id TEXT NOT NULL, cat_order BIGINT NOT NULL, UNIQUE (group_id, category_id, cat_order), CHECK (cat_order > 0) );
+CREATE TABLE group_room_categories ( group_id TEXT NOT NULL, category_id TEXT NOT NULL, profile TEXT NOT NULL, is_public BOOLEAN NOT NULL, UNIQUE (group_id, category_id) );
+CREATE TABLE group_summary_users ( group_id TEXT NOT NULL, user_id TEXT NOT NULL, role_id TEXT NOT NULL, user_order BIGINT NOT NULL, is_public BOOLEAN NOT NULL );
+CREATE INDEX group_summary_users_g_idx ON group_summary_users(group_id);
+CREATE TABLE group_summary_roles ( group_id TEXT NOT NULL, role_id TEXT NOT NULL, role_order BIGINT NOT NULL, UNIQUE (group_id, role_id, role_order), CHECK (role_order > 0) );
+CREATE TABLE group_roles ( group_id TEXT NOT NULL, role_id TEXT NOT NULL, profile TEXT NOT NULL, is_public BOOLEAN NOT NULL, UNIQUE (group_id, role_id) );
+CREATE TABLE group_attestations_renewals ( group_id TEXT NOT NULL, user_id TEXT NOT NULL, valid_until_ms BIGINT NOT NULL );
+CREATE INDEX group_attestations_renewals_g_idx ON group_attestations_renewals(group_id, user_id);
+CREATE INDEX group_attestations_renewals_u_idx ON group_attestations_renewals(user_id);
+CREATE INDEX group_attestations_renewals_v_idx ON group_attestations_renewals(valid_until_ms);
+CREATE TABLE group_attestations_remote ( group_id TEXT NOT NULL, user_id TEXT NOT NULL, valid_until_ms BIGINT NOT NULL, attestation_json TEXT NOT NULL );
+CREATE INDEX group_attestations_remote_g_idx ON group_attestations_remote(group_id, user_id);
+CREATE INDEX group_attestations_remote_u_idx ON group_attestations_remote(user_id);
+CREATE INDEX group_attestations_remote_v_idx ON group_attestations_remote(valid_until_ms);
+CREATE TABLE local_group_membership ( group_id TEXT NOT NULL, user_id TEXT NOT NULL, is_admin BOOLEAN NOT NULL, membership TEXT NOT NULL, is_publicised BOOLEAN NOT NULL, content TEXT NOT NULL );
+CREATE INDEX local_group_membership_u_idx ON local_group_membership(user_id, group_id);
+CREATE INDEX local_group_membership_g_idx ON local_group_membership(group_id);
+CREATE TABLE local_group_updates ( stream_id BIGINT NOT NULL, group_id TEXT NOT NULL, user_id TEXT NOT NULL, type TEXT NOT NULL, content TEXT NOT NULL );
+CREATE TABLE remote_profile_cache ( user_id TEXT NOT NULL, displayname TEXT, avatar_url TEXT, last_check BIGINT NOT NULL );
+CREATE UNIQUE INDEX remote_profile_cache_user_id ON remote_profile_cache(user_id);
+CREATE INDEX remote_profile_cache_time ON remote_profile_cache(last_check);
+CREATE TABLE IF NOT EXISTS "deleted_pushers" ( stream_id BIGINT NOT NULL, app_id TEXT NOT NULL, pushkey TEXT NOT NULL, user_id TEXT NOT NULL );
+CREATE INDEX deleted_pushers_stream_id ON deleted_pushers (stream_id);
+CREATE TABLE IF NOT EXISTS "groups" ( group_id TEXT NOT NULL, name TEXT, avatar_url TEXT, short_description TEXT, long_description TEXT, is_public BOOL NOT NULL , join_policy TEXT NOT NULL DEFAULT 'invite');
+CREATE UNIQUE INDEX groups_idx ON groups(group_id);
+CREATE TABLE IF NOT EXISTS "user_directory" ( user_id TEXT NOT NULL, room_id TEXT, display_name TEXT, avatar_url TEXT );
+CREATE INDEX user_directory_room_idx ON user_directory(room_id);
+CREATE UNIQUE INDEX user_directory_user_idx ON user_directory(user_id);
+CREATE TABLE event_push_actions_staging ( event_id TEXT NOT NULL, user_id TEXT NOT NULL, actions TEXT NOT NULL, notif SMALLINT NOT NULL, highlight SMALLINT NOT NULL );
+CREATE INDEX event_push_actions_staging_id ON event_push_actions_staging(event_id);
+CREATE TABLE users_pending_deactivation ( user_id TEXT NOT NULL );
+CREATE UNIQUE INDEX group_invites_g_idx ON group_invites(group_id, user_id);
+CREATE UNIQUE INDEX group_users_g_idx ON group_users(group_id, user_id);
+CREATE INDEX group_users_u_idx ON group_users(user_id);
+CREATE INDEX group_invites_u_idx ON group_invites(user_id);
+CREATE UNIQUE INDEX group_rooms_g_idx ON group_rooms(group_id, room_id);
+CREATE INDEX group_rooms_r_idx ON group_rooms(room_id);
+CREATE TABLE user_daily_visits ( user_id TEXT NOT NULL, device_id TEXT, timestamp BIGINT NOT NULL );
+CREATE INDEX user_daily_visits_uts_idx ON user_daily_visits(user_id, timestamp);
+CREATE INDEX user_daily_visits_ts_idx ON user_daily_visits(timestamp);
+CREATE TABLE erased_users ( user_id TEXT NOT NULL );
+CREATE UNIQUE INDEX erased_users_user ON erased_users(user_id);
+CREATE TABLE monthly_active_users ( user_id TEXT NOT NULL, timestamp BIGINT NOT NULL );
+CREATE UNIQUE INDEX monthly_active_users_users ON monthly_active_users(user_id);
+CREATE INDEX monthly_active_users_time_stamp ON monthly_active_users(timestamp);
+CREATE TABLE IF NOT EXISTS "e2e_room_keys_versions" ( user_id TEXT NOT NULL, version BIGINT NOT NULL, algorithm TEXT NOT NULL, auth_data TEXT NOT NULL, deleted SMALLINT DEFAULT 0 NOT NULL );
+CREATE UNIQUE INDEX e2e_room_keys_versions_idx ON e2e_room_keys_versions(user_id, version);
+CREATE TABLE IF NOT EXISTS "e2e_room_keys" ( user_id TEXT NOT NULL, room_id TEXT NOT NULL, session_id TEXT NOT NULL, version BIGINT NOT NULL, first_message_index INT, forwarded_count INT, is_verified BOOLEAN, session_data TEXT NOT NULL );
+CREATE UNIQUE INDEX e2e_room_keys_idx ON e2e_room_keys(user_id, room_id, session_id);
+CREATE TABLE users_who_share_private_rooms ( user_id TEXT NOT NULL, other_user_id TEXT NOT NULL, room_id TEXT NOT NULL );
+CREATE UNIQUE INDEX users_who_share_private_rooms_u_idx ON users_who_share_private_rooms(user_id, other_user_id, room_id);
+CREATE INDEX users_who_share_private_rooms_r_idx ON users_who_share_private_rooms(room_id);
+CREATE INDEX users_who_share_private_rooms_o_idx ON users_who_share_private_rooms(other_user_id);
+CREATE TABLE user_threepid_id_server ( user_id TEXT NOT NULL, medium TEXT NOT NULL, address TEXT NOT NULL, id_server TEXT NOT NULL );
+CREATE UNIQUE INDEX user_threepid_id_server_idx ON user_threepid_id_server( user_id, medium, address, id_server );
+CREATE TABLE users_in_public_rooms ( user_id TEXT NOT NULL, room_id TEXT NOT NULL );
+CREATE UNIQUE INDEX users_in_public_rooms_u_idx ON users_in_public_rooms(user_id, room_id);
+CREATE TABLE account_validity ( user_id TEXT PRIMARY KEY, expiration_ts_ms BIGINT NOT NULL, email_sent BOOLEAN NOT NULL, renewal_token TEXT );
+CREATE TABLE event_relations ( event_id TEXT NOT NULL, relates_to_id TEXT NOT NULL, relation_type TEXT NOT NULL, aggregation_key TEXT );
+CREATE UNIQUE INDEX event_relations_id ON event_relations(event_id);
+CREATE INDEX event_relations_relates ON event_relations(relates_to_id, relation_type, aggregation_key);
+CREATE TABLE stats_stream_pos ( Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, stream_id BIGINT, CHECK (Lock='X') );
+CREATE TABLE user_stats ( user_id TEXT NOT NULL, ts BIGINT NOT NULL, bucket_size INT NOT NULL, public_rooms INT NOT NULL, private_rooms INT NOT NULL );
+CREATE UNIQUE INDEX user_stats_user_ts ON user_stats(user_id, ts);
+CREATE TABLE room_stats ( room_id TEXT NOT NULL, ts BIGINT NOT NULL, bucket_size INT NOT NULL, current_state_events INT NOT NULL, joined_members INT NOT NULL, invited_members INT NOT NULL, left_members INT NOT NULL, banned_members INT NOT NULL, state_events INT NOT NULL );
+CREATE UNIQUE INDEX room_stats_room_ts ON room_stats(room_id, ts);
+CREATE TABLE room_state ( room_id TEXT NOT NULL, join_rules TEXT, history_visibility TEXT, encryption TEXT, name TEXT, topic TEXT, avatar TEXT, canonical_alias TEXT );
+CREATE UNIQUE INDEX room_state_room ON room_state(room_id);
+CREATE TABLE room_stats_earliest_token ( room_id TEXT NOT NULL, token BIGINT NOT NULL );
+CREATE UNIQUE INDEX room_stats_earliest_token_idx ON room_stats_earliest_token(room_id);
+CREATE INDEX access_tokens_device_id ON access_tokens (user_id, device_id);
+CREATE INDEX user_ips_device_id ON user_ips (user_id, device_id, last_seen);
+CREATE INDEX event_contains_url_index ON events (room_id, topological_ordering, stream_ordering);
+CREATE INDEX event_push_actions_u_highlight ON event_push_actions (user_id, stream_ordering);
+CREATE INDEX event_push_actions_highlights_index ON event_push_actions (user_id, room_id, topological_ordering, stream_ordering);
+CREATE INDEX current_state_events_member_index ON current_state_events (state_key);
+CREATE INDEX device_inbox_stream_id_user_id ON device_inbox (stream_id, user_id);
+CREATE INDEX device_lists_stream_user_id ON device_lists_stream (user_id, device_id);
+CREATE INDEX local_media_repository_url_idx ON local_media_repository (created_ts);
+CREATE INDEX user_ips_last_seen ON user_ips (user_id, last_seen);
+CREATE INDEX user_ips_last_seen_only ON user_ips (last_seen);
+CREATE INDEX users_creation_ts ON users (creation_ts);
+CREATE INDEX event_to_state_groups_sg_index ON event_to_state_groups (state_group);
+CREATE UNIQUE INDEX device_lists_remote_cache_unique_id ON device_lists_remote_cache (user_id, device_id);
+CREATE INDEX state_groups_state_type_idx ON state_groups_state(state_group, type, state_key);
+CREATE UNIQUE INDEX device_lists_remote_extremeties_unique_idx ON device_lists_remote_extremeties (user_id);
+CREATE UNIQUE INDEX user_ips_user_token_ip_unique_index ON user_ips (user_id, access_token, ip);
diff --git a/synapse/storage/schema/full_schemas/54/stream_positions.sql b/synapse/storage/schema/full_schemas/54/stream_positions.sql
new file mode 100644
index 0000000000..c265fd20e2
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/54/stream_positions.sql
@@ -0,0 +1,7 @@
+
+INSERT INTO appservice_stream_position (stream_ordering) SELECT COALESCE(MAX(stream_ordering), 0) FROM events;
+INSERT INTO federation_stream_position (type, stream_id) VALUES ('federation', -1);
+INSERT INTO federation_stream_position (type, stream_id) SELECT 'events', coalesce(max(stream_ordering), -1) FROM events;
+INSERT INTO user_directory_stream_pos (stream_id) VALUES (0);
+INSERT INTO stats_stream_pos (stream_id) VALUES (0);
+INSERT INTO event_push_summary_stream_ordering (stream_ordering) VALUES (0);
diff --git a/synapse/storage/schema/full_schemas/README.txt b/synapse/storage/schema/full_schemas/README.txt
new file mode 100644
index 0000000000..d3f6401344
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/README.txt
@@ -0,0 +1,19 @@
+Building full schema dumps
+==========================
+
+These schemas need to be made from a database that has had all background updates run.
+
+Postgres
+--------
+
+$ pg_dump --format=plain --schema-only --no-tablespaces --no-acl --no-owner $DATABASE_NAME| sed -e '/^--/d' -e 's/public\.//g' -e '/^SET /d' -e '/^SELECT /d' > full.sql.postgres
+
+SQLite
+------
+
+$ sqlite3 $DATABASE_FILE ".schema" > full.sql.sqlite
+
+After
+-----
+
+Delete the CREATE statements for "sqlite_stat1", "schema_version", "applied_schema_deltas", and "applied_module_schemas".
\ No newline at end of file
diff --git a/synapse/storage/search.py b/synapse/storage/search.py
index ff49eaae02..f3b1cec933 100644
--- a/synapse/storage/search.py
+++ b/synapse/storage/search.py
@@ -31,8 +31,8 @@ from .background_updates import BackgroundUpdateStore
logger = logging.getLogger(__name__)
SearchEntry = namedtuple(
- 'SearchEntry',
- ['key', 'value', 'event_id', 'room_id', 'stream_ordering', 'origin_server_ts'],
+ "SearchEntry",
+ ["key", "value", "event_id", "room_id", "stream_ordering", "origin_server_ts"],
)
@@ -216,7 +216,7 @@ class SearchStore(BackgroundUpdateStore):
target_min_stream_id = progress["target_min_stream_id_inclusive"]
max_stream_id = progress["max_stream_id_exclusive"]
rows_inserted = progress.get("rows_inserted", 0)
- have_added_index = progress['have_added_indexes']
+ have_added_index = progress["have_added_indexes"]
if not have_added_index:
@@ -341,29 +341,7 @@ class SearchStore(BackgroundUpdateStore):
for entry in entries
)
- # inserts to a GIN index are normally batched up into a pending
- # list, and then all committed together once the list gets to a
- # certain size. The trouble with that is that postgres (pre-9.5)
- # uses work_mem to determine the length of the list, and work_mem
- # is typically very large.
- #
- # We therefore reduce work_mem while we do the insert.
- #
- # (postgres 9.5 uses the separate gin_pending_list_limit setting,
- # so doesn't suffer the same problem, but changing work_mem will
- # be harmless)
- #
- # Note that we don't need to worry about restoring it on
- # exception, because exceptions will cause the transaction to be
- # rolled back, including the effects of the SET command.
- #
- # Also: we use SET rather than SET LOCAL because there's lots of
- # other stuff going on in this transaction, which want to have the
- # normal work_mem setting.
-
- txn.execute("SET work_mem='256kB'")
txn.executemany(sql, args)
- txn.execute("RESET work_mem")
elif isinstance(self.database_engine, Sqlite3Engine):
sql = (
diff --git a/synapse/storage/state_deltas.py b/synapse/storage/state_deltas.py
index 31a0279b18..5fdb442104 100644
--- a/synapse/storage/state_deltas.py
+++ b/synapse/storage/state_deltas.py
@@ -84,10 +84,16 @@ class StateDeltasStore(SQLBaseStore):
"get_current_state_deltas", get_current_state_deltas_txn
)
- def get_max_stream_id_in_current_state_deltas(self):
- return self._simple_select_one_onecol(
+ def _get_max_stream_id_in_current_state_deltas_txn(self, txn):
+ return self._simple_select_one_onecol_txn(
+ txn,
table="current_state_delta_stream",
keyvalues={},
retcol="COALESCE(MAX(stream_id), -1)",
- desc="get_max_stream_id_in_current_state_deltas",
+ )
+
+ def get_max_stream_id_in_current_state_deltas(self):
+ return self.runInteraction(
+ "get_max_stream_id_in_current_state_deltas",
+ self._get_max_stream_id_in_current_state_deltas_txn,
)
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
new file mode 100644
index 0000000000..1cec84ee2e
--- /dev/null
+++ b/synapse/storage/stats.py
@@ -0,0 +1,482 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018, 2019 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from twisted.internet import defer
+
+from synapse.api.constants import EventTypes, Membership
+from synapse.storage.prepare_database import get_statements
+from synapse.storage.state_deltas import StateDeltasStore
+from synapse.util.caches.descriptors import cached
+
+logger = logging.getLogger(__name__)
+
+# these fields track absolutes (e.g. total number of rooms on the server)
+ABSOLUTE_STATS_FIELDS = {
+ "room": (
+ "current_state_events",
+ "joined_members",
+ "invited_members",
+ "left_members",
+ "banned_members",
+ "state_events",
+ ),
+ "user": ("public_rooms", "private_rooms"),
+}
+
+TYPE_TO_ROOM = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
+
+TEMP_TABLE = "_temp_populate_stats"
+
+
+class StatsStore(StateDeltasStore):
+ def __init__(self, db_conn, hs):
+ super(StatsStore, self).__init__(db_conn, hs)
+
+ self.server_name = hs.hostname
+ self.clock = self.hs.get_clock()
+ self.stats_enabled = hs.config.stats_enabled
+ self.stats_bucket_size = hs.config.stats_bucket_size
+
+ self.register_background_update_handler(
+ "populate_stats_createtables", self._populate_stats_createtables
+ )
+ self.register_background_update_handler(
+ "populate_stats_process_rooms", self._populate_stats_process_rooms
+ )
+ self.register_background_update_handler(
+ "populate_stats_cleanup", self._populate_stats_cleanup
+ )
+
+ @defer.inlineCallbacks
+ def _populate_stats_createtables(self, progress, batch_size):
+
+ if not self.stats_enabled:
+ yield self._end_background_update("populate_stats_createtables")
+ defer.returnValue(1)
+
+ # Get all the rooms that we want to process.
+ def _make_staging_area(txn):
+ # Create the temporary tables
+ stmts = get_statements(
+ """
+ -- We just recreate the table, we'll be reinserting the
+ -- correct entries again later anyway.
+ DROP TABLE IF EXISTS {temp}_rooms;
+
+ CREATE TABLE IF NOT EXISTS {temp}_rooms(
+ room_id TEXT NOT NULL,
+ events BIGINT NOT NULL
+ );
+
+ CREATE INDEX {temp}_rooms_events
+ ON {temp}_rooms(events);
+ CREATE INDEX {temp}_rooms_id
+ ON {temp}_rooms(room_id);
+ """.format(
+ temp=TEMP_TABLE
+ ).splitlines()
+ )
+
+ for statement in stmts:
+ txn.execute(statement)
+
+ sql = (
+ "CREATE TABLE IF NOT EXISTS "
+ + TEMP_TABLE
+ + "_position(position TEXT NOT NULL)"
+ )
+ txn.execute(sql)
+
+ # Get rooms we want to process from the database, only adding
+ # those that we haven't (i.e. those not in room_stats_earliest_token)
+ sql = """
+ INSERT INTO %s_rooms (room_id, events)
+ SELECT c.room_id, count(*) FROM current_state_events AS c
+ LEFT JOIN room_stats_earliest_token AS t USING (room_id)
+ WHERE t.room_id IS NULL
+ GROUP BY c.room_id
+ """ % (
+ TEMP_TABLE,
+ )
+ txn.execute(sql)
+
+ new_pos = yield self.get_max_stream_id_in_current_state_deltas()
+ yield self.runInteraction("populate_stats_temp_build", _make_staging_area)
+ yield self._simple_insert(TEMP_TABLE + "_position", {"position": new_pos})
+ self.get_earliest_token_for_room_stats.invalidate_all()
+
+ yield self._end_background_update("populate_stats_createtables")
+ defer.returnValue(1)
+
+ @defer.inlineCallbacks
+ def _populate_stats_cleanup(self, progress, batch_size):
+ """
+ Update the user directory stream position, then clean up the old tables.
+ """
+ if not self.stats_enabled:
+ yield self._end_background_update("populate_stats_cleanup")
+ defer.returnValue(1)
+
+ position = yield self._simple_select_one_onecol(
+ TEMP_TABLE + "_position", None, "position"
+ )
+ yield self.update_stats_stream_pos(position)
+
+ def _delete_staging_area(txn):
+ txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_rooms")
+ txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_position")
+
+ yield self.runInteraction("populate_stats_cleanup", _delete_staging_area)
+
+ yield self._end_background_update("populate_stats_cleanup")
+ defer.returnValue(1)
+
+ @defer.inlineCallbacks
+ def _populate_stats_process_rooms(self, progress, batch_size):
+
+ if not self.stats_enabled:
+ yield self._end_background_update("populate_stats_process_rooms")
+ defer.returnValue(1)
+
+ # If we don't have progress filed, delete everything.
+ if not progress:
+ yield self.delete_all_stats()
+
+ def _get_next_batch(txn):
+ # Only fetch 250 rooms, so we don't fetch too many at once, even
+ # if those 250 rooms have less than batch_size state events.
+ sql = """
+ SELECT room_id, events FROM %s_rooms
+ ORDER BY events DESC
+ LIMIT 250
+ """ % (
+ TEMP_TABLE,
+ )
+ txn.execute(sql)
+ rooms_to_work_on = txn.fetchall()
+
+ if not rooms_to_work_on:
+ return None
+
+ # Get how many are left to process, so we can give status on how
+ # far we are in processing
+ txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms")
+ progress["remaining"] = txn.fetchone()[0]
+
+ return rooms_to_work_on
+
+ rooms_to_work_on = yield self.runInteraction(
+ "populate_stats_temp_read", _get_next_batch
+ )
+
+ # No more rooms -- complete the transaction.
+ if not rooms_to_work_on:
+ yield self._end_background_update("populate_stats_process_rooms")
+ defer.returnValue(1)
+
+ logger.info(
+ "Processing the next %d rooms of %d remaining",
+ len(rooms_to_work_on),
+ progress["remaining"],
+ )
+
+ # Number of state events we've processed by going through each room
+ processed_event_count = 0
+
+ for room_id, event_count in rooms_to_work_on:
+
+ current_state_ids = yield self.get_current_state_ids(room_id)
+
+ join_rules_id = current_state_ids.get((EventTypes.JoinRules, ""))
+ history_visibility_id = current_state_ids.get(
+ (EventTypes.RoomHistoryVisibility, "")
+ )
+ encryption_id = current_state_ids.get((EventTypes.RoomEncryption, ""))
+ name_id = current_state_ids.get((EventTypes.Name, ""))
+ topic_id = current_state_ids.get((EventTypes.Topic, ""))
+ avatar_id = current_state_ids.get((EventTypes.RoomAvatar, ""))
+ canonical_alias_id = current_state_ids.get((EventTypes.CanonicalAlias, ""))
+
+ state_events = yield self.get_events(
+ [
+ join_rules_id,
+ history_visibility_id,
+ encryption_id,
+ name_id,
+ topic_id,
+ avatar_id,
+ canonical_alias_id,
+ ]
+ )
+
+ def _get_or_none(event_id, arg):
+ event = state_events.get(event_id)
+ if event:
+ return event.content.get(arg)
+ return None
+
+ yield self.update_room_state(
+ room_id,
+ {
+ "join_rules": _get_or_none(join_rules_id, "join_rule"),
+ "history_visibility": _get_or_none(
+ history_visibility_id, "history_visibility"
+ ),
+ "encryption": _get_or_none(encryption_id, "algorithm"),
+ "name": _get_or_none(name_id, "name"),
+ "topic": _get_or_none(topic_id, "topic"),
+ "avatar": _get_or_none(avatar_id, "url"),
+ "canonical_alias": _get_or_none(canonical_alias_id, "alias"),
+ },
+ )
+
+ now = self.hs.get_reactor().seconds()
+
+ # quantise time to the nearest bucket
+ now = (now // self.stats_bucket_size) * self.stats_bucket_size
+
+ def _fetch_data(txn):
+
+ # Get the current token of the room
+ current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn)
+
+ current_state_events = len(current_state_ids)
+
+ membership_counts = self._get_user_counts_in_room_txn(txn, room_id)
+
+ total_state_events = self._get_total_state_event_counts_txn(
+ txn, room_id
+ )
+
+ self._update_stats_txn(
+ txn,
+ "room",
+ room_id,
+ now,
+ {
+ "bucket_size": self.stats_bucket_size,
+ "current_state_events": current_state_events,
+ "joined_members": membership_counts.get(Membership.JOIN, 0),
+ "invited_members": membership_counts.get(Membership.INVITE, 0),
+ "left_members": membership_counts.get(Membership.LEAVE, 0),
+ "banned_members": membership_counts.get(Membership.BAN, 0),
+ "state_events": total_state_events,
+ },
+ )
+ self._simple_insert_txn(
+ txn,
+ "room_stats_earliest_token",
+ {"room_id": room_id, "token": current_token},
+ )
+
+ # We've finished a room. Delete it from the table.
+ self._simple_delete_one_txn(
+ txn, TEMP_TABLE + "_rooms", {"room_id": room_id}
+ )
+
+ yield self.runInteraction("update_room_stats", _fetch_data)
+
+ # Update the remaining counter.
+ progress["remaining"] -= 1
+ yield self.runInteraction(
+ "populate_stats",
+ self._background_update_progress_txn,
+ "populate_stats_process_rooms",
+ progress,
+ )
+
+ processed_event_count += event_count
+
+ if processed_event_count > batch_size:
+ # Don't process any more rooms, we've hit our batch size.
+ defer.returnValue(processed_event_count)
+
+ defer.returnValue(processed_event_count)
+
+ def delete_all_stats(self):
+ """
+ Delete all statistics records.
+ """
+
+ def _delete_all_stats_txn(txn):
+ txn.execute("DELETE FROM room_state")
+ txn.execute("DELETE FROM room_stats")
+ txn.execute("DELETE FROM room_stats_earliest_token")
+ txn.execute("DELETE FROM user_stats")
+
+ return self.runInteraction("delete_all_stats", _delete_all_stats_txn)
+
+ def get_stats_stream_pos(self):
+ return self._simple_select_one_onecol(
+ table="stats_stream_pos",
+ keyvalues={},
+ retcol="stream_id",
+ desc="stats_stream_pos",
+ )
+
+ def update_stats_stream_pos(self, stream_id):
+ return self._simple_update_one(
+ table="stats_stream_pos",
+ keyvalues={},
+ updatevalues={"stream_id": stream_id},
+ desc="update_stats_stream_pos",
+ )
+
+ def update_room_state(self, room_id, fields):
+ """
+ Args:
+ room_id (str)
+ fields (dict[str:Any])
+ """
+
+ # For whatever reason some of the fields may contain null bytes, which
+ # postgres isn't a fan of, so we replace those fields with null.
+ for col in (
+ "join_rules",
+ "history_visibility",
+ "encryption",
+ "name",
+ "topic",
+ "avatar",
+ "canonical_alias",
+ ):
+ field = fields.get(col)
+ if field and "\0" in field:
+ fields[col] = None
+
+ return self._simple_upsert(
+ table="room_state",
+ keyvalues={"room_id": room_id},
+ values=fields,
+ desc="update_room_state",
+ )
+
+ def get_deltas_for_room(self, room_id, start, size=100):
+ """
+ Get statistics deltas for a given room.
+
+ Args:
+ room_id (str)
+ start (int): Pagination start. Number of entries, not timestamp.
+ size (int): How many entries to return.
+
+ Returns:
+ Deferred[list[dict]], where the dict has the keys of
+ ABSOLUTE_STATS_FIELDS["room"] and "ts".
+ """
+ return self._simple_select_list_paginate(
+ "room_stats",
+ {"room_id": room_id},
+ "ts",
+ start,
+ size,
+ retcols=(list(ABSOLUTE_STATS_FIELDS["room"]) + ["ts"]),
+ order_direction="DESC",
+ )
+
+ def get_all_room_state(self):
+ return self._simple_select_list(
+ "room_state", None, retcols=("name", "topic", "canonical_alias")
+ )
+
+ @cached()
+ def get_earliest_token_for_room_stats(self, room_id):
+ """
+ Fetch the "earliest token". This is used by the room stats delta
+ processor to ignore deltas that have been processed between the
+ start of the background task and any particular room's stats
+ being calculated.
+
+ Returns:
+ Deferred[int]
+ """
+ return self._simple_select_one_onecol(
+ "room_stats_earliest_token",
+ {"room_id": room_id},
+ retcol="token",
+ allow_none=True,
+ )
+
+ def update_stats(self, stats_type, stats_id, ts, fields):
+ table, id_col = TYPE_TO_ROOM[stats_type]
+ return self._simple_upsert(
+ table=table,
+ keyvalues={id_col: stats_id, "ts": ts},
+ values=fields,
+ desc="update_stats",
+ )
+
+ def _update_stats_txn(self, txn, stats_type, stats_id, ts, fields):
+ table, id_col = TYPE_TO_ROOM[stats_type]
+ return self._simple_upsert_txn(
+ txn, table=table, keyvalues={id_col: stats_id, "ts": ts}, values=fields
+ )
+
+ def update_stats_delta(self, ts, stats_type, stats_id, field, value):
+ def _update_stats_delta(txn):
+ table, id_col = TYPE_TO_ROOM[stats_type]
+
+ sql = (
+ "SELECT * FROM %s"
+ " WHERE %s=? and ts=("
+ " SELECT MAX(ts) FROM %s"
+ " WHERE %s=?"
+ ")"
+ ) % (table, id_col, table, id_col)
+ txn.execute(sql, (stats_id, stats_id))
+ rows = self.cursor_to_dict(txn)
+ if len(rows) == 0:
+ # silently skip as we don't have anything to apply a delta to yet.
+ # this tries to minimise any race between the initial sync and
+ # subsequent deltas arriving.
+ return
+
+ current_ts = ts
+ latest_ts = rows[0]["ts"]
+ if current_ts < latest_ts:
+ # This one is in the past, but we're just encountering it now.
+ # Mark it as part of the current bucket.
+ current_ts = latest_ts
+ elif ts != latest_ts:
+ # we have to copy our absolute counters over to the new entry.
+ values = {
+ key: rows[0][key] for key in ABSOLUTE_STATS_FIELDS[stats_type]
+ }
+ values[id_col] = stats_id
+ values["ts"] = ts
+ values["bucket_size"] = self.stats_bucket_size
+
+ self._simple_insert_txn(txn, table=table, values=values)
+
+ # actually update the new value
+ if stats_type in ABSOLUTE_STATS_FIELDS[stats_type]:
+ self._simple_update_txn(
+ txn,
+ table=table,
+ keyvalues={id_col: stats_id, "ts": current_ts},
+ updatevalues={field: value},
+ )
+ else:
+ sql = ("UPDATE %s SET %s=%s+? WHERE %s=? AND ts=?") % (
+ table,
+ field,
+ field,
+ id_col,
+ )
+ txn.execute(sql, (value, stats_id, current_ts))
+
+ return self.runInteraction("update_stats_delta", _update_stats_delta)
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 529ad4ea79..d9482a3843 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -65,7 +65,7 @@ _EventDictReturn = namedtuple(
def generate_pagination_where_clause(
- direction, column_names, from_token, to_token, engine,
+ direction, column_names, from_token, to_token, engine
):
"""Creates an SQL expression to bound the columns by the pagination
tokens.
@@ -153,7 +153,7 @@ def _make_generic_sql_bound(bound, column_names, values, engine):
str
"""
- assert(bound in (">", "<", ">=", "<="))
+ assert bound in (">", "<", ">=", "<=")
name1, name2 = column_names
val1, val2 = values
@@ -169,11 +169,7 @@ def _make_generic_sql_bound(bound, column_names, values, engine):
# Postgres doesn't optimise ``(x < a) OR (x=a AND y<b)`` as well
# as it optimises ``(x,y) < (a,b)`` on multicolumn indexes. So we
# use the later form when running against postgres.
- return "((%d,%d) %s (%s,%s))" % (
- val1, val2,
- bound,
- name1, name2,
- )
+ return "((%d,%d) %s (%s,%s))" % (val1, val2, bound, name1, name2)
# We want to generate queries of e.g. the form:
#
@@ -276,7 +272,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
@defer.inlineCallbacks
def get_room_events_stream_for_rooms(
- self, room_ids, from_key, to_key, limit=0, order='DESC'
+ self, room_ids, from_key, to_key, limit=0, order="DESC"
):
"""Get new room events in stream ordering since `from_key`.
@@ -346,7 +342,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
@defer.inlineCallbacks
def get_room_events_stream_for_room(
- self, room_id, from_key, to_key, limit=0, order='DESC'
+ self, room_id, from_key, to_key, limit=0, order="DESC"
):
"""Get new room events in stream ordering since `from_key`.
@@ -395,8 +391,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
rows = yield self.runInteraction("get_room_events_stream_for_room", f)
- ret = yield self.get_events_as_list([
- r.event_id for r in rows], get_prev_content=True,
+ ret = yield self.get_events_as_list(
+ [r.event_id for r in rows], get_prev_content=True
)
self._set_before_and_after(ret, rows, topo_order=from_id is None)
@@ -446,7 +442,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
rows = yield self.runInteraction("get_membership_changes_for_user", f)
ret = yield self.get_events_as_list(
- [r.event_id for r in rows], get_prev_content=True,
+ [r.event_id for r in rows], get_prev_content=True
)
self._set_before_and_after(ret, rows, topo_order=False)
@@ -592,8 +588,18 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
)
def get_max_topological_token(self, room_id, stream_key):
+ """Get the max topological token in a room before the given stream
+ ordering.
+
+ Args:
+ room_id (str)
+ stream_key (int)
+
+ Returns:
+ Deferred[int]
+ """
sql = (
- "SELECT max(topological_ordering) FROM events"
+ "SELECT coalesce(max(topological_ordering), 0) FROM events"
" WHERE room_id = ? AND stream_ordering < ?"
)
return self._execute(
@@ -715,7 +721,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
txn,
room_id,
before_token,
- direction='b',
+ direction="b",
limit=before_limit,
event_filter=event_filter,
)
@@ -725,7 +731,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
txn,
room_id,
after_token,
- direction='f',
+ direction="f",
limit=after_limit,
event_filter=event_filter,
)
@@ -806,7 +812,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
room_id,
from_token,
to_token=None,
- direction='b',
+ direction="b",
limit=-1,
event_filter=None,
):
@@ -836,7 +842,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
# the convention of pointing to the event before the gap. Hence
# we have a bit of asymmetry when it comes to equalities.
args = [False, room_id]
- if direction == 'b':
+ if direction == "b":
order = "DESC"
else:
order = "ASC"
@@ -872,7 +878,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
if rows:
topo = rows[-1].topological_ordering
toke = rows[-1].stream_ordering
- if direction == 'b':
+ if direction == "b":
# Tokens are positions between events.
# This token points *after* the last event in the chunk.
# We need it to point to the event before it in the chunk
@@ -888,7 +894,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
@defer.inlineCallbacks
def paginate_room_events(
- self, room_id, from_key, to_key=None, direction='b', limit=-1, event_filter=None
+ self, room_id, from_key, to_key=None, direction="b", limit=-1, event_filter=None
):
"""Returns list of events before or after a given token.
|