From 5db56779699a3c9a95701e6373ac8bd5cace7860 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 13 Aug 2015 16:58:23 +0100 Subject: Add metrics to the receipts cache --- synapse/storage/receipts.py | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index cac1a5657e..42f4b5f78d 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -15,6 +15,7 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cachedInlineCallbacks +from synapse.util.caches import cache_counter, caches_by_name from twisted.internet import defer @@ -305,6 +306,8 @@ class _RoomStreamChangeCache(object): self._room_to_key = {} self._cache = sorteddict() self._earliest_key = None + self.name = "ReceiptsRoomChangeCache" + caches_by_name[self.name] = self._cache @defer.inlineCallbacks def get_rooms_changed(self, store, room_ids, key): @@ -321,6 +324,9 @@ class _RoomStreamChangeCache(object): else: result = room_ids + cache_counter.inc_hits_by(len(result), self.name) + cache_counter.inc_misses_by(len(room_ids) - len(result), self.name) + defer.returnValue(result) @defer.inlineCallbacks -- cgit 1.4.1 From 68b255c5a19cca4fce6a4225447e143b5ed7814b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 14 Aug 2015 15:06:22 +0100 Subject: Batch _get_linearized_receipts_for_rooms --- synapse/storage/receipts.py | 79 ++++++++++++++++++++++++++++++++++++--------- 1 file changed, 64 insertions(+), 15 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 42f4b5f78d..fb35472ad7 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -14,7 +14,7 @@ # limitations under the License. from ._base import SQLBaseStore -from synapse.util.caches.descriptors import cachedInlineCallbacks +from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList from synapse.util.caches import cache_counter, caches_by_name from twisted.internet import defer @@ -55,19 +55,13 @@ class ReceiptsStore(SQLBaseStore): self, room_ids, from_key ) - results = yield defer.gatherResults( - [ - self.get_linearized_receipts_for_room( - room_id, to_key, from_key=from_key - ) - for room_id in room_ids - ], - consumeErrors=True, - ).addErrback(unwrapFirstError) + results = yield self._get_linearized_receipts_for_rooms( + room_ids, to_key, from_key=from_key + ) - defer.returnValue([ev for res in results for ev in res]) + defer.returnValue([ev for res in results.values() for ev in res]) - @defer.inlineCallbacks + @cachedInlineCallbacks(num_args=3, max_entries=5000) def get_linearized_receipts_for_room(self, room_id, to_key, from_key=None): """Get receipts for a single room for sending to clients. @@ -127,6 +121,61 @@ class ReceiptsStore(SQLBaseStore): "content": content, }]) + @cachedList(cache=get_linearized_receipts_for_room.cache, list_name="room_ids", + num_args=3, inlineCallbacks=True) + def _get_linearized_receipts_for_rooms(self, room_ids, to_key, from_key=None): + if not room_ids: + defer.returnValue({}) + + def f(txn): + if from_key: + sql = ( + "SELECT * FROM receipts_linearized WHERE" + " room_id IN (%s) AND stream_id > ? AND stream_id <= ?" + ) % ( + ",".join(["?"] * len(room_ids)) + ) + args = list(room_ids) + args.extend([from_key, to_key]) + + txn.execute(sql, args) + else: + sql = ( + "SELECT * FROM receipts_linearized WHERE" + " room_id IN (%s) AND stream_id <= ?" + ) % ( + ",".join(["?"] * len(room_ids)) + ) + + args = list(room_ids) + args.append(to_key) + + txn.execute(sql, args) + + return self.cursor_to_dict(txn) + + txn_results = yield self.runInteraction( + "_get_linearized_receipts_for_rooms", f + ) + + results = {} + for row in txn_results: + results.setdefault(row["room_id"], { + "type": "m.receipt", + "room_id": row["room_id"], + "content": {}, + })["content"].setdefault( + row["event_id"], {} + ).setdefault( + row["receipt_type"], {} + )[row["user_id"]] = json.loads(row["data"]) + + results = { + room_id: [results[room_id]] if room_id in results else [] + for room_id in room_ids + } + defer.returnValue(results) + def get_max_receipt_stream_id(self): return self._receipts_id_gen.get_max_token(self) @@ -321,11 +370,11 @@ class _RoomStreamChangeCache(object): result = set( self._cache[k] for k in keys[i:] ).intersection(room_ids) + + cache_counter.inc_hits(self.name) else: result = room_ids - - cache_counter.inc_hits_by(len(result), self.name) - cache_counter.inc_misses_by(len(room_ids) - len(result), self.name) + cache_counter.inc_misses(self.name) defer.returnValue(result) -- cgit 1.4.1 From d3d582bc1c1d6d497f72d6b248e4f15722f6e18d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 17 Aug 2015 13:38:09 +0100 Subject: Remove unused import --- synapse/storage/receipts.py | 2 -- 1 file changed, 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index fb35472ad7..46c06e42b1 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -19,8 +19,6 @@ from synapse.util.caches import cache_counter, caches_by_name from twisted.internet import defer -from synapse.util import unwrapFirstError - from blist import sorteddict import logging import ujson as json -- cgit 1.4.1 From 8f4165628b7e8fa5510bd3cffe01bb93cb7e2df7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 17 Aug 2015 14:43:54 +0100 Subject: Add index receipts_linearized_room_stream --- synapse/storage/__init__.py | 2 +- synapse/storage/schema/delta/22/receipts_index.sql | 18 ++++++++++++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) create mode 100644 synapse/storage/schema/delta/22/receipts_index.sql (limited to 'synapse/storage') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index c6ce65b4cc..f154b1c8ae 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -54,7 +54,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 21 +SCHEMA_VERSION = 22 dir_path = os.path.abspath(os.path.dirname(__file__)) diff --git a/synapse/storage/schema/delta/22/receipts_index.sql b/synapse/storage/schema/delta/22/receipts_index.sql new file mode 100644 index 0000000000..b182b2b661 --- /dev/null +++ b/synapse/storage/schema/delta/22/receipts_index.sql @@ -0,0 +1,18 @@ +/* Copyright 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE INDEX receipts_linearized_room_stream ON receipts_linearized( + room_id, stream_id +); -- cgit 1.4.1 From cfc503681f3febbd80faeda0da4134b98bcbcbbc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 18 Aug 2015 10:49:23 +0100 Subject: Comments --- synapse/storage/receipts.py | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 46c06e42b1..0ed9f45ecd 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -158,15 +158,25 @@ class ReceiptsStore(SQLBaseStore): results = {} for row in txn_results: - results.setdefault(row["room_id"], { + # We want a single event per room, since we want to batch the + # receipts by room, event and type. + room_event = results.setdefault(row["room_id"], { "type": "m.receipt", "room_id": row["room_id"], "content": {}, - })["content"].setdefault( + }) + + # The content is of the form: + # {"$foo:bar": { "read": { "@user:host": }, .. }, .. } + event_id = room_event["content"].setdefault( row["event_id"], {} - ).setdefault( + ) + + receipt_type = event_id.setdefault( row["receipt_type"], {} - )[row["user_id"]] = json.loads(row["data"]) + ) + + receipt_type[row["user_id"]] = json.loads(row["data"]) results = { room_id: [results[room_id]] if room_id in results else [] -- cgit 1.4.1 From f704c10f29ac1b013966cc8e43e74d38449cee6a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 18 Aug 2015 11:54:03 +0100 Subject: Rename unhelpful variable name --- synapse/storage/receipts.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 0ed9f45ecd..a535063547 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -168,13 +168,8 @@ class ReceiptsStore(SQLBaseStore): # The content is of the form: # {"$foo:bar": { "read": { "@user:host": }, .. }, .. } - event_id = room_event["content"].setdefault( - row["event_id"], {} - ) - - receipt_type = event_id.setdefault( - row["receipt_type"], {} - ) + event_entry = room_event["content"].setdefault(row["event_id"], {}) + receipt_type = event_entry.setdefault(row["receipt_type"], {}) receipt_type[row["user_id"]] = json.loads(row["data"]) -- cgit 1.4.1