summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/databases/main/events.py23
-rw-r--r--synapse/storage/persist_events.py25
2 files changed, 25 insertions, 23 deletions
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index e53e84054a..23fa089bca 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -130,7 +130,7 @@ class PersistEventsStore:
         *,
         current_state_for_room: Dict[str, StateMap[str]],
         state_delta_for_room: Dict[str, DeltaState],
-        new_forward_extremeties: Dict[str, List[str]],
+        new_forward_extremities: Dict[str, Set[str]],
         use_negative_stream_ordering: bool = False,
         inhibit_local_membership_updates: bool = False,
     ) -> None:
@@ -143,7 +143,7 @@ class PersistEventsStore:
                 the room based on forward extremities
             state_delta_for_room: Map from room_id to the delta to apply to
                 room state
-            new_forward_extremities: Map from room_id to list of event IDs
+            new_forward_extremities: Map from room_id to set of event IDs
                 that are the new forward extremities of the room.
             use_negative_stream_ordering: Whether to start stream_ordering on
                 the negative side and decrement. This should be set as True
@@ -193,7 +193,7 @@ class PersistEventsStore:
                 events_and_contexts=events_and_contexts,
                 inhibit_local_membership_updates=inhibit_local_membership_updates,
                 state_delta_for_room=state_delta_for_room,
-                new_forward_extremeties=new_forward_extremeties,
+                new_forward_extremities=new_forward_extremities,
             )
             persist_event_counter.inc(len(events_and_contexts))
 
@@ -220,7 +220,7 @@ class PersistEventsStore:
             for room_id, new_state in current_state_for_room.items():
                 self.store.get_current_state_ids.prefill((room_id,), new_state)
 
-            for room_id, latest_event_ids in new_forward_extremeties.items():
+            for room_id, latest_event_ids in new_forward_extremities.items():
                 self.store.get_latest_event_ids_in_room.prefill(
                     (room_id,), list(latest_event_ids)
                 )
@@ -334,8 +334,8 @@ class PersistEventsStore:
         events_and_contexts: List[Tuple[EventBase, EventContext]],
         inhibit_local_membership_updates: bool = False,
         state_delta_for_room: Optional[Dict[str, DeltaState]] = None,
-        new_forward_extremeties: Optional[Dict[str, List[str]]] = None,
-    ):
+        new_forward_extremities: Optional[Dict[str, Set[str]]] = None,
+    ) -> None:
         """Insert some number of room events into the necessary database tables.
 
         Rejected events are only inserted into the events table, the events_json table,
@@ -353,13 +353,13 @@ class PersistEventsStore:
                 from the database. This is useful when retrying due to
                 IntegrityError.
             state_delta_for_room: The current-state delta for each room.
-            new_forward_extremetie: The new forward extremities for each room.
+            new_forward_extremities: The new forward extremities for each room.
                 For each room, a list of the event ids which are the forward
                 extremities.
 
         """
         state_delta_for_room = state_delta_for_room or {}
-        new_forward_extremeties = new_forward_extremeties or {}
+        new_forward_extremities = new_forward_extremities or {}
 
         all_events_and_contexts = events_and_contexts
 
@@ -372,7 +372,7 @@ class PersistEventsStore:
 
         self._update_forward_extremities_txn(
             txn,
-            new_forward_extremities=new_forward_extremeties,
+            new_forward_extremities=new_forward_extremities,
             max_stream_order=max_stream_order,
         )
 
@@ -1158,7 +1158,10 @@ class PersistEventsStore:
             )
 
     def _update_forward_extremities_txn(
-        self, txn, new_forward_extremities, max_stream_order
+        self,
+        txn: LoggingTransaction,
+        new_forward_extremities: Dict[str, Set[str]],
+        max_stream_order: int,
     ):
         for room_id in new_forward_extremities.keys():
             self.db_pool.simple_delete_txn(
diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py
index 428d66a617..7d543fdbe0 100644
--- a/synapse/storage/persist_events.py
+++ b/synapse/storage/persist_events.py
@@ -427,21 +427,21 @@ class EventsPersistenceStorage:
             # NB: Assumes that we are only persisting events for one room
             # at a time.
 
-            # map room_id->list[event_ids] giving the new forward
+            # map room_id->set[event_ids] giving the new forward
             # extremities in each room
-            new_forward_extremeties = {}
+            new_forward_extremities: Dict[str, Set[str]] = {}
 
             # map room_id->(type,state_key)->event_id tracking the full
             # state in each room after adding these events.
             # This is simply used to prefill the get_current_state_ids
             # cache
-            current_state_for_room = {}
+            current_state_for_room: Dict[str, StateMap[str]] = {}
 
             # map room_id->(to_delete, to_insert) where to_delete is a list
             # of type/state keys to remove from current state, and to_insert
             # is a map (type,key)->event_id giving the state delta in each
             # room
-            state_delta_for_room = {}
+            state_delta_for_room: Dict[str, DeltaState] = {}
 
             # Set of remote users which were in rooms the server has left. We
             # should check if we still share any rooms and if not we mark their
@@ -460,14 +460,13 @@ class EventsPersistenceStorage:
                         )
 
                     for room_id, ev_ctx_rm in events_by_room.items():
-                        latest_event_ids = (
+                        latest_event_ids = set(
                             await self.main_store.get_latest_event_ids_in_room(room_id)
                         )
                         new_latest_event_ids = await self._calculate_new_extremities(
                             room_id, ev_ctx_rm, latest_event_ids
                         )
 
-                        latest_event_ids = set(latest_event_ids)
                         if new_latest_event_ids == latest_event_ids:
                             # No change in extremities, so no change in state
                             continue
@@ -478,7 +477,7 @@ class EventsPersistenceStorage:
                         # extremities, so we'll `continue` above and skip this bit.)
                         assert new_latest_event_ids, "No forward extremities left!"
 
-                        new_forward_extremeties[room_id] = new_latest_event_ids
+                        new_forward_extremities[room_id] = new_latest_event_ids
 
                         len_1 = (
                             len(latest_event_ids) == 1
@@ -533,7 +532,7 @@ class EventsPersistenceStorage:
                             # extremities, so we'll `continue` above and skip this bit.)
                             assert new_latest_event_ids, "No forward extremities left!"
 
-                            new_forward_extremeties[room_id] = new_latest_event_ids
+                            new_forward_extremities[room_id] = new_latest_event_ids
 
                         # If either are not None then there has been a change,
                         # and we need to work out the delta (or use that
@@ -567,7 +566,7 @@ class EventsPersistenceStorage:
                             )
                             if not is_still_joined:
                                 logger.info("Server no longer in room %s", room_id)
-                                latest_event_ids = []
+                                latest_event_ids = set()
                                 current_state = {}
                                 delta.no_longer_in_room = True
 
@@ -582,7 +581,7 @@ class EventsPersistenceStorage:
                 chunk,
                 current_state_for_room=current_state_for_room,
                 state_delta_for_room=state_delta_for_room,
-                new_forward_extremeties=new_forward_extremeties,
+                new_forward_extremities=new_forward_extremities,
                 use_negative_stream_ordering=backfilled,
                 inhibit_local_membership_updates=backfilled,
             )
@@ -596,7 +595,7 @@ class EventsPersistenceStorage:
         room_id: str,
         event_contexts: List[Tuple[EventBase, EventContext]],
         latest_event_ids: Collection[str],
-    ):
+    ) -> Set[str]:
         """Calculates the new forward extremities for a room given events to
         persist.
 
@@ -906,9 +905,9 @@ class EventsPersistenceStorage:
             # Ideally we'd figure out a way of still being able to drop old
             # dummy events that reference local events, but this is good enough
             # as a first cut.
-            events_to_check = [event]
+            events_to_check: Collection[EventBase] = [event]
             while events_to_check:
-                new_events = set()
+                new_events: Set[str] = set()
                 for event_to_check in events_to_check:
                     if self.is_mine_id(event_to_check.sender):
                         if event_to_check.type != EventTypes.Dummy: