From 14fa9d4d92eaa242d44a2823bbd9908be2f02d81 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 2 Aug 2018 13:23:48 +0100 Subject: Avoid extra db lookups Since we're about to look up the events themselves anyway, we can skip the extra db queries here. --- synapse/storage/events_worker.py | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) (limited to 'synapse/storage/events_worker.py') diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index 67433606c6..6b8a8e908f 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -19,7 +19,7 @@ from canonicaljson import json from twisted.internet import defer -from synapse.api.errors import SynapseError +from synapse.api.errors import NotFoundError # these are only included to make the type annotations work from synapse.events import EventBase # noqa: F401 from synapse.events import FrozenEvent @@ -76,7 +76,7 @@ class EventsWorkerStore(SQLBaseStore): @defer.inlineCallbacks def get_event(self, event_id, check_redacted=True, get_prev_content=False, allow_rejected=False, - allow_none=False): + allow_none=False, check_room_id=None): """Get an event from the database by event_id. Args: @@ -87,7 +87,9 @@ class EventsWorkerStore(SQLBaseStore): include the previous states content in the unsigned field. allow_rejected (bool): If True return rejected events. allow_none (bool): If True, return None if no event found, if - False throw an exception. + False throw a NotFoundError + check_room_id (str|None): if not None, check the room of the found event. + If there is a mismatch, behave as per allow_none. Returns: Deferred : A FrozenEvent. @@ -99,10 +101,16 @@ class EventsWorkerStore(SQLBaseStore): allow_rejected=allow_rejected, ) - if not events and not allow_none: - raise SynapseError(404, "Could not find event %s" % (event_id,)) + event = events[0] if events else None - defer.returnValue(events[0] if events else None) + if event is not None and check_room_id is not None: + if event.room_id != check_room_id: + event = None + + if event is None and not allow_none: + raise NotFoundError("Could not find event %s" % (event_id,)) + + defer.returnValue(event) @defer.inlineCallbacks def get_events(self, event_ids, check_redacted=True, -- cgit 1.5.1 From 62ace05c4521caeed023e5b9dadd993afe5c154f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 26 Jul 2018 13:31:59 +0100 Subject: Move necessary storage functions to worker classes --- synapse/storage/events.py | 82 --------------------------------------- synapse/storage/events_worker.py | 84 ++++++++++++++++++++++++++++++++++++++++ synapse/storage/room.py | 32 +++++++-------- 3 files changed, 100 insertions(+), 98 deletions(-) (limited to 'synapse/storage/events_worker.py') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index e8e5a0fe44..5374796fd7 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1430,88 +1430,6 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore (event.event_id, event.redacts) ) - @defer.inlineCallbacks - def have_events_in_timeline(self, event_ids): - """Given a list of event ids, check if we have already processed and - stored them as non outliers. - """ - rows = yield self._simple_select_many_batch( - table="events", - retcols=("event_id",), - column="event_id", - iterable=list(event_ids), - keyvalues={"outlier": False}, - desc="have_events_in_timeline", - ) - - defer.returnValue(set(r["event_id"] for r in rows)) - - @defer.inlineCallbacks - def have_seen_events(self, event_ids): - """Given a list of event ids, check if we have already processed them. - - Args: - event_ids (iterable[str]): - - Returns: - Deferred[set[str]]: The events we have already seen. - """ - results = set() - - def have_seen_events_txn(txn, chunk): - sql = ( - "SELECT event_id FROM events as e WHERE e.event_id IN (%s)" - % (",".join("?" * len(chunk)), ) - ) - txn.execute(sql, chunk) - for (event_id, ) in txn: - results.add(event_id) - - # break the input up into chunks of 100 - input_iterator = iter(event_ids) - for chunk in iter(lambda: list(itertools.islice(input_iterator, 100)), - []): - yield self.runInteraction( - "have_seen_events", - have_seen_events_txn, - chunk, - ) - defer.returnValue(results) - - def get_seen_events_with_rejections(self, event_ids): - """Given a list of event ids, check if we rejected them. - - Args: - event_ids (list[str]) - - Returns: - Deferred[dict[str, str|None): - Has an entry for each event id we already have seen. Maps to - the rejected reason string if we rejected the event, else maps - to None. - """ - if not event_ids: - return defer.succeed({}) - - def f(txn): - sql = ( - "SELECT e.event_id, reason FROM events as e " - "LEFT JOIN rejections as r ON e.event_id = r.event_id " - "WHERE e.event_id = ?" - ) - - res = {} - for event_id in event_ids: - txn.execute(sql, (event_id,)) - row = txn.fetchone() - if row: - _, rejected = row - res[event_id] = rejected - - return res - - return self.runInteraction("get_rejection_reasons", f) - @defer.inlineCallbacks def count_daily_messages(self): """ diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index 9b4cfeb899..e1dc2c62fd 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -12,7 +12,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import itertools import logging + from collections import namedtuple from canonicaljson import json @@ -442,3 +444,85 @@ class EventsWorkerStore(SQLBaseStore): self._get_event_cache.prefill((original_ev.event_id,), cache_entry) defer.returnValue(cache_entry) + + @defer.inlineCallbacks + def have_events_in_timeline(self, event_ids): + """Given a list of event ids, check if we have already processed and + stored them as non outliers. + """ + rows = yield self._simple_select_many_batch( + table="events", + retcols=("event_id",), + column="event_id", + iterable=list(event_ids), + keyvalues={"outlier": False}, + desc="have_events_in_timeline", + ) + + defer.returnValue(set(r["event_id"] for r in rows)) + + @defer.inlineCallbacks + def have_seen_events(self, event_ids): + """Given a list of event ids, check if we have already processed them. + + Args: + event_ids (iterable[str]): + + Returns: + Deferred[set[str]]: The events we have already seen. + """ + results = set() + + def have_seen_events_txn(txn, chunk): + sql = ( + "SELECT event_id FROM events as e WHERE e.event_id IN (%s)" + % (",".join("?" * len(chunk)), ) + ) + txn.execute(sql, chunk) + for (event_id, ) in txn: + results.add(event_id) + + # break the input up into chunks of 100 + input_iterator = iter(event_ids) + for chunk in iter(lambda: list(itertools.islice(input_iterator, 100)), + []): + yield self.runInteraction( + "have_seen_events", + have_seen_events_txn, + chunk, + ) + defer.returnValue(results) + + def get_seen_events_with_rejections(self, event_ids): + """Given a list of event ids, check if we rejected them. + + Args: + event_ids (list[str]) + + Returns: + Deferred[dict[str, str|None): + Has an entry for each event id we already have seen. Maps to + the rejected reason string if we rejected the event, else maps + to None. + """ + if not event_ids: + return defer.succeed({}) + + def f(txn): + sql = ( + "SELECT e.event_id, reason FROM events as e " + "LEFT JOIN rejections as r ON e.event_id = r.event_id " + "WHERE e.event_id = ?" + ) + + res = {} + for event_id in event_ids: + txn.execute(sql, (event_id,)) + row = txn.fetchone() + if row: + _, rejected = row + res[event_id] = rejected + + return res + + return self.runInteraction("get_rejection_reasons", f) diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 3147fb6827..3378fc77d1 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -41,6 +41,22 @@ RatelimitOverride = collections.namedtuple( class RoomWorkerStore(SQLBaseStore): + def get_room(self, room_id): + """Retrieve a room. + + Args: + room_id (str): The ID of the room to retrieve. + Returns: + A namedtuple containing the room information, or an empty list. + """ + return self._simple_select_one( + table="rooms", + keyvalues={"room_id": room_id}, + retcols=("room_id", "is_public", "creator"), + desc="get_room", + allow_none=True, + ) + def get_public_room_ids(self): return self._simple_select_onecol( table="rooms", @@ -215,22 +231,6 @@ class RoomStore(RoomWorkerStore, SearchStore): logger.error("store_room with room_id=%s failed: %s", room_id, e) raise StoreError(500, "Problem creating room.") - def get_room(self, room_id): - """Retrieve a room. - - Args: - room_id (str): The ID of the room to retrieve. - Returns: - A namedtuple containing the room information, or an empty list. - """ - return self._simple_select_one( - table="rooms", - keyvalues={"room_id": room_id}, - retcols=("room_id", "is_public", "creator"), - desc="get_room", - allow_none=True, - ) - @defer.inlineCallbacks def set_room_is_public(self, room_id, is_public): def set_room_is_public_txn(txn, next_id): -- cgit 1.5.1 From 96a9a296455e2c08be4e0375cf784e4113fa51e1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 26 Jul 2018 13:35:36 +0100 Subject: Pull in necessary stores in federation_reader --- synapse/app/federation_reader.py | 2 ++ synapse/storage/events_worker.py | 1 - 2 files changed, 2 insertions(+), 1 deletion(-) (limited to 'synapse/storage/events_worker.py') diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index c512b4be87..0c557a8242 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -36,6 +36,7 @@ from synapse.replication.slave.storage.appservice import SlavedApplicationServic from synapse.replication.slave.storage.directory import DirectoryStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.keys import SlavedKeyStore +from synapse.replication.slave.storage.profile import SlavedProfileStore from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore from synapse.replication.slave.storage.pushers import SlavedPusherStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore @@ -53,6 +54,7 @@ logger = logging.getLogger("synapse.app.federation_reader") class FederationReaderSlavedStore( + SlavedProfileStore, SlavedApplicationServiceStore, SlavedPusherStore, SlavedPushRuleStore, diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index e1dc2c62fd..59822178ff 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -14,7 +14,6 @@ # limitations under the License. import itertools import logging - from collections import namedtuple from canonicaljson import json -- cgit 1.5.1