summary refs log tree commit diff
path: root/synapse/storage/persist_events.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/persist_events.py')
-rw-r--r--synapse/storage/persist_events.py136
1 files changed, 131 insertions, 5 deletions
diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py
index 368c457321..af3fd67ab9 100644
--- a/synapse/storage/persist_events.py
+++ b/synapse/storage/persist_events.py
@@ -15,9 +15,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import itertools
 import logging
 from collections import deque, namedtuple
-from typing import Iterable, List, Optional, Tuple
+from typing import Iterable, List, Optional, Set, Tuple
 
 from six import iteritems
 from six.moves import range
@@ -27,7 +28,7 @@ from prometheus_client import Counter, Histogram
 
 from twisted.internet import defer
 
-from synapse.api.constants import EventTypes
+from synapse.api.constants import EventTypes, Membership
 from synapse.events import FrozenEvent
 from synapse.events.snapshot import EventContext
 from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
@@ -72,17 +73,20 @@ stale_forward_extremities_counter = Histogram(
 )
 
 
-@attr.s(slots=True, frozen=True)
+@attr.s(slots=True)
 class DeltaState:
     """Deltas to use to update the `current_state_events` table.
 
     Attributes:
         to_delete: List of type/state_keys to delete from current state
         to_insert: Map of state to upsert into current state
+        no_longer_in_room: The server is not longer in the room, so the room
+            should e.g. be removed from `current_state_events` table.
     """
 
     to_delete = attr.ib(type=List[Tuple[str, str]])
     to_insert = attr.ib(type=StateMap[str])
+    no_longer_in_room = attr.ib(type=bool, default=False)
 
 
 class _EventPeristenceQueue(object):
@@ -314,6 +318,11 @@ class EventsPersistenceStorage(object):
             # room
             state_delta_for_room = {}
 
+            # Set of remote users which were in rooms the server has left. We
+            # should check if we still share any rooms and if not we mark their
+            # device lists as stale.
+            potentially_left_users = set()  # type: Set[str]
+
             if not backfilled:
                 with Measure(self._clock, "_calculate_state_and_extrem"):
                     # Work out the new "current state" for each room.
@@ -396,11 +405,12 @@ class EventsPersistenceStorage(object):
                         # If either are not None then there has been a change,
                         # and we need to work out the delta (or use that
                         # given)
+                        delta = None
                         if delta_ids is not None:
                             # If there is a delta we know that we've
                             # only added or replaced state, never
                             # removed keys entirely.
-                            state_delta_for_room[room_id] = DeltaState([], delta_ids)
+                            delta = DeltaState([], delta_ids)
                         elif current_state is not None:
                             with Measure(
                                 self._clock, "persist_events.calculate_state_delta"
@@ -408,6 +418,26 @@ class EventsPersistenceStorage(object):
                                 delta = await self._calculate_state_delta(
                                     room_id, current_state
                                 )
+
+                        if delta:
+                            # If we have a change of state then lets check
+                            # whether we're actually still a member of the room,
+                            # or if our last user left. If we're no longer in
+                            # the room then we delete the current state and
+                            # extremities.
+                            is_still_joined = await self._is_server_still_joined(
+                                room_id,
+                                ev_ctx_rm,
+                                delta,
+                                current_state,
+                                potentially_left_users,
+                            )
+                            if not is_still_joined:
+                                logger.info("Server no longer in room %s", room_id)
+                                latest_event_ids = []
+                                current_state = {}
+                                delta.no_longer_in_room = True
+
                             state_delta_for_room[room_id] = delta
 
                         # If we have the current_state then lets prefill
@@ -423,6 +453,8 @@ class EventsPersistenceStorage(object):
                 backfilled=backfilled,
             )
 
+            await self._handle_potentially_left_users(potentially_left_users)
+
     async def _calculate_new_extremities(
         self,
         room_id: str,
@@ -629,7 +661,7 @@ class EventsPersistenceStorage(object):
                 break
 
         if not room_version:
-            room_version = await self.main_store.get_room_version(room_id)
+            room_version = await self.main_store.get_room_version_id(room_id)
 
         logger.debug("calling resolve_state_groups from preserve_events")
         res = await self._state_resolution_handler.resolve_state_groups(
@@ -660,3 +692,97 @@ class EventsPersistenceStorage(object):
         }
 
         return DeltaState(to_delete=to_delete, to_insert=to_insert)
+
+    async def _is_server_still_joined(
+        self,
+        room_id: str,
+        ev_ctx_rm: List[Tuple[FrozenEvent, EventContext]],
+        delta: DeltaState,
+        current_state: Optional[StateMap[str]],
+        potentially_left_users: Set[str],
+    ) -> bool:
+        """Check if the server will still be joined after the given events have
+        been persised.
+
+        Args:
+            room_id
+            ev_ctx_rm
+            delta: The delta of current state between what is in the database
+                and what the new current state will be.
+            current_state: The new current state if it already been calculated,
+                otherwise None.
+            potentially_left_users: If the server has left the room, then joined
+                remote users will be added to this set to indicate that the
+                server may no longer be sharing a room with them.
+        """
+
+        if not any(
+            self.is_mine_id(state_key)
+            for typ, state_key in itertools.chain(delta.to_delete, delta.to_insert)
+            if typ == EventTypes.Member
+        ):
+            # There have been no changes to membership of our users, so nothing
+            # has changed and we assume we're still in the room.
+            return True
+
+        # Check if any of the given events are a local join that appear in the
+        # current state
+        for (typ, state_key), event_id in delta.to_insert.items():
+            if typ != EventTypes.Member or not self.is_mine_id(state_key):
+                continue
+
+            for event, _ in ev_ctx_rm:
+                if event_id == event.event_id:
+                    if event.membership == Membership.JOIN:
+                        return True
+
+        # There's been a change of membership but we don't have a local join
+        # event in the new events, so we need to check the full state.
+        if current_state is None:
+            current_state = await self.main_store.get_current_state_ids(room_id)
+            current_state = dict(current_state)
+            for key in delta.to_delete:
+                current_state.pop(key, None)
+
+            current_state.update(delta.to_insert)
+
+        event_ids = [
+            event_id
+            for (typ, state_key,), event_id in current_state.items()
+            if typ == EventTypes.Member and self.is_mine_id(state_key)
+        ]
+
+        rows = await self.main_store.get_membership_from_event_ids(event_ids)
+        is_still_joined = any(row["membership"] == Membership.JOIN for row in rows)
+        if is_still_joined:
+            return True
+
+        # The server will leave the room, so we go and find out which remote
+        # users will still be joined when we leave.
+        remote_event_ids = [
+            event_id
+            for (typ, state_key,), event_id in current_state.items()
+            if typ == EventTypes.Member and not self.is_mine_id(state_key)
+        ]
+        rows = await self.main_store.get_membership_from_event_ids(remote_event_ids)
+        potentially_left_users.update(
+            row["user_id"] for row in rows if row["membership"] == Membership.JOIN
+        )
+
+        return False
+
+    async def _handle_potentially_left_users(self, user_ids: Set[str]):
+        """Given a set of remote users check if the server still shares a room with
+        them. If not then mark those users' device cache as stale.
+        """
+
+        if not user_ids:
+            return
+
+        joined_users = await self.main_store.get_users_server_still_shares_room_with(
+            user_ids
+        )
+        left_users = user_ids - joined_users
+
+        for user_id in left_users:
+            await self.main_store.mark_remote_user_device_list_as_unsubscribed(user_id)