summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/events.py80
1 files changed, 42 insertions, 38 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 5a04d45e1e..9751f024d7 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -402,6 +402,10 @@ class EventsStore(SQLBaseStore):
     @defer.inlineCallbacks
     def _get_events(self, event_ids, check_redacted=True,
                     get_prev_content=False, allow_rejected=False, txn=None):
+        """Gets a collection of events. If `txn` is not None the we use the
+        current transaction to fetch events and we return a deferred that is
+        guarenteed to have resolved.
+        """
         if not event_ids:
             defer.returnValue([])
 
@@ -490,16 +494,10 @@ class EventsStore(SQLBaseStore):
 
         return event_map
 
-    def _fetch_events_txn(self, txn, events, check_redacted=True,
-                          get_prev_content=False, allow_rejected=False):
-        return unwrap_deferred(self._fetch_events(
-            txn, events,
-            check_redacted=check_redacted,
-            get_prev_content=get_prev_content,
-            allow_rejected=allow_rejected,
-        ))
-
     def _do_fetch(self, conn):
+        """Takes a database connection and waits for requests for events from
+        the _event_fetch_list queue.
+        """
         event_list = []
         i = 0
         while True:
@@ -532,6 +530,7 @@ class EventsStore(SQLBaseStore):
                     for r in rows
                 }
 
+                # We only want to resolve deferreds from the main thread
                 def fire(lst, res):
                     for ids, d in lst:
                         if not d.called:
@@ -547,6 +546,7 @@ class EventsStore(SQLBaseStore):
             except Exception as e:
                 logger.exception("do_fetch")
 
+                # We only want to resolve deferreds from the main thread
                 def fire(evs):
                     for _, d in evs:
                         if not d.called:
@@ -558,6 +558,10 @@ class EventsStore(SQLBaseStore):
     @defer.inlineCallbacks
     def _enqueue_events(self, events, check_redacted=True,
                         get_prev_content=False, allow_rejected=False):
+        """Fetches events from the database using the _event_fetch_list. This
+        allows batch and bulk fetching of events - it allows us to fetch events
+        without having to create a new transaction for each request for events.
+        """
         if not events:
             defer.returnValue({})
 
@@ -582,6 +586,9 @@ class EventsStore(SQLBaseStore):
 
         rows = yield preserve_context_over_deferred(events_d)
 
+        if not allow_rejected:
+            rows[:] = [r for r in rows if not r["rejects"]]
+
         res = yield defer.gatherResults(
             [
                 self._get_event_from_row(
@@ -627,49 +634,46 @@ class EventsStore(SQLBaseStore):
 
         return rows
 
-    @defer.inlineCallbacks
-    def _fetch_events(self, txn, events, check_redacted=True,
-                      get_prev_content=False, allow_rejected=False):
+    def _fetch_events_txn(self, txn, events, check_redacted=True,
+                          get_prev_content=False, allow_rejected=False):
         if not events:
-            defer.returnValue({})
+            return {}
 
-        if txn:
-            rows = self._fetch_event_rows(
-                txn, events,
-            )
-        else:
-            rows = yield self.runInteraction(
-                self._fetch_event_rows,
-                events,
-            )
+        rows = self._fetch_event_rows(
+            txn, events,
+        )
 
         if not allow_rejected:
             rows[:] = [r for r in rows if not r["rejects"]]
 
-        res = yield defer.gatherResults(
-            [
-                defer.maybeDeferred(
-                    self._get_event_from_row,
-                    txn,
-                    row["internal_metadata"], row["json"], row["redacts"],
-                    check_redacted=check_redacted,
-                    get_prev_content=get_prev_content,
-                    rejected_reason=row["rejects"],
-                )
-                for row in rows
-            ],
-            consumeErrors=True,
-        )
+        res = [
+            unwrap_deferred(self._get_event_from_row(
+                txn,
+                row["internal_metadata"], row["json"], row["redacts"],
+                check_redacted=check_redacted,
+                get_prev_content=get_prev_content,
+                rejected_reason=row["rejects"],
+            ))
+            for row in rows
+        ]
 
-        defer.returnValue({
+        return {
             r.event_id: r
             for r in res
-        })
+        }
 
     @defer.inlineCallbacks
     def _get_event_from_row(self, txn, internal_metadata, js, redacted,
                             check_redacted=True, get_prev_content=False,
                             rejected_reason=None):
+        """This is called when we have a row from the database that we want to
+        convert into an event. Depending on the given options it may do more
+        database ops to fill in extra information (e.g. previous content or
+        rejection reason.)
+
+        `txn` may be None, and if so this creates new transactions for each
+        database op.
+        """
         d = json.loads(js)
         internal_metadata = json.loads(internal_metadata)