diff options
author | Richard van der Hoff <1389908+richvdh@users.noreply.github.com> | 2022-08-11 11:42:24 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-08-11 10:42:24 +0000 |
commit | 507c1cb3309e989d84ec3ff9557a96ae1fc7f369 (patch) | |
tree | d0ff6f14715ae9df656fbaff9df2d5974770b67b /synapse/storage/databases | |
parent | Use literals in place of `HTTPStatus` constants in tests (#13488) (diff) | |
download | synapse-507c1cb3309e989d84ec3ff9557a96ae1fc7f369.tar.xz |
Update the rejected state of events during resync (#13459)
Events can be un-rejected or newly-rejected during resync, so ensure we update the database and caches when that happens.
Diffstat (limited to 'synapse/storage/databases')
-rw-r--r-- | synapse/storage/databases/main/events_worker.py | 60 | ||||
-rw-r--r-- | synapse/storage/databases/main/state.py | 5 |
2 files changed, 65 insertions, 0 deletions
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index e9ff6cfb34..b07d812ae2 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -2200,3 +2200,63 @@ class EventsWorkerStore(SQLBaseStore): (room_id,), ) return [row[0] for row in txn] + + def mark_event_rejected_txn( + self, + txn: LoggingTransaction, + event_id: str, + rejection_reason: Optional[str], + ) -> None: + """Mark an event that was previously accepted as rejected, or vice versa + + This can happen, for example, when resyncing state during a faster join. + + Args: + txn: + event_id: ID of event to update + rejection_reason: reason it has been rejected, or None if it is now accepted + """ + if rejection_reason is None: + logger.info( + "Marking previously-processed event %s as accepted", + event_id, + ) + self.db_pool.simple_delete_txn( + txn, + "rejections", + keyvalues={"event_id": event_id}, + ) + else: + logger.info( + "Marking previously-processed event %s as rejected(%s)", + event_id, + rejection_reason, + ) + self.db_pool.simple_upsert_txn( + txn, + table="rejections", + keyvalues={"event_id": event_id}, + values={ + "reason": rejection_reason, + "last_check": self._clock.time_msec(), + }, + ) + self.db_pool.simple_update_txn( + txn, + table="events", + keyvalues={"event_id": event_id}, + updatevalues={"rejection_reason": rejection_reason}, + ) + + self.invalidate_get_event_cache_after_txn(txn, event_id) + + # TODO(faster_joins): invalidate the cache on workers. Ideally we'd just + # call '_send_invalidation_to_replication', but we actually need the other + # end to call _invalidate_local_get_event_cache() rather than (just) + # _get_event_cache.invalidate(). + # + # One solution might be to (somehow) get the workers to call + # _invalidate_caches_for_event() (though that will invalidate more than + # strictly necessary). + # + # https://github.com/matrix-org/synapse/issues/12994 diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py index f70705a0af..0b10af0e58 100644 --- a/synapse/storage/databases/main/state.py +++ b/synapse/storage/databases/main/state.py @@ -430,6 +430,11 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): updatevalues={"state_group": state_group}, ) + # the event may now be rejected where it was not before, or vice versa, + # in which case we need to update the rejected flags. + if bool(context.rejected) != (event.rejected_reason is not None): + self.mark_event_rejected_txn(txn, event.event_id, context.rejected) + self.db_pool.simple_delete_one_txn( txn, table="partial_state_events", |