diff options
author | Richard van der Hoff <1389908+richvdh@users.noreply.github.com> | 2018-04-20 10:26:12 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-04-20 10:26:12 +0100 |
commit | bc381d5798ceccbd4c9bde99ef56a5cb47fb433d (patch) | |
tree | 2badc5cf943bbb88a3c6f704b66fc7daed9e743e /synapse/storage | |
parent | Merge pull request #3113 from matrix-org/rav/fix_huge_prev_events (diff) | |
parent | Refactor store.have_events (diff) | |
download | synapse-bc381d5798ceccbd4c9bde99ef56a5cb47fb433d.tar.xz |
Merge pull request #3117 from matrix-org/rav/refactor_have_events
Refactor store.have_events
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/events.py | 49 |
1 files changed, 42 insertions, 7 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py index da44b52fd6..5fe4a0e56c 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -16,6 +16,7 @@ from collections import OrderedDict, deque, namedtuple from functools import wraps +import itertools import logging import simplejson as json @@ -1320,13 +1321,49 @@ class EventsStore(EventsWorkerStore): defer.returnValue(set(r["event_id"] for r in rows)) - def have_events(self, event_ids): + @defer.inlineCallbacks + def have_seen_events(self, event_ids): """Given a list of event ids, check if we have already processed them. + Args: + event_ids (iterable[str]): + Returns: - dict: Has an entry for each event id we already have seen. Maps to - the rejected reason string if we rejected the event, else maps to - None. + Deferred[set[str]]: The events we have already seen. + """ + results = set() + + def have_seen_events_txn(txn, chunk): + sql = ( + "SELECT event_id FROM events as e WHERE e.event_id IN (%s)" + % (",".join("?" * len(chunk)), ) + ) + txn.execute(sql, chunk) + for (event_id, ) in txn: + results.add(event_id) + + # break the input up into chunks of 100 + input_iterator = iter(event_ids) + for chunk in iter(lambda: list(itertools.islice(input_iterator, 100)), + []): + yield self.runInteraction( + "have_seen_events", + have_seen_events_txn, + chunk, + ) + defer.returnValue(results) + + def get_seen_events_with_rejections(self, event_ids): + """Given a list of event ids, check if we rejected them. + + Args: + event_ids (list[str]) + + Returns: + Deferred[dict[str, str|None): + Has an entry for each event id we already have seen. Maps to + the rejected reason string if we rejected the event, else maps + to None. """ if not event_ids: return defer.succeed({}) @@ -1348,9 +1385,7 @@ class EventsStore(EventsWorkerStore): return res - return self.runInteraction( - "have_events", f, - ) + return self.runInteraction("get_rejection_reasons", f) @defer.inlineCallbacks def count_daily_messages(self): |