summary refs log tree commit diff
path: root/synapse/storage/persist_events.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-01-29 11:01:32 +0000
committerGitHub <noreply@github.com>2020-01-29 11:01:32 +0000
commit611215a49cedf8d5f63c53168173763731d02260 (patch)
tree8c0ad8ba10fb7fcb3e8f73aca662f5adf190fba5 /synapse/storage/persist_events.py
parentFix bug when querying remote user keys that require a resync. (#6796) (diff)
downloadsynapse-611215a49cedf8d5f63c53168173763731d02260.tar.xz
Delete current state when server leaves a room (#6792)
Otherwise its just stale data, which may get deleted later anyway so
can't be relied on. It's also a bit of a shotgun if we're trying to get
the current state of a room we're not in.
Diffstat (limited to 'synapse/storage/persist_events.py')
-rw-r--r--synapse/storage/persist_events.py89
1 files changed, 86 insertions, 3 deletions
diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py
index 368c457321..d060c8b992 100644
--- a/synapse/storage/persist_events.py
+++ b/synapse/storage/persist_events.py
@@ -15,6 +15,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import itertools
 import logging
 from collections import deque, namedtuple
 from typing import Iterable, List, Optional, Tuple
@@ -27,7 +28,7 @@ from prometheus_client import Counter, Histogram
 
 from twisted.internet import defer
 
-from synapse.api.constants import EventTypes
+from synapse.api.constants import EventTypes, Membership
 from synapse.events import FrozenEvent
 from synapse.events.snapshot import EventContext
 from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
@@ -72,17 +73,20 @@ stale_forward_extremities_counter = Histogram(
 )
 
 
-@attr.s(slots=True, frozen=True)
+@attr.s(slots=True)
 class DeltaState:
     """Deltas to use to update the `current_state_events` table.
 
     Attributes:
         to_delete: List of type/state_keys to delete from current state
         to_insert: Map of state to upsert into current state
+        no_longer_in_room: The server is not longer in the room, so the room
+            should e.g. be removed from `current_state_events` table.
     """
 
     to_delete = attr.ib(type=List[Tuple[str, str]])
     to_insert = attr.ib(type=StateMap[str])
+    no_longer_in_room = attr.ib(type=bool, default=False)
 
 
 class _EventPeristenceQueue(object):
@@ -396,11 +400,12 @@ class EventsPersistenceStorage(object):
                         # If either are not None then there has been a change,
                         # and we need to work out the delta (or use that
                         # given)
+                        delta = None
                         if delta_ids is not None:
                             # If there is a delta we know that we've
                             # only added or replaced state, never
                             # removed keys entirely.
-                            state_delta_for_room[room_id] = DeltaState([], delta_ids)
+                            delta = DeltaState([], delta_ids)
                         elif current_state is not None:
                             with Measure(
                                 self._clock, "persist_events.calculate_state_delta"
@@ -408,6 +413,22 @@ class EventsPersistenceStorage(object):
                                 delta = await self._calculate_state_delta(
                                     room_id, current_state
                                 )
+
+                        if delta:
+                            # If we have a change of state then lets check
+                            # whether we're actually still a member of the room,
+                            # or if our last user left. If we're no longer in
+                            # the room then we delete the current state and
+                            # extremities.
+                            is_still_joined = await self._is_server_still_joined(
+                                room_id, ev_ctx_rm, delta, current_state
+                            )
+                            if not is_still_joined:
+                                logger.info("Server no longer in room %s", room_id)
+                                latest_event_ids = []
+                                current_state = {}
+                                delta.no_longer_in_room = True
+
                             state_delta_for_room[room_id] = delta
 
                         # If we have the current_state then lets prefill
@@ -660,3 +681,65 @@ class EventsPersistenceStorage(object):
         }
 
         return DeltaState(to_delete=to_delete, to_insert=to_insert)
+
+    async def _is_server_still_joined(
+        self,
+        room_id: str,
+        ev_ctx_rm: List[Tuple[FrozenEvent, EventContext]],
+        delta: DeltaState,
+        current_state: Optional[StateMap[str]],
+    ) -> bool:
+        """Check if the server will still be joined after the given events have
+        been persised.
+
+        Args:
+            room_id
+            ev_ctx_rm
+            delta: The delta of current state between what is in the database
+                and what the new current state will be.
+            current_state: The new current state if it already been calculated,
+                otherwise None.
+        """
+
+        if not any(
+            self.is_mine_id(state_key)
+            for typ, state_key in itertools.chain(delta.to_delete, delta.to_insert)
+            if typ == EventTypes.Member
+        ):
+            # There have been no changes to membership of our users, so nothing
+            # has changed and we assume we're still in the room.
+            return True
+
+        # Check if any of the given events are a local join that appear in the
+        # current state
+        for (typ, state_key), event_id in delta.to_insert.items():
+            if typ != EventTypes.Member or not self.is_mine_id(state_key):
+                continue
+
+            for event, _ in ev_ctx_rm:
+                if event_id == event.event_id:
+                    if event.membership == Membership.JOIN:
+                        return True
+
+        # There's been a change of membership but we don't have a local join
+        # event in the new events, so we need to check the full state.
+        if current_state is None:
+            current_state = await self.main_store.get_current_state_ids(room_id)
+            current_state = dict(current_state)
+            for key in delta.to_delete:
+                current_state.pop(key, None)
+
+            current_state.update(delta.to_insert)
+
+        event_ids = [
+            event_id
+            for (typ, state_key,), event_id in current_state.items()
+            if typ == EventTypes.Member and self.is_mine_id(state_key)
+        ]
+
+        rows = await self.main_store.get_membership_from_event_ids(event_ids)
+        is_still_joined = any(row["membership"] == Membership.JOIN for row in rows)
+        if is_still_joined:
+            return True
+        else:
+            return False