diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 5a04d45e1e..9751f024d7 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -402,6 +402,10 @@ class EventsStore(SQLBaseStore):
@defer.inlineCallbacks
def _get_events(self, event_ids, check_redacted=True,
get_prev_content=False, allow_rejected=False, txn=None):
+ """Gets a collection of events. If `txn` is not None the we use the
+ current transaction to fetch events and we return a deferred that is
+ guarenteed to have resolved.
+ """
if not event_ids:
defer.returnValue([])
@@ -490,16 +494,10 @@ class EventsStore(SQLBaseStore):
return event_map
- def _fetch_events_txn(self, txn, events, check_redacted=True,
- get_prev_content=False, allow_rejected=False):
- return unwrap_deferred(self._fetch_events(
- txn, events,
- check_redacted=check_redacted,
- get_prev_content=get_prev_content,
- allow_rejected=allow_rejected,
- ))
-
def _do_fetch(self, conn):
+ """Takes a database connection and waits for requests for events from
+ the _event_fetch_list queue.
+ """
event_list = []
i = 0
while True:
@@ -532,6 +530,7 @@ class EventsStore(SQLBaseStore):
for r in rows
}
+ # We only want to resolve deferreds from the main thread
def fire(lst, res):
for ids, d in lst:
if not d.called:
@@ -547,6 +546,7 @@ class EventsStore(SQLBaseStore):
except Exception as e:
logger.exception("do_fetch")
+ # We only want to resolve deferreds from the main thread
def fire(evs):
for _, d in evs:
if not d.called:
@@ -558,6 +558,10 @@ class EventsStore(SQLBaseStore):
@defer.inlineCallbacks
def _enqueue_events(self, events, check_redacted=True,
get_prev_content=False, allow_rejected=False):
+ """Fetches events from the database using the _event_fetch_list. This
+ allows batch and bulk fetching of events - it allows us to fetch events
+ without having to create a new transaction for each request for events.
+ """
if not events:
defer.returnValue({})
@@ -582,6 +586,9 @@ class EventsStore(SQLBaseStore):
rows = yield preserve_context_over_deferred(events_d)
+ if not allow_rejected:
+ rows[:] = [r for r in rows if not r["rejects"]]
+
res = yield defer.gatherResults(
[
self._get_event_from_row(
@@ -627,49 +634,46 @@ class EventsStore(SQLBaseStore):
return rows
- @defer.inlineCallbacks
- def _fetch_events(self, txn, events, check_redacted=True,
- get_prev_content=False, allow_rejected=False):
+ def _fetch_events_txn(self, txn, events, check_redacted=True,
+ get_prev_content=False, allow_rejected=False):
if not events:
- defer.returnValue({})
+ return {}
- if txn:
- rows = self._fetch_event_rows(
- txn, events,
- )
- else:
- rows = yield self.runInteraction(
- self._fetch_event_rows,
- events,
- )
+ rows = self._fetch_event_rows(
+ txn, events,
+ )
if not allow_rejected:
rows[:] = [r for r in rows if not r["rejects"]]
- res = yield defer.gatherResults(
- [
- defer.maybeDeferred(
- self._get_event_from_row,
- txn,
- row["internal_metadata"], row["json"], row["redacts"],
- check_redacted=check_redacted,
- get_prev_content=get_prev_content,
- rejected_reason=row["rejects"],
- )
- for row in rows
- ],
- consumeErrors=True,
- )
+ res = [
+ unwrap_deferred(self._get_event_from_row(
+ txn,
+ row["internal_metadata"], row["json"], row["redacts"],
+ check_redacted=check_redacted,
+ get_prev_content=get_prev_content,
+ rejected_reason=row["rejects"],
+ ))
+ for row in rows
+ ]
- defer.returnValue({
+ return {
r.event_id: r
for r in res
- })
+ }
@defer.inlineCallbacks
def _get_event_from_row(self, txn, internal_metadata, js, redacted,
check_redacted=True, get_prev_content=False,
rejected_reason=None):
+ """This is called when we have a row from the database that we want to
+ convert into an event. Depending on the given options it may do more
+ database ops to fill in extra information (e.g. previous content or
+ rejection reason.)
+
+ `txn` may be None, and if so this creates new transactions for each
+ database op.
+ """
d = json.loads(js)
internal_metadata = json.loads(internal_metadata)
|