summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorJeroen <vo.jeroen@gmail.com>2018-07-09 08:51:11 +0200
committerJeroen <vo.jeroen@gmail.com>2018-07-09 08:51:11 +0200
commitb5e157d895dcf182d7ffc58edf6442deeaebc3f0 (patch)
treea3f8a4549d0fe4947bef34632218b41c94202b19 /synapse/storage
parenttake idna implementation from twisted (diff)
parentAdd an isort configuration (#3463) (diff)
downloadsynapse-b5e157d895dcf182d7ffc58edf6442deeaebc3f0.tar.xz
Merge branch 'develop' into send_sni_for_federation_requests
# Conflicts:
#	synapse/http/endpoint.py
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/__init__.py2
-rw-r--r--synapse/storage/account_data.py3
-rw-r--r--synapse/storage/appservice.py2
-rw-r--r--synapse/storage/background_updates.py3
-rw-r--r--synapse/storage/deviceinbox.py13
-rw-r--r--synapse/storage/devices.py3
-rw-r--r--synapse/storage/end_to_end_keys.py3
-rw-r--r--synapse/storage/event_push_actions.py3
-rw-r--r--synapse/storage/events.py6
-rw-r--r--synapse/storage/events_worker.py3
-rw-r--r--synapse/storage/filtering.py3
-rw-r--r--synapse/storage/group_server.py2
-rw-r--r--synapse/storage/push_rule.py3
-rw-r--r--synapse/storage/pusher.py3
-rw-r--r--synapse/storage/receipts.py3
-rw-r--r--synapse/storage/registration.py4
-rw-r--r--synapse/storage/room.py3
-rw-r--r--synapse/storage/roommember.py2
-rw-r--r--synapse/storage/schema/delta/50/erasure_store.sql21
-rw-r--r--synapse/storage/search.py2
-rw-r--r--synapse/storage/tags.py3
-rw-r--r--synapse/storage/transactions.py3
-rw-r--r--synapse/storage/user_erasure_store.py103
23 files changed, 166 insertions, 30 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 979fa22438..e843b702b9 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -20,6 +20,7 @@ import time
 import logging
 
 from synapse.storage.devices import DeviceStore
+from synapse.storage.user_erasure_store import UserErasureStore
 from .appservice import (
     ApplicationServiceStore, ApplicationServiceTransactionStore
 )
@@ -88,6 +89,7 @@ class DataStore(RoomMemberStore, RoomStore,
                 DeviceInboxStore,
                 UserDirectoryStore,
                 GroupServerStore,
+                UserErasureStore,
                 ):
 
     def __init__(self, db_conn, hs):
diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py
index 284ec3c970..7034a61399 100644
--- a/synapse/storage/account_data.py
+++ b/synapse/storage/account_data.py
@@ -22,8 +22,9 @@ from synapse.storage.util.id_generators import StreamIdGenerator
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
 
+from canonicaljson import json
+
 import abc
-import simplejson as json
 import logging
 
 logger = logging.getLogger(__name__)
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index 12ea8a158c..4d32d0bdf6 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -15,8 +15,8 @@
 # limitations under the License.
 import logging
 import re
-import simplejson as json
 from twisted.internet import defer
+from canonicaljson import json
 
 from synapse.appservice import AppServiceTransaction
 from synapse.config.appservice import load_appservices
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index b7e9c716c8..af18964510 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -18,7 +18,8 @@ from . import engines
 
 from twisted.internet import defer
 
-import simplejson as json
+from canonicaljson import json
+
 import logging
 
 logger = logging.getLogger(__name__)
diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py
index a879e5bfc1..38addbf9c0 100644
--- a/synapse/storage/deviceinbox.py
+++ b/synapse/storage/deviceinbox.py
@@ -14,7 +14,8 @@
 # limitations under the License.
 
 import logging
-import simplejson
+
+from canonicaljson import json
 
 from twisted.internet import defer
 
@@ -85,7 +86,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
             )
             rows = []
             for destination, edu in remote_messages_by_destination.items():
-                edu_json = simplejson.dumps(edu)
+                edu_json = json.dumps(edu)
                 rows.append((destination, stream_id, now_ms, edu_json))
             txn.executemany(sql, rows)
 
@@ -177,7 +178,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
                     " WHERE user_id = ?"
                 )
                 txn.execute(sql, (user_id,))
-                message_json = simplejson.dumps(messages_by_device["*"])
+                message_json = json.dumps(messages_by_device["*"])
                 for row in txn:
                     # Add the message for all devices for this user on this
                     # server.
@@ -199,7 +200,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
                     # Only insert into the local inbox if the device exists on
                     # this server
                     device = row[0]
-                    message_json = simplejson.dumps(messages_by_device[device])
+                    message_json = json.dumps(messages_by_device[device])
                     messages_json_for_user[device] = message_json
 
             if messages_json_for_user:
@@ -253,7 +254,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
             messages = []
             for row in txn:
                 stream_pos = row[0]
-                messages.append(simplejson.loads(row[1]))
+                messages.append(json.loads(row[1]))
             if len(messages) < limit:
                 stream_pos = current_stream_id
             return (messages, stream_pos)
@@ -389,7 +390,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
             messages = []
             for row in txn:
                 stream_pos = row[0]
-                messages.append(simplejson.loads(row[1]))
+                messages.append(json.loads(row[1]))
             if len(messages) < limit:
                 stream_pos = current_stream_id
             return (messages, stream_pos)
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index d149d8392e..2ed9ada783 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -13,7 +13,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-import simplejson as json
 
 from twisted.internet import defer
 
@@ -21,6 +20,8 @@ from synapse.api.errors import StoreError
 from ._base import SQLBaseStore, Cache
 from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks
 
+from canonicaljson import json
+
 from six import itervalues, iteritems
 
 logger = logging.getLogger(__name__)
diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py
index b146487943..181047c8b7 100644
--- a/synapse/storage/end_to_end_keys.py
+++ b/synapse/storage/end_to_end_keys.py
@@ -16,8 +16,7 @@ from twisted.internet import defer
 
 from synapse.util.caches.descriptors import cached
 
-from canonicaljson import encode_canonical_json
-import simplejson as json
+from canonicaljson import encode_canonical_json, json
 
 from ._base import SQLBaseStore
 
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 8cb24b7d59..05cb3f61ce 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -19,7 +19,8 @@ from twisted.internet import defer
 from synapse.util.caches.descriptors import cachedInlineCallbacks
 
 import logging
-import simplejson as json
+
+from canonicaljson import json
 
 from six import iteritems
 
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 7d0e59538a..a54abb9edd 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -19,7 +19,8 @@ from functools import wraps
 import itertools
 import logging
 
-import simplejson as json
+from canonicaljson import json
+
 from twisted.internet import defer
 
 from synapse.storage.events_worker import EventsWorkerStore
@@ -800,7 +801,8 @@ class EventsStore(EventsWorkerStore):
                     ]
                 )
 
-                self._curr_state_delta_stream_cache.entity_has_changed(
+                txn.call_after(
+                    self._curr_state_delta_stream_cache.entity_has_changed,
                     room_id, max_stream_order,
                 )
 
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index f6a6e46b43..896225aab9 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -29,7 +29,8 @@ from synapse.api.errors import SynapseError
 from collections import namedtuple
 
 import logging
-import simplejson as json
+
+from canonicaljson import json
 
 # these are only included to make the type annotations work
 from synapse.events import EventBase    # noqa: F401
diff --git a/synapse/storage/filtering.py b/synapse/storage/filtering.py
index 2e2763126d..eae6027cee 100644
--- a/synapse/storage/filtering.py
+++ b/synapse/storage/filtering.py
@@ -19,8 +19,7 @@ from ._base import SQLBaseStore
 from synapse.api.errors import SynapseError, Codes
 from synapse.util.caches.descriptors import cachedInlineCallbacks
 
-from canonicaljson import encode_canonical_json
-import simplejson as json
+from canonicaljson import encode_canonical_json, json
 
 
 class FilteringStore(SQLBaseStore):
diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py
index da05ccb027..b77402d295 100644
--- a/synapse/storage/group_server.py
+++ b/synapse/storage/group_server.py
@@ -20,7 +20,7 @@ from synapse.api.errors import SynapseError
 
 from ._base import SQLBaseStore
 
-import simplejson as json
+from canonicaljson import json
 
 
 # The category ID for the "default" category. We don't store as null in the
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index 04a0b59a39..9e52e992b3 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -25,9 +25,10 @@ from synapse.push.baserules import list_with_base_rules
 from synapse.api.constants import EventTypes
 from twisted.internet import defer
 
+from canonicaljson import json
+
 import abc
 import logging
-import simplejson as json
 
 logger = logging.getLogger(__name__)
 
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 307660b99a..c6def861cf 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -17,12 +17,11 @@
 from ._base import SQLBaseStore
 from twisted.internet import defer
 
-from canonicaljson import encode_canonical_json
+from canonicaljson import encode_canonical_json, json
 
 from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList
 
 import logging
-import simplejson as json
 import types
 
 logger = logging.getLogger(__name__)
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index c93c228f6e..f230a3bab7 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -21,9 +21,10 @@ from synapse.util.caches.stream_change_cache import StreamChangeCache
 
 from twisted.internet import defer
 
+from canonicaljson import json
+
 import abc
 import logging
-import simplejson as json
 
 
 logger = logging.getLogger(__name__)
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 9c9cf46e7f..0d18f6d869 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -623,7 +623,9 @@ class RegistrationStore(RegistrationWorkerStore,
         Removes the given user to the table of users who need to be parted from all the
         rooms they're in, effectively marking that user as fully deactivated.
         """
-        return self._simple_delete_one(
+        # XXX: This should be simple_delete_one but we failed to put a unique index on
+        # the table, so somehow duplicate entries have ended up in it.
+        return self._simple_delete(
             "users_pending_deactivation",
             keyvalues={
                 "user_id": user_id,
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index ea6a189185..ca0eb187e5 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -20,9 +20,10 @@ from synapse.storage._base import SQLBaseStore
 from synapse.storage.search import SearchStore
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
 
+from canonicaljson import json
+
 import collections
 import logging
-import simplejson as json
 import re
 
 logger = logging.getLogger(__name__)
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 829cc4a207..8fc9549a75 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -28,7 +28,7 @@ from synapse.api.constants import Membership, EventTypes
 from synapse.types import get_domain_from_id
 
 import logging
-import simplejson as json
+from canonicaljson import json
 
 from six import itervalues, iteritems
 
diff --git a/synapse/storage/schema/delta/50/erasure_store.sql b/synapse/storage/schema/delta/50/erasure_store.sql
new file mode 100644
index 0000000000..5d8641a9ab
--- /dev/null
+++ b/synapse/storage/schema/delta/50/erasure_store.sql
@@ -0,0 +1,21 @@
+/* Copyright 2018 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- a table of users who have requested that their details be erased
+CREATE TABLE erased_users (
+    user_id TEXT NOT NULL
+);
+
+CREATE UNIQUE INDEX erased_users_user ON erased_users(user_id);
diff --git a/synapse/storage/search.py b/synapse/storage/search.py
index f0fa5d7631..9b77c45318 100644
--- a/synapse/storage/search.py
+++ b/synapse/storage/search.py
@@ -16,7 +16,7 @@
 from collections import namedtuple
 import logging
 import re
-import simplejson as json
+from canonicaljson import json
 
 from six import string_types
 
diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py
index 6671d3cfca..04d123ed95 100644
--- a/synapse/storage/tags.py
+++ b/synapse/storage/tags.py
@@ -19,7 +19,8 @@ from synapse.storage.account_data import AccountDataWorkerStore
 from synapse.util.caches.descriptors import cached
 from twisted.internet import defer
 
-import simplejson as json
+from canonicaljson import json
+
 import logging
 
 from six.moves import range
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index e485d19b84..acbc03446e 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -19,12 +19,11 @@ from synapse.util.caches.descriptors import cached
 from twisted.internet import defer
 import six
 
-from canonicaljson import encode_canonical_json
+from canonicaljson import encode_canonical_json, json
 
 from collections import namedtuple
 
 import logging
-import simplejson as json
 
 # py2 sqlite has buffer hardcoded as only binary type, so we must use it,
 # despite being deprecated and removed in favor of memoryview
diff --git a/synapse/storage/user_erasure_store.py b/synapse/storage/user_erasure_store.py
new file mode 100644
index 0000000000..47bfc01e84
--- /dev/null
+++ b/synapse/storage/user_erasure_store.py
@@ -0,0 +1,103 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import operator
+
+from twisted.internet import defer
+
+from synapse.storage._base import SQLBaseStore
+from synapse.util.caches.descriptors import cachedList, cached
+
+
+class UserErasureWorkerStore(SQLBaseStore):
+    @cached()
+    def is_user_erased(self, user_id):
+        """
+        Check if the given user id has requested erasure
+
+        Args:
+            user_id (str): full user id to check
+
+        Returns:
+            Deferred[bool]: True if the user has requested erasure
+        """
+        return self._simple_select_onecol(
+            table="erased_users",
+            keyvalues={"user_id": user_id},
+            retcol="1",
+            desc="is_user_erased",
+        ).addCallback(operator.truth)
+
+    @cachedList(
+        cached_method_name="is_user_erased",
+        list_name="user_ids",
+        inlineCallbacks=True,
+    )
+    def are_users_erased(self, user_ids):
+        """
+        Checks which users in a list have requested erasure
+
+        Args:
+            user_ids (iterable[str]): full user id to check
+
+        Returns:
+            Deferred[dict[str, bool]]:
+                for each user, whether the user has requested erasure.
+        """
+        # this serves the dual purpose of (a) making sure we can do len and
+        # iterate it multiple times, and (b) avoiding duplicates.
+        user_ids = tuple(set(user_ids))
+
+        def _get_erased_users(txn):
+            txn.execute(
+                "SELECT user_id FROM erased_users WHERE user_id IN (%s)" % (
+                    ",".join("?" * len(user_ids))
+                ),
+                user_ids,
+            )
+            return set(r[0] for r in txn)
+
+        erased_users = yield self.runInteraction(
+            "are_users_erased", _get_erased_users,
+        )
+        res = dict((u, u in erased_users) for u in user_ids)
+        defer.returnValue(res)
+
+
+class UserErasureStore(UserErasureWorkerStore):
+    def mark_user_erased(self, user_id):
+        """Indicate that user_id wishes their message history to be erased.
+
+        Args:
+            user_id (str): full user_id to be erased
+        """
+        def f(txn):
+            # first check if they are already in the list
+            txn.execute(
+                "SELECT 1 FROM erased_users WHERE user_id = ?",
+                (user_id, )
+            )
+            if txn.fetchone():
+                return
+
+            # they are not already there: do the insert.
+            txn.execute(
+                "INSERT INTO erased_users (user_id) VALUES (?)",
+                (user_id, )
+            )
+
+            self._invalidate_cache_and_stream(
+                txn, self.is_user_erased, (user_id,)
+            )
+        return self.runInteraction("mark_user_erased", f)