summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/events/__init__.py6
-rw-r--r--synapse/events/utils.py5
-rw-r--r--synapse/federation/sender/__init__.py2
-rw-r--r--synapse/federation/sender/per_destination_queue.py2
-rw-r--r--synapse/handlers/federation.py3
-rw-r--r--synapse/handlers/message.py4
-rw-r--r--synapse/handlers/room_member.py13
-rw-r--r--synapse/rest/admin/__init__.py5
-rw-r--r--synapse/storage/databases/main/events.py4
-rw-r--r--synapse/storage/databases/main/events_worker.py26
-rw-r--r--synapse/storage/databases/main/stream.py13
-rw-r--r--synapse/storage/persist_events.py2
12 files changed, 52 insertions, 33 deletions
diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py
index dc49df0812..7a51d0a22f 100644
--- a/synapse/events/__init__.py
+++ b/synapse/events/__init__.py
@@ -97,13 +97,16 @@ class DefaultDictProperty(DictProperty):
 
 
 class _EventInternalMetadata:
-    __slots__ = ["_dict"]
+    __slots__ = ["_dict", "stream_ordering"]
 
     def __init__(self, internal_metadata_dict: JsonDict):
         # we have to copy the dict, because it turns out that the same dict is
         # reused. TODO: fix that
         self._dict = dict(internal_metadata_dict)
 
+        # the stream ordering of this event. None, until it has been persisted.
+        self.stream_ordering = None  # type: Optional[int]
+
     outlier = DictProperty("outlier")  # type: bool
     out_of_band_membership = DictProperty("out_of_band_membership")  # type: bool
     send_on_behalf_of = DictProperty("send_on_behalf_of")  # type: str
@@ -113,7 +116,6 @@ class _EventInternalMetadata:
     redacted = DictProperty("redacted")  # type: bool
     txn_id = DictProperty("txn_id")  # type: str
     token_id = DictProperty("token_id")  # type: str
-    stream_ordering = DictProperty("stream_ordering")  # type: int
 
     # XXX: These are set by StreamWorkerStore._set_before_and_after.
     # I'm pretty sure that these are never persisted to the database, so shouldn't
diff --git a/synapse/events/utils.py b/synapse/events/utils.py
index 32c73d3413..355cbe05f1 100644
--- a/synapse/events/utils.py
+++ b/synapse/events/utils.py
@@ -49,6 +49,11 @@ def prune_event(event: EventBase) -> EventBase:
         pruned_event_dict, event.room_version, event.internal_metadata.get_dict()
     )
 
+    # copy the internal fields
+    pruned_event.internal_metadata.stream_ordering = (
+        event.internal_metadata.stream_ordering
+    )
+
     # Mark the event as redacted
     pruned_event.internal_metadata.redacted = True
 
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 8bb17b3a05..e33b29a42c 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -297,6 +297,8 @@ class FederationSender:
         sent_pdus_destination_dist_total.inc(len(destinations))
         sent_pdus_destination_dist_count.inc()
 
+        assert pdu.internal_metadata.stream_ordering
+
         # track the fact that we have a PDU for these destinations,
         # to allow us to perform catch-up later on if the remote is unreachable
         # for a while.
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index bc99af3fdd..db8e456fe8 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -158,6 +158,7 @@ class PerDestinationQueue:
             # yet know if we have anything to catch up (None)
             self._pending_pdus.append(pdu)
         else:
+            assert pdu.internal_metadata.stream_ordering
             self._catchup_last_skipped = pdu.internal_metadata.stream_ordering
 
         self.attempt_new_transaction()
@@ -361,6 +362,7 @@ class PerDestinationQueue:
                         last_successful_stream_ordering = (
                             final_pdu.internal_metadata.stream_ordering
                         )
+                        assert last_successful_stream_ordering
                         await self._store.set_destination_last_successful_stream_ordering(
                             self._destination, last_successful_stream_ordering
                         )
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 1a8144405a..5ac2fc5656 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -3008,6 +3008,9 @@ class FederationHandler(BaseHandler):
         elif event.internal_metadata.is_outlier():
             return
 
+        # the event has been persisted so it should have a stream ordering.
+        assert event.internal_metadata.stream_ordering
+
         event_pos = PersistedEventPosition(
             self._instance_name, event.internal_metadata.stream_ordering
         )
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index ee271e85e5..00513fbf37 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -682,7 +682,9 @@ class EventCreationHandler:
                     event.event_id,
                     prev_event.event_id,
                 )
-                return await self.store.get_stream_id_for_event(prev_event.event_id)
+                # we know it was persisted, so must have a stream ordering
+                assert prev_event.internal_metadata.stream_ordering
+                return prev_event.internal_metadata.stream_ordering
 
         return await self.handle_new_client_event(
             requester=requester, event=event, context=context, ratelimit=ratelimit
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 567a14bd0a..13b749b7cb 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -194,8 +194,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         )
         if duplicate is not None:
             # Discard the new event since this membership change is a no-op.
-            _, stream_id = await self.store.get_event_ordering(duplicate.event_id)
-            return duplicate.event_id, stream_id
+            # we know it was persisted, so must have a stream ordering.
+            assert duplicate.internal_metadata.stream_ordering
+            return duplicate.event_id, duplicate.internal_metadata.stream_ordering
 
         prev_state_ids = await context.get_prev_state_ids()
 
@@ -441,12 +442,12 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
                 same_membership = old_membership == effective_membership_state
                 same_sender = requester.user.to_string() == old_state.sender
                 if same_sender and same_membership and same_content:
-                    _, stream_id = await self.store.get_event_ordering(
-                        old_state.event_id
-                    )
+                    # duplicate event.
+                    # we know it was persisted, so must have a stream ordering.
+                    assert old_state.internal_metadata.stream_ordering
                     return (
                         old_state.event_id,
-                        stream_id,
+                        old_state.internal_metadata.stream_ordering,
                     )
 
             if old_membership in ["ban", "leave"] and action == "kick":
diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py
index 57cac22252..789431ef25 100644
--- a/synapse/rest/admin/__init__.py
+++ b/synapse/rest/admin/__init__.py
@@ -57,6 +57,7 @@ from synapse.rest.admin.users import (
     UsersRestServletV2,
     WhoisRestServlet,
 )
+from synapse.types import RoomStreamToken
 from synapse.util.versionstring import get_version_string
 
 logger = logging.getLogger(__name__)
@@ -109,7 +110,9 @@ class PurgeHistoryRestServlet(RestServlet):
             if event.room_id != room_id:
                 raise SynapseError(400, "Event is for wrong room.")
 
-            room_token = await self.store.get_topological_token_for_event(event_id)
+            room_token = RoomStreamToken(
+                event.depth, event.internal_metadata.stream_ordering
+            )
             token = await room_token.to_string(self.store)
 
             logger.info("[purge] purging up to token %s (event_id %s)", token, event_id)
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 78e645592f..b4abd961b9 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -331,6 +331,10 @@ class PersistEventsStore:
         min_stream_order = events_and_contexts[0][0].internal_metadata.stream_ordering
         max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering
 
+        # stream orderings should have been assigned by now
+        assert min_stream_order
+        assert max_stream_order
+
         self._update_forward_extremities_txn(
             txn,
             new_forward_extremities=new_forward_extremeties,
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 723ced4ff0..b7ed8ca6ab 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -723,6 +723,7 @@ class EventsWorkerStore(SQLBaseStore):
                 internal_metadata_dict=internal_metadata,
                 rejected_reason=rejected_reason,
             )
+            original_ev.internal_metadata.stream_ordering = row["stream_ordering"]
 
             event_map[event_id] = original_ev
 
@@ -790,6 +791,8 @@ class EventsWorkerStore(SQLBaseStore):
 
          * event_id (str)
 
+         * stream_ordering (int): stream ordering for this event
+
          * json (str): json-encoded event structure
 
          * internal_metadata (str): json-encoded internal metadata dict
@@ -822,13 +825,15 @@ class EventsWorkerStore(SQLBaseStore):
             sql = """\
                 SELECT
                   e.event_id,
-                  e.internal_metadata,
-                  e.json,
-                  e.format_version,
+                  e.stream_ordering,
+                  ej.internal_metadata,
+                  ej.json,
+                  ej.format_version,
                   r.room_version,
                   rej.reason
-                FROM event_json as e
-                  LEFT JOIN rooms r USING (room_id)
+                FROM events AS e
+                  JOIN event_json AS ej USING (event_id)
+                  LEFT JOIN rooms r ON r.room_id = e.room_id
                   LEFT JOIN rejections as rej USING (event_id)
                 WHERE """
 
@@ -842,11 +847,12 @@ class EventsWorkerStore(SQLBaseStore):
                 event_id = row[0]
                 event_dict[event_id] = {
                     "event_id": event_id,
-                    "internal_metadata": row[1],
-                    "json": row[2],
-                    "format_version": row[3],
-                    "room_version_id": row[4],
-                    "rejected_reason": row[5],
+                    "stream_ordering": row[1],
+                    "internal_metadata": row[2],
+                    "json": row[3],
+                    "format_version": row[4],
+                    "room_version_id": row[5],
+                    "rejected_reason": row[6],
                     "redactions": [],
                 }
 
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 1d27439536..a94bec1ac5 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -589,19 +589,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
             )
             return "t%d-%d" % (topo, token)
 
-    async def get_stream_id_for_event(self, event_id: str) -> int:
-        """The stream ID for an event
-        Args:
-            event_id: The id of the event to look up a stream token for.
-        Raises:
-            StoreError if the event wasn't in the database.
-        Returns:
-            A stream ID.
-        """
-        return await self.db_pool.runInteraction(
-            "get_stream_id_for_event", self.get_stream_id_for_event_txn, event_id,
-        )
-
     def get_stream_id_for_event_txn(
         self, txn: LoggingTransaction, event_id: str, allow_none=False,
     ) -> int:
diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py
index 72939f3984..4d2d88d1f0 100644
--- a/synapse/storage/persist_events.py
+++ b/synapse/storage/persist_events.py
@@ -248,6 +248,8 @@ class EventsPersistenceStorage:
         await make_deferred_yieldable(deferred)
 
         event_stream_id = event.internal_metadata.stream_ordering
+        # stream ordering should have been assigned by now
+        assert event_stream_id
 
         pos = PersistedEventPosition(self._instance_name, event_stream_id)
         return pos, self.main_store.get_room_max_token()