summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/_base.py8
-rw-r--r--synapse/storage/event_push_actions.py12
-rw-r--r--synapse/storage/pusher.py13
-rw-r--r--synapse/storage/receipts.py36
-rw-r--r--synapse/storage/schema/delta/32/remove_indices.sql38
5 files changed, 90 insertions, 17 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 1e27c2c0ce..e0d7098692 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -453,7 +453,9 @@ class SQLBaseStore(object):
             keyvalues (dict): The unique key tables and their new values
             values (dict): The nonunique columns and their new values
             insertion_values (dict): key/values to use when inserting
-        Returns: A deferred
+        Returns:
+            Deferred(bool): True if a new entry was created, False if an
+                existing one was updated.
         """
         return self.runInteraction(
             desc,
@@ -498,6 +500,10 @@ class SQLBaseStore(object):
             )
             txn.execute(sql, allvalues.values())
 
+            return True
+        else:
+            return False
+
     def _simple_select_one(self, table, keyvalues, retcols,
                            allow_none=False, desc="_simple_select_one"):
         """Executes a SELECT query on the named table, which is expected to
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 6f316f7d24..9705db5c47 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -224,6 +224,18 @@ class EventPushActionsStore(SQLBaseStore):
             (room_id, event_id)
         )
 
+    def _remove_push_actions_before_txn(self, txn, room_id, user_id,
+                                        topological_ordering):
+        txn.call_after(
+            self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
+            (room_id, user_id, )
+        )
+        txn.execute(
+            "DELETE FROM event_push_actions"
+            " WHERE room_id = ? AND user_id = ? AND topological_ordering < ?",
+            (room_id, user_id, topological_ordering,)
+        )
+
 
 def _action_has_highlight(actions):
     for action in actions:
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index d9afd7ec87..9e8e2e2964 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -156,8 +156,7 @@ class PusherStore(SQLBaseStore):
                    profile_tag=""):
         with self._pushers_id_gen.get_next() as stream_id:
             def f(txn):
-                txn.call_after(self.get_users_with_pushers_in_room.invalidate_all)
-                return self._simple_upsert_txn(
+                newly_inserted = self._simple_upsert_txn(
                     txn,
                     "pushers",
                     {
@@ -178,11 +177,18 @@ class PusherStore(SQLBaseStore):
                         "id": stream_id,
                     },
                 )
-        defer.returnValue((yield self.runInteraction("add_pusher", f)))
+                if newly_inserted:
+                    # get_users_with_pushers_in_room only cares if the user has
+                    # at least *one* pusher.
+                    txn.call_after(self.get_users_with_pushers_in_room.invalidate_all)
+
+            yield self.runInteraction("add_pusher", f)
 
     @defer.inlineCallbacks
     def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id):
         def delete_pusher_txn(txn, stream_id):
+            txn.call_after(self.get_users_with_pushers_in_room.invalidate_all)
+
             self._simple_delete_one_txn(
                 txn,
                 "pushers",
@@ -194,6 +200,7 @@ class PusherStore(SQLBaseStore):
                 {"app_id": app_id, "pushkey": pushkey, "user_id": user_id},
                 {"stream_id": stream_id},
             )
+
         with self._pushers_id_gen.get_next() as stream_id:
             yield self.runInteraction(
                 "delete_pusher", delete_pusher_txn, stream_id
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index 935fc503d9..fdcf28f3e1 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -100,7 +100,7 @@ class ReceiptsStore(SQLBaseStore):
 
         defer.returnValue([ev for res in results.values() for ev in res])
 
-    @cachedInlineCallbacks(num_args=3, max_entries=5000)
+    @cachedInlineCallbacks(num_args=3, max_entries=5000, lru=True, tree=True)
     def get_linearized_receipts_for_room(self, room_id, to_key, from_key=None):
         """Get receipts for a single room for sending to clients.
 
@@ -232,7 +232,7 @@ class ReceiptsStore(SQLBaseStore):
             self.get_receipts_for_user.invalidate, (user_id, receipt_type)
         )
         # FIXME: This shouldn't invalidate the whole cache
-        txn.call_after(self.get_linearized_receipts_for_room.invalidate_all)
+        txn.call_after(self.get_linearized_receipts_for_room.invalidate_many, (room_id,))
 
         txn.call_after(
             self._receipts_stream_cache.entity_has_changed,
@@ -244,6 +244,17 @@ class ReceiptsStore(SQLBaseStore):
             (user_id, room_id, receipt_type)
         )
 
+        res = self._simple_select_one_txn(
+            txn,
+            table="events",
+            retcols=["topological_ordering", "stream_ordering"],
+            keyvalues={"event_id": event_id},
+            allow_none=True
+        )
+
+        topological_ordering = int(res["topological_ordering"]) if res else None
+        stream_ordering = int(res["stream_ordering"]) if res else None
+
         # We don't want to clobber receipts for more recent events, so we
         # have to compare orderings of existing receipts
         sql = (
@@ -255,16 +266,7 @@ class ReceiptsStore(SQLBaseStore):
         txn.execute(sql, (room_id, receipt_type, user_id))
         results = txn.fetchall()
 
-        if results:
-            res = self._simple_select_one_txn(
-                txn,
-                table="events",
-                retcols=["topological_ordering", "stream_ordering"],
-                keyvalues={"event_id": event_id},
-            )
-            topological_ordering = int(res["topological_ordering"])
-            stream_ordering = int(res["stream_ordering"])
-
+        if results and topological_ordering:
             for to, so, _ in results:
                 if int(to) > topological_ordering:
                     return False
@@ -294,6 +296,14 @@ class ReceiptsStore(SQLBaseStore):
             }
         )
 
+        if receipt_type == "m.read" and topological_ordering:
+            self._remove_push_actions_before_txn(
+                txn,
+                room_id=room_id,
+                user_id=user_id,
+                topological_ordering=topological_ordering,
+            )
+
         return True
 
     @defer.inlineCallbacks
@@ -367,7 +377,7 @@ class ReceiptsStore(SQLBaseStore):
             self.get_receipts_for_user.invalidate, (user_id, receipt_type)
         )
         # FIXME: This shouldn't invalidate the whole cache
-        txn.call_after(self.get_linearized_receipts_for_room.invalidate_all)
+        txn.call_after(self.get_linearized_receipts_for_room.invalidate_many, (room_id,))
 
         self._simple_delete_txn(
             txn,
diff --git a/synapse/storage/schema/delta/32/remove_indices.sql b/synapse/storage/schema/delta/32/remove_indices.sql
new file mode 100644
index 0000000000..f859be46a6
--- /dev/null
+++ b/synapse/storage/schema/delta/32/remove_indices.sql
@@ -0,0 +1,38 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+-- The following indices are redundant, other indices are equivalent or
+-- supersets
+DROP INDEX IF EXISTS events_room_id; -- Prefix of events_room_stream
+DROP INDEX IF EXISTS events_order; -- Prefix of events_order_topo_stream_room
+DROP INDEX IF EXISTS events_topological_ordering; -- Prefix of events_order_topo_stream_room
+DROP INDEX IF EXISTS events_stream_ordering; -- Duplicate of PRIMARY KEY
+DROP INDEX IF EXISTS state_groups_id; -- Duplicate of PRIMARY KEY
+DROP INDEX IF EXISTS event_to_state_groups_id; -- Duplicate of PRIMARY KEY
+DROP INDEX IF EXISTS event_push_actions_room_id_event_id_user_id_profile_tag; -- Duplicate of UNIQUE CONSTRAINT
+
+DROP INDEX IF EXISTS event_destinations_id; -- Prefix of UNIQUE CONSTRAINT
+DROP INDEX IF EXISTS st_extrem_id; -- Prefix of UNIQUE CONSTRAINT
+DROP INDEX IF EXISTS event_content_hashes_id; -- Prefix of UNIQUE CONSTRAINT
+DROP INDEX IF EXISTS event_signatures_id; -- Prefix of UNIQUE CONSTRAINT
+DROP INDEX IF EXISTS event_edge_hashes_id; -- Prefix of UNIQUE CONSTRAINT
+DROP INDEX IF EXISTS redactions_event_id; -- Duplicate of UNIQUE CONSTRAINT
+DROP INDEX IF EXISTS room_hosts_room_id; -- Prefix of UNIQUE CONSTRAINT
+
+-- The following indices were unused
+DROP INDEX IF EXISTS remote_media_cache_thumbnails_media_id;
+DROP INDEX IF EXISTS evauth_edges_auth_id;
+DROP INDEX IF EXISTS presence_stream_state;