summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2016-08-15 10:21:25 +0100
committerErik Johnston <erik@matrix.org>2016-08-15 11:15:17 +0100
commit4d70d1f80ea688304abdcbbf3ee01f6ab932abc7 (patch)
treeb6389f1bf99d1266fd555c255caa34bed8aaae43
parentAlways run txn.after_callbacks (diff)
downloadsynapse-4d70d1f80ea688304abdcbbf3ee01f6ab932abc7.tar.xz
Add some invalidations to a cache_stream
-rw-r--r--synapse/storage/__init__.py3
-rw-r--r--synapse/storage/_base.py18
-rw-r--r--synapse/storage/directory.py37
-rw-r--r--synapse/storage/prepare_database.py2
-rw-r--r--synapse/storage/presence.py32
-rw-r--r--synapse/storage/roommember.py12
-rw-r--r--synapse/storage/schema/delta/34/cache_stream.py44
7 files changed, 117 insertions, 31 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 73fb334dd6..a0c029a2fc 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -122,6 +122,9 @@ class DataStore(RoomMemberStore, RoomStore,
             db_conn, "pushers", "id",
             extra_tables=[("deleted_pushers", "stream_id")],
         )
+        self._cache_id_gen = StreamIdGenerator(
+            db_conn, "cache_stream", "stream_id",
+        )
 
         events_max = self._stream_id_gen.get_current_token()
         event_cache_prefill, min_event_val = self._get_cache_dict(
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index e516b6de3e..02d9098ddd 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -861,6 +861,24 @@ class SQLBaseStore(object):
 
         return cache, min_val
 
+    def _invalidate_cache_and_stream(self, txn, cache_func, keys):
+        txn.call_after(cache_func.invalidate, keys)
+
+        ctx = self._cache_id_gen.get_next()
+        stream_id = ctx.__enter__()
+        txn.call_after(ctx.__exit__, None, None, None)
+
+        self._simple_insert_txn(
+            txn,
+            table="cache_stream",
+            values={
+                "stream_id": stream_id,
+                "cache_func": cache_func.__name__,
+                "keys": list(keys),
+                "invalidation_ts": self.clock.time_msec(),
+            }
+        )
+
 
 class _RollbackButIsFineException(Exception):
     """ This exception is used to rollback a transaction without implying
diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py
index ef231a04dc..9caaf81f2c 100644
--- a/synapse/storage/directory.py
+++ b/synapse/storage/directory.py
@@ -82,32 +82,39 @@ class DirectoryStore(SQLBaseStore):
         Returns:
             Deferred
         """
-        try:
-            yield self._simple_insert(
+        def alias_txn(txn):
+            self._simple_insert_txn(
+                txn,
                 "room_aliases",
                 {
                     "room_alias": room_alias.to_string(),
                     "room_id": room_id,
                     "creator": creator,
                 },
-                desc="create_room_alias_association",
-            )
-        except self.database_engine.module.IntegrityError:
-            raise SynapseError(
-                409, "Room alias %s already exists" % room_alias.to_string()
             )
 
-        for server in servers:
-            # TODO(erikj): Fix this to bulk insert
-            yield self._simple_insert(
-                "room_alias_servers",
-                {
+            self._simple_insert_many_txn(
+                txn,
+                table="room_alias_servers",
+                values=[{
                     "room_alias": room_alias.to_string(),
                     "server": server,
-                },
-                desc="create_room_alias_association",
+                } for server in servers],
             )
-        self.get_aliases_for_room.invalidate((room_id,))
+
+            self._invalidate_cache_and_stream(
+                txn, self.get_aliases_for_room, (room_id,)
+            )
+
+        try:
+            ret = yield self.runInteraction(
+                "create_room_alias_association", alias_txn
+            )
+        except self.database_engine.module.IntegrityError:
+            raise SynapseError(
+                409, "Room alias %s already exists" % room_alias.to_string()
+            )
+        defer.returnValue(ret)
 
     def get_room_alias_creator(self, room_alias):
         return self._simple_select_one_onecol(
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 8801669a6b..b94ce7bea1 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
 
 # Remember to update this number every time a change is made to database
 # schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 33
+SCHEMA_VERSION = 34
 
 dir_path = os.path.abspath(os.path.dirname(__file__))
 
diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py
index d03f7c541e..21d0696640 100644
--- a/synapse/storage/presence.py
+++ b/synapse/storage/presence.py
@@ -189,18 +189,30 @@ class PresenceStore(SQLBaseStore):
             desc="add_presence_list_pending",
         )
 
-    @defer.inlineCallbacks
     def set_presence_list_accepted(self, observer_localpart, observed_userid):
-        result = yield self._simple_update_one(
-            table="presence_list",
-            keyvalues={"user_id": observer_localpart,
-                       "observed_user_id": observed_userid},
-            updatevalues={"accepted": True},
-            desc="set_presence_list_accepted",
+        def update_presence_list_txn(txn):
+            result = self._simple_update_one_txn(
+                txn,
+                table="presence_list",
+                keyvalues={
+                    "user_id": observer_localpart,
+                    "observed_user_id": observed_userid
+                },
+                updatevalues={"accepted": True},
+            )
+
+            self._invalidate_cache_and_stream(
+                txn, self.get_presence_list_accepted, (observer_localpart,)
+            )
+            self._invalidate_cache_and_stream(
+                txn, self.get_presence_list_observers_accepted, (observed_userid,)
+            )
+
+            return result
+
+        return self.runInteraction(
+            "set_presence_list_accepted", update_presence_list_txn,
         )
-        self.get_presence_list_accepted.invalidate((observer_localpart,))
-        self.get_presence_list_observers_accepted.invalidate((observed_userid,))
-        defer.returnValue(result)
 
     def get_presence_list(self, observer_localpart, accepted=None):
         if accepted:
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 8bd693be72..a422ddf633 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -277,7 +277,6 @@ class RoomMemberStore(SQLBaseStore):
             user_id, membership_list=[Membership.JOIN],
         )
 
-    @defer.inlineCallbacks
     def forget(self, user_id, room_id):
         """Indicate that user_id wishes to discard history for room_id."""
         def f(txn):
@@ -292,10 +291,13 @@ class RoomMemberStore(SQLBaseStore):
                 "  room_id = ?"
             )
             txn.execute(sql, (user_id, room_id))
-        yield self.runInteraction("forget_membership", f)
-        self.was_forgotten_at.invalidate_all()
-        self.who_forgot_in_room.invalidate_all()
-        self.did_forget.invalidate((user_id, room_id))
+
+            txn.call_after(self.was_forgotten_at.invalidate_all)
+            txn.call_after(self.did_forget.invalidate, (user_id, room_id))
+            self._invalidate_cache_and_stream(
+                txn, self.who_forgot_in_room, (room_id,)
+            )
+        return self.runInteraction("forget_membership", f)
 
     @cachedInlineCallbacks(num_args=2)
     def did_forget(self, user_id, room_id):
diff --git a/synapse/storage/schema/delta/34/cache_stream.py b/synapse/storage/schema/delta/34/cache_stream.py
new file mode 100644
index 0000000000..4c350bfb11
--- /dev/null
+++ b/synapse/storage/schema/delta/34/cache_stream.py
@@ -0,0 +1,44 @@
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.storage.prepare_database import get_statements
+from synapse.storage.engines import PostgresEngine
+
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+CREATE_TABLE = """
+CREATE TABLE cache_stream (
+    stream_id       BIGINT,
+    cache_func      TEXT,
+    keys            TEXT[],
+    invalidation_ts BIGINT
+);
+
+CREATE INDEX cache_stream_id ON cache_stream(stream_id);
+"""
+
+
+def run_create(cur, database_engine, *args, **kwargs):
+    if not isinstance(database_engine, PostgresEngine):
+        return
+
+    for statement in get_statements(CREATE_TABLE.splitlines()):
+        cur.execute(statement)
+
+
+def run_upgrade(cur, database_engine, *args, **kwargs):
+    pass