diff options
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/receipts.py | 54 | ||||
-rw-r--r-- | synapse/storage/schema/delta/28/receipts_user_id_index.sql | 18 |
2 files changed, 51 insertions, 21 deletions
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index c80e576620..c4232bdc65 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -45,6 +45,21 @@ class ReceiptsStore(SQLBaseStore): desc="get_receipts_for_room", ) + @cachedInlineCallbacks(num_args=2) + def get_receipts_for_user(self, user_id, receipt_type): + def f(txn): + sql = ( + "SELECT room_id,event_id " + "FROM receipts_linearized " + "WHERE user_id = ? AND receipt_type = ? " + ) + txn.execute(sql, (user_id, receipt_type)) + return txn.fetchall() + + defer.returnValue(dict( + (yield self.runInteraction("get_receipts_for_user", f)) + )) + @defer.inlineCallbacks def get_linearized_receipts_for_rooms(self, room_ids, to_key, from_key=None): """Get receipts for multiple rooms for sending to clients. @@ -194,29 +209,16 @@ class ReceiptsStore(SQLBaseStore): def get_max_receipt_stream_id(self): return self._receipts_id_gen.get_max_token(self) - @cachedInlineCallbacks() - def get_graph_receipts_for_room(self, room_id): - """Get receipts for sending to remote servers. - """ - rows = yield self._simple_select_list( - table="receipts_graph", - keyvalues={"room_id": room_id}, - retcols=["receipt_type", "user_id", "event_id"], - desc="get_linearized_receipts_for_room", - ) - - result = {} - for row in rows: - result.setdefault( - row["user_id"], {} - ).setdefault( - row["receipt_type"], [] - ).append(row["event_id"]) - - defer.returnValue(result) - def insert_linearized_receipt_txn(self, txn, room_id, receipt_type, user_id, event_id, data, stream_id): + txn.call_after( + self.get_receipts_for_room.invalidate, (room_id, receipt_type) + ) + txn.call_after( + self.get_receipts_for_user.invalidate, (user_id, receipt_type) + ) + # FIXME: This shouldn't invalidate the whole cache + txn.call_after(self.get_linearized_receipts_for_room.invalidate_all) # We don't want to clobber receipts for more recent events, so we # have to compare orderings of existing receipts @@ -324,6 +326,7 @@ class ReceiptsStore(SQLBaseStore): ) max_persisted_id = yield self._stream_id_gen.get_max_token(self) + defer.returnValue((stream_id, max_persisted_id)) def insert_graph_receipt(self, room_id, receipt_type, user_id, event_ids, @@ -336,6 +339,15 @@ class ReceiptsStore(SQLBaseStore): def insert_graph_receipt_txn(self, txn, room_id, receipt_type, user_id, event_ids, data): + txn.call_after( + self.get_receipts_for_room.invalidate, (room_id, receipt_type) + ) + txn.call_after( + self.get_receipts_for_user.invalidate, (user_id, receipt_type) + ) + # FIXME: This shouldn't invalidate the whole cache + txn.call_after(self.get_linearized_receipts_for_room.invalidate_all) + self._simple_delete_txn( txn, table="receipts_graph", diff --git a/synapse/storage/schema/delta/28/receipts_user_id_index.sql b/synapse/storage/schema/delta/28/receipts_user_id_index.sql new file mode 100644 index 0000000000..452a1b3c6c --- /dev/null +++ b/synapse/storage/schema/delta/28/receipts_user_id_index.sql @@ -0,0 +1,18 @@ +/* Copyright 2015, 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE INDEX receipts_linearized_user ON receipts_linearized( + user_id +); |