diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/events/__init__.py | 6 | ||||
-rw-r--r-- | synapse/events/utils.py | 5 | ||||
-rw-r--r-- | synapse/federation/sender/__init__.py | 2 | ||||
-rw-r--r-- | synapse/federation/sender/per_destination_queue.py | 2 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 3 | ||||
-rw-r--r-- | synapse/handlers/message.py | 4 | ||||
-rw-r--r-- | synapse/handlers/room_member.py | 13 | ||||
-rw-r--r-- | synapse/rest/admin/__init__.py | 5 | ||||
-rw-r--r-- | synapse/storage/databases/main/events.py | 4 | ||||
-rw-r--r-- | synapse/storage/databases/main/events_worker.py | 26 | ||||
-rw-r--r-- | synapse/storage/databases/main/stream.py | 13 | ||||
-rw-r--r-- | synapse/storage/persist_events.py | 2 |
12 files changed, 52 insertions, 33 deletions
diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index dc49df0812..7a51d0a22f 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -97,13 +97,16 @@ class DefaultDictProperty(DictProperty): class _EventInternalMetadata: - __slots__ = ["_dict"] + __slots__ = ["_dict", "stream_ordering"] def __init__(self, internal_metadata_dict: JsonDict): # we have to copy the dict, because it turns out that the same dict is # reused. TODO: fix that self._dict = dict(internal_metadata_dict) + # the stream ordering of this event. None, until it has been persisted. + self.stream_ordering = None # type: Optional[int] + outlier = DictProperty("outlier") # type: bool out_of_band_membership = DictProperty("out_of_band_membership") # type: bool send_on_behalf_of = DictProperty("send_on_behalf_of") # type: str @@ -113,7 +116,6 @@ class _EventInternalMetadata: redacted = DictProperty("redacted") # type: bool txn_id = DictProperty("txn_id") # type: str token_id = DictProperty("token_id") # type: str - stream_ordering = DictProperty("stream_ordering") # type: int # XXX: These are set by StreamWorkerStore._set_before_and_after. # I'm pretty sure that these are never persisted to the database, so shouldn't diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 32c73d3413..355cbe05f1 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -49,6 +49,11 @@ def prune_event(event: EventBase) -> EventBase: pruned_event_dict, event.room_version, event.internal_metadata.get_dict() ) + # copy the internal fields + pruned_event.internal_metadata.stream_ordering = ( + event.internal_metadata.stream_ordering + ) + # Mark the event as redacted pruned_event.internal_metadata.redacted = True diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 8bb17b3a05..e33b29a42c 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -297,6 +297,8 @@ class FederationSender: sent_pdus_destination_dist_total.inc(len(destinations)) sent_pdus_destination_dist_count.inc() + assert pdu.internal_metadata.stream_ordering + # track the fact that we have a PDU for these destinations, # to allow us to perform catch-up later on if the remote is unreachable # for a while. diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index bc99af3fdd..db8e456fe8 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -158,6 +158,7 @@ class PerDestinationQueue: # yet know if we have anything to catch up (None) self._pending_pdus.append(pdu) else: + assert pdu.internal_metadata.stream_ordering self._catchup_last_skipped = pdu.internal_metadata.stream_ordering self.attempt_new_transaction() @@ -361,6 +362,7 @@ class PerDestinationQueue: last_successful_stream_ordering = ( final_pdu.internal_metadata.stream_ordering ) + assert last_successful_stream_ordering await self._store.set_destination_last_successful_stream_ordering( self._destination, last_successful_stream_ordering ) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 1a8144405a..5ac2fc5656 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -3008,6 +3008,9 @@ class FederationHandler(BaseHandler): elif event.internal_metadata.is_outlier(): return + # the event has been persisted so it should have a stream ordering. + assert event.internal_metadata.stream_ordering + event_pos = PersistedEventPosition( self._instance_name, event.internal_metadata.stream_ordering ) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index ee271e85e5..00513fbf37 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -682,7 +682,9 @@ class EventCreationHandler: event.event_id, prev_event.event_id, ) - return await self.store.get_stream_id_for_event(prev_event.event_id) + # we know it was persisted, so must have a stream ordering + assert prev_event.internal_metadata.stream_ordering + return prev_event.internal_metadata.stream_ordering return await self.handle_new_client_event( requester=requester, event=event, context=context, ratelimit=ratelimit diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 567a14bd0a..13b749b7cb 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -194,8 +194,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): ) if duplicate is not None: # Discard the new event since this membership change is a no-op. - _, stream_id = await self.store.get_event_ordering(duplicate.event_id) - return duplicate.event_id, stream_id + # we know it was persisted, so must have a stream ordering. + assert duplicate.internal_metadata.stream_ordering + return duplicate.event_id, duplicate.internal_metadata.stream_ordering prev_state_ids = await context.get_prev_state_ids() @@ -441,12 +442,12 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): same_membership = old_membership == effective_membership_state same_sender = requester.user.to_string() == old_state.sender if same_sender and same_membership and same_content: - _, stream_id = await self.store.get_event_ordering( - old_state.event_id - ) + # duplicate event. + # we know it was persisted, so must have a stream ordering. + assert old_state.internal_metadata.stream_ordering return ( old_state.event_id, - stream_id, + old_state.internal_metadata.stream_ordering, ) if old_membership in ["ban", "leave"] and action == "kick": diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py index 57cac22252..789431ef25 100644 --- a/synapse/rest/admin/__init__.py +++ b/synapse/rest/admin/__init__.py @@ -57,6 +57,7 @@ from synapse.rest.admin.users import ( UsersRestServletV2, WhoisRestServlet, ) +from synapse.types import RoomStreamToken from synapse.util.versionstring import get_version_string logger = logging.getLogger(__name__) @@ -109,7 +110,9 @@ class PurgeHistoryRestServlet(RestServlet): if event.room_id != room_id: raise SynapseError(400, "Event is for wrong room.") - room_token = await self.store.get_topological_token_for_event(event_id) + room_token = RoomStreamToken( + event.depth, event.internal_metadata.stream_ordering + ) token = await room_token.to_string(self.store) logger.info("[purge] purging up to token %s (event_id %s)", token, event_id) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 78e645592f..b4abd961b9 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -331,6 +331,10 @@ class PersistEventsStore: min_stream_order = events_and_contexts[0][0].internal_metadata.stream_ordering max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering + # stream orderings should have been assigned by now + assert min_stream_order + assert max_stream_order + self._update_forward_extremities_txn( txn, new_forward_extremities=new_forward_extremeties, diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 723ced4ff0..b7ed8ca6ab 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -723,6 +723,7 @@ class EventsWorkerStore(SQLBaseStore): internal_metadata_dict=internal_metadata, rejected_reason=rejected_reason, ) + original_ev.internal_metadata.stream_ordering = row["stream_ordering"] event_map[event_id] = original_ev @@ -790,6 +791,8 @@ class EventsWorkerStore(SQLBaseStore): * event_id (str) + * stream_ordering (int): stream ordering for this event + * json (str): json-encoded event structure * internal_metadata (str): json-encoded internal metadata dict @@ -822,13 +825,15 @@ class EventsWorkerStore(SQLBaseStore): sql = """\ SELECT e.event_id, - e.internal_metadata, - e.json, - e.format_version, + e.stream_ordering, + ej.internal_metadata, + ej.json, + ej.format_version, r.room_version, rej.reason - FROM event_json as e - LEFT JOIN rooms r USING (room_id) + FROM events AS e + JOIN event_json AS ej USING (event_id) + LEFT JOIN rooms r ON r.room_id = e.room_id LEFT JOIN rejections as rej USING (event_id) WHERE """ @@ -842,11 +847,12 @@ class EventsWorkerStore(SQLBaseStore): event_id = row[0] event_dict[event_id] = { "event_id": event_id, - "internal_metadata": row[1], - "json": row[2], - "format_version": row[3], - "room_version_id": row[4], - "rejected_reason": row[5], + "stream_ordering": row[1], + "internal_metadata": row[2], + "json": row[3], + "format_version": row[4], + "room_version_id": row[5], + "rejected_reason": row[6], "redactions": [], } diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 1d27439536..a94bec1ac5 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -589,19 +589,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta): ) return "t%d-%d" % (topo, token) - async def get_stream_id_for_event(self, event_id: str) -> int: - """The stream ID for an event - Args: - event_id: The id of the event to look up a stream token for. - Raises: - StoreError if the event wasn't in the database. - Returns: - A stream ID. - """ - return await self.db_pool.runInteraction( - "get_stream_id_for_event", self.get_stream_id_for_event_txn, event_id, - ) - def get_stream_id_for_event_txn( self, txn: LoggingTransaction, event_id: str, allow_none=False, ) -> int: diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index 72939f3984..4d2d88d1f0 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -248,6 +248,8 @@ class EventsPersistenceStorage: await make_deferred_yieldable(deferred) event_stream_id = event.internal_metadata.stream_ordering + # stream ordering should have been assigned by now + assert event_stream_id pos = PersistedEventPosition(self._instance_name, event_stream_id) return pos, self.main_store.get_room_max_token() |