summary refs log tree commit diff
path: root/synapse/storage/databases/main/events.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases/main/events.py')
-rw-r--r--synapse/storage/databases/main/events.py107
1 files changed, 34 insertions, 73 deletions
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 4e528612ea..06832221ad 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -15,7 +15,7 @@
 # limitations under the License.
 import itertools
 import logging
-from collections import OrderedDict
+from collections import OrderedDict, namedtuple
 from typing import (
     TYPE_CHECKING,
     Any,
@@ -41,10 +41,9 @@ from synapse.events.snapshot import EventContext  # noqa: F401
 from synapse.logging.utils import log_function
 from synapse.storage._base import db_to_json, make_in_list_sql_clause
 from synapse.storage.database import DatabasePool, LoggingTransaction
-from synapse.storage.databases.main.events_worker import EventCacheEntry
 from synapse.storage.databases.main.search import SearchEntry
 from synapse.storage.types import Connection
-from synapse.storage.util.id_generators import AbstractStreamIdGenerator
+from synapse.storage.util.id_generators import MultiWriterIdGenerator
 from synapse.storage.util.sequence import SequenceGenerator
 from synapse.types import StateMap, get_domain_from_id
 from synapse.util import json_encoder
@@ -65,6 +64,9 @@ event_counter = Counter(
 )
 
 
+_EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event"))
+
+
 @attr.s(slots=True)
 class DeltaState:
     """Deltas to use to update the `current_state_events` table.
@@ -106,30 +108,23 @@ class PersistEventsStore:
         self._ephemeral_messages_enabled = hs.config.server.enable_ephemeral_messages
         self.is_mine_id = hs.is_mine_id
 
+        # Ideally we'd move these ID gens here, unfortunately some other ID
+        # generators are chained off them so doing so is a bit of a PITA.
+        self._backfill_id_gen: MultiWriterIdGenerator = self.store._backfill_id_gen
+        self._stream_id_gen: MultiWriterIdGenerator = self.store._stream_id_gen
+
         # This should only exist on instances that are configured to write
         assert (
             hs.get_instance_name() in hs.config.worker.writers.events
         ), "Can only instantiate EventsStore on master"
 
-        # Since we have been configured to write, we ought to have id generators,
-        # rather than id trackers.
-        assert isinstance(self.store._backfill_id_gen, AbstractStreamIdGenerator)
-        assert isinstance(self.store._stream_id_gen, AbstractStreamIdGenerator)
-
-        # Ideally we'd move these ID gens here, unfortunately some other ID
-        # generators are chained off them so doing so is a bit of a PITA.
-        self._backfill_id_gen: AbstractStreamIdGenerator = self.store._backfill_id_gen
-        self._stream_id_gen: AbstractStreamIdGenerator = self.store._stream_id_gen
-
     async def _persist_events_and_state_updates(
         self,
         events_and_contexts: List[Tuple[EventBase, EventContext]],
-        *,
         current_state_for_room: Dict[str, StateMap[str]],
         state_delta_for_room: Dict[str, DeltaState],
         new_forward_extremeties: Dict[str, List[str]],
-        use_negative_stream_ordering: bool = False,
-        inhibit_local_membership_updates: bool = False,
+        backfilled: bool = False,
     ) -> None:
         """Persist a set of events alongside updates to the current state and
         forward extremities tables.
@@ -142,14 +137,7 @@ class PersistEventsStore:
                 room state
             new_forward_extremities: Map from room_id to list of event IDs
                 that are the new forward extremities of the room.
-            use_negative_stream_ordering: Whether to start stream_ordering on
-                the negative side and decrement. This should be set as True
-                for backfilled events because backfilled events get a negative
-                stream ordering so they don't come down incremental `/sync`.
-            inhibit_local_membership_updates: Stop the local_current_membership
-                from being updated by these events. This should be set to True
-                for backfilled events because backfilled events in the past do
-                not affect the current local state.
+            backfilled
 
         Returns:
             Resolves when the events have been persisted
@@ -171,7 +159,7 @@ class PersistEventsStore:
         #
         # Note: Multiple instances of this function cannot be in flight at
         # the same time for the same room.
-        if use_negative_stream_ordering:
+        if backfilled:
             stream_ordering_manager = self._backfill_id_gen.get_next_mult(
                 len(events_and_contexts)
             )
@@ -188,13 +176,13 @@ class PersistEventsStore:
                 "persist_events",
                 self._persist_events_txn,
                 events_and_contexts=events_and_contexts,
-                inhibit_local_membership_updates=inhibit_local_membership_updates,
+                backfilled=backfilled,
                 state_delta_for_room=state_delta_for_room,
                 new_forward_extremeties=new_forward_extremeties,
             )
             persist_event_counter.inc(len(events_and_contexts))
 
-            if stream < 0:
+            if not backfilled:
                 # backfilled events have negative stream orderings, so we don't
                 # want to set the event_persisted_position to that.
                 synapse.metrics.event_persisted_position.set(
@@ -328,9 +316,8 @@ class PersistEventsStore:
     def _persist_events_txn(
         self,
         txn: LoggingTransaction,
-        *,
         events_and_contexts: List[Tuple[EventBase, EventContext]],
-        inhibit_local_membership_updates: bool = False,
+        backfilled: bool,
         state_delta_for_room: Optional[Dict[str, DeltaState]] = None,
         new_forward_extremeties: Optional[Dict[str, List[str]]] = None,
     ):
@@ -343,10 +330,7 @@ class PersistEventsStore:
         Args:
             txn
             events_and_contexts: events to persist
-            inhibit_local_membership_updates: Stop the local_current_membership
-                from being updated by these events. This should be set to True
-                for backfilled events because backfilled events in the past do
-                not affect the current local state.
+            backfilled: True if the events were backfilled
             delete_existing True to purge existing table rows for the events
                 from the database. This is useful when retrying due to
                 IntegrityError.
@@ -379,7 +363,9 @@ class PersistEventsStore:
             events_and_contexts
         )
 
-        self._update_room_depths_txn(txn, events_and_contexts=events_and_contexts)
+        self._update_room_depths_txn(
+            txn, events_and_contexts=events_and_contexts, backfilled=backfilled
+        )
 
         # _update_outliers_txn filters out any events which have already been
         # persisted, and returns the filtered list.
@@ -412,7 +398,7 @@ class PersistEventsStore:
             txn,
             events_and_contexts=events_and_contexts,
             all_events_and_contexts=all_events_and_contexts,
-            inhibit_local_membership_updates=inhibit_local_membership_updates,
+            backfilled=backfilled,
         )
 
         # We call this last as it assumes we've inserted the events into
@@ -575,9 +561,9 @@ class PersistEventsStore:
         # fetch their auth event info.
         while missing_auth_chains:
             sql = """
-                SELECT event_id, events.type, se.state_key, chain_id, sequence_number
+                SELECT event_id, events.type, state_key, chain_id, sequence_number
                 FROM events
-                INNER JOIN state_events AS se USING (event_id)
+                INNER JOIN state_events USING (event_id)
                 LEFT JOIN event_auth_chains USING (event_id)
                 WHERE
             """
@@ -1214,6 +1200,7 @@ class PersistEventsStore:
         self,
         txn,
         events_and_contexts: List[Tuple[EventBase, EventContext]],
+        backfilled: bool,
     ):
         """Update min_depth for each room
 
@@ -1221,18 +1208,13 @@ class PersistEventsStore:
             txn (twisted.enterprise.adbapi.Connection): db connection
             events_and_contexts (list[(EventBase, EventContext)]): events
                 we are persisting
+            backfilled (bool): True if the events were backfilled
         """
         depth_updates: Dict[str, int] = {}
         for event, context in events_and_contexts:
             # Remove the any existing cache entries for the event_ids
             txn.call_after(self.store._invalidate_get_event_cache, event.event_id)
-            # Then update the `stream_ordering` position to mark the latest
-            # event as the front of the room. This should not be done for
-            # backfilled events because backfilled events have negative
-            # stream_ordering and happened in the past so we know that we don't
-            # need to update the stream_ordering tip/front for the room.
-            assert event.internal_metadata.stream_ordering is not None
-            if event.internal_metadata.stream_ordering >= 0:
+            if not backfilled:
                 txn.call_after(
                     self.store._events_stream_cache.entity_has_changed,
                     event.room_id,
@@ -1445,12 +1427,7 @@ class PersistEventsStore:
         return [ec for ec in events_and_contexts if ec[0] not in to_remove]
 
     def _update_metadata_tables_txn(
-        self,
-        txn,
-        *,
-        events_and_contexts,
-        all_events_and_contexts,
-        inhibit_local_membership_updates: bool = False,
+        self, txn, events_and_contexts, all_events_and_contexts, backfilled
     ):
         """Update all the miscellaneous tables for new events
 
@@ -1462,10 +1439,7 @@ class PersistEventsStore:
                 events that we were going to persist. This includes events
                 we've already persisted, etc, that wouldn't appear in
                 events_and_context.
-            inhibit_local_membership_updates: Stop the local_current_membership
-                from being updated by these events. This should be set to True
-                for backfilled events because backfilled events in the past do
-                not affect the current local state.
+            backfilled (bool): True if the events were backfilled
         """
 
         # Insert all the push actions into the event_push_actions table.
@@ -1539,7 +1513,7 @@ class PersistEventsStore:
                 for event, _ in events_and_contexts
                 if event.type == EventTypes.Member
             ],
-            inhibit_local_membership_updates=inhibit_local_membership_updates,
+            backfilled=backfilled,
         )
 
         # Insert event_reference_hashes table.
@@ -1579,13 +1553,11 @@ class PersistEventsStore:
         for row in rows:
             event = ev_map[row["event_id"]]
             if not row["rejects"] and not row["redacts"]:
-                to_prefill.append(EventCacheEntry(event=event, redacted_event=None))
+                to_prefill.append(_EventCacheEntry(event=event, redacted_event=None))
 
         def prefill():
             for cache_entry in to_prefill:
-                self.store._get_event_cache.set(
-                    (cache_entry.event.event_id,), cache_entry
-                )
+                self.store._get_event_cache.set((cache_entry[0].event_id,), cache_entry)
 
         txn.call_after(prefill)
 
@@ -1666,19 +1638,8 @@ class PersistEventsStore:
             txn, table="event_reference_hashes", values=vals
         )
 
-    def _store_room_members_txn(
-        self, txn, events, *, inhibit_local_membership_updates: bool = False
-    ):
-        """
-        Store a room member in the database.
-        Args:
-            txn: The transaction to use.
-            events: List of events to store.
-            inhibit_local_membership_updates: Stop the local_current_membership
-                from being updated by these events. This should be set to True
-                for backfilled events because backfilled events in the past do
-                not affect the current local state.
-        """
+    def _store_room_members_txn(self, txn, events, backfilled):
+        """Store a room member in the database."""
 
         def non_null_str_or_none(val: Any) -> Optional[str]:
             return val if isinstance(val, str) and "\u0000" not in val else None
@@ -1721,7 +1682,7 @@ class PersistEventsStore:
             # band membership", like a remote invite or a rejection of a remote invite.
             if (
                 self.is_mine_id(event.state_key)
-                and not inhibit_local_membership_updates
+                and not backfilled
                 and event.internal_metadata.is_outlier()
                 and event.internal_metadata.is_out_of_band_membership()
             ):