diff options
-rw-r--r-- | synapse/storage/events.py | 80 |
1 files changed, 42 insertions, 38 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 5a04d45e1e..9751f024d7 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -402,6 +402,10 @@ class EventsStore(SQLBaseStore): @defer.inlineCallbacks def _get_events(self, event_ids, check_redacted=True, get_prev_content=False, allow_rejected=False, txn=None): + """Gets a collection of events. If `txn` is not None the we use the + current transaction to fetch events and we return a deferred that is + guarenteed to have resolved. + """ if not event_ids: defer.returnValue([]) @@ -490,16 +494,10 @@ class EventsStore(SQLBaseStore): return event_map - def _fetch_events_txn(self, txn, events, check_redacted=True, - get_prev_content=False, allow_rejected=False): - return unwrap_deferred(self._fetch_events( - txn, events, - check_redacted=check_redacted, - get_prev_content=get_prev_content, - allow_rejected=allow_rejected, - )) - def _do_fetch(self, conn): + """Takes a database connection and waits for requests for events from + the _event_fetch_list queue. + """ event_list = [] i = 0 while True: @@ -532,6 +530,7 @@ class EventsStore(SQLBaseStore): for r in rows } + # We only want to resolve deferreds from the main thread def fire(lst, res): for ids, d in lst: if not d.called: @@ -547,6 +546,7 @@ class EventsStore(SQLBaseStore): except Exception as e: logger.exception("do_fetch") + # We only want to resolve deferreds from the main thread def fire(evs): for _, d in evs: if not d.called: @@ -558,6 +558,10 @@ class EventsStore(SQLBaseStore): @defer.inlineCallbacks def _enqueue_events(self, events, check_redacted=True, get_prev_content=False, allow_rejected=False): + """Fetches events from the database using the _event_fetch_list. This + allows batch and bulk fetching of events - it allows us to fetch events + without having to create a new transaction for each request for events. + """ if not events: defer.returnValue({}) @@ -582,6 +586,9 @@ class EventsStore(SQLBaseStore): rows = yield preserve_context_over_deferred(events_d) + if not allow_rejected: + rows[:] = [r for r in rows if not r["rejects"]] + res = yield defer.gatherResults( [ self._get_event_from_row( @@ -627,49 +634,46 @@ class EventsStore(SQLBaseStore): return rows - @defer.inlineCallbacks - def _fetch_events(self, txn, events, check_redacted=True, - get_prev_content=False, allow_rejected=False): + def _fetch_events_txn(self, txn, events, check_redacted=True, + get_prev_content=False, allow_rejected=False): if not events: - defer.returnValue({}) + return {} - if txn: - rows = self._fetch_event_rows( - txn, events, - ) - else: - rows = yield self.runInteraction( - self._fetch_event_rows, - events, - ) + rows = self._fetch_event_rows( + txn, events, + ) if not allow_rejected: rows[:] = [r for r in rows if not r["rejects"]] - res = yield defer.gatherResults( - [ - defer.maybeDeferred( - self._get_event_from_row, - txn, - row["internal_metadata"], row["json"], row["redacts"], - check_redacted=check_redacted, - get_prev_content=get_prev_content, - rejected_reason=row["rejects"], - ) - for row in rows - ], - consumeErrors=True, - ) + res = [ + unwrap_deferred(self._get_event_from_row( + txn, + row["internal_metadata"], row["json"], row["redacts"], + check_redacted=check_redacted, + get_prev_content=get_prev_content, + rejected_reason=row["rejects"], + )) + for row in rows + ] - defer.returnValue({ + return { r.event_id: r for r in res - }) + } @defer.inlineCallbacks def _get_event_from_row(self, txn, internal_metadata, js, redacted, check_redacted=True, get_prev_content=False, rejected_reason=None): + """This is called when we have a row from the database that we want to + convert into an event. Depending on the given options it may do more + database ops to fill in extra information (e.g. previous content or + rejection reason.) + + `txn` may be None, and if so this creates new transactions for each + database op. + """ d = json.loads(js) internal_metadata = json.loads(internal_metadata) |