diff options
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/events.py | 29 | ||||
-rw-r--r-- | synapse/storage/state.py | 16 | ||||
-rw-r--r-- | synapse/storage/stream.py | 33 |
3 files changed, 55 insertions, 23 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py index e444b64cee..5233430028 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -151,13 +151,38 @@ class EventsStore(SQLBaseStore): defer.returnValue(events[0] if events else None) + @defer.inlineCallbacks + def get_events(self, event_ids, check_redacted=True, + get_prev_content=False, allow_rejected=False): + """Get events from the database + + Args: + event_ids (list): The event_ids of the events to fetch + check_redacted (bool): If True, check if event has been redacted + and redact it. + get_prev_content (bool): If True and event is a state event, + include the previous states content in the unsigned field. + allow_rejected (bool): If True return rejected events. + + Returns: + Deferred : Dict from event_id to event. + """ + events = yield self._get_events( + event_ids, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, + ) + + defer.returnValue({e.event_id: e for e in events}) + @log_function def _persist_event_txn(self, txn, event, context, is_new_state=True, current_state=None): # We purposefully do this first since if we include a `current_state` # key, we *want* to update the `current_state_events` table if current_state: - txn.call_after(self.get_current_state_for_key.invalidate_all) + txn.call_after(self._get_current_state_for_key.invalidate_all) txn.call_after(self.get_rooms_for_user.invalidate_all) txn.call_after(self.get_users_in_room.invalidate, (event.room_id,)) txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,)) @@ -441,7 +466,7 @@ class EventsStore(SQLBaseStore): for event, _ in state_events_and_contexts: if not context.rejected: txn.call_after( - self.get_current_state_for_key.invalidate, + self._get_current_state_for_key.invalidate, (event.room_id, event.type, event.state_key,) ) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index f06c734c4e..eab2c5a8ce 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -14,9 +14,7 @@ # limitations under the License. from ._base import SQLBaseStore -from synapse.util.caches.descriptors import ( - cached, cachedInlineCallbacks, cachedList -) +from synapse.util.caches.descriptors import cached, cachedList from twisted.internet import defer @@ -155,8 +153,14 @@ class StateStore(SQLBaseStore): events = yield self._get_events(event_ids, get_prev_content=False) defer.returnValue(events) - @cachedInlineCallbacks(num_args=3) + @defer.inlineCallbacks def get_current_state_for_key(self, room_id, event_type, state_key): + event_ids = yield self._get_current_state_for_key(room_id, event_type, state_key) + events = yield self._get_events(event_ids, get_prev_content=False) + defer.returnValue(events) + + @cached(num_args=3) + def _get_current_state_for_key(self, room_id, event_type, state_key): def f(txn): sql = ( "SELECT event_id FROM current_state_events" @@ -167,9 +171,7 @@ class StateStore(SQLBaseStore): txn.execute(sql, args) results = txn.fetchall() return [r[0] for r in results] - event_ids = yield self.runInteraction("get_current_state_for_key", f) - events = yield self._get_events(event_ids, get_prev_content=False) - defer.returnValue(events) + return self.runInteraction("get_current_state_for_key", f) def _get_state_groups_from_groups(self, groups, types): """Returns dictionary state_group -> (dict of (type, state_key) -> event id) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 7f4a827528..cf84938be5 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -36,7 +36,7 @@ what sort order was used: from twisted.internet import defer from ._base import SQLBaseStore -from synapse.util.caches.descriptors import cachedInlineCallbacks +from synapse.util.caches.descriptors import cached from synapse.api.constants import EventTypes from synapse.types import RoomStreamToken from synapse.util.logcontext import preserve_fn @@ -465,9 +465,25 @@ class StreamStore(SQLBaseStore): defer.returnValue((events, token)) - @cachedInlineCallbacks(num_args=4) + @defer.inlineCallbacks def get_recent_events_for_room(self, room_id, limit, end_token, from_token=None): + rows, token = yield self.get_recent_event_ids_for_room( + room_id, limit, end_token, from_token + ) + + logger.debug("stream before") + events = yield self._get_events( + [r["event_id"] for r in rows], + get_prev_content=True + ) + logger.debug("stream after") + + self._set_before_and_after(events, rows) + + defer.returnValue((events, token)) + @cached(num_args=4) + def get_recent_event_ids_for_room(self, room_id, limit, end_token, from_token=None): end_token = RoomStreamToken.parse_stream_token(end_token) if from_token is None: @@ -517,21 +533,10 @@ class StreamStore(SQLBaseStore): return rows, token - rows, token = yield self.runInteraction( + return self.runInteraction( "get_recent_events_for_room", get_recent_events_for_room_txn ) - logger.debug("stream before") - events = yield self._get_events( - [r["event_id"] for r in rows], - get_prev_content=True - ) - logger.debug("stream after") - - self._set_before_and_after(events, rows) - - defer.returnValue((events, token)) - @defer.inlineCallbacks def get_room_events_max_id(self, direction='f'): token = yield self._stream_id_gen.get_max_token() |