summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorMark Haines <mark.haines@matrix.org>2014-09-30 17:55:06 +0100
committerMark Haines <mark.haines@matrix.org>2014-09-30 17:55:06 +0100
commit9605593d11b67199a98ed25f121a2af2e1c9587a (patch)
treedc57897bfcbd1591fa0dfb219e376353584654e9 /synapse/storage
parentSYN-75 Verify signatures on server to server transactions (diff)
parentpyflakes cleanup (diff)
downloadsynapse-9605593d11b67199a98ed25f121a2af2e1c9587a.tar.xz
Merge branch 'develop' into server2server_signing
Conflicts:
	synapse/storage/__init__.py
	tests/rest/test_presence.py
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/__init__.py53
-rw-r--r--synapse/storage/_base.py31
-rw-r--r--synapse/storage/directory.py30
-rw-r--r--synapse/storage/registration.py35
-rw-r--r--synapse/storage/room.py10
-rw-r--r--synapse/storage/roommember.py15
-rw-r--r--synapse/storage/schema/delta/v4.sql12
-rw-r--r--synapse/storage/schema/delta/v5.sql16
-rw-r--r--synapse/storage/schema/im.sql3
-rw-r--r--synapse/storage/schema/redactions.sql8
-rw-r--r--synapse/storage/schema/users.sql14
-rw-r--r--synapse/storage/stream.py33
12 files changed, 227 insertions, 33 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index ef98b6a444..6dadeb8cce 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -24,6 +24,7 @@ from synapse.api.events.room import (
     RoomAddStateLevelEvent,
     RoomSendEventLevelEvent,
     RoomOpsPowerLevelsEvent,
+    RoomRedactionEvent,
 )
 
 from synapse.util.logutils import log_function
@@ -57,12 +58,13 @@ SCHEMAS = [
     "im",
     "room_aliases",
     "keys",
+    "redactions",
 ]
 
 
 # Remember to update this number every time an incompatible change is made to
 # database schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 3
+SCHEMA_VERSION = 5
 
 
 class _RollbackButIsFineException(Exception):
@@ -104,7 +106,7 @@ class DataStore(RoomMemberStore, RoomStore,
                 stream_ordering=stream_ordering,
                 is_new_state=is_new_state,
             )
-        except _RollbackButIsFineException as e:
+        except _RollbackButIsFineException:
             pass
 
     @defer.inlineCallbacks
@@ -183,6 +185,8 @@ class DataStore(RoomMemberStore, RoomStore,
             self._store_send_event_level(txn, event)
         elif event.type == RoomOpsPowerLevelsEvent.TYPE:
             self._store_ops_level(txn, event)
+        elif event.type == RoomRedactionEvent.TYPE:
+            self._store_redaction(txn, event)
 
         vals = {
             "topological_ordering": event.depth,
@@ -204,7 +208,7 @@ class DataStore(RoomMemberStore, RoomStore,
         unrec = {
             k: v
             for k, v in event.get_full_dict().items()
-            if k not in vals.keys()
+            if k not in vals.keys() and k not in ["redacted", "redacted_because"]
         }
         vals["unrecognized_keys"] = json.dumps(unrec)
 
@@ -218,7 +222,8 @@ class DataStore(RoomMemberStore, RoomStore,
             )
             raise _RollbackButIsFineException("_persist_event")
 
-        if is_new_state and hasattr(event, "state_key"):
+        is_state = hasattr(event, "state_key") and event.state_key is not None
+        if is_new_state and is_state:
             vals = {
                 "event_id": event.event_id,
                 "room_id": event.room_id,
@@ -242,14 +247,28 @@ class DataStore(RoomMemberStore, RoomStore,
                 }
             )
 
+    def _store_redaction(self, txn, event):
+        txn.execute(
+            "INSERT OR IGNORE INTO redactions "
+            "(event_id, redacts) VALUES (?,?)",
+            (event.event_id, event.redacts)
+        )
+
     @defer.inlineCallbacks
     def get_current_state(self, room_id, event_type=None, state_key=""):
+        del_sql = (
+            "SELECT event_id FROM redactions WHERE redacts = e.event_id "
+            "LIMIT 1"
+        )
+
         sql = (
-            "SELECT e.* FROM events as e "
+            "SELECT e.*, (%(redacted)s) AS redacted FROM events as e "
             "INNER JOIN current_state_events as c ON e.event_id = c.event_id "
             "INNER JOIN state_events as s ON e.event_id = s.event_id "
             "WHERE c.room_id = ? "
-        )
+        ) % {
+            "redacted": del_sql,
+        }
 
         if event_type:
             sql += " AND s.type = ? AND s.state_key = ? "
@@ -276,6 +295,28 @@ class DataStore(RoomMemberStore, RoomStore,
 
         defer.returnValue(self.min_token)
 
+    def insert_client_ip(self, user, access_token, device_id, ip, user_agent):
+        return self._simple_insert(
+            "user_ips",
+            {
+                "user": user.to_string(),
+                "access_token": access_token,
+                "device_id": device_id,
+                "ip": ip,
+                "user_agent": user_agent,
+                "last_seen": int(self._clock.time_msec()),
+            }
+        )
+
+    def get_user_ip_and_agents(self, user):
+        return self._simple_select_list(
+            table="user_ips",
+            keyvalues={"user": user.to_string()},
+            retcols=[
+                "device_id", "access_token", "ip", "user_agent", "last_seen"
+            ],
+        )
+
     def snapshot_room(self, room_id, user_id, state_type=None, state_key=None):
         """Snapshot the room for an update by a user
         Args:
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 76ed7d06fb..889de2bedc 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -17,6 +17,7 @@ import logging
 from twisted.internet import defer
 
 from synapse.api.errors import StoreError
+from synapse.api.events.utils import prune_event
 from synapse.util.logutils import log_function
 
 import collections
@@ -345,7 +346,7 @@ class SQLBaseStore(object):
         return self.runInteraction(func)
 
     def _parse_event_from_row(self, row_dict):
-        d = copy.deepcopy({k: v for k, v in row_dict.items() if v})
+        d = copy.deepcopy({k: v for k, v in row_dict.items()})
 
         d.pop("stream_ordering", None)
         d.pop("topological_ordering", None)
@@ -373,8 +374,8 @@ class SQLBaseStore(object):
         sql = "SELECT * FROM events WHERE event_id = ?"
 
         for ev in events:
-           if hasattr(ev, "prev_state"):
-                # Load previous state_content. 
+            if hasattr(ev, "prev_state"):
+                # Load previous state_content.
                 # TODO: Should we be pulling this out above?
                 cursor = txn.execute(sql, (ev.prev_state,))
                 prevs = self.cursor_to_dict(cursor)
@@ -382,8 +383,32 @@ class SQLBaseStore(object):
                     prev = self._parse_event_from_row(prevs[0])
                     ev.prev_content = prev.content
 
+            if not hasattr(ev, "redacted"):
+                logger.debug("Doesn't have redacted key: %s", ev)
+                ev.redacted = self._has_been_redacted_txn(txn, ev)
+
+            if ev.redacted:
+                # Get the redaction event.
+                sql = "SELECT * FROM events WHERE event_id = ?"
+                txn.execute(sql, (ev.redacted,))
+
+                del_evs = self._parse_events_txn(
+                    txn, self.cursor_to_dict(txn)
+                )
+
+                if del_evs:
+                    prune_event(ev)
+                    ev.redacted_because = del_evs[0]
+
         return events
 
+    def _has_been_redacted_txn(self, txn, event):
+        sql = "SELECT event_id FROM redactions WHERE redacts = ?"
+        txn.execute(sql, (event.event_id,))
+        result = txn.fetchone()
+        return result[0] if result else None
+
+
 class Table(object):
     """ A base class used to store information about a particular table.
     """
diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py
index 540eb4c2c4..52373a28a6 100644
--- a/synapse/storage/directory.py
+++ b/synapse/storage/directory.py
@@ -93,6 +93,36 @@ class DirectoryStore(SQLBaseStore):
                 }
             )
 
+    def delete_room_alias(self, room_alias):
+        return self.runInteraction(
+            self._delete_room_alias_txn,
+            room_alias,
+        )
+
+    def _delete_room_alias_txn(self, txn, room_alias):
+        cursor = txn.execute(
+            "SELECT room_id FROM room_aliases WHERE room_alias = ?",
+            (room_alias.to_string(),)
+        )
+
+        res = cursor.fetchone()
+        if res:
+            room_id = res[0]
+        else:
+            return None
+
+        txn.execute(
+            "DELETE FROM room_aliases WHERE room_alias = ?",
+            (room_alias.to_string(),)
+        )
+
+        txn.execute(
+            "DELETE FROM room_alias_servers WHERE room_alias = ?",
+            (room_alias.to_string(),)
+        )
+
+        return room_id
+
     def get_aliases_for_room(self, room_id):
         return self._simple_select_onecol(
             "room_aliases",
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index db20b1daa0..719806f82b 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -88,27 +88,40 @@ class RegistrationStore(SQLBaseStore):
             query, user_id
         )
 
-    @defer.inlineCallbacks
     def get_user_by_token(self, token):
         """Get a user from the given access token.
 
         Args:
             token (str): The access token of a user.
         Returns:
-            str: The user ID of the user.
+            dict: Including the name (user_id), device_id and whether they are
+                an admin.
         Raises:
             StoreError if no user was found.
         """
-        user_id = yield self.runInteraction(self._query_for_auth,
-                                                     token)
-        defer.returnValue(user_id)
+        return self.runInteraction(
+            self._query_for_auth,
+            token
+        )
+
+    def is_server_admin(self, user):
+        return self._simple_select_one_onecol(
+            table="users",
+            keyvalues={"name": user.to_string()},
+            retcol="admin",
+        )
 
     def _query_for_auth(self, txn, token):
-        txn.execute("SELECT users.name FROM access_tokens LEFT JOIN users" +
-                    " ON users.id = access_tokens.user_id WHERE token = ?",
-                    [token])
-        row = txn.fetchone()
-        if row:
-            return row[0]
+        sql = (
+            "SELECT users.name, users.admin, access_tokens.device_id "
+            "FROM users "
+            "INNER JOIN access_tokens on users.id = access_tokens.user_id "
+            "WHERE token = ?"
+        )
+
+        cursor = txn.execute(sql, (token,))
+        rows = self.cursor_to_dict(cursor)
+        if rows:
+            return rows[0]
 
         raise StoreError(404, "Token not found.")
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 5adf8cdf1b..8cd46334cf 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -27,7 +27,7 @@ import logging
 logger = logging.getLogger(__name__)
 
 
-OpsLevel = collections.namedtuple("OpsLevel", ("ban_level", "kick_level"))
+OpsLevel = collections.namedtuple("OpsLevel", ("ban_level", "kick_level", "redact_level"))
 
 
 class RoomStore(SQLBaseStore):
@@ -189,7 +189,8 @@ class RoomStore(SQLBaseStore):
 
     def _get_ops_levels(self, txn, room_id):
         sql = (
-            "SELECT ban_level, kick_level FROM room_ops_levels as r "
+            "SELECT ban_level, kick_level, redact_level "
+            "FROM room_ops_levels as r "
             "INNER JOIN current_state_events as c "
             "ON r.event_id = c.event_id "
             "WHERE c.room_id = ? "
@@ -198,7 +199,7 @@ class RoomStore(SQLBaseStore):
         rows = txn.execute(sql, (room_id,)).fetchall()
 
         if len(rows) == 1:
-            return OpsLevel(rows[0][0], rows[0][1])
+            return OpsLevel(rows[0][0], rows[0][1], rows[0][2])
         else:
             return OpsLevel(None, None)
 
@@ -326,6 +327,9 @@ class RoomStore(SQLBaseStore):
         if "ban_level" in event.content:
             content["ban_level"] = event.content["ban_level"]
 
+        if "redact_level" in event.content:
+            content["redact_level"] = event.content["redact_level"]
+
         self._simple_insert_txn(
             txn,
             "room_ops_levels",
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 04b4067d03..ceeef5880e 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -18,7 +18,6 @@ from twisted.internet import defer
 from ._base import SQLBaseStore
 
 from synapse.api.constants import Membership
-from synapse.util.logutils import log_function
 
 import logging
 
@@ -182,14 +181,22 @@ class RoomMemberStore(SQLBaseStore):
         )
 
     def _get_members_query_txn(self, txn, where_clause, where_values):
+        del_sql = (
+            "SELECT event_id FROM redactions WHERE redacts = e.event_id "
+            "LIMIT 1"
+        )
+
         sql = (
-            "SELECT e.* FROM events as e "
+            "SELECT e.*, (%(redacted)s) AS redacted FROM events as e "
             "INNER JOIN room_memberships as m "
             "ON e.event_id = m.event_id "
             "INNER JOIN current_state_events as c "
             "ON m.event_id = c.event_id "
-            "WHERE %s "
-        ) % (where_clause,)
+            "WHERE %(where)s "
+        ) % {
+            "redacted": del_sql,
+            "where": where_clause,
+        }
 
         txn.execute(sql, where_values)
         rows = self.cursor_to_dict(txn)
diff --git a/synapse/storage/schema/delta/v4.sql b/synapse/storage/schema/delta/v4.sql
new file mode 100644
index 0000000000..25d2ead450
--- /dev/null
+++ b/synapse/storage/schema/delta/v4.sql
@@ -0,0 +1,12 @@
+CREATE TABLE IF NOT EXISTS redactions (
+    event_id TEXT NOT NULL,
+    redacts TEXT NOT NULL,
+    CONSTRAINT ev_uniq UNIQUE (event_id)
+);
+
+CREATE INDEX IF NOT EXISTS redactions_event_id ON redactions (event_id);
+CREATE INDEX IF NOT EXISTS redactions_redacts ON redactions (redacts);
+
+ALTER TABLE room_ops_levels ADD COLUMN redact_level INTEGER;
+
+PRAGMA user_version = 4;
diff --git a/synapse/storage/schema/delta/v5.sql b/synapse/storage/schema/delta/v5.sql
new file mode 100644
index 0000000000..af9df11aa9
--- /dev/null
+++ b/synapse/storage/schema/delta/v5.sql
@@ -0,0 +1,16 @@
+
+CREATE TABLE IF NOT EXISTS user_ips (
+    user TEXT NOT NULL,
+    access_token TEXT NOT NULL,
+    device_id TEXT,
+    ip TEXT NOT NULL,
+    user_agent TEXT NOT NULL,
+    last_seen INTEGER NOT NULL,
+    CONSTRAINT user_ip UNIQUE (user, access_token, ip, user_agent) ON CONFLICT REPLACE
+);
+
+CREATE INDEX IF NOT EXISTS user_ips_user ON user_ips(user);
+
+ALTER TABLE users ADD COLUMN admin BOOL DEFAULT 0 NOT NULL;
+
+PRAGMA user_version = 5;
diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/im.sql
index 6ffea51310..3aa83f5c8c 100644
--- a/synapse/storage/schema/im.sql
+++ b/synapse/storage/schema/im.sql
@@ -150,7 +150,8 @@ CREATE TABLE IF NOT EXISTS room_ops_levels(
     event_id TEXT NOT NULL,
     room_id TEXT NOT NULL,
     ban_level INTEGER,
-    kick_level INTEGER
+    kick_level INTEGER,
+    redact_level INTEGER
 );
 
 CREATE INDEX IF NOT EXISTS room_ops_levels_event_id ON room_ops_levels(event_id);
diff --git a/synapse/storage/schema/redactions.sql b/synapse/storage/schema/redactions.sql
new file mode 100644
index 0000000000..4c2829d05d
--- /dev/null
+++ b/synapse/storage/schema/redactions.sql
@@ -0,0 +1,8 @@
+CREATE TABLE IF NOT EXISTS redactions (
+    event_id TEXT NOT NULL,
+    redacts TEXT NOT NULL,
+    CONSTRAINT ev_uniq UNIQUE (event_id)
+);
+
+CREATE INDEX IF NOT EXISTS redactions_event_id ON redactions (event_id);
+CREATE INDEX IF NOT EXISTS redactions_redacts ON redactions (redacts);
diff --git a/synapse/storage/schema/users.sql b/synapse/storage/schema/users.sql
index 2519702971..8244f733bd 100644
--- a/synapse/storage/schema/users.sql
+++ b/synapse/storage/schema/users.sql
@@ -17,6 +17,7 @@ CREATE TABLE IF NOT EXISTS users(
     name TEXT,
     password_hash TEXT,
     creation_ts INTEGER,
+    admin BOOL DEFAULT 0 NOT NULL,
     UNIQUE(name) ON CONFLICT ROLLBACK
 );
 
@@ -29,3 +30,16 @@ CREATE TABLE IF NOT EXISTS access_tokens(
     FOREIGN KEY(user_id) REFERENCES users(id),
     UNIQUE(token) ON CONFLICT ROLLBACK
 );
+
+CREATE TABLE IF NOT EXISTS user_ips (
+    user TEXT NOT NULL,
+    access_token TEXT NOT NULL,
+    device_id TEXT,
+    ip TEXT NOT NULL,
+    user_agent TEXT NOT NULL,
+    last_seen INTEGER NOT NULL,
+    CONSTRAINT user_ip UNIQUE (user, access_token, ip, user_agent) ON CONFLICT REPLACE
+);
+
+CREATE INDEX IF NOT EXISTS user_ips_user ON user_ips(user);
+
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index a76fecf24f..d61f909939 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -157,6 +157,11 @@ class StreamStore(SQLBaseStore):
             "WHERE m.user_id = ? "
         )
 
+        del_sql = (
+            "SELECT event_id FROM redactions WHERE redacts = e.event_id "
+            "LIMIT 1"
+        )
+
         if limit:
             limit = max(limit, MAX_STREAM_SIZE)
         else:
@@ -171,13 +176,14 @@ class StreamStore(SQLBaseStore):
             return
 
         sql = (
-            "SELECT * FROM events as e WHERE "
+            "SELECT *, (%(redacted)s) AS redacted FROM events AS e WHERE "
             "((room_id IN (%(current)s)) OR "
             "(event_id IN (%(invites)s))) "
             "AND e.stream_ordering > ? AND e.stream_ordering <= ? "
             "AND e.outlier = 0 "
             "ORDER BY stream_ordering ASC LIMIT %(limit)d "
         ) % {
+            "redacted": del_sql,
             "current": current_room_membership_sql,
             "invites": membership_sql,
             "limit": limit
@@ -224,11 +230,21 @@ class StreamStore(SQLBaseStore):
         else:
             limit_str = ""
 
+        del_sql = (
+            "SELECT event_id FROM redactions WHERE redacts = events.event_id "
+            "LIMIT 1"
+        )
+
         sql = (
-            "SELECT * FROM events "
+            "SELECT *, (%(redacted)s) AS redacted FROM events "
             "WHERE outlier = 0 AND room_id = ? AND %(bounds)s "
             "ORDER BY topological_ordering %(order)s, stream_ordering %(order)s %(limit)s "
-        ) % {"bounds": bounds, "order": order, "limit": limit_str}
+        ) % {
+            "redacted": del_sql,
+            "bounds": bounds,
+            "order": order,
+            "limit": limit_str
+        }
 
         rows = yield self._execute_and_decode(
             sql,
@@ -257,11 +273,18 @@ class StreamStore(SQLBaseStore):
                                    with_feedback=False):
         # TODO (erikj): Handle compressed feedback
 
+        del_sql = (
+            "SELECT event_id FROM redactions WHERE redacts = events.event_id "
+            "LIMIT 1"
+        )
+
         sql = (
-            "SELECT * FROM events "
+            "SELECT *, (%(redacted)s) AS redacted FROM events "
             "WHERE room_id = ? AND stream_ordering <= ? "
             "ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ? "
-        )
+        ) % {
+            "redacted": del_sql,
+        }
 
         rows = yield self._execute_and_decode(
             sql,