diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py
index 0f9ac1cf09..f159400a87 100644
--- a/synapse/storage/persist_events.py
+++ b/synapse/storage/persist_events.py
@@ -23,7 +23,6 @@ from typing import Iterable, List, Optional, Set, Tuple
from six import iteritems
from six.moves import range
-import attr
from prometheus_client import Counter, Histogram
from twisted.internet import defer
@@ -35,6 +34,7 @@ from synapse.logging.context import PreserveLoggingContext, make_deferred_yielda
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.state import StateResolutionStore
from synapse.storage.data_stores import DataStores
+from synapse.storage.data_stores.main.events import DeltaState
from synapse.types import StateMap
from synapse.util.async_helpers import ObservableDeferred
from synapse.util.metrics import Measure
@@ -73,22 +73,6 @@ stale_forward_extremities_counter = Histogram(
)
-@attr.s(slots=True)
-class DeltaState:
- """Deltas to use to update the `current_state_events` table.
-
- Attributes:
- to_delete: List of type/state_keys to delete from current state
- to_insert: Map of state to upsert into current state
- no_longer_in_room: The server is not longer in the room, so the room
- should e.g. be removed from `current_state_events` table.
- """
-
- to_delete = attr.ib(type=List[Tuple[str, str]])
- to_insert = attr.ib(type=StateMap[str])
- no_longer_in_room = attr.ib(type=bool, default=False)
-
-
class _EventPeristenceQueue(object):
"""Queues up events so that they can be persisted in bulk with only one
concurrent transaction per room.
@@ -205,6 +189,7 @@ class EventsPersistenceStorage(object):
# store for now.
self.main_store = stores.main
self.state_store = stores.state
+ self.persist_events_store = stores.persist_events
self._clock = hs.get_clock()
self.is_mine_id = hs.is_mine_id
@@ -445,7 +430,7 @@ class EventsPersistenceStorage(object):
if current_state is not None:
current_state_for_room[room_id] = current_state
- await self.main_store._persist_events_and_state_updates(
+ await self.persist_events_store._persist_events_and_state_updates(
chunk,
current_state_for_room=current_state_for_room,
state_delta_for_room=state_delta_for_room,
@@ -491,13 +476,15 @@ class EventsPersistenceStorage(object):
)
# Remove any events which are prev_events of any existing events.
- existing_prevs = await self.main_store._get_events_which_are_prevs(result)
+ existing_prevs = await self.persist_events_store._get_events_which_are_prevs(
+ result
+ )
result.difference_update(existing_prevs)
# Finally handle the case where the new events have soft-failed prev
# events. If they do we need to remove them and their prev events,
# otherwise we end up with dangling extremities.
- existing_prevs = await self.main_store._get_prevs_before_rejected(
+ existing_prevs = await self.persist_events_store._get_prevs_before_rejected(
e_id for event in new_events for e_id in event.prev_event_ids()
)
result.difference_update(existing_prevs)
@@ -753,8 +740,8 @@ class EventsPersistenceStorage(object):
# whose state has changed as we've already their new state above.
users_to_ignore = [
state_key
- for _, state_key in itertools.chain(delta.to_insert, delta.to_delete)
- if self.is_mine_id(state_key)
+ for typ, state_key in itertools.chain(delta.to_insert, delta.to_delete)
+ if typ == EventTypes.Member and self.is_mine_id(state_key)
]
if await self.main_store.is_local_host_in_room_ignoring_users(
@@ -799,3 +786,9 @@ class EventsPersistenceStorage(object):
for user_id in left_users:
await self.main_store.mark_remote_user_device_list_as_unsubscribed(user_id)
+
+ async def locally_reject_invite(self, user_id: str, room_id: str) -> int:
+ """Mark the invite has having been rejected even though we failed to
+ create a leave event for it.
+ """
+ return await self.persist_events_store.locally_reject_invite(user_id, room_id)
|