summary refs log tree commit diff
diff options
context:
space:
mode:
authorRichard van der Hoff <1389908+richvdh@users.noreply.github.com>2020-10-05 14:43:14 +0100
committerGitHub <noreply@github.com>2020-10-05 14:43:14 +0100
commitf31f8e63198cfe46af48d788dbb294aba9155e5a (patch)
tree603352c2adac33e3348be771f54f4c8d130eb582
parentUpdate manhole documentation for async/await. (#8462) (diff)
downloadsynapse-f31f8e63198cfe46af48d788dbb294aba9155e5a.tar.xz
Remove stream ordering from Metadata dict (#8452)
There's no need for it to be in the dict as well as the events table. Instead,
we store it in a separate attribute in the EventInternalMetadata object, and
populate that on load.

This means that we can rely on it being correctly populated for any event which
has been persited to the database.
-rw-r--r--changelog.d/8452.misc1
-rw-r--r--synapse/events/__init__.py6
-rw-r--r--synapse/events/utils.py5
-rw-r--r--synapse/federation/sender/__init__.py2
-rw-r--r--synapse/federation/sender/per_destination_queue.py2
-rw-r--r--synapse/handlers/federation.py3
-rw-r--r--synapse/handlers/message.py4
-rw-r--r--synapse/handlers/room_member.py13
-rw-r--r--synapse/rest/admin/__init__.py5
-rw-r--r--synapse/storage/databases/main/events.py4
-rw-r--r--synapse/storage/databases/main/events_worker.py26
-rw-r--r--synapse/storage/databases/main/stream.py13
-rw-r--r--synapse/storage/persist_events.py2
13 files changed, 53 insertions, 33 deletions
diff --git a/changelog.d/8452.misc b/changelog.d/8452.misc
new file mode 100644
index 0000000000..8288d91c78
--- /dev/null
+++ b/changelog.d/8452.misc
@@ -0,0 +1 @@
+Remove redundant databae loads of stream_ordering for events we already have.
diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py
index dc49df0812..7a51d0a22f 100644
--- a/synapse/events/__init__.py
+++ b/synapse/events/__init__.py
@@ -97,13 +97,16 @@ class DefaultDictProperty(DictProperty):
 
 
 class _EventInternalMetadata:
-    __slots__ = ["_dict"]
+    __slots__ = ["_dict", "stream_ordering"]
 
     def __init__(self, internal_metadata_dict: JsonDict):
         # we have to copy the dict, because it turns out that the same dict is
         # reused. TODO: fix that
         self._dict = dict(internal_metadata_dict)
 
+        # the stream ordering of this event. None, until it has been persisted.
+        self.stream_ordering = None  # type: Optional[int]
+
     outlier = DictProperty("outlier")  # type: bool
     out_of_band_membership = DictProperty("out_of_band_membership")  # type: bool
     send_on_behalf_of = DictProperty("send_on_behalf_of")  # type: str
@@ -113,7 +116,6 @@ class _EventInternalMetadata:
     redacted = DictProperty("redacted")  # type: bool
     txn_id = DictProperty("txn_id")  # type: str
     token_id = DictProperty("token_id")  # type: str
-    stream_ordering = DictProperty("stream_ordering")  # type: int
 
     # XXX: These are set by StreamWorkerStore._set_before_and_after.
     # I'm pretty sure that these are never persisted to the database, so shouldn't
diff --git a/synapse/events/utils.py b/synapse/events/utils.py
index 32c73d3413..355cbe05f1 100644
--- a/synapse/events/utils.py
+++ b/synapse/events/utils.py
@@ -49,6 +49,11 @@ def prune_event(event: EventBase) -> EventBase:
         pruned_event_dict, event.room_version, event.internal_metadata.get_dict()
     )
 
+    # copy the internal fields
+    pruned_event.internal_metadata.stream_ordering = (
+        event.internal_metadata.stream_ordering
+    )
+
     # Mark the event as redacted
     pruned_event.internal_metadata.redacted = True
 
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 8bb17b3a05..e33b29a42c 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -297,6 +297,8 @@ class FederationSender:
         sent_pdus_destination_dist_total.inc(len(destinations))
         sent_pdus_destination_dist_count.inc()
 
+        assert pdu.internal_metadata.stream_ordering
+
         # track the fact that we have a PDU for these destinations,
         # to allow us to perform catch-up later on if the remote is unreachable
         # for a while.
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index bc99af3fdd..db8e456fe8 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -158,6 +158,7 @@ class PerDestinationQueue:
             # yet know if we have anything to catch up (None)
             self._pending_pdus.append(pdu)
         else:
+            assert pdu.internal_metadata.stream_ordering
             self._catchup_last_skipped = pdu.internal_metadata.stream_ordering
 
         self.attempt_new_transaction()
@@ -361,6 +362,7 @@ class PerDestinationQueue:
                         last_successful_stream_ordering = (
                             final_pdu.internal_metadata.stream_ordering
                         )
+                        assert last_successful_stream_ordering
                         await self._store.set_destination_last_successful_stream_ordering(
                             self._destination, last_successful_stream_ordering
                         )
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 1a8144405a..5ac2fc5656 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -3008,6 +3008,9 @@ class FederationHandler(BaseHandler):
         elif event.internal_metadata.is_outlier():
             return
 
+        # the event has been persisted so it should have a stream ordering.
+        assert event.internal_metadata.stream_ordering
+
         event_pos = PersistedEventPosition(
             self._instance_name, event.internal_metadata.stream_ordering
         )
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index ee271e85e5..00513fbf37 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -682,7 +682,9 @@ class EventCreationHandler:
                     event.event_id,
                     prev_event.event_id,
                 )
-                return await self.store.get_stream_id_for_event(prev_event.event_id)
+                # we know it was persisted, so must have a stream ordering
+                assert prev_event.internal_metadata.stream_ordering
+                return prev_event.internal_metadata.stream_ordering
 
         return await self.handle_new_client_event(
             requester=requester, event=event, context=context, ratelimit=ratelimit
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 567a14bd0a..13b749b7cb 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -194,8 +194,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         )
         if duplicate is not None:
             # Discard the new event since this membership change is a no-op.
-            _, stream_id = await self.store.get_event_ordering(duplicate.event_id)
-            return duplicate.event_id, stream_id
+            # we know it was persisted, so must have a stream ordering.
+            assert duplicate.internal_metadata.stream_ordering
+            return duplicate.event_id, duplicate.internal_metadata.stream_ordering
 
         prev_state_ids = await context.get_prev_state_ids()
 
@@ -441,12 +442,12 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
                 same_membership = old_membership == effective_membership_state
                 same_sender = requester.user.to_string() == old_state.sender
                 if same_sender and same_membership and same_content:
-                    _, stream_id = await self.store.get_event_ordering(
-                        old_state.event_id
-                    )
+                    # duplicate event.
+                    # we know it was persisted, so must have a stream ordering.
+                    assert old_state.internal_metadata.stream_ordering
                     return (
                         old_state.event_id,
-                        stream_id,
+                        old_state.internal_metadata.stream_ordering,
                     )
 
             if old_membership in ["ban", "leave"] and action == "kick":
diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py
index 57cac22252..789431ef25 100644
--- a/synapse/rest/admin/__init__.py
+++ b/synapse/rest/admin/__init__.py
@@ -57,6 +57,7 @@ from synapse.rest.admin.users import (
     UsersRestServletV2,
     WhoisRestServlet,
 )
+from synapse.types import RoomStreamToken
 from synapse.util.versionstring import get_version_string
 
 logger = logging.getLogger(__name__)
@@ -109,7 +110,9 @@ class PurgeHistoryRestServlet(RestServlet):
             if event.room_id != room_id:
                 raise SynapseError(400, "Event is for wrong room.")
 
-            room_token = await self.store.get_topological_token_for_event(event_id)
+            room_token = RoomStreamToken(
+                event.depth, event.internal_metadata.stream_ordering
+            )
             token = await room_token.to_string(self.store)
 
             logger.info("[purge] purging up to token %s (event_id %s)", token, event_id)
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 78e645592f..b4abd961b9 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -331,6 +331,10 @@ class PersistEventsStore:
         min_stream_order = events_and_contexts[0][0].internal_metadata.stream_ordering
         max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering
 
+        # stream orderings should have been assigned by now
+        assert min_stream_order
+        assert max_stream_order
+
         self._update_forward_extremities_txn(
             txn,
             new_forward_extremities=new_forward_extremeties,
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 723ced4ff0..b7ed8ca6ab 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -723,6 +723,7 @@ class EventsWorkerStore(SQLBaseStore):
                 internal_metadata_dict=internal_metadata,
                 rejected_reason=rejected_reason,
             )
+            original_ev.internal_metadata.stream_ordering = row["stream_ordering"]
 
             event_map[event_id] = original_ev
 
@@ -790,6 +791,8 @@ class EventsWorkerStore(SQLBaseStore):
 
          * event_id (str)
 
+         * stream_ordering (int): stream ordering for this event
+
          * json (str): json-encoded event structure
 
          * internal_metadata (str): json-encoded internal metadata dict
@@ -822,13 +825,15 @@ class EventsWorkerStore(SQLBaseStore):
             sql = """\
                 SELECT
                   e.event_id,
-                  e.internal_metadata,
-                  e.json,
-                  e.format_version,
+                  e.stream_ordering,
+                  ej.internal_metadata,
+                  ej.json,
+                  ej.format_version,
                   r.room_version,
                   rej.reason
-                FROM event_json as e
-                  LEFT JOIN rooms r USING (room_id)
+                FROM events AS e
+                  JOIN event_json AS ej USING (event_id)
+                  LEFT JOIN rooms r ON r.room_id = e.room_id
                   LEFT JOIN rejections as rej USING (event_id)
                 WHERE """
 
@@ -842,11 +847,12 @@ class EventsWorkerStore(SQLBaseStore):
                 event_id = row[0]
                 event_dict[event_id] = {
                     "event_id": event_id,
-                    "internal_metadata": row[1],
-                    "json": row[2],
-                    "format_version": row[3],
-                    "room_version_id": row[4],
-                    "rejected_reason": row[5],
+                    "stream_ordering": row[1],
+                    "internal_metadata": row[2],
+                    "json": row[3],
+                    "format_version": row[4],
+                    "room_version_id": row[5],
+                    "rejected_reason": row[6],
                     "redactions": [],
                 }
 
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 1d27439536..a94bec1ac5 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -589,19 +589,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
             )
             return "t%d-%d" % (topo, token)
 
-    async def get_stream_id_for_event(self, event_id: str) -> int:
-        """The stream ID for an event
-        Args:
-            event_id: The id of the event to look up a stream token for.
-        Raises:
-            StoreError if the event wasn't in the database.
-        Returns:
-            A stream ID.
-        """
-        return await self.db_pool.runInteraction(
-            "get_stream_id_for_event", self.get_stream_id_for_event_txn, event_id,
-        )
-
     def get_stream_id_for_event_txn(
         self, txn: LoggingTransaction, event_id: str, allow_none=False,
     ) -> int:
diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py
index 72939f3984..4d2d88d1f0 100644
--- a/synapse/storage/persist_events.py
+++ b/synapse/storage/persist_events.py
@@ -248,6 +248,8 @@ class EventsPersistenceStorage:
         await make_deferred_yieldable(deferred)
 
         event_stream_id = event.internal_metadata.stream_ordering
+        # stream ordering should have been assigned by now
+        assert event_stream_id
 
         pos = PersistedEventPosition(self._instance_name, event_stream_id)
         return pos, self.main_store.get_room_max_token()