summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/events.py29
-rw-r--r--synapse/storage/state.py16
-rw-r--r--synapse/storage/stream.py33
3 files changed, 55 insertions, 23 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index e444b64cee..5233430028 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -151,13 +151,38 @@ class EventsStore(SQLBaseStore):
 
         defer.returnValue(events[0] if events else None)
 
+    @defer.inlineCallbacks
+    def get_events(self, event_ids, check_redacted=True,
+                   get_prev_content=False, allow_rejected=False):
+        """Get events from the database
+
+        Args:
+            event_ids (list): The event_ids of the events to fetch
+            check_redacted (bool): If True, check if event has been redacted
+                and redact it.
+            get_prev_content (bool): If True and event is a state event,
+                include the previous states content in the unsigned field.
+            allow_rejected (bool): If True return rejected events.
+
+        Returns:
+            Deferred : Dict from event_id to event.
+        """
+        events = yield self._get_events(
+            event_ids,
+            check_redacted=check_redacted,
+            get_prev_content=get_prev_content,
+            allow_rejected=allow_rejected,
+        )
+
+        defer.returnValue({e.event_id: e for e in events})
+
     @log_function
     def _persist_event_txn(self, txn, event, context,
                            is_new_state=True, current_state=None):
         # We purposefully do this first since if we include a `current_state`
         # key, we *want* to update the `current_state_events` table
         if current_state:
-            txn.call_after(self.get_current_state_for_key.invalidate_all)
+            txn.call_after(self._get_current_state_for_key.invalidate_all)
             txn.call_after(self.get_rooms_for_user.invalidate_all)
             txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
             txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,))
@@ -441,7 +466,7 @@ class EventsStore(SQLBaseStore):
             for event, _ in state_events_and_contexts:
                 if not context.rejected:
                     txn.call_after(
-                        self.get_current_state_for_key.invalidate,
+                        self._get_current_state_for_key.invalidate,
                         (event.room_id, event.type, event.state_key,)
                     )
 
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index f06c734c4e..eab2c5a8ce 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -14,9 +14,7 @@
 # limitations under the License.
 
 from ._base import SQLBaseStore
-from synapse.util.caches.descriptors import (
-    cached, cachedInlineCallbacks, cachedList
-)
+from synapse.util.caches.descriptors import cached, cachedList
 
 from twisted.internet import defer
 
@@ -155,8 +153,14 @@ class StateStore(SQLBaseStore):
         events = yield self._get_events(event_ids, get_prev_content=False)
         defer.returnValue(events)
 
-    @cachedInlineCallbacks(num_args=3)
+    @defer.inlineCallbacks
     def get_current_state_for_key(self, room_id, event_type, state_key):
+        event_ids = yield self._get_current_state_for_key(room_id, event_type, state_key)
+        events = yield self._get_events(event_ids, get_prev_content=False)
+        defer.returnValue(events)
+
+    @cached(num_args=3)
+    def _get_current_state_for_key(self, room_id, event_type, state_key):
         def f(txn):
             sql = (
                 "SELECT event_id FROM current_state_events"
@@ -167,9 +171,7 @@ class StateStore(SQLBaseStore):
             txn.execute(sql, args)
             results = txn.fetchall()
             return [r[0] for r in results]
-        event_ids = yield self.runInteraction("get_current_state_for_key", f)
-        events = yield self._get_events(event_ids, get_prev_content=False)
-        defer.returnValue(events)
+        return self.runInteraction("get_current_state_for_key", f)
 
     def _get_state_groups_from_groups(self, groups, types):
         """Returns dictionary state_group -> (dict of (type, state_key) -> event id)
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 7f4a827528..cf84938be5 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -36,7 +36,7 @@ what sort order was used:
 from twisted.internet import defer
 
 from ._base import SQLBaseStore
-from synapse.util.caches.descriptors import cachedInlineCallbacks
+from synapse.util.caches.descriptors import cached
 from synapse.api.constants import EventTypes
 from synapse.types import RoomStreamToken
 from synapse.util.logcontext import preserve_fn
@@ -465,9 +465,25 @@ class StreamStore(SQLBaseStore):
 
         defer.returnValue((events, token))
 
-    @cachedInlineCallbacks(num_args=4)
+    @defer.inlineCallbacks
     def get_recent_events_for_room(self, room_id, limit, end_token, from_token=None):
+        rows, token = yield self.get_recent_event_ids_for_room(
+            room_id, limit, end_token, from_token
+        )
+
+        logger.debug("stream before")
+        events = yield self._get_events(
+            [r["event_id"] for r in rows],
+            get_prev_content=True
+        )
+        logger.debug("stream after")
+
+        self._set_before_and_after(events, rows)
+
+        defer.returnValue((events, token))
 
+    @cached(num_args=4)
+    def get_recent_event_ids_for_room(self, room_id, limit, end_token, from_token=None):
         end_token = RoomStreamToken.parse_stream_token(end_token)
 
         if from_token is None:
@@ -517,21 +533,10 @@ class StreamStore(SQLBaseStore):
 
             return rows, token
 
-        rows, token = yield self.runInteraction(
+        return self.runInteraction(
             "get_recent_events_for_room", get_recent_events_for_room_txn
         )
 
-        logger.debug("stream before")
-        events = yield self._get_events(
-            [r["event_id"] for r in rows],
-            get_prev_content=True
-        )
-        logger.debug("stream after")
-
-        self._set_before_and_after(events, rows)
-
-        defer.returnValue((events, token))
-
     @defer.inlineCallbacks
     def get_room_events_max_id(self, direction='f'):
         token = yield self._stream_id_gen.get_max_token()