summary refs log tree commit diff
path: root/synapse/storage/persist_events.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-05-21 15:19:00 +0100
committerErik Johnston <erik@matrix.org>2020-05-21 15:19:00 +0100
commitcf92310da26f9b6e03e98055e55c7e9767225742 (patch)
tree832140c94907ed381004d01f859abbf2fa86dd3b /synapse/storage/persist_events.py
parentMerge branch 'rav/matrix_hacks' into matrix-org-hotfixes (diff)
parentStub out GET presence requests in the frontend proxy (#7545) (diff)
downloadsynapse-cf92310da26f9b6e03e98055e55c7e9767225742.tar.xz
Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes
Diffstat (limited to 'synapse/storage/persist_events.py')
-rw-r--r--synapse/storage/persist_events.py31
1 files changed, 9 insertions, 22 deletions
diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py

index 0f9ac1cf09..12e1ffb9a2 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py
@@ -23,7 +23,6 @@ from typing import Iterable, List, Optional, Set, Tuple from six import iteritems from six.moves import range -import attr from prometheus_client import Counter, Histogram from twisted.internet import defer @@ -35,6 +34,7 @@ from synapse.logging.context import PreserveLoggingContext, make_deferred_yielda from synapse.metrics.background_process_metrics import run_as_background_process from synapse.state import StateResolutionStore from synapse.storage.data_stores import DataStores +from synapse.storage.data_stores.main.events import DeltaState from synapse.types import StateMap from synapse.util.async_helpers import ObservableDeferred from synapse.util.metrics import Measure @@ -73,22 +73,6 @@ stale_forward_extremities_counter = Histogram( ) -@attr.s(slots=True) -class DeltaState: - """Deltas to use to update the `current_state_events` table. - - Attributes: - to_delete: List of type/state_keys to delete from current state - to_insert: Map of state to upsert into current state - no_longer_in_room: The server is not longer in the room, so the room - should e.g. be removed from `current_state_events` table. - """ - - to_delete = attr.ib(type=List[Tuple[str, str]]) - to_insert = attr.ib(type=StateMap[str]) - no_longer_in_room = attr.ib(type=bool, default=False) - - class _EventPeristenceQueue(object): """Queues up events so that they can be persisted in bulk with only one concurrent transaction per room. @@ -205,6 +189,7 @@ class EventsPersistenceStorage(object): # store for now. self.main_store = stores.main self.state_store = stores.state + self.persist_events_store = stores.persist_events self._clock = hs.get_clock() self.is_mine_id = hs.is_mine_id @@ -445,7 +430,7 @@ class EventsPersistenceStorage(object): if current_state is not None: current_state_for_room[room_id] = current_state - await self.main_store._persist_events_and_state_updates( + await self.persist_events_store._persist_events_and_state_updates( chunk, current_state_for_room=current_state_for_room, state_delta_for_room=state_delta_for_room, @@ -491,13 +476,15 @@ class EventsPersistenceStorage(object): ) # Remove any events which are prev_events of any existing events. - existing_prevs = await self.main_store._get_events_which_are_prevs(result) + existing_prevs = await self.persist_events_store._get_events_which_are_prevs( + result + ) result.difference_update(existing_prevs) # Finally handle the case where the new events have soft-failed prev # events. If they do we need to remove them and their prev events, # otherwise we end up with dangling extremities. - existing_prevs = await self.main_store._get_prevs_before_rejected( + existing_prevs = await self.persist_events_store._get_prevs_before_rejected( e_id for event in new_events for e_id in event.prev_event_ids() ) result.difference_update(existing_prevs) @@ -753,8 +740,8 @@ class EventsPersistenceStorage(object): # whose state has changed as we've already their new state above. users_to_ignore = [ state_key - for _, state_key in itertools.chain(delta.to_insert, delta.to_delete) - if self.is_mine_id(state_key) + for typ, state_key in itertools.chain(delta.to_insert, delta.to_delete) + if typ == EventTypes.Member and self.is_mine_id(state_key) ] if await self.main_store.is_local_host_in_room_ignoring_users(