diff options
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/__init__.py | 53 | ||||
-rw-r--r-- | synapse/storage/_base.py | 31 | ||||
-rw-r--r-- | synapse/storage/directory.py | 30 | ||||
-rw-r--r-- | synapse/storage/registration.py | 35 | ||||
-rw-r--r-- | synapse/storage/room.py | 10 | ||||
-rw-r--r-- | synapse/storage/roommember.py | 15 | ||||
-rw-r--r-- | synapse/storage/schema/delta/v4.sql | 12 | ||||
-rw-r--r-- | synapse/storage/schema/delta/v5.sql | 16 | ||||
-rw-r--r-- | synapse/storage/schema/im.sql | 3 | ||||
-rw-r--r-- | synapse/storage/schema/redactions.sql | 8 | ||||
-rw-r--r-- | synapse/storage/schema/users.sql | 14 | ||||
-rw-r--r-- | synapse/storage/stream.py | 33 |
12 files changed, 227 insertions, 33 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index ef98b6a444..6dadeb8cce 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -24,6 +24,7 @@ from synapse.api.events.room import ( RoomAddStateLevelEvent, RoomSendEventLevelEvent, RoomOpsPowerLevelsEvent, + RoomRedactionEvent, ) from synapse.util.logutils import log_function @@ -57,12 +58,13 @@ SCHEMAS = [ "im", "room_aliases", "keys", + "redactions", ] # Remember to update this number every time an incompatible change is made to # database schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 3 +SCHEMA_VERSION = 5 class _RollbackButIsFineException(Exception): @@ -104,7 +106,7 @@ class DataStore(RoomMemberStore, RoomStore, stream_ordering=stream_ordering, is_new_state=is_new_state, ) - except _RollbackButIsFineException as e: + except _RollbackButIsFineException: pass @defer.inlineCallbacks @@ -183,6 +185,8 @@ class DataStore(RoomMemberStore, RoomStore, self._store_send_event_level(txn, event) elif event.type == RoomOpsPowerLevelsEvent.TYPE: self._store_ops_level(txn, event) + elif event.type == RoomRedactionEvent.TYPE: + self._store_redaction(txn, event) vals = { "topological_ordering": event.depth, @@ -204,7 +208,7 @@ class DataStore(RoomMemberStore, RoomStore, unrec = { k: v for k, v in event.get_full_dict().items() - if k not in vals.keys() + if k not in vals.keys() and k not in ["redacted", "redacted_because"] } vals["unrecognized_keys"] = json.dumps(unrec) @@ -218,7 +222,8 @@ class DataStore(RoomMemberStore, RoomStore, ) raise _RollbackButIsFineException("_persist_event") - if is_new_state and hasattr(event, "state_key"): + is_state = hasattr(event, "state_key") and event.state_key is not None + if is_new_state and is_state: vals = { "event_id": event.event_id, "room_id": event.room_id, @@ -242,14 +247,28 @@ class DataStore(RoomMemberStore, RoomStore, } ) + def _store_redaction(self, txn, event): + txn.execute( + "INSERT OR IGNORE INTO redactions " + "(event_id, redacts) VALUES (?,?)", + (event.event_id, event.redacts) + ) + @defer.inlineCallbacks def get_current_state(self, room_id, event_type=None, state_key=""): + del_sql = ( + "SELECT event_id FROM redactions WHERE redacts = e.event_id " + "LIMIT 1" + ) + sql = ( - "SELECT e.* FROM events as e " + "SELECT e.*, (%(redacted)s) AS redacted FROM events as e " "INNER JOIN current_state_events as c ON e.event_id = c.event_id " "INNER JOIN state_events as s ON e.event_id = s.event_id " "WHERE c.room_id = ? " - ) + ) % { + "redacted": del_sql, + } if event_type: sql += " AND s.type = ? AND s.state_key = ? " @@ -276,6 +295,28 @@ class DataStore(RoomMemberStore, RoomStore, defer.returnValue(self.min_token) + def insert_client_ip(self, user, access_token, device_id, ip, user_agent): + return self._simple_insert( + "user_ips", + { + "user": user.to_string(), + "access_token": access_token, + "device_id": device_id, + "ip": ip, + "user_agent": user_agent, + "last_seen": int(self._clock.time_msec()), + } + ) + + def get_user_ip_and_agents(self, user): + return self._simple_select_list( + table="user_ips", + keyvalues={"user": user.to_string()}, + retcols=[ + "device_id", "access_token", "ip", "user_agent", "last_seen" + ], + ) + def snapshot_room(self, room_id, user_id, state_type=None, state_key=None): """Snapshot the room for an update by a user Args: diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 76ed7d06fb..889de2bedc 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -17,6 +17,7 @@ import logging from twisted.internet import defer from synapse.api.errors import StoreError +from synapse.api.events.utils import prune_event from synapse.util.logutils import log_function import collections @@ -345,7 +346,7 @@ class SQLBaseStore(object): return self.runInteraction(func) def _parse_event_from_row(self, row_dict): - d = copy.deepcopy({k: v for k, v in row_dict.items() if v}) + d = copy.deepcopy({k: v for k, v in row_dict.items()}) d.pop("stream_ordering", None) d.pop("topological_ordering", None) @@ -373,8 +374,8 @@ class SQLBaseStore(object): sql = "SELECT * FROM events WHERE event_id = ?" for ev in events: - if hasattr(ev, "prev_state"): - # Load previous state_content. + if hasattr(ev, "prev_state"): + # Load previous state_content. # TODO: Should we be pulling this out above? cursor = txn.execute(sql, (ev.prev_state,)) prevs = self.cursor_to_dict(cursor) @@ -382,8 +383,32 @@ class SQLBaseStore(object): prev = self._parse_event_from_row(prevs[0]) ev.prev_content = prev.content + if not hasattr(ev, "redacted"): + logger.debug("Doesn't have redacted key: %s", ev) + ev.redacted = self._has_been_redacted_txn(txn, ev) + + if ev.redacted: + # Get the redaction event. + sql = "SELECT * FROM events WHERE event_id = ?" + txn.execute(sql, (ev.redacted,)) + + del_evs = self._parse_events_txn( + txn, self.cursor_to_dict(txn) + ) + + if del_evs: + prune_event(ev) + ev.redacted_because = del_evs[0] + return events + def _has_been_redacted_txn(self, txn, event): + sql = "SELECT event_id FROM redactions WHERE redacts = ?" + txn.execute(sql, (event.event_id,)) + result = txn.fetchone() + return result[0] if result else None + + class Table(object): """ A base class used to store information about a particular table. """ diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py index 540eb4c2c4..52373a28a6 100644 --- a/synapse/storage/directory.py +++ b/synapse/storage/directory.py @@ -93,6 +93,36 @@ class DirectoryStore(SQLBaseStore): } ) + def delete_room_alias(self, room_alias): + return self.runInteraction( + self._delete_room_alias_txn, + room_alias, + ) + + def _delete_room_alias_txn(self, txn, room_alias): + cursor = txn.execute( + "SELECT room_id FROM room_aliases WHERE room_alias = ?", + (room_alias.to_string(),) + ) + + res = cursor.fetchone() + if res: + room_id = res[0] + else: + return None + + txn.execute( + "DELETE FROM room_aliases WHERE room_alias = ?", + (room_alias.to_string(),) + ) + + txn.execute( + "DELETE FROM room_alias_servers WHERE room_alias = ?", + (room_alias.to_string(),) + ) + + return room_id + def get_aliases_for_room(self, room_id): return self._simple_select_onecol( "room_aliases", diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index db20b1daa0..719806f82b 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -88,27 +88,40 @@ class RegistrationStore(SQLBaseStore): query, user_id ) - @defer.inlineCallbacks def get_user_by_token(self, token): """Get a user from the given access token. Args: token (str): The access token of a user. Returns: - str: The user ID of the user. + dict: Including the name (user_id), device_id and whether they are + an admin. Raises: StoreError if no user was found. """ - user_id = yield self.runInteraction(self._query_for_auth, - token) - defer.returnValue(user_id) + return self.runInteraction( + self._query_for_auth, + token + ) + + def is_server_admin(self, user): + return self._simple_select_one_onecol( + table="users", + keyvalues={"name": user.to_string()}, + retcol="admin", + ) def _query_for_auth(self, txn, token): - txn.execute("SELECT users.name FROM access_tokens LEFT JOIN users" + - " ON users.id = access_tokens.user_id WHERE token = ?", - [token]) - row = txn.fetchone() - if row: - return row[0] + sql = ( + "SELECT users.name, users.admin, access_tokens.device_id " + "FROM users " + "INNER JOIN access_tokens on users.id = access_tokens.user_id " + "WHERE token = ?" + ) + + cursor = txn.execute(sql, (token,)) + rows = self.cursor_to_dict(cursor) + if rows: + return rows[0] raise StoreError(404, "Token not found.") diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 5adf8cdf1b..8cd46334cf 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -27,7 +27,7 @@ import logging logger = logging.getLogger(__name__) -OpsLevel = collections.namedtuple("OpsLevel", ("ban_level", "kick_level")) +OpsLevel = collections.namedtuple("OpsLevel", ("ban_level", "kick_level", "redact_level")) class RoomStore(SQLBaseStore): @@ -189,7 +189,8 @@ class RoomStore(SQLBaseStore): def _get_ops_levels(self, txn, room_id): sql = ( - "SELECT ban_level, kick_level FROM room_ops_levels as r " + "SELECT ban_level, kick_level, redact_level " + "FROM room_ops_levels as r " "INNER JOIN current_state_events as c " "ON r.event_id = c.event_id " "WHERE c.room_id = ? " @@ -198,7 +199,7 @@ class RoomStore(SQLBaseStore): rows = txn.execute(sql, (room_id,)).fetchall() if len(rows) == 1: - return OpsLevel(rows[0][0], rows[0][1]) + return OpsLevel(rows[0][0], rows[0][1], rows[0][2]) else: return OpsLevel(None, None) @@ -326,6 +327,9 @@ class RoomStore(SQLBaseStore): if "ban_level" in event.content: content["ban_level"] = event.content["ban_level"] + if "redact_level" in event.content: + content["redact_level"] = event.content["redact_level"] + self._simple_insert_txn( txn, "room_ops_levels", diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 04b4067d03..ceeef5880e 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -18,7 +18,6 @@ from twisted.internet import defer from ._base import SQLBaseStore from synapse.api.constants import Membership -from synapse.util.logutils import log_function import logging @@ -182,14 +181,22 @@ class RoomMemberStore(SQLBaseStore): ) def _get_members_query_txn(self, txn, where_clause, where_values): + del_sql = ( + "SELECT event_id FROM redactions WHERE redacts = e.event_id " + "LIMIT 1" + ) + sql = ( - "SELECT e.* FROM events as e " + "SELECT e.*, (%(redacted)s) AS redacted FROM events as e " "INNER JOIN room_memberships as m " "ON e.event_id = m.event_id " "INNER JOIN current_state_events as c " "ON m.event_id = c.event_id " - "WHERE %s " - ) % (where_clause,) + "WHERE %(where)s " + ) % { + "redacted": del_sql, + "where": where_clause, + } txn.execute(sql, where_values) rows = self.cursor_to_dict(txn) diff --git a/synapse/storage/schema/delta/v4.sql b/synapse/storage/schema/delta/v4.sql new file mode 100644 index 0000000000..25d2ead450 --- /dev/null +++ b/synapse/storage/schema/delta/v4.sql @@ -0,0 +1,12 @@ +CREATE TABLE IF NOT EXISTS redactions ( + event_id TEXT NOT NULL, + redacts TEXT NOT NULL, + CONSTRAINT ev_uniq UNIQUE (event_id) +); + +CREATE INDEX IF NOT EXISTS redactions_event_id ON redactions (event_id); +CREATE INDEX IF NOT EXISTS redactions_redacts ON redactions (redacts); + +ALTER TABLE room_ops_levels ADD COLUMN redact_level INTEGER; + +PRAGMA user_version = 4; diff --git a/synapse/storage/schema/delta/v5.sql b/synapse/storage/schema/delta/v5.sql new file mode 100644 index 0000000000..af9df11aa9 --- /dev/null +++ b/synapse/storage/schema/delta/v5.sql @@ -0,0 +1,16 @@ + +CREATE TABLE IF NOT EXISTS user_ips ( + user TEXT NOT NULL, + access_token TEXT NOT NULL, + device_id TEXT, + ip TEXT NOT NULL, + user_agent TEXT NOT NULL, + last_seen INTEGER NOT NULL, + CONSTRAINT user_ip UNIQUE (user, access_token, ip, user_agent) ON CONFLICT REPLACE +); + +CREATE INDEX IF NOT EXISTS user_ips_user ON user_ips(user); + +ALTER TABLE users ADD COLUMN admin BOOL DEFAULT 0 NOT NULL; + +PRAGMA user_version = 5; diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/im.sql index 6ffea51310..3aa83f5c8c 100644 --- a/synapse/storage/schema/im.sql +++ b/synapse/storage/schema/im.sql @@ -150,7 +150,8 @@ CREATE TABLE IF NOT EXISTS room_ops_levels( event_id TEXT NOT NULL, room_id TEXT NOT NULL, ban_level INTEGER, - kick_level INTEGER + kick_level INTEGER, + redact_level INTEGER ); CREATE INDEX IF NOT EXISTS room_ops_levels_event_id ON room_ops_levels(event_id); diff --git a/synapse/storage/schema/redactions.sql b/synapse/storage/schema/redactions.sql new file mode 100644 index 0000000000..4c2829d05d --- /dev/null +++ b/synapse/storage/schema/redactions.sql @@ -0,0 +1,8 @@ +CREATE TABLE IF NOT EXISTS redactions ( + event_id TEXT NOT NULL, + redacts TEXT NOT NULL, + CONSTRAINT ev_uniq UNIQUE (event_id) +); + +CREATE INDEX IF NOT EXISTS redactions_event_id ON redactions (event_id); +CREATE INDEX IF NOT EXISTS redactions_redacts ON redactions (redacts); diff --git a/synapse/storage/schema/users.sql b/synapse/storage/schema/users.sql index 2519702971..8244f733bd 100644 --- a/synapse/storage/schema/users.sql +++ b/synapse/storage/schema/users.sql @@ -17,6 +17,7 @@ CREATE TABLE IF NOT EXISTS users( name TEXT, password_hash TEXT, creation_ts INTEGER, + admin BOOL DEFAULT 0 NOT NULL, UNIQUE(name) ON CONFLICT ROLLBACK ); @@ -29,3 +30,16 @@ CREATE TABLE IF NOT EXISTS access_tokens( FOREIGN KEY(user_id) REFERENCES users(id), UNIQUE(token) ON CONFLICT ROLLBACK ); + +CREATE TABLE IF NOT EXISTS user_ips ( + user TEXT NOT NULL, + access_token TEXT NOT NULL, + device_id TEXT, + ip TEXT NOT NULL, + user_agent TEXT NOT NULL, + last_seen INTEGER NOT NULL, + CONSTRAINT user_ip UNIQUE (user, access_token, ip, user_agent) ON CONFLICT REPLACE +); + +CREATE INDEX IF NOT EXISTS user_ips_user ON user_ips(user); + diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index a76fecf24f..d61f909939 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -157,6 +157,11 @@ class StreamStore(SQLBaseStore): "WHERE m.user_id = ? " ) + del_sql = ( + "SELECT event_id FROM redactions WHERE redacts = e.event_id " + "LIMIT 1" + ) + if limit: limit = max(limit, MAX_STREAM_SIZE) else: @@ -171,13 +176,14 @@ class StreamStore(SQLBaseStore): return sql = ( - "SELECT * FROM events as e WHERE " + "SELECT *, (%(redacted)s) AS redacted FROM events AS e WHERE " "((room_id IN (%(current)s)) OR " "(event_id IN (%(invites)s))) " "AND e.stream_ordering > ? AND e.stream_ordering <= ? " "AND e.outlier = 0 " "ORDER BY stream_ordering ASC LIMIT %(limit)d " ) % { + "redacted": del_sql, "current": current_room_membership_sql, "invites": membership_sql, "limit": limit @@ -224,11 +230,21 @@ class StreamStore(SQLBaseStore): else: limit_str = "" + del_sql = ( + "SELECT event_id FROM redactions WHERE redacts = events.event_id " + "LIMIT 1" + ) + sql = ( - "SELECT * FROM events " + "SELECT *, (%(redacted)s) AS redacted FROM events " "WHERE outlier = 0 AND room_id = ? AND %(bounds)s " "ORDER BY topological_ordering %(order)s, stream_ordering %(order)s %(limit)s " - ) % {"bounds": bounds, "order": order, "limit": limit_str} + ) % { + "redacted": del_sql, + "bounds": bounds, + "order": order, + "limit": limit_str + } rows = yield self._execute_and_decode( sql, @@ -257,11 +273,18 @@ class StreamStore(SQLBaseStore): with_feedback=False): # TODO (erikj): Handle compressed feedback + del_sql = ( + "SELECT event_id FROM redactions WHERE redacts = events.event_id " + "LIMIT 1" + ) + sql = ( - "SELECT * FROM events " + "SELECT *, (%(redacted)s) AS redacted FROM events " "WHERE room_id = ? AND stream_ordering <= ? " "ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ? " - ) + ) % { + "redacted": del_sql, + } rows = yield self._execute_and_decode( sql, |