diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py
index 368c457321..af3fd67ab9 100644
--- a/synapse/storage/persist_events.py
+++ b/synapse/storage/persist_events.py
@@ -15,9 +15,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import itertools
import logging
from collections import deque, namedtuple
-from typing import Iterable, List, Optional, Tuple
+from typing import Iterable, List, Optional, Set, Tuple
from six import iteritems
from six.moves import range
@@ -27,7 +28,7 @@ from prometheus_client import Counter, Histogram
from twisted.internet import defer
-from synapse.api.constants import EventTypes
+from synapse.api.constants import EventTypes, Membership
from synapse.events import FrozenEvent
from synapse.events.snapshot import EventContext
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
@@ -72,17 +73,20 @@ stale_forward_extremities_counter = Histogram(
)
-@attr.s(slots=True, frozen=True)
+@attr.s(slots=True)
class DeltaState:
"""Deltas to use to update the `current_state_events` table.
Attributes:
to_delete: List of type/state_keys to delete from current state
to_insert: Map of state to upsert into current state
+ no_longer_in_room: The server is not longer in the room, so the room
+ should e.g. be removed from `current_state_events` table.
"""
to_delete = attr.ib(type=List[Tuple[str, str]])
to_insert = attr.ib(type=StateMap[str])
+ no_longer_in_room = attr.ib(type=bool, default=False)
class _EventPeristenceQueue(object):
@@ -314,6 +318,11 @@ class EventsPersistenceStorage(object):
# room
state_delta_for_room = {}
+ # Set of remote users which were in rooms the server has left. We
+ # should check if we still share any rooms and if not we mark their
+ # device lists as stale.
+ potentially_left_users = set() # type: Set[str]
+
if not backfilled:
with Measure(self._clock, "_calculate_state_and_extrem"):
# Work out the new "current state" for each room.
@@ -396,11 +405,12 @@ class EventsPersistenceStorage(object):
# If either are not None then there has been a change,
# and we need to work out the delta (or use that
# given)
+ delta = None
if delta_ids is not None:
# If there is a delta we know that we've
# only added or replaced state, never
# removed keys entirely.
- state_delta_for_room[room_id] = DeltaState([], delta_ids)
+ delta = DeltaState([], delta_ids)
elif current_state is not None:
with Measure(
self._clock, "persist_events.calculate_state_delta"
@@ -408,6 +418,26 @@ class EventsPersistenceStorage(object):
delta = await self._calculate_state_delta(
room_id, current_state
)
+
+ if delta:
+ # If we have a change of state then lets check
+ # whether we're actually still a member of the room,
+ # or if our last user left. If we're no longer in
+ # the room then we delete the current state and
+ # extremities.
+ is_still_joined = await self._is_server_still_joined(
+ room_id,
+ ev_ctx_rm,
+ delta,
+ current_state,
+ potentially_left_users,
+ )
+ if not is_still_joined:
+ logger.info("Server no longer in room %s", room_id)
+ latest_event_ids = []
+ current_state = {}
+ delta.no_longer_in_room = True
+
state_delta_for_room[room_id] = delta
# If we have the current_state then lets prefill
@@ -423,6 +453,8 @@ class EventsPersistenceStorage(object):
backfilled=backfilled,
)
+ await self._handle_potentially_left_users(potentially_left_users)
+
async def _calculate_new_extremities(
self,
room_id: str,
@@ -629,7 +661,7 @@ class EventsPersistenceStorage(object):
break
if not room_version:
- room_version = await self.main_store.get_room_version(room_id)
+ room_version = await self.main_store.get_room_version_id(room_id)
logger.debug("calling resolve_state_groups from preserve_events")
res = await self._state_resolution_handler.resolve_state_groups(
@@ -660,3 +692,97 @@ class EventsPersistenceStorage(object):
}
return DeltaState(to_delete=to_delete, to_insert=to_insert)
+
+ async def _is_server_still_joined(
+ self,
+ room_id: str,
+ ev_ctx_rm: List[Tuple[FrozenEvent, EventContext]],
+ delta: DeltaState,
+ current_state: Optional[StateMap[str]],
+ potentially_left_users: Set[str],
+ ) -> bool:
+ """Check if the server will still be joined after the given events have
+ been persised.
+
+ Args:
+ room_id
+ ev_ctx_rm
+ delta: The delta of current state between what is in the database
+ and what the new current state will be.
+ current_state: The new current state if it already been calculated,
+ otherwise None.
+ potentially_left_users: If the server has left the room, then joined
+ remote users will be added to this set to indicate that the
+ server may no longer be sharing a room with them.
+ """
+
+ if not any(
+ self.is_mine_id(state_key)
+ for typ, state_key in itertools.chain(delta.to_delete, delta.to_insert)
+ if typ == EventTypes.Member
+ ):
+ # There have been no changes to membership of our users, so nothing
+ # has changed and we assume we're still in the room.
+ return True
+
+ # Check if any of the given events are a local join that appear in the
+ # current state
+ for (typ, state_key), event_id in delta.to_insert.items():
+ if typ != EventTypes.Member or not self.is_mine_id(state_key):
+ continue
+
+ for event, _ in ev_ctx_rm:
+ if event_id == event.event_id:
+ if event.membership == Membership.JOIN:
+ return True
+
+ # There's been a change of membership but we don't have a local join
+ # event in the new events, so we need to check the full state.
+ if current_state is None:
+ current_state = await self.main_store.get_current_state_ids(room_id)
+ current_state = dict(current_state)
+ for key in delta.to_delete:
+ current_state.pop(key, None)
+
+ current_state.update(delta.to_insert)
+
+ event_ids = [
+ event_id
+ for (typ, state_key,), event_id in current_state.items()
+ if typ == EventTypes.Member and self.is_mine_id(state_key)
+ ]
+
+ rows = await self.main_store.get_membership_from_event_ids(event_ids)
+ is_still_joined = any(row["membership"] == Membership.JOIN for row in rows)
+ if is_still_joined:
+ return True
+
+ # The server will leave the room, so we go and find out which remote
+ # users will still be joined when we leave.
+ remote_event_ids = [
+ event_id
+ for (typ, state_key,), event_id in current_state.items()
+ if typ == EventTypes.Member and not self.is_mine_id(state_key)
+ ]
+ rows = await self.main_store.get_membership_from_event_ids(remote_event_ids)
+ potentially_left_users.update(
+ row["user_id"] for row in rows if row["membership"] == Membership.JOIN
+ )
+
+ return False
+
+ async def _handle_potentially_left_users(self, user_ids: Set[str]):
+ """Given a set of remote users check if the server still shares a room with
+ them. If not then mark those users' device cache as stale.
+ """
+
+ if not user_ids:
+ return
+
+ joined_users = await self.main_store.get_users_server_still_shares_room_with(
+ user_ids
+ )
+ left_users = user_ids - joined_users
+
+ for user_id in left_users:
+ await self.main_store.mark_remote_user_device_list_as_unsubscribed(user_id)
|