diff --git a/changelog.d/8452.misc b/changelog.d/8452.misc
new file mode 100644
index 0000000000..8288d91c78
--- /dev/null
+++ b/changelog.d/8452.misc
@@ -0,0 +1 @@
+Remove redundant databae loads of stream_ordering for events we already have.
diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py
index dc49df0812..7a51d0a22f 100644
--- a/synapse/events/__init__.py
+++ b/synapse/events/__init__.py
@@ -97,13 +97,16 @@ class DefaultDictProperty(DictProperty):
class _EventInternalMetadata:
- __slots__ = ["_dict"]
+ __slots__ = ["_dict", "stream_ordering"]
def __init__(self, internal_metadata_dict: JsonDict):
# we have to copy the dict, because it turns out that the same dict is
# reused. TODO: fix that
self._dict = dict(internal_metadata_dict)
+ # the stream ordering of this event. None, until it has been persisted.
+ self.stream_ordering = None # type: Optional[int]
+
outlier = DictProperty("outlier") # type: bool
out_of_band_membership = DictProperty("out_of_band_membership") # type: bool
send_on_behalf_of = DictProperty("send_on_behalf_of") # type: str
@@ -113,7 +116,6 @@ class _EventInternalMetadata:
redacted = DictProperty("redacted") # type: bool
txn_id = DictProperty("txn_id") # type: str
token_id = DictProperty("token_id") # type: str
- stream_ordering = DictProperty("stream_ordering") # type: int
# XXX: These are set by StreamWorkerStore._set_before_and_after.
# I'm pretty sure that these are never persisted to the database, so shouldn't
diff --git a/synapse/events/utils.py b/synapse/events/utils.py
index 32c73d3413..355cbe05f1 100644
--- a/synapse/events/utils.py
+++ b/synapse/events/utils.py
@@ -49,6 +49,11 @@ def prune_event(event: EventBase) -> EventBase:
pruned_event_dict, event.room_version, event.internal_metadata.get_dict()
)
+ # copy the internal fields
+ pruned_event.internal_metadata.stream_ordering = (
+ event.internal_metadata.stream_ordering
+ )
+
# Mark the event as redacted
pruned_event.internal_metadata.redacted = True
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 8bb17b3a05..e33b29a42c 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -297,6 +297,8 @@ class FederationSender:
sent_pdus_destination_dist_total.inc(len(destinations))
sent_pdus_destination_dist_count.inc()
+ assert pdu.internal_metadata.stream_ordering
+
# track the fact that we have a PDU for these destinations,
# to allow us to perform catch-up later on if the remote is unreachable
# for a while.
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index bc99af3fdd..db8e456fe8 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -158,6 +158,7 @@ class PerDestinationQueue:
# yet know if we have anything to catch up (None)
self._pending_pdus.append(pdu)
else:
+ assert pdu.internal_metadata.stream_ordering
self._catchup_last_skipped = pdu.internal_metadata.stream_ordering
self.attempt_new_transaction()
@@ -361,6 +362,7 @@ class PerDestinationQueue:
last_successful_stream_ordering = (
final_pdu.internal_metadata.stream_ordering
)
+ assert last_successful_stream_ordering
await self._store.set_destination_last_successful_stream_ordering(
self._destination, last_successful_stream_ordering
)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 1a8144405a..5ac2fc5656 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -3008,6 +3008,9 @@ class FederationHandler(BaseHandler):
elif event.internal_metadata.is_outlier():
return
+ # the event has been persisted so it should have a stream ordering.
+ assert event.internal_metadata.stream_ordering
+
event_pos = PersistedEventPosition(
self._instance_name, event.internal_metadata.stream_ordering
)
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index ee271e85e5..00513fbf37 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -682,7 +682,9 @@ class EventCreationHandler:
event.event_id,
prev_event.event_id,
)
- return await self.store.get_stream_id_for_event(prev_event.event_id)
+ # we know it was persisted, so must have a stream ordering
+ assert prev_event.internal_metadata.stream_ordering
+ return prev_event.internal_metadata.stream_ordering
return await self.handle_new_client_event(
requester=requester, event=event, context=context, ratelimit=ratelimit
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 567a14bd0a..13b749b7cb 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -194,8 +194,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
)
if duplicate is not None:
# Discard the new event since this membership change is a no-op.
- _, stream_id = await self.store.get_event_ordering(duplicate.event_id)
- return duplicate.event_id, stream_id
+ # we know it was persisted, so must have a stream ordering.
+ assert duplicate.internal_metadata.stream_ordering
+ return duplicate.event_id, duplicate.internal_metadata.stream_ordering
prev_state_ids = await context.get_prev_state_ids()
@@ -441,12 +442,12 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
same_membership = old_membership == effective_membership_state
same_sender = requester.user.to_string() == old_state.sender
if same_sender and same_membership and same_content:
- _, stream_id = await self.store.get_event_ordering(
- old_state.event_id
- )
+ # duplicate event.
+ # we know it was persisted, so must have a stream ordering.
+ assert old_state.internal_metadata.stream_ordering
return (
old_state.event_id,
- stream_id,
+ old_state.internal_metadata.stream_ordering,
)
if old_membership in ["ban", "leave"] and action == "kick":
diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py
index 57cac22252..789431ef25 100644
--- a/synapse/rest/admin/__init__.py
+++ b/synapse/rest/admin/__init__.py
@@ -57,6 +57,7 @@ from synapse.rest.admin.users import (
UsersRestServletV2,
WhoisRestServlet,
)
+from synapse.types import RoomStreamToken
from synapse.util.versionstring import get_version_string
logger = logging.getLogger(__name__)
@@ -109,7 +110,9 @@ class PurgeHistoryRestServlet(RestServlet):
if event.room_id != room_id:
raise SynapseError(400, "Event is for wrong room.")
- room_token = await self.store.get_topological_token_for_event(event_id)
+ room_token = RoomStreamToken(
+ event.depth, event.internal_metadata.stream_ordering
+ )
token = await room_token.to_string(self.store)
logger.info("[purge] purging up to token %s (event_id %s)", token, event_id)
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 78e645592f..b4abd961b9 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -331,6 +331,10 @@ class PersistEventsStore:
min_stream_order = events_and_contexts[0][0].internal_metadata.stream_ordering
max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering
+ # stream orderings should have been assigned by now
+ assert min_stream_order
+ assert max_stream_order
+
self._update_forward_extremities_txn(
txn,
new_forward_extremities=new_forward_extremeties,
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 723ced4ff0..b7ed8ca6ab 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -723,6 +723,7 @@ class EventsWorkerStore(SQLBaseStore):
internal_metadata_dict=internal_metadata,
rejected_reason=rejected_reason,
)
+ original_ev.internal_metadata.stream_ordering = row["stream_ordering"]
event_map[event_id] = original_ev
@@ -790,6 +791,8 @@ class EventsWorkerStore(SQLBaseStore):
* event_id (str)
+ * stream_ordering (int): stream ordering for this event
+
* json (str): json-encoded event structure
* internal_metadata (str): json-encoded internal metadata dict
@@ -822,13 +825,15 @@ class EventsWorkerStore(SQLBaseStore):
sql = """\
SELECT
e.event_id,
- e.internal_metadata,
- e.json,
- e.format_version,
+ e.stream_ordering,
+ ej.internal_metadata,
+ ej.json,
+ ej.format_version,
r.room_version,
rej.reason
- FROM event_json as e
- LEFT JOIN rooms r USING (room_id)
+ FROM events AS e
+ JOIN event_json AS ej USING (event_id)
+ LEFT JOIN rooms r ON r.room_id = e.room_id
LEFT JOIN rejections as rej USING (event_id)
WHERE """
@@ -842,11 +847,12 @@ class EventsWorkerStore(SQLBaseStore):
event_id = row[0]
event_dict[event_id] = {
"event_id": event_id,
- "internal_metadata": row[1],
- "json": row[2],
- "format_version": row[3],
- "room_version_id": row[4],
- "rejected_reason": row[5],
+ "stream_ordering": row[1],
+ "internal_metadata": row[2],
+ "json": row[3],
+ "format_version": row[4],
+ "room_version_id": row[5],
+ "rejected_reason": row[6],
"redactions": [],
}
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 1d27439536..a94bec1ac5 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -589,19 +589,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
)
return "t%d-%d" % (topo, token)
- async def get_stream_id_for_event(self, event_id: str) -> int:
- """The stream ID for an event
- Args:
- event_id: The id of the event to look up a stream token for.
- Raises:
- StoreError if the event wasn't in the database.
- Returns:
- A stream ID.
- """
- return await self.db_pool.runInteraction(
- "get_stream_id_for_event", self.get_stream_id_for_event_txn, event_id,
- )
-
def get_stream_id_for_event_txn(
self, txn: LoggingTransaction, event_id: str, allow_none=False,
) -> int:
diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py
index 72939f3984..4d2d88d1f0 100644
--- a/synapse/storage/persist_events.py
+++ b/synapse/storage/persist_events.py
@@ -248,6 +248,8 @@ class EventsPersistenceStorage:
await make_deferred_yieldable(deferred)
event_stream_id = event.internal_metadata.stream_ordering
+ # stream ordering should have been assigned by now
+ assert event_stream_id
pos = PersistedEventPosition(self._instance_name, event_stream_id)
return pos, self.main_store.get_room_max_token()
|