summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/push/bulk_push_rule_evaluator.py12
-rw-r--r--synapse/storage/databases/main/account_data.py121
-rw-r--r--synapse/storage/databases/main/schema/delta/59/01ignored_user.py82
-rw-r--r--synapse/storage/prepare_database.py2
4 files changed, 183 insertions, 34 deletions
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index 10f27e4378..9018f9e20b 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -203,14 +203,18 @@ class BulkPushRuleEvaluator:
 
         condition_cache = {}  # type: Dict[str, bool]
 
+        # If the event is not a state event check if any users ignore the sender.
+        if not event.is_state():
+            ignorers = await self.store.ignored_by(event.sender)
+        else:
+            ignorers = set()
+
         for uid, rules in rules_by_user.items():
             if event.sender == uid:
                 continue
 
-            if not event.is_state():
-                is_ignored = await self.store.is_ignored_by(event.sender, uid)
-                if is_ignored:
-                    continue
+            if uid in ignorers:
+                continue
 
             display_name = None
             profile_info = room_members.get(uid)
diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py
index 49ee23470d..bff51e92b9 100644
--- a/synapse/storage/databases/main/account_data.py
+++ b/synapse/storage/databases/main/account_data.py
@@ -16,7 +16,7 @@
 
 import abc
 import logging
-from typing import Dict, List, Optional, Tuple
+from typing import Dict, List, Optional, Set, Tuple
 
 from synapse.api.constants import AccountDataTypes
 from synapse.storage._base import SQLBaseStore, db_to_json
@@ -24,7 +24,7 @@ from synapse.storage.database import DatabasePool
 from synapse.storage.util.id_generators import StreamIdGenerator
 from synapse.types import JsonDict
 from synapse.util import json_encoder
-from synapse.util.caches.descriptors import _CacheContext, cached
+from synapse.util.caches.descriptors import cached
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 
 logger = logging.getLogger(__name__)
@@ -287,23 +287,25 @@ class AccountDataWorkerStore(SQLBaseStore, metaclass=abc.ABCMeta):
             "get_updated_account_data_for_user", get_updated_account_data_for_user_txn
         )
 
-    @cached(num_args=2, cache_context=True, max_entries=5000)
-    async def is_ignored_by(
-        self, ignored_user_id: str, ignorer_user_id: str, cache_context: _CacheContext
-    ) -> bool:
-        ignored_account_data = await self.get_global_account_data_by_type_for_user(
-            AccountDataTypes.IGNORED_USER_LIST,
-            ignorer_user_id,
-            on_invalidate=cache_context.invalidate,
-        )
-        if not ignored_account_data:
-            return False
+    @cached(max_entries=5000, iterable=True)
+    async def ignored_by(self, user_id: str) -> Set[str]:
+        """
+        Get users which ignore the given user.
 
-        try:
-            return ignored_user_id in ignored_account_data.get("ignored_users", {})
-        except TypeError:
-            # The type of the ignored_users field is invalid.
-            return False
+        Params:
+            user_id: The user ID which might be ignored.
+
+        Return:
+            The user IDs which ignore the given user.
+        """
+        return set(
+            await self.db_pool.simple_select_onecol(
+                table="ignored_users",
+                keyvalues={"ignored_user_id": user_id},
+                retcol="ignorer_user_id",
+                desc="ignored_by",
+            )
+        )
 
 
 class AccountDataStore(AccountDataWorkerStore):
@@ -390,18 +392,14 @@ class AccountDataStore(AccountDataWorkerStore):
         Returns:
             The maximum stream ID.
         """
-        content_json = json_encoder.encode(content)
-
         async with self._account_data_id_gen.get_next() as next_id:
-            # no need to lock here as account_data has a unique constraint on
-            # (user_id, account_data_type) so simple_upsert will retry if
-            # there is a conflict.
-            await self.db_pool.simple_upsert(
-                desc="add_user_account_data",
-                table="account_data",
-                keyvalues={"user_id": user_id, "account_data_type": account_data_type},
-                values={"stream_id": next_id, "content": content_json},
-                lock=False,
+            await self.db_pool.runInteraction(
+                "add_user_account_data",
+                self._add_account_data_for_user,
+                next_id,
+                user_id,
+                account_data_type,
+                content,
             )
 
             # it's theoretically possible for the above to succeed and the
@@ -424,6 +422,71 @@ class AccountDataStore(AccountDataWorkerStore):
 
         return self._account_data_id_gen.get_current_token()
 
+    def _add_account_data_for_user(
+        self,
+        txn,
+        next_id: int,
+        user_id: str,
+        account_data_type: str,
+        content: JsonDict,
+    ) -> None:
+        content_json = json_encoder.encode(content)
+
+        # no need to lock here as account_data has a unique constraint on
+        # (user_id, account_data_type) so simple_upsert will retry if
+        # there is a conflict.
+        self.db_pool.simple_upsert_txn(
+            txn,
+            table="account_data",
+            keyvalues={"user_id": user_id, "account_data_type": account_data_type},
+            values={"stream_id": next_id, "content": content_json},
+            lock=False,
+        )
+
+        # Ignored users get denormalized into a separate table as an optimisation.
+        if account_data_type != AccountDataTypes.IGNORED_USER_LIST:
+            return
+
+        # Insert / delete to sync the list of ignored users.
+        previously_ignored_users = set(
+            self.db_pool.simple_select_onecol_txn(
+                txn,
+                table="ignored_users",
+                keyvalues={"ignorer_user_id": user_id},
+                retcol="ignored_user_id",
+            )
+        )
+
+        # If the data is invalid, no one is ignored.
+        ignored_users_content = content.get("ignored_users", {})
+        if isinstance(ignored_users_content, dict):
+            currently_ignored_users = set(ignored_users_content)
+        else:
+            currently_ignored_users = set()
+
+        # Delete entries which are no longer ignored.
+        self.db_pool.simple_delete_many_txn(
+            txn,
+            table="ignored_users",
+            column="ignored_user_id",
+            iterable=previously_ignored_users - currently_ignored_users,
+            keyvalues={"ignorer_user_id": user_id},
+        )
+
+        # Add entries which are newly ignored.
+        self.db_pool.simple_insert_many_txn(
+            txn,
+            table="ignored_users",
+            values=[
+                {"ignorer_user_id": user_id, "ignored_user_id": u}
+                for u in currently_ignored_users - previously_ignored_users
+            ],
+        )
+
+        # Invalidate the cache for any ignored users which were added or removed.
+        for ignored_user_id in previously_ignored_users ^ currently_ignored_users:
+            self._invalidate_cache_and_stream(txn, self.ignored_by, (ignored_user_id,))
+
     async def _update_max_stream_id(self, next_id: int) -> None:
         """Update the max stream_id
 
diff --git a/synapse/storage/databases/main/schema/delta/59/01ignored_user.py b/synapse/storage/databases/main/schema/delta/59/01ignored_user.py
new file mode 100644
index 0000000000..f35c70b699
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/59/01ignored_user.py
@@ -0,0 +1,82 @@
+# Copyright 2021 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+This migration denormalises the account_data table into an ignored users table.
+"""
+
+import logging
+from io import StringIO
+
+from synapse.storage._base import db_to_json
+from synapse.storage.engines import BaseDatabaseEngine
+from synapse.storage.prepare_database import execute_statements_from_stream
+from synapse.storage.types import Cursor
+
+logger = logging.getLogger(__name__)
+
+
+def run_upgrade(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs):
+    pass
+
+
+def run_create(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs):
+    logger.info("Creating ignored_users table")
+    execute_statements_from_stream(cur, StringIO(_create_commands))
+
+    # We now upgrade existing data, if any. We don't do this in `run_upgrade` as
+    # we a) want to run these before adding constraints and b) `run_upgrade` is
+    # not run on empty databases.
+    insert_sql = """
+    INSERT INTO ignored_users (ignorer_user_id, ignored_user_id) VALUES (?, ?)
+    """
+
+    logger.info("Converting existing ignore lists")
+    cur.execute(
+        "SELECT user_id, content FROM account_data WHERE account_data_type = 'm.ignored_user_list'"
+    )
+    for user_id, content_json in cur.fetchall():
+        content = db_to_json(content_json)
+
+        # The content should be the form of a dictionary with a key
+        # "ignored_users" pointing to a dictionary with keys of ignored users.
+        #
+        # { "ignored_users": "@someone:example.org": {} }
+        ignored_users = content.get("ignored_users", {})
+        if isinstance(ignored_users, dict) and ignored_users:
+            cur.executemany(insert_sql, [(user_id, u) for u in ignored_users])
+
+    # Add indexes after inserting data for efficiency.
+    logger.info("Adding constraints to ignored_users table")
+    execute_statements_from_stream(cur, StringIO(_constraints_commands))
+
+
+# there might be duplicates, so the easiest way to achieve this is to create a new
+# table with the right data, and renaming it into place
+
+_create_commands = """
+-- Users which are ignored when calculating push notifications. This data is
+-- denormalized from account data.
+CREATE TABLE IF NOT EXISTS ignored_users(
+    ignorer_user_id TEXT NOT NULL,  -- The user ID of the user who is ignoring another user. (This is a local user.)
+    ignored_user_id TEXT NOT NULL  -- The user ID of the user who is being ignored. (This is a local or remote user.)
+);
+"""
+
+_constraints_commands = """
+CREATE UNIQUE INDEX ignored_users_uniqueness ON ignored_users (ignorer_user_id, ignored_user_id);
+
+-- Add an index on ignored_users since look-ups are done to get all ignorers of an ignored user.
+CREATE INDEX ignored_users_ignored_user_id ON ignored_users (ignored_user_id);
+"""
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 6684403a0a..01efb2cabb 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -38,7 +38,7 @@ logger = logging.getLogger(__name__)
 # XXX: If you're about to bump this to 59 (or higher) please create an update
 # that drops the unused `cache_invalidation_stream` table, as per #7436!
 # XXX: Also add an update to drop `account_data_max_stream_id` as per #7656!
-SCHEMA_VERSION = 58
+SCHEMA_VERSION = 59
 
 dir_path = os.path.abspath(os.path.dirname(__file__))