summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/event_push_actions.py45
-rw-r--r--synapse/storage/events.py26
-rw-r--r--synapse/storage/prepare_database.py2
-rw-r--r--synapse/storage/receipts.py19
4 files changed, 52 insertions, 40 deletions
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py

index d0a969f50b..d77a817682 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py
@@ -24,8 +24,7 @@ logger = logging.getLogger(__name__) class EventPushActionsStore(SQLBaseStore): - @defer.inlineCallbacks - def set_push_actions_for_event_and_users(self, event, tuples): + def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples): """ :param event: the event set actions for :param tuples: list of tuples of (user_id, profile_tag, actions) @@ -44,18 +43,12 @@ class EventPushActionsStore(SQLBaseStore): 'highlight': 1 if _action_has_highlight(actions) else 0, }) - def f(txn): - for uid, _, __ in tuples: - txn.call_after( - self.get_unread_event_push_actions_by_room_for_user.invalidate_many, - (event.room_id, uid) - ) - return self._simple_insert_many_txn(txn, "event_push_actions", values) - - yield self.runInteraction( - "set_actions_for_event_and_users", - f, - ) + for uid, _, __ in tuples: + txn.call_after( + self.get_unread_event_push_actions_by_room_for_user.invalidate_many, + (event.room_id, uid) + ) + self._simple_insert_many_txn(txn, "event_push_actions", values) @cachedInlineCallbacks(num_args=3, lru=True, tree=True) def get_unread_event_push_actions_by_room_for_user( @@ -107,21 +100,15 @@ class EventPushActionsStore(SQLBaseStore): ) defer.returnValue(ret) - @defer.inlineCallbacks - def remove_push_actions_for_event_id(self, room_id, event_id): - def f(txn): - # Sad that we have to blow away the cache for the whole room here - txn.call_after( - self.get_unread_event_push_actions_by_room_for_user.invalidate_many, - (room_id,) - ) - txn.execute( - "DELETE FROM event_push_actions WHERE room_id = ? AND event_id = ?", - (room_id, event_id) - ) - yield self.runInteraction( - "remove_push_actions_for_event_id", - f + def _remove_push_actions_for_event_id_txn(self, txn, room_id, event_id): + # Sad that we have to blow away the cache for the whole room here + txn.call_after( + self.get_unread_event_push_actions_by_room_for_user.invalidate_many, + (room_id,) + ) + txn.execute( + "DELETE FROM event_push_actions WHERE room_id = ? AND event_id = ?", + (room_id, event_id) ) diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index c6ed54721c..3a5c6ee4b1 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py
@@ -205,23 +205,29 @@ class EventsStore(SQLBaseStore): @log_function def _persist_events_txn(self, txn, events_and_contexts, backfilled, is_new_state=True): - - # Remove the any existing cache entries for the event_ids - for event, _ in events_and_contexts: + depth_updates = {} + for event, context in events_and_contexts: + # Remove the any existing cache entries for the event_ids txn.call_after(self._invalidate_get_event_cache, event.event_id) - if not backfilled: txn.call_after( self._events_stream_cache.entity_has_changed, event.room_id, event.internal_metadata.stream_ordering, ) - depth_updates = {} - for event, _ in events_and_contexts: - if event.internal_metadata.is_outlier(): - continue - depth_updates[event.room_id] = max( - event.depth, depth_updates.get(event.room_id, event.depth) + if not event.internal_metadata.is_outlier(): + depth_updates[event.room_id] = max( + event.depth, depth_updates.get(event.room_id, event.depth) + ) + + if context.push_actions: + self._set_push_actions_for_event_and_users_txn( + txn, event, context.push_actions + ) + + if event.type == EventTypes.Redaction and event.redacts is not None: + self._remove_push_actions_for_event_id_txn( + txn, event.room_id, event.redacts ) for room_id, depth in depth_updates.items(): diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index d782b8e25b..850736c85e 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py
@@ -211,7 +211,7 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files, logger.debug("applied_delta_files: %s", applied_delta_files) for v in range(start_ver, SCHEMA_VERSION + 1): - logger.debug("Upgrading schema to v%d", v) + logger.info("Upgrading schema to v%d", v) delta_dir = os.path.join(dir_path, "schema", "delta", str(v)) diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index 8068c73740..4202a6b3dc 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py
@@ -46,6 +46,20 @@ class ReceiptsStore(SQLBaseStore): desc="get_receipts_for_room", ) + @cached(num_args=3) + def get_last_receipt_event_id_for_user(self, user_id, room_id, receipt_type): + return self._simple_select_one_onecol( + table="receipts_linearized", + keyvalues={ + "room_id": room_id, + "receipt_type": receipt_type, + "user_id": user_id + }, + retcol="event_id", + desc="get_own_receipt_for_user", + allow_none=True, + ) + @cachedInlineCallbacks(num_args=2) def get_receipts_for_user(self, user_id, receipt_type): def f(txn): @@ -226,6 +240,11 @@ class ReceiptsStore(SQLBaseStore): room_id, stream_id ) + txn.call_after( + self.get_last_receipt_event_id_for_user.invalidate, + (user_id, room_id, receipt_type) + ) + # We don't want to clobber receipts for more recent events, so we # have to compare orderings of existing receipts sql = (