summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/databases/main/events.py4
-rw-r--r--synapse/storage/databases/main/events_worker.py26
-rw-r--r--synapse/storage/databases/main/stream.py13
-rw-r--r--synapse/storage/persist_events.py2
4 files changed, 22 insertions, 23 deletions
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 78e645592f..b4abd961b9 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -331,6 +331,10 @@ class PersistEventsStore:
         min_stream_order = events_and_contexts[0][0].internal_metadata.stream_ordering
         max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering
 
+        # stream orderings should have been assigned by now
+        assert min_stream_order
+        assert max_stream_order
+
         self._update_forward_extremities_txn(
             txn,
             new_forward_extremities=new_forward_extremeties,
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 723ced4ff0..b7ed8ca6ab 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -723,6 +723,7 @@ class EventsWorkerStore(SQLBaseStore):
                 internal_metadata_dict=internal_metadata,
                 rejected_reason=rejected_reason,
             )
+            original_ev.internal_metadata.stream_ordering = row["stream_ordering"]
 
             event_map[event_id] = original_ev
 
@@ -790,6 +791,8 @@ class EventsWorkerStore(SQLBaseStore):
 
          * event_id (str)
 
+         * stream_ordering (int): stream ordering for this event
+
          * json (str): json-encoded event structure
 
          * internal_metadata (str): json-encoded internal metadata dict
@@ -822,13 +825,15 @@ class EventsWorkerStore(SQLBaseStore):
             sql = """\
                 SELECT
                   e.event_id,
-                  e.internal_metadata,
-                  e.json,
-                  e.format_version,
+                  e.stream_ordering,
+                  ej.internal_metadata,
+                  ej.json,
+                  ej.format_version,
                   r.room_version,
                   rej.reason
-                FROM event_json as e
-                  LEFT JOIN rooms r USING (room_id)
+                FROM events AS e
+                  JOIN event_json AS ej USING (event_id)
+                  LEFT JOIN rooms r ON r.room_id = e.room_id
                   LEFT JOIN rejections as rej USING (event_id)
                 WHERE """
 
@@ -842,11 +847,12 @@ class EventsWorkerStore(SQLBaseStore):
                 event_id = row[0]
                 event_dict[event_id] = {
                     "event_id": event_id,
-                    "internal_metadata": row[1],
-                    "json": row[2],
-                    "format_version": row[3],
-                    "room_version_id": row[4],
-                    "rejected_reason": row[5],
+                    "stream_ordering": row[1],
+                    "internal_metadata": row[2],
+                    "json": row[3],
+                    "format_version": row[4],
+                    "room_version_id": row[5],
+                    "rejected_reason": row[6],
                     "redactions": [],
                 }
 
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 1d27439536..a94bec1ac5 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -589,19 +589,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
             )
             return "t%d-%d" % (topo, token)
 
-    async def get_stream_id_for_event(self, event_id: str) -> int:
-        """The stream ID for an event
-        Args:
-            event_id: The id of the event to look up a stream token for.
-        Raises:
-            StoreError if the event wasn't in the database.
-        Returns:
-            A stream ID.
-        """
-        return await self.db_pool.runInteraction(
-            "get_stream_id_for_event", self.get_stream_id_for_event_txn, event_id,
-        )
-
     def get_stream_id_for_event_txn(
         self, txn: LoggingTransaction, event_id: str, allow_none=False,
     ) -> int:
diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py
index 72939f3984..4d2d88d1f0 100644
--- a/synapse/storage/persist_events.py
+++ b/synapse/storage/persist_events.py
@@ -248,6 +248,8 @@ class EventsPersistenceStorage:
         await make_deferred_yieldable(deferred)
 
         event_stream_id = event.internal_metadata.stream_ordering
+        # stream ordering should have been assigned by now
+        assert event_stream_id
 
         pos = PersistedEventPosition(self._instance_name, event_stream_id)
         return pos, self.main_store.get_room_max_token()