diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 0a1a8cc1e5..0460fe8cc9 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -30,6 +30,7 @@ stored in `synapse.storage.schema`.
from synapse.storage.data_stores import DataStores
from synapse.storage.data_stores.main import DataStore
from synapse.storage.persist_events import EventsPersistenceStorage
+from synapse.storage.purge_events import PurgeEventsStorage
from synapse.storage.state import StateGroupStorage
__all__ = ["DataStores", "DataStore"]
@@ -46,6 +47,7 @@ class Storage(object):
self.main = stores.main
self.persistence = EventsPersistenceStorage(hs, stores)
+ self.purge_events = PurgeEventsStorage(hs, stores)
self.state = StateGroupStorage(hs, stores)
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 1a2b7ebe25..459901ac60 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -361,14 +361,11 @@ class SQLBaseStore(object):
expiration_ts,
)
- self._simple_insert_txn(
+ self._simple_upsert_txn(
txn,
"account_validity",
- values={
- "user_id": user_id,
- "expiration_ts_ms": expiration_ts,
- "email_sent": False,
- },
+ keyvalues={"user_id": user_id},
+ values={"expiration_ts_ms": expiration_ts, "email_sent": False},
)
def start_profiling(self):
@@ -412,16 +409,15 @@ class SQLBaseStore(object):
i = 0
N = 5
while True:
+ cursor = LoggingTransaction(
+ conn.cursor(),
+ name,
+ self.database_engine,
+ after_callbacks,
+ exception_callbacks,
+ )
try:
- txn = conn.cursor()
- txn = LoggingTransaction(
- txn,
- name,
- self.database_engine,
- after_callbacks,
- exception_callbacks,
- )
- r = func(txn, *args, **kwargs)
+ r = func(cursor, *args, **kwargs)
conn.commit()
return r
except self.database_engine.module.OperationalError as e:
@@ -459,6 +455,40 @@ class SQLBaseStore(object):
)
continue
raise
+ finally:
+ # we're either about to retry with a new cursor, or we're about to
+ # release the connection. Once we release the connection, it could
+ # get used for another query, which might do a conn.rollback().
+ #
+ # In the latter case, even though that probably wouldn't affect the
+ # results of this transaction, python's sqlite will reset all
+ # statements on the connection [1], which will make our cursor
+ # invalid [2].
+ #
+ # In any case, continuing to read rows after commit()ing seems
+ # dubious from the PoV of ACID transactional semantics
+ # (sqlite explicitly says that once you commit, you may see rows
+ # from subsequent updates.)
+ #
+ # In psycopg2, cursors are essentially a client-side fabrication -
+ # all the data is transferred to the client side when the statement
+ # finishes executing - so in theory we could go on streaming results
+ # from the cursor, but attempting to do so would make us
+ # incompatible with sqlite, so let's make sure we're not doing that
+ # by closing the cursor.
+ #
+ # (*named* cursors in psycopg2 are different and are proper server-
+ # side things, but (a) we don't use them and (b) they are implicitly
+ # closed by ending the transaction anyway.)
+ #
+ # In short, if we haven't finished with the cursor yet, that's a
+ # problem waiting to bite us.
+ #
+ # TL;DR: we're done with the cursor, so we can close it.
+ #
+ # [1]: https://github.com/python/cpython/blob/v3.8.0/Modules/_sqlite/connection.c#L465
+ # [2]: https://github.com/python/cpython/blob/v3.8.0/Modules/_sqlite/cursor.c#L236
+ cursor.close()
except Exception as e:
logger.debug("[TXN FAIL] {%s} %s", name, e)
raise
@@ -854,7 +884,7 @@ class SQLBaseStore(object):
allvalues.update(values)
latter = "UPDATE SET " + ", ".join(k + "=EXCLUDED." + k for k in values)
- sql = ("INSERT INTO %s (%s) VALUES (%s) " "ON CONFLICT (%s) DO %s") % (
+ sql = ("INSERT INTO %s (%s) VALUES (%s) ON CONFLICT (%s) DO %s") % (
table,
", ".join(k for k in allvalues),
", ".join("?" for _ in allvalues),
diff --git a/synapse/storage/data_stores/main/account_data.py b/synapse/storage/data_stores/main/account_data.py
index 6afbfc0d74..22093484ed 100644
--- a/synapse/storage/data_stores/main/account_data.py
+++ b/synapse/storage/data_stores/main/account_data.py
@@ -184,14 +184,14 @@ class AccountDataWorkerStore(SQLBaseStore):
current_id(int): The position to fetch up to.
Returns:
A deferred pair of lists of tuples of stream_id int, user_id string,
- room_id string, type string, and content string.
+ room_id string, and type string.
"""
if last_room_id == current_id and last_global_id == current_id:
return defer.succeed(([], []))
def get_updated_account_data_txn(txn):
sql = (
- "SELECT stream_id, user_id, account_data_type, content"
+ "SELECT stream_id, user_id, account_data_type"
" FROM account_data WHERE ? < stream_id AND stream_id <= ?"
" ORDER BY stream_id ASC LIMIT ?"
)
@@ -199,7 +199,7 @@ class AccountDataWorkerStore(SQLBaseStore):
global_results = txn.fetchall()
sql = (
- "SELECT stream_id, user_id, room_id, account_data_type, content"
+ "SELECT stream_id, user_id, room_id, account_data_type"
" FROM room_account_data WHERE ? < stream_id AND stream_id <= ?"
" ORDER BY stream_id ASC LIMIT ?"
)
diff --git a/synapse/storage/data_stores/main/deviceinbox.py b/synapse/storage/data_stores/main/deviceinbox.py
index f04aad0743..a23744f11c 100644
--- a/synapse/storage/data_stores/main/deviceinbox.py
+++ b/synapse/storage/data_stores/main/deviceinbox.py
@@ -358,8 +358,21 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore)
def _add_messages_to_local_device_inbox_txn(
self, txn, stream_id, messages_by_user_then_device
):
- sql = "UPDATE device_max_stream_id" " SET stream_id = ?" " WHERE stream_id < ?"
- txn.execute(sql, (stream_id, stream_id))
+ # Compatible method of performing an upsert
+ sql = "SELECT stream_id FROM device_max_stream_id"
+
+ txn.execute(sql)
+ rows = txn.fetchone()
+ if rows:
+ db_stream_id = rows[0]
+ if db_stream_id < stream_id:
+ # Insert the new stream_id
+ sql = "UPDATE device_max_stream_id SET stream_id = ?"
+ else:
+ # No rows, perform an insert
+ sql = "INSERT INTO device_max_stream_id (stream_id) VALUES (?)"
+
+ txn.execute(sql, (stream_id,))
local_by_user_then_device = {}
for user_id, messages_by_device in messages_by_user_then_device.items():
@@ -367,7 +380,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore)
devices = list(messages_by_device.keys())
if len(devices) == 1 and devices[0] == "*":
# Handle wildcard device_ids.
- sql = "SELECT device_id FROM devices" " WHERE user_id = ?"
+ sql = "SELECT device_id FROM devices WHERE user_id = ?"
txn.execute(sql, (user_id,))
message_json = json.dumps(messages_by_device["*"])
for row in txn:
diff --git a/synapse/storage/data_stores/main/e2e_room_keys.py b/synapse/storage/data_stores/main/e2e_room_keys.py
index 1cbbae5b63..113224fd7c 100644
--- a/synapse/storage/data_stores/main/e2e_room_keys.py
+++ b/synapse/storage/data_stores/main/e2e_room_keys.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2017 New Vector Ltd
+# Copyright 2019 Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -24,49 +25,8 @@ from synapse.storage._base import SQLBaseStore
class EndToEndRoomKeyStore(SQLBaseStore):
@defer.inlineCallbacks
- def get_e2e_room_key(self, user_id, version, room_id, session_id):
- """Get the encrypted E2E room key for a given session from a given
- backup version of room_keys. We only store the 'best' room key for a given
- session at a given time, as determined by the handler.
-
- Args:
- user_id(str): the user whose backup we're querying
- version(str): the version ID of the backup for the set of keys we're querying
- room_id(str): the ID of the room whose keys we're querying.
- This is a bit redundant as it's implied by the session_id, but
- we include for consistency with the rest of the API.
- session_id(str): the session whose room_key we're querying.
-
- Returns:
- A deferred dict giving the session_data and message metadata for
- this room key.
- """
-
- row = yield self._simple_select_one(
- table="e2e_room_keys",
- keyvalues={
- "user_id": user_id,
- "version": version,
- "room_id": room_id,
- "session_id": session_id,
- },
- retcols=(
- "first_message_index",
- "forwarded_count",
- "is_verified",
- "session_data",
- ),
- desc="get_e2e_room_key",
- )
-
- row["session_data"] = json.loads(row["session_data"])
-
- return row
-
- @defer.inlineCallbacks
- def set_e2e_room_key(self, user_id, version, room_id, session_id, room_key):
- """Replaces or inserts the encrypted E2E room key for a given session in
- a given backup
+ def update_e2e_room_key(self, user_id, version, room_id, session_id, room_key):
+ """Replaces the encrypted E2E room key for a given session in a given backup
Args:
user_id(str): the user whose backup we're setting
@@ -78,7 +38,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
StoreError
"""
- yield self._simple_upsert(
+ yield self._simple_update_one(
table="e2e_room_keys",
keyvalues={
"user_id": user_id,
@@ -86,21 +46,51 @@ class EndToEndRoomKeyStore(SQLBaseStore):
"room_id": room_id,
"session_id": session_id,
},
- values={
+ updatevalues={
"first_message_index": room_key["first_message_index"],
"forwarded_count": room_key["forwarded_count"],
"is_verified": room_key["is_verified"],
"session_data": json.dumps(room_key["session_data"]),
},
- lock=False,
+ desc="update_e2e_room_key",
)
- log_kv(
- {
- "message": "Set room key",
- "room_id": room_id,
- "session_id": session_id,
- "room_key": room_key,
- }
+
+ @defer.inlineCallbacks
+ def add_e2e_room_keys(self, user_id, version, room_keys):
+ """Bulk add room keys to a given backup.
+
+ Args:
+ user_id (str): the user whose backup we're adding to
+ version (str): the version ID of the backup for the set of keys we're adding to
+ room_keys (iterable[(str, str, dict)]): the keys to add, in the form
+ (roomID, sessionID, keyData)
+ """
+
+ values = []
+ for (room_id, session_id, room_key) in room_keys:
+ values.append(
+ {
+ "user_id": user_id,
+ "version": version,
+ "room_id": room_id,
+ "session_id": session_id,
+ "first_message_index": room_key["first_message_index"],
+ "forwarded_count": room_key["forwarded_count"],
+ "is_verified": room_key["is_verified"],
+ "session_data": json.dumps(room_key["session_data"]),
+ }
+ )
+ log_kv(
+ {
+ "message": "Set room key",
+ "room_id": room_id,
+ "session_id": session_id,
+ "room_key": room_key,
+ }
+ )
+
+ yield self._simple_insert_many(
+ table="e2e_room_keys", values=values, desc="add_e2e_room_keys"
)
@trace
@@ -110,11 +100,11 @@ class EndToEndRoomKeyStore(SQLBaseStore):
room, or a given session.
Args:
- user_id(str): the user whose backup we're querying
- version(str): the version ID of the backup for the set of keys we're querying
- room_id(str): Optional. the ID of the room whose keys we're querying, if any.
+ user_id (str): the user whose backup we're querying
+ version (str): the version ID of the backup for the set of keys we're querying
+ room_id (str): Optional. the ID of the room whose keys we're querying, if any.
If not specified, we return the keys for all the rooms in the backup.
- session_id(str): Optional. the session whose room_key we're querying, if any.
+ session_id (str): Optional. the session whose room_key we're querying, if any.
If specified, we also require the room_id to be specified.
If not specified, we return all the keys in this version of
the backup (or for the specified room)
@@ -162,6 +152,95 @@ class EndToEndRoomKeyStore(SQLBaseStore):
return sessions
+ def get_e2e_room_keys_multi(self, user_id, version, room_keys):
+ """Get multiple room keys at a time. The difference between this function and
+ get_e2e_room_keys is that this function can be used to retrieve
+ multiple specific keys at a time, whereas get_e2e_room_keys is used for
+ getting all the keys in a backup version, all the keys for a room, or a
+ specific key.
+
+ Args:
+ user_id (str): the user whose backup we're querying
+ version (str): the version ID of the backup we're querying about
+ room_keys (dict[str, dict[str, iterable[str]]]): a map from
+ room ID -> {"session": [session ids]} indicating the session IDs
+ that we want to query
+
+ Returns:
+ Deferred[dict[str, dict[str, dict]]]: a map of room IDs to session IDs to room key
+ """
+
+ return self.runInteraction(
+ "get_e2e_room_keys_multi",
+ self._get_e2e_room_keys_multi_txn,
+ user_id,
+ version,
+ room_keys,
+ )
+
+ @staticmethod
+ def _get_e2e_room_keys_multi_txn(txn, user_id, version, room_keys):
+ if not room_keys:
+ return {}
+
+ where_clauses = []
+ params = [user_id, version]
+ for room_id, room in room_keys.items():
+ sessions = list(room["sessions"])
+ if not sessions:
+ continue
+ params.append(room_id)
+ params.extend(sessions)
+ where_clauses.append(
+ "(room_id = ? AND session_id IN (%s))"
+ % (",".join(["?" for _ in sessions]),)
+ )
+
+ # check if we're actually querying something
+ if not where_clauses:
+ return {}
+
+ sql = """
+ SELECT room_id, session_id, first_message_index, forwarded_count,
+ is_verified, session_data
+ FROM e2e_room_keys
+ WHERE user_id = ? AND version = ? AND (%s)
+ """ % (
+ " OR ".join(where_clauses)
+ )
+
+ txn.execute(sql, params)
+
+ ret = {}
+
+ for row in txn:
+ room_id = row[0]
+ session_id = row[1]
+ ret.setdefault(room_id, {})
+ ret[room_id][session_id] = {
+ "first_message_index": row[2],
+ "forwarded_count": row[3],
+ "is_verified": row[4],
+ "session_data": json.loads(row[5]),
+ }
+
+ return ret
+
+ def count_e2e_room_keys(self, user_id, version):
+ """Get the number of keys in a backup version.
+
+ Args:
+ user_id (str): the user whose backup we're querying
+ version (str): the version ID of the backup we're querying about
+ """
+
+ return self._simple_select_one_onecol(
+ table="e2e_room_keys",
+ keyvalues={"user_id": user_id, "version": version},
+ retcol="COUNT(*)",
+ desc="count_e2e_room_keys",
+ )
+
@trace
@defer.inlineCallbacks
def delete_e2e_room_keys(self, user_id, version, room_id=None, session_id=None):
@@ -219,6 +298,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
version(str)
algorithm(str)
auth_data(object): opaque dict supplied by the client
+ etag(int): tag of the keys in the backup
"""
def _get_e2e_room_keys_version_info_txn(txn):
@@ -236,10 +316,12 @@ class EndToEndRoomKeyStore(SQLBaseStore):
txn,
table="e2e_room_keys_versions",
keyvalues={"user_id": user_id, "version": this_version, "deleted": 0},
- retcols=("version", "algorithm", "auth_data"),
+ retcols=("version", "algorithm", "auth_data", "etag"),
)
result["auth_data"] = json.loads(result["auth_data"])
result["version"] = str(result["version"])
+ if result["etag"] is None:
+ result["etag"] = 0
return result
return self.runInteraction(
@@ -288,21 +370,33 @@ class EndToEndRoomKeyStore(SQLBaseStore):
)
@trace
- def update_e2e_room_keys_version(self, user_id, version, info):
+ def update_e2e_room_keys_version(
+ self, user_id, version, info=None, version_etag=None
+ ):
"""Update a given backup version
Args:
user_id(str): the user whose backup version we're updating
version(str): the version ID of the backup version we're updating
- info(dict): the new backup version info to store
+ info (dict): the new backup version info to store. If None, then
+ the backup version info is not updated
+ version_etag (Optional[int]): etag of the keys in the backup. If
+ None, then the etag is not updated
"""
+ updatevalues = {}
- return self._simple_update(
- table="e2e_room_keys_versions",
- keyvalues={"user_id": user_id, "version": version},
- updatevalues={"auth_data": json.dumps(info["auth_data"])},
- desc="update_e2e_room_keys_version",
- )
+ if info is not None and "auth_data" in info:
+ updatevalues["auth_data"] = json.dumps(info["auth_data"])
+ if version_etag is not None:
+ updatevalues["etag"] = version_etag
+
+ if updatevalues:
+ return self._simple_update(
+ table="e2e_room_keys_versions",
+ keyvalues={"user_id": user_id, "version": version},
+ updatevalues=updatevalues,
+ desc="update_e2e_room_keys_version",
+ )
@trace
def delete_e2e_room_keys_version(self, user_id, version=None):
diff --git a/synapse/storage/data_stores/main/end_to_end_keys.py b/synapse/storage/data_stores/main/end_to_end_keys.py
index 073412a78d..d8ad59ad93 100644
--- a/synapse/storage/data_stores/main/end_to_end_keys.py
+++ b/synapse/storage/data_stores/main/end_to_end_keys.py
@@ -138,9 +138,9 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
result.setdefault(user_id, {})[device_id] = None
# get signatures on the device
- signature_sql = (
- "SELECT * " " FROM e2e_cross_signing_signatures " " WHERE %s"
- ) % (" OR ".join("(" + q + ")" for q in signature_query_clauses))
+ signature_sql = ("SELECT * FROM e2e_cross_signing_signatures WHERE %s") % (
+ " OR ".join("(" + q + ")" for q in signature_query_clauses)
+ )
txn.execute(signature_sql, signature_query_params)
rows = self.cursor_to_dict(txn)
diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py
index 301f8ea128..2737a1d3ae 100644
--- a/synapse/storage/data_stores/main/events.py
+++ b/synapse/storage/data_stores/main/events.py
@@ -713,9 +713,7 @@ class EventsStore(
metadata_json = encode_json(event.internal_metadata.get_dict())
- sql = (
- "UPDATE event_json SET internal_metadata = ?" " WHERE event_id = ?"
- )
+ sql = "UPDATE event_json SET internal_metadata = ? WHERE event_id = ?"
txn.execute(sql, (metadata_json, event.event_id))
# Add an entry to the ex_outlier_stream table to replicate the
@@ -732,7 +730,7 @@ class EventsStore(
},
)
- sql = "UPDATE events SET outlier = ?" " WHERE event_id = ?"
+ sql = "UPDATE events SET outlier = ? WHERE event_id = ?"
txn.execute(sql, (False, event.event_id))
# Update the event_backward_extremities table now that this
@@ -929,6 +927,9 @@ class EventsStore(
elif event.type == EventTypes.Redaction:
# Insert into the redactions table.
self._store_redaction(txn, event)
+ elif event.type == EventTypes.Retention:
+ # Update the room_retention table.
+ self._store_retention_policy_for_room_txn(txn, event)
self._handle_event_relations(txn, event)
@@ -1375,6 +1376,10 @@ class EventsStore(
if True, we will delete local events as well as remote ones
(instead of just marking them as outliers and deleting their
state groups).
+
+ Returns:
+ Deferred[set[int]]: The set of state groups that are referenced by
+ deleted events.
"""
return self.runInteraction(
@@ -1475,7 +1480,7 @@ class EventsStore(
# We do joins against events_to_purge for e.g. calculating state
# groups to purge, etc., so lets make an index.
- txn.execute("CREATE INDEX events_to_purge_id" " ON events_to_purge(event_id)")
+ txn.execute("CREATE INDEX events_to_purge_id ON events_to_purge(event_id)")
txn.execute("SELECT event_id, should_delete FROM events_to_purge")
event_rows = txn.fetchall()
@@ -1511,11 +1516,10 @@ class EventsStore(
[(room_id, event_id) for event_id, in new_backwards_extrems],
)
- logger.info("[purge] finding redundant state groups")
+ logger.info("[purge] finding state groups referenced by deleted events")
# Get all state groups that are referenced by events that are to be
- # deleted. We then go and check if they are referenced by other events
- # or state groups, and if not we delete them.
+ # deleted.
txn.execute(
"""
SELECT DISTINCT state_group FROM events_to_purge
@@ -1528,60 +1532,6 @@ class EventsStore(
"[purge] found %i referenced state groups", len(referenced_state_groups)
)
- logger.info("[purge] finding state groups that can be deleted")
-
- _ = self._find_unreferenced_groups_during_purge(txn, referenced_state_groups)
- state_groups_to_delete, remaining_state_groups = _
-
- logger.info(
- "[purge] found %i state groups to delete", len(state_groups_to_delete)
- )
-
- logger.info(
- "[purge] de-delta-ing %i remaining state groups",
- len(remaining_state_groups),
- )
-
- # Now we turn the state groups that reference to-be-deleted state
- # groups to non delta versions.
- for sg in remaining_state_groups:
- logger.info("[purge] de-delta-ing remaining state group %s", sg)
- curr_state = self._get_state_groups_from_groups_txn(txn, [sg])
- curr_state = curr_state[sg]
-
- self._simple_delete_txn(
- txn, table="state_groups_state", keyvalues={"state_group": sg}
- )
-
- self._simple_delete_txn(
- txn, table="state_group_edges", keyvalues={"state_group": sg}
- )
-
- self._simple_insert_many_txn(
- txn,
- table="state_groups_state",
- values=[
- {
- "state_group": sg,
- "room_id": room_id,
- "type": key[0],
- "state_key": key[1],
- "event_id": state_id,
- }
- for key, state_id in iteritems(curr_state)
- ],
- )
-
- logger.info("[purge] removing redundant state groups")
- txn.executemany(
- "DELETE FROM state_groups_state WHERE state_group = ?",
- ((sg,) for sg in state_groups_to_delete),
- )
- txn.executemany(
- "DELETE FROM state_groups WHERE id = ?",
- ((sg,) for sg in state_groups_to_delete),
- )
-
logger.info("[purge] removing events from event_to_state_groups")
txn.execute(
"DELETE FROM event_to_state_groups "
@@ -1668,138 +1618,35 @@ class EventsStore(
logger.info("[purge] done")
- def _find_unreferenced_groups_during_purge(self, txn, state_groups):
- """Used when purging history to figure out which state groups can be
- deleted and which need to be de-delta'ed (due to one of its prev groups
- being scheduled for deletion).
-
- Args:
- txn
- state_groups (set[int]): Set of state groups referenced by events
- that are going to be deleted.
-
- Returns:
- tuple[set[int], set[int]]: The set of state groups that can be
- deleted and the set of state groups that need to be de-delta'ed
- """
- # Graph of state group -> previous group
- graph = {}
-
- # Set of events that we have found to be referenced by events
- referenced_groups = set()
-
- # Set of state groups we've already seen
- state_groups_seen = set(state_groups)
-
- # Set of state groups to handle next.
- next_to_search = set(state_groups)
- while next_to_search:
- # We bound size of groups we're looking up at once, to stop the
- # SQL query getting too big
- if len(next_to_search) < 100:
- current_search = next_to_search
- next_to_search = set()
- else:
- current_search = set(itertools.islice(next_to_search, 100))
- next_to_search -= current_search
-
- # Check if state groups are referenced
- sql = """
- SELECT DISTINCT state_group FROM event_to_state_groups
- LEFT JOIN events_to_purge AS ep USING (event_id)
- WHERE ep.event_id IS NULL AND
- """
- clause, args = make_in_list_sql_clause(
- txn.database_engine, "state_group", current_search
- )
- txn.execute(sql + clause, list(args))
-
- referenced = set(sg for sg, in txn)
- referenced_groups |= referenced
-
- # We don't continue iterating up the state group graphs for state
- # groups that are referenced.
- current_search -= referenced
-
- rows = self._simple_select_many_txn(
- txn,
- table="state_group_edges",
- column="prev_state_group",
- iterable=current_search,
- keyvalues={},
- retcols=("prev_state_group", "state_group"),
- )
-
- prevs = set(row["state_group"] for row in rows)
- # We don't bother re-handling groups we've already seen
- prevs -= state_groups_seen
- next_to_search |= prevs
- state_groups_seen |= prevs
-
- for row in rows:
- # Note: Each state group can have at most one prev group
- graph[row["state_group"]] = row["prev_state_group"]
-
- to_delete = state_groups_seen - referenced_groups
-
- to_dedelta = set()
- for sg in referenced_groups:
- prev_sg = graph.get(sg)
- if prev_sg and prev_sg in to_delete:
- to_dedelta.add(sg)
-
- return to_delete, to_dedelta
+ return referenced_state_groups
def purge_room(self, room_id):
"""Deletes all record of a room
Args:
- room_id (str):
+ room_id (str)
+
+ Returns:
+ Deferred[List[int]]: The list of state groups to delete.
"""
return self.runInteraction("purge_room", self._purge_room_txn, room_id)
def _purge_room_txn(self, txn, room_id):
- # first we have to delete the state groups states
- logger.info("[purge] removing %s from state_groups_state", room_id)
-
+ # First we fetch all the state groups that should be deleted, before
+ # we delete that information.
txn.execute(
"""
- DELETE FROM state_groups_state WHERE state_group IN (
- SELECT state_group FROM events JOIN event_to_state_groups USING(event_id)
- WHERE events.room_id=?
- )
+ SELECT DISTINCT state_group FROM events
+ INNER JOIN event_to_state_groups USING(event_id)
+ WHERE events.room_id = ?
""",
(room_id,),
)
- # ... and the state group edges
- logger.info("[purge] removing %s from state_group_edges", room_id)
+ state_groups = [row[0] for row in txn]
- txn.execute(
- """
- DELETE FROM state_group_edges WHERE state_group IN (
- SELECT state_group FROM events JOIN event_to_state_groups USING(event_id)
- WHERE events.room_id=?
- )
- """,
- (room_id,),
- )
-
- # ... and the state groups
- logger.info("[purge] removing %s from state_groups", room_id)
-
- txn.execute(
- """
- DELETE FROM state_groups WHERE id IN (
- SELECT state_group FROM events JOIN event_to_state_groups USING(event_id)
- WHERE events.room_id=?
- )
- """,
- (room_id,),
- )
-
- # and then tables which lack an index on room_id but have one on event_id
+ # Now we delete tables which lack an index on room_id but have one on event_id
for table in (
"event_auth",
"event_edges",
@@ -1887,6 +1734,165 @@ class EventsStore(
logger.info("[purge] done")
+ return state_groups
+
+ def purge_unreferenced_state_groups(
+ self, room_id: str, state_groups_to_delete
+ ) -> defer.Deferred:
+ """Deletes no longer referenced state groups and de-deltas any state
+ groups that reference them.
+
+ Args:
+ room_id: The room the state groups belong to (must all be in the
+ same room).
+ state_groups_to_delete (Collection[int]): Set of all state groups
+ to delete.
+ """
+
+ return self.runInteraction(
+ "purge_unreferenced_state_groups",
+ self._purge_unreferenced_state_groups,
+ room_id,
+ state_groups_to_delete,
+ )
+
+ def _purge_unreferenced_state_groups(self, txn, room_id, state_groups_to_delete):
+ logger.info(
+ "[purge] found %i state groups to delete", len(state_groups_to_delete)
+ )
+
+ rows = self._simple_select_many_txn(
+ txn,
+ table="state_group_edges",
+ column="prev_state_group",
+ iterable=state_groups_to_delete,
+ keyvalues={},
+ retcols=("state_group",),
+ )
+
+ remaining_state_groups = set(
+ row["state_group"]
+ for row in rows
+ if row["state_group"] not in state_groups_to_delete
+ )
+
+ logger.info(
+ "[purge] de-delta-ing %i remaining state groups",
+ len(remaining_state_groups),
+ )
+
+ # Now we turn the state groups that reference to-be-deleted state
+ # groups to non delta versions.
+ for sg in remaining_state_groups:
+ logger.info("[purge] de-delta-ing remaining state group %s", sg)
+ curr_state = self._get_state_groups_from_groups_txn(txn, [sg])
+ curr_state = curr_state[sg]
+
+ self._simple_delete_txn(
+ txn, table="state_groups_state", keyvalues={"state_group": sg}
+ )
+
+ self._simple_delete_txn(
+ txn, table="state_group_edges", keyvalues={"state_group": sg}
+ )
+
+ self._simple_insert_many_txn(
+ txn,
+ table="state_groups_state",
+ values=[
+ {
+ "state_group": sg,
+ "room_id": room_id,
+ "type": key[0],
+ "state_key": key[1],
+ "event_id": state_id,
+ }
+ for key, state_id in iteritems(curr_state)
+ ],
+ )
+
+ logger.info("[purge] removing redundant state groups")
+ txn.executemany(
+ "DELETE FROM state_groups_state WHERE state_group = ?",
+ ((sg,) for sg in state_groups_to_delete),
+ )
+ txn.executemany(
+ "DELETE FROM state_groups WHERE id = ?",
+ ((sg,) for sg in state_groups_to_delete),
+ )
+
+ @defer.inlineCallbacks
+ def get_previous_state_groups(self, state_groups):
+ """Fetch the previous groups of the given state groups.
+
+ Args:
+ state_groups (Iterable[int])
+
+ Returns:
+ Deferred[dict[int, int]]: mapping from state group to previous
+ state group.
+ """
+
+ rows = yield self._simple_select_many_batch(
+ table="state_group_edges",
+ column="prev_state_group",
+ iterable=state_groups,
+ keyvalues={},
+ retcols=("prev_state_group", "state_group"),
+ desc="get_previous_state_groups",
+ )
+
+ return {row["state_group"]: row["prev_state_group"] for row in rows}
+
+ def purge_room_state(self, room_id, state_groups_to_delete):
+ """Deletes all record of a room from state tables
+
+ Args:
+ room_id (str):
+ state_groups_to_delete (list[int]): State groups to delete
+ """
+
+ return self.runInteraction(
+ "purge_room_state",
+ self._purge_room_state_txn,
+ room_id,
+ state_groups_to_delete,
+ )
+
+ def _purge_room_state_txn(self, txn, room_id, state_groups_to_delete):
+ # first we have to delete the state groups states
+ logger.info("[purge] removing %s from state_groups_state", room_id)
+
+ self._simple_delete_many_txn(
+ txn,
+ table="state_groups_state",
+ column="state_group",
+ iterable=state_groups_to_delete,
+ keyvalues={},
+ )
+
+ # ... and the state group edges
+ logger.info("[purge] removing %s from state_group_edges", room_id)
+
+ self._simple_delete_many_txn(
+ txn,
+ table="state_group_edges",
+ column="state_group",
+ iterable=state_groups_to_delete,
+ keyvalues={},
+ )
+
+ # ... and the state groups
+ logger.info("[purge] removing %s from state_groups", room_id)
+
+ self._simple_delete_many_txn(
+ txn,
+ table="state_groups",
+ column="id",
+ iterable=state_groups_to_delete,
+ keyvalues={},
+ )
+
async def is_event_after(self, event_id1, event_id2):
"""Returns True if event_id1 is after event_id2 in the stream
"""
diff --git a/synapse/storage/data_stores/main/events_bg_updates.py b/synapse/storage/data_stores/main/events_bg_updates.py
index 51352b9966..aa87f9abc5 100644
--- a/synapse/storage/data_stores/main/events_bg_updates.py
+++ b/synapse/storage/data_stores/main/events_bg_updates.py
@@ -21,6 +21,7 @@ from canonicaljson import json
from twisted.internet import defer
+from synapse.api.constants import EventContentFields
from synapse.storage._base import make_in_list_sql_clause
from synapse.storage.background_updates import BackgroundUpdateStore
@@ -85,6 +86,10 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
"event_fix_redactions_bytes", self._event_fix_redactions_bytes
)
+ self.register_background_update_handler(
+ "event_store_labels", self._event_store_labels
+ )
+
@defer.inlineCallbacks
def _background_reindex_fields_sender(self, progress, batch_size):
target_min_stream_id = progress["target_min_stream_id_inclusive"]
@@ -503,3 +508,68 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
yield self._end_background_update("event_fix_redactions_bytes")
return 1
+
+ @defer.inlineCallbacks
+ def _event_store_labels(self, progress, batch_size):
+ """Background update handler which will store labels for existing events."""
+ last_event_id = progress.get("last_event_id", "")
+
+ def _event_store_labels_txn(txn):
+ txn.execute(
+ """
+ SELECT event_id, json FROM event_json
+ LEFT JOIN event_labels USING (event_id)
+ WHERE event_id > ? AND label IS NULL
+ ORDER BY event_id LIMIT ?
+ """,
+ (last_event_id, batch_size),
+ )
+
+ results = list(txn)
+
+ nbrows = 0
+ last_row_event_id = ""
+ for (event_id, event_json_raw) in results:
+ try:
+ event_json = json.loads(event_json_raw)
+
+ self._simple_insert_many_txn(
+ txn=txn,
+ table="event_labels",
+ values=[
+ {
+ "event_id": event_id,
+ "label": label,
+ "room_id": event_json["room_id"],
+ "topological_ordering": event_json["depth"],
+ }
+ for label in event_json["content"].get(
+ EventContentFields.LABELS, []
+ )
+ if isinstance(label, str)
+ ],
+ )
+ except Exception as e:
+ logger.warning(
+ "Unable to load event %s (no labels will be imported): %s",
+ event_id,
+ e,
+ )
+
+ nbrows += 1
+ last_row_event_id = event_id
+
+ self._background_update_progress_txn(
+ txn, "event_store_labels", {"last_event_id": last_row_event_id}
+ )
+
+ return nbrows
+
+ num_rows = yield self.runInteraction(
+ desc="event_store_labels", func=_event_store_labels_txn
+ )
+
+ if not num_rows:
+ yield self._end_background_update("event_store_labels")
+
+ return num_rows
diff --git a/synapse/storage/data_stores/main/filtering.py b/synapse/storage/data_stores/main/filtering.py
index a2a2a67927..f05ace299a 100644
--- a/synapse/storage/data_stores/main/filtering.py
+++ b/synapse/storage/data_stores/main/filtering.py
@@ -55,7 +55,7 @@ class FilteringStore(SQLBaseStore):
if filter_id_response is not None:
return filter_id_response[0]
- sql = "SELECT MAX(filter_id) FROM user_filters " "WHERE user_id = ?"
+ sql = "SELECT MAX(filter_id) FROM user_filters WHERE user_id = ?"
txn.execute(sql, (user_localpart,))
max_id = txn.fetchone()[0]
if max_id is None:
diff --git a/synapse/storage/data_stores/main/group_server.py b/synapse/storage/data_stores/main/group_server.py
index b3a2771f1b..5ded539af8 100644
--- a/synapse/storage/data_stores/main/group_server.py
+++ b/synapse/storage/data_stores/main/group_server.py
@@ -553,6 +553,21 @@ class GroupServerStore(SQLBaseStore):
desc="remove_user_from_summary",
)
+ def get_local_groups_for_room(self, room_id):
+ """Get all of the local group that contain a given room
+ Args:
+ room_id (str): The ID of a room
+ Returns:
+ Deferred[list[str]]: A twisted.Deferred containing a list of group ids
+ containing this room
+ """
+ return self._simple_select_onecol(
+ table="group_rooms",
+ keyvalues={"room_id": room_id},
+ retcol="group_id",
+ desc="get_local_groups_for_room",
+ )
+
def get_users_for_summary_by_role(self, group_id, include_private=False):
"""Get the users and roles that should be included in a summary request
diff --git a/synapse/storage/data_stores/main/media_repository.py b/synapse/storage/data_stores/main/media_repository.py
index 84b5f3ad5e..0f2887bdce 100644
--- a/synapse/storage/data_stores/main/media_repository.py
+++ b/synapse/storage/data_stores/main/media_repository.py
@@ -337,7 +337,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
if len(media_ids) == 0:
return
- sql = "DELETE FROM local_media_repository_url_cache" " WHERE media_id = ?"
+ sql = "DELETE FROM local_media_repository_url_cache WHERE media_id = ?"
def _delete_url_cache_txn(txn):
txn.executemany(sql, [(media_id,) for media_id in media_ids])
@@ -365,11 +365,11 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
return
def _delete_url_cache_media_txn(txn):
- sql = "DELETE FROM local_media_repository" " WHERE media_id = ?"
+ sql = "DELETE FROM local_media_repository WHERE media_id = ?"
txn.executemany(sql, [(media_id,) for media_id in media_ids])
- sql = "DELETE FROM local_media_repository_thumbnails" " WHERE media_id = ?"
+ sql = "DELETE FROM local_media_repository_thumbnails WHERE media_id = ?"
txn.executemany(sql, [(media_id,) for media_id in media_ids])
diff --git a/synapse/storage/data_stores/main/receipts.py b/synapse/storage/data_stores/main/receipts.py
index 0c24430f28..8b17334ff4 100644
--- a/synapse/storage/data_stores/main/receipts.py
+++ b/synapse/storage/data_stores/main/receipts.py
@@ -280,7 +280,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
args.append(limit)
txn.execute(sql, args)
- return (r[0:5] + (json.loads(r[5]),) for r in txn)
+ return list(r[0:5] + (json.loads(r[5]),) for r in txn)
return self.runInteraction(
"get_all_updated_receipts", get_all_updated_receipts_txn
diff --git a/synapse/storage/data_stores/main/registration.py b/synapse/storage/data_stores/main/registration.py
index f70d41ecab..98cf6427c3 100644
--- a/synapse/storage/data_stores/main/registration.py
+++ b/synapse/storage/data_stores/main/registration.py
@@ -19,7 +19,6 @@ import logging
import re
from six import iterkeys
-from six.moves import range
from twisted.internet import defer
from twisted.internet.defer import Deferred
@@ -377,9 +376,7 @@ class RegistrationWorkerStore(SQLBaseStore):
"""
def f(txn):
- sql = (
- "SELECT name, password_hash FROM users" " WHERE lower(name) = lower(?)"
- )
+ sql = "SELECT name, password_hash FROM users WHERE lower(name) = lower(?)"
txn.execute(sql, (user_id,))
return dict(txn)
@@ -484,30 +481,25 @@ class RegistrationWorkerStore(SQLBaseStore):
"""
Gets the localpart of the next generated user ID.
- Generated user IDs are integers, and we aim for them to be as small as
- we can. Unfortunately, it's possible some of them are already taken by
- existing users, and there may be gaps in the already taken range. This
- function returns the start of the first allocatable gap. This is to
- avoid the case of ID 10000000 being pre-allocated, so us wasting the
- first (and shortest) many generated user IDs.
+ Generated user IDs are integers, so we find the largest integer user ID
+ already taken and return that plus one.
"""
def _find_next_generated_user_id(txn):
- # We bound between '@1' and '@a' to avoid pulling the entire table
+ # We bound between '@0' and '@a' to avoid pulling the entire table
# out.
- txn.execute("SELECT name FROM users WHERE '@1' <= name AND name < '@a'")
+ txn.execute("SELECT name FROM users WHERE '@0' <= name AND name < '@a'")
regex = re.compile(r"^@(\d+):")
- found = set()
+ max_found = 0
for (user_id,) in txn:
match = regex.search(user_id)
if match:
- found.add(int(match.group(1)))
- for i in range(len(found) + 1):
- if i not in found:
- return i
+ max_found = max(int(match.group(1)), max_found)
+
+ return max_found + 1
return (
(
@@ -577,6 +569,19 @@ class RegistrationWorkerStore(SQLBaseStore):
return self._simple_delete(
"user_threepids",
keyvalues={"user_id": user_id, "medium": medium, "address": address},
+ desc="user_delete_threepid",
+ )
+
+ def user_delete_threepids(self, user_id: str):
+ """Delete all threepid this user has bound
+
+ Args:
+ user_id: The user id to delete all threepids of
+
+ """
+ return self._simple_delete(
+ "user_threepids",
+ keyvalues={"user_id": user_id},
desc="user_delete_threepids",
)
diff --git a/synapse/storage/data_stores/main/room.py b/synapse/storage/data_stores/main/room.py
index 67bb1b6f60..b7f9024811 100644
--- a/synapse/storage/data_stores/main/room.py
+++ b/synapse/storage/data_stores/main/room.py
@@ -19,10 +19,13 @@ import logging
import re
from typing import Optional, Tuple
+from six import integer_types
+
from canonicaljson import json
from twisted.internet import defer
+from synapse.api.constants import EventTypes
from synapse.api.errors import StoreError
from synapse.storage._base import SQLBaseStore
from synapse.storage.data_stores.main.search import SearchStore
@@ -300,8 +303,141 @@ class RoomWorkerStore(SQLBaseStore):
else:
return None
+ @cachedInlineCallbacks()
+ def get_retention_policy_for_room(self, room_id):
+ """Get the retention policy for a given room.
+
+ If no retention policy has been found for this room, returns a policy defined
+ by the configured default policy (which has None as both the 'min_lifetime' and
+ the 'max_lifetime' if no default policy has been defined in the server's
+ configuration).
+
+ Args:
+ room_id (str): The ID of the room to get the retention policy of.
+
+ Returns:
+ dict[int, int]: "min_lifetime" and "max_lifetime" for this room.
+ """
+
+ def get_retention_policy_for_room_txn(txn):
+ txn.execute(
+ """
+ SELECT min_lifetime, max_lifetime FROM room_retention
+ INNER JOIN current_state_events USING (event_id, room_id)
+ WHERE room_id = ?;
+ """,
+ (room_id,),
+ )
+
+ return self.cursor_to_dict(txn)
+
+ ret = yield self.runInteraction(
+ "get_retention_policy_for_room", get_retention_policy_for_room_txn,
+ )
+
+ # If we don't know this room ID, ret will be None, in this case return the default
+ # policy.
+ if not ret:
+ defer.returnValue(
+ {
+ "min_lifetime": self.config.retention_default_min_lifetime,
+ "max_lifetime": self.config.retention_default_max_lifetime,
+ }
+ )
+
+ row = ret[0]
+
+ # If one of the room's policy's attributes isn't defined, use the matching
+ # attribute from the default policy.
+ # The default values will be None if no default policy has been defined, or if one
+ # of the attributes is missing from the default policy.
+ if row["min_lifetime"] is None:
+ row["min_lifetime"] = self.config.retention_default_min_lifetime
+
+ if row["max_lifetime"] is None:
+ row["max_lifetime"] = self.config.retention_default_max_lifetime
+
+ defer.returnValue(row)
+
class RoomStore(RoomWorkerStore, SearchStore):
+ def __init__(self, db_conn, hs):
+ super(RoomStore, self).__init__(db_conn, hs)
+
+ self.config = hs.config
+
+ self.register_background_update_handler(
+ "insert_room_retention", self._background_insert_retention,
+ )
+
+ @defer.inlineCallbacks
+ def _background_insert_retention(self, progress, batch_size):
+ """Retrieves a list of all rooms within a range and inserts an entry for each of
+ them into the room_retention table.
+ NULLs the property's columns if missing from the retention event in the room's
+ state (or NULLs all of them if there's no retention event in the room's state),
+ so that we fall back to the server's retention policy.
+ """
+
+ last_room = progress.get("room_id", "")
+
+ def _background_insert_retention_txn(txn):
+ txn.execute(
+ """
+ SELECT state.room_id, state.event_id, events.json
+ FROM current_state_events as state
+ LEFT JOIN event_json AS events ON (state.event_id = events.event_id)
+ WHERE state.room_id > ? AND state.type = '%s'
+ ORDER BY state.room_id ASC
+ LIMIT ?;
+ """
+ % EventTypes.Retention,
+ (last_room, batch_size),
+ )
+
+ rows = self.cursor_to_dict(txn)
+
+ if not rows:
+ return True
+
+ for row in rows:
+ if not row["json"]:
+ retention_policy = {}
+ else:
+ ev = json.loads(row["json"])
+ retention_policy = json.dumps(ev["content"])
+
+ self._simple_insert_txn(
+ txn=txn,
+ table="room_retention",
+ values={
+ "room_id": row["room_id"],
+ "event_id": row["event_id"],
+ "min_lifetime": retention_policy.get("min_lifetime"),
+ "max_lifetime": retention_policy.get("max_lifetime"),
+ },
+ )
+
+ logger.info("Inserted %d rows into room_retention", len(rows))
+
+ self._background_update_progress_txn(
+ txn, "insert_room_retention", {"room_id": rows[-1]["room_id"]}
+ )
+
+ if batch_size > len(rows):
+ return True
+ else:
+ return False
+
+ end = yield self.runInteraction(
+ "insert_room_retention", _background_insert_retention_txn,
+ )
+
+ if end:
+ yield self._end_background_update("insert_room_retention")
+
+ defer.returnValue(batch_size)
+
@defer.inlineCallbacks
def store_room(self, room_id, room_creator_user_id, is_public):
"""Stores a room.
@@ -502,6 +638,35 @@ class RoomStore(RoomWorkerStore, SearchStore):
txn, event, "content.body", event.content["body"]
)
+ def _store_retention_policy_for_room_txn(self, txn, event):
+ if hasattr(event, "content") and (
+ "min_lifetime" in event.content or "max_lifetime" in event.content
+ ):
+ if (
+ "min_lifetime" in event.content
+ and not isinstance(event.content.get("min_lifetime"), integer_types)
+ ) or (
+ "max_lifetime" in event.content
+ and not isinstance(event.content.get("max_lifetime"), integer_types)
+ ):
+ # Ignore the event if one of the value isn't an integer.
+ return
+
+ self._simple_insert_txn(
+ txn=txn,
+ table="room_retention",
+ values={
+ "room_id": event.room_id,
+ "event_id": event.event_id,
+ "min_lifetime": event.content.get("min_lifetime"),
+ "max_lifetime": event.content.get("max_lifetime"),
+ },
+ )
+
+ self._invalidate_cache_and_stream(
+ txn, self.get_retention_policy_for_room, (event.room_id,)
+ )
+
def add_event_report(
self, room_id, event_id, user_id, reason, content, received_ts
):
@@ -683,3 +848,89 @@ class RoomStore(RoomWorkerStore, SearchStore):
remote_media_mxcs.append((hostname, media_id))
return local_media_mxcs, remote_media_mxcs
+
+ @defer.inlineCallbacks
+ def get_rooms_for_retention_period_in_range(
+ self, min_ms, max_ms, include_null=False
+ ):
+ """Retrieves all of the rooms within the given retention range.
+
+ Optionally includes the rooms which don't have a retention policy.
+
+ Args:
+ min_ms (int|None): Duration in milliseconds that define the lower limit of
+ the range to handle (exclusive). If None, doesn't set a lower limit.
+ max_ms (int|None): Duration in milliseconds that define the upper limit of
+ the range to handle (inclusive). If None, doesn't set an upper limit.
+ include_null (bool): Whether to include rooms which retention policy is NULL
+ in the returned set.
+
+ Returns:
+ dict[str, dict]: The rooms within this range, along with their retention
+ policy. The key is "room_id", and maps to a dict describing the retention
+ policy associated with this room ID. The keys for this nested dict are
+ "min_lifetime" (int|None), and "max_lifetime" (int|None).
+ """
+
+ def get_rooms_for_retention_period_in_range_txn(txn):
+ range_conditions = []
+ args = []
+
+ if min_ms is not None:
+ range_conditions.append("max_lifetime > ?")
+ args.append(min_ms)
+
+ if max_ms is not None:
+ range_conditions.append("max_lifetime <= ?")
+ args.append(max_ms)
+
+ # Do a first query which will retrieve the rooms that have a retention policy
+ # in their current state.
+ sql = """
+ SELECT room_id, min_lifetime, max_lifetime FROM room_retention
+ INNER JOIN current_state_events USING (event_id, room_id)
+ """
+
+ if len(range_conditions):
+ sql += " WHERE (" + " AND ".join(range_conditions) + ")"
+
+ if include_null:
+ sql += " OR max_lifetime IS NULL"
+
+ txn.execute(sql, args)
+
+ rows = self.cursor_to_dict(txn)
+ rooms_dict = {}
+
+ for row in rows:
+ rooms_dict[row["room_id"]] = {
+ "min_lifetime": row["min_lifetime"],
+ "max_lifetime": row["max_lifetime"],
+ }
+
+ if include_null:
+ # If required, do a second query that retrieves all of the rooms we know
+ # of so we can handle rooms with no retention policy.
+ sql = "SELECT DISTINCT room_id FROM current_state_events"
+
+ txn.execute(sql)
+
+ rows = self.cursor_to_dict(txn)
+
+ # If a room isn't already in the dict (i.e. it doesn't have a retention
+ # policy in its state), add it with a null policy.
+ for row in rows:
+ if row["room_id"] not in rooms_dict:
+ rooms_dict[row["room_id"]] = {
+ "min_lifetime": None,
+ "max_lifetime": None,
+ }
+
+ return rooms_dict
+
+ rooms = yield self.runInteraction(
+ "get_rooms_for_retention_period_in_range",
+ get_rooms_for_retention_period_in_range_txn,
+ )
+
+ defer.returnValue(rooms)
diff --git a/synapse/storage/data_stores/main/schema/delta/56/event_labels_background_update.sql b/synapse/storage/data_stores/main/schema/delta/56/event_labels_background_update.sql
new file mode 100644
index 0000000000..5f5e0499ae
--- /dev/null
+++ b/synapse/storage/data_stores/main/schema/delta/56/event_labels_background_update.sql
@@ -0,0 +1,17 @@
+/* Copyright 2019 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+INSERT INTO background_updates (update_name, progress_json) VALUES
+ ('event_store_labels', '{}');
diff --git a/synapse/storage/data_stores/main/schema/delta/56/room_key_etag.sql b/synapse/storage/data_stores/main/schema/delta/56/room_key_etag.sql
new file mode 100644
index 0000000000..7d70dd071e
--- /dev/null
+++ b/synapse/storage/data_stores/main/schema/delta/56/room_key_etag.sql
@@ -0,0 +1,17 @@
+/* Copyright 2019 Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- store the current etag of backup version
+ALTER TABLE e2e_room_keys_versions ADD COLUMN etag BIGINT;
diff --git a/synapse/storage/data_stores/main/schema/delta/56/room_retention.sql b/synapse/storage/data_stores/main/schema/delta/56/room_retention.sql
new file mode 100644
index 0000000000..ee6cdf7a14
--- /dev/null
+++ b/synapse/storage/data_stores/main/schema/delta/56/room_retention.sql
@@ -0,0 +1,33 @@
+/* Copyright 2019 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Tracks the retention policy of a room.
+-- A NULL max_lifetime or min_lifetime means that the matching property is not defined in
+-- the room's retention policy state event.
+-- If a room doesn't have a retention policy state event in its state, both max_lifetime
+-- and min_lifetime are NULL.
+CREATE TABLE IF NOT EXISTS room_retention(
+ room_id TEXT,
+ event_id TEXT,
+ min_lifetime BIGINT,
+ max_lifetime BIGINT,
+
+ PRIMARY KEY(room_id, event_id)
+);
+
+CREATE INDEX room_retention_max_lifetime_idx on room_retention(max_lifetime);
+
+INSERT INTO background_updates (update_name, progress_json) VALUES
+ ('insert_room_retention', '{}');
diff --git a/synapse/storage/data_stores/main/state.py b/synapse/storage/data_stores/main/state.py
index 3132848034..6a90daea31 100644
--- a/synapse/storage/data_stores/main/state.py
+++ b/synapse/storage/data_stores/main/state.py
@@ -285,7 +285,11 @@ class StateGroupWorkerStore(
room_id (str)
Returns:
- Deferred[unicode|None]: predecessor room id
+ Deferred[dict|None]: A dictionary containing the structure of the predecessor
+ field from the room's create event. The structure is subject to other servers,
+ but it is expected to be:
+ * room_id (str): The room ID of the predecessor room
+ * event_id (str): The ID of the tombstone event in the predecessor room
Raises:
NotFoundError if the room is unknown
@@ -991,6 +995,29 @@ class StateGroupWorkerStore(
return self.runInteraction("store_state_group", _store_state_group_txn)
+ @defer.inlineCallbacks
+ def get_referenced_state_groups(self, state_groups):
+ """Check if the state groups are referenced by events.
+
+ Args:
+ state_groups (Iterable[int])
+
+ Returns:
+ Deferred[set[int]]: The subset of state groups that are
+ referenced.
+ """
+
+ rows = yield self._simple_select_many_batch(
+ table="event_to_state_groups",
+ column="state_group",
+ iterable=state_groups,
+ keyvalues={},
+ retcols=("DISTINCT state_group",),
+ desc="get_referenced_state_groups",
+ )
+
+ return set(row["state_group"] for row in rows)
+
class StateBackgroundUpdateStore(
StateGroupBackgroundUpdateStore, BackgroundUpdateStore
@@ -1231,7 +1258,7 @@ class StateStore(StateGroupWorkerStore, StateBackgroundUpdateStore):
# if the event was rejected, just give it the same state as its
# predecessor.
if context.rejected:
- state_groups[event.event_id] = context.prev_group
+ state_groups[event.event_id] = context.state_group_before_event
continue
state_groups[event.event_id] = context.state_group
diff --git a/synapse/storage/data_stores/main/stream.py b/synapse/storage/data_stores/main/stream.py
index 9cac664880..21a410afd0 100644
--- a/synapse/storage/data_stores/main/stream.py
+++ b/synapse/storage/data_stores/main/stream.py
@@ -619,7 +619,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
def _get_max_topological_txn(self, txn, room_id):
txn.execute(
- "SELECT MAX(topological_ordering) FROM events" " WHERE room_id = ?",
+ "SELECT MAX(topological_ordering) FROM events WHERE room_id = ?",
(room_id,),
)
@@ -874,14 +874,38 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
args.append(int(limit))
- sql = (
- "SELECT DISTINCT event_id, topological_ordering, stream_ordering"
- " FROM events"
- " LEFT JOIN event_labels USING (event_id, room_id, topological_ordering)"
- " WHERE outlier = ? AND room_id = ? AND %(bounds)s"
- " ORDER BY topological_ordering %(order)s,"
- " stream_ordering %(order)s LIMIT ?"
- ) % {"bounds": bounds, "order": order}
+ select_keywords = "SELECT"
+ join_clause = ""
+ if event_filter and event_filter.labels:
+ # If we're not filtering on a label, then joining on event_labels will
+ # return as many row for a single event as the number of labels it has. To
+ # avoid this, only join if we're filtering on at least one label.
+ join_clause = """
+ LEFT JOIN event_labels
+ USING (event_id, room_id, topological_ordering)
+ """
+ if len(event_filter.labels) > 1:
+ # Using DISTINCT in this SELECT query is quite expensive, because it
+ # requires the engine to sort on the entire (not limited) result set,
+ # i.e. the entire events table. We only need to use it when we're
+ # filtering on more than two labels, because that's the only scenario
+ # in which we can possibly to get multiple times the same event ID in
+ # the results.
+ select_keywords += "DISTINCT"
+
+ sql = """
+ %(select_keywords)s event_id, topological_ordering, stream_ordering
+ FROM events
+ %(join_clause)s
+ WHERE outlier = ? AND room_id = ? AND %(bounds)s
+ ORDER BY topological_ordering %(order)s,
+ stream_ordering %(order)s LIMIT ?
+ """ % {
+ "select_keywords": select_keywords,
+ "join_clause": join_clause,
+ "bounds": bounds,
+ "order": order,
+ }
txn.execute(sql, args)
diff --git a/synapse/storage/data_stores/main/tags.py b/synapse/storage/data_stores/main/tags.py
index 10d1887f75..aa24339717 100644
--- a/synapse/storage/data_stores/main/tags.py
+++ b/synapse/storage/data_stores/main/tags.py
@@ -83,9 +83,7 @@ class TagsWorkerStore(AccountDataWorkerStore):
)
def get_tag_content(txn, tag_ids):
- sql = (
- "SELECT tag, content" " FROM room_tags" " WHERE user_id=? AND room_id=?"
- )
+ sql = "SELECT tag, content FROM room_tags WHERE user_id=? AND room_id=?"
results = []
for stream_id, user_id, room_id in tag_ids:
txn.execute(sql, (user_id, room_id))
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 2e7753820e..731e1c9d9c 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -447,7 +447,7 @@ def _apply_module_schema_files(cur, database_engine, modname, names_and_streams)
# Mark as done.
cur.execute(
database_engine.convert_param_style(
- "INSERT INTO applied_module_schemas (module_name, file)" " VALUES (?,?)"
+ "INSERT INTO applied_module_schemas (module_name, file) VALUES (?,?)"
),
(modname, name),
)
diff --git a/synapse/storage/purge_events.py b/synapse/storage/purge_events.py
new file mode 100644
index 0000000000..a368182034
--- /dev/null
+++ b/synapse/storage/purge_events.py
@@ -0,0 +1,117 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import itertools
+import logging
+
+from twisted.internet import defer
+
+logger = logging.getLogger(__name__)
+
+
+class PurgeEventsStorage(object):
+ """High level interface for purging rooms and event history.
+ """
+
+ def __init__(self, hs, stores):
+ self.stores = stores
+
+ @defer.inlineCallbacks
+ def purge_room(self, room_id: str):
+ """Deletes all record of a room
+ """
+
+ state_groups_to_delete = yield self.stores.main.purge_room(room_id)
+ yield self.stores.main.purge_room_state(room_id, state_groups_to_delete)
+
+ @defer.inlineCallbacks
+ def purge_history(self, room_id, token, delete_local_events):
+ """Deletes room history before a certain point
+
+ Args:
+ room_id (str):
+
+ token (str): A topological token to delete events before
+
+ delete_local_events (bool):
+ if True, we will delete local events as well as remote ones
+ (instead of just marking them as outliers and deleting their
+ state groups).
+ """
+ state_groups = yield self.stores.main.purge_history(
+ room_id, token, delete_local_events
+ )
+
+ logger.info("[purge] finding state groups that can be deleted")
+
+ sg_to_delete = yield self._find_unreferenced_groups(state_groups)
+
+ yield self.stores.main.purge_unreferenced_state_groups(room_id, sg_to_delete)
+
+ @defer.inlineCallbacks
+ def _find_unreferenced_groups(self, state_groups):
+ """Used when purging history to figure out which state groups can be
+ deleted.
+
+ Args:
+ state_groups (set[int]): Set of state groups referenced by events
+ that are going to be deleted.
+
+ Returns:
+ Deferred[set[int]] The set of state groups that can be deleted.
+ """
+ # Graph of state group -> previous group
+ graph = {}
+
+ # Set of events that we have found to be referenced by events
+ referenced_groups = set()
+
+ # Set of state groups we've already seen
+ state_groups_seen = set(state_groups)
+
+ # Set of state groups to handle next.
+ next_to_search = set(state_groups)
+ while next_to_search:
+ # We bound size of groups we're looking up at once, to stop the
+ # SQL query getting too big
+ if len(next_to_search) < 100:
+ current_search = next_to_search
+ next_to_search = set()
+ else:
+ current_search = set(itertools.islice(next_to_search, 100))
+ next_to_search -= current_search
+
+ referenced = yield self.stores.main.get_referenced_state_groups(
+ current_search
+ )
+ referenced_groups |= referenced
+
+ # We don't continue iterating up the state group graphs for state
+ # groups that are referenced.
+ current_search -= referenced
+
+ edges = yield self.stores.main.get_previous_state_groups(current_search)
+
+ prevs = set(edges.values())
+ # We don't bother re-handling groups we've already seen
+ prevs -= state_groups_seen
+ next_to_search |= prevs
+ state_groups_seen |= prevs
+
+ graph.update(edges)
+
+ to_delete = state_groups_seen - referenced_groups
+
+ return to_delete
|