summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2021-07-01 08:59:54 +0100
committerRichard van der Hoff <richard@matrix.org>2021-07-01 08:59:54 +0100
commit7eea8de9de653bc160dada0bf0f87eb6d1256acc (patch)
treee363a57eb7a28a2b32fcc1e2b349e402728c87ab /synapse
parentbump background update rate (diff)
parentFix the homeserver config example in presence router docs (#10288) (diff)
downloadsynapse-7eea8de9de653bc160dada0bf0f87eb6d1256acc.tar.xz
Merge remote-tracking branch 'origin/develop' into matrix-org-hotfixes
Diffstat (limited to '')
-rw-r--r--synapse/__init__.py2
-rw-r--r--synapse/api/auth.py5
-rw-r--r--synapse/api/constants.py6
-rw-r--r--synapse/event_auth.py5
-rw-r--r--synapse/events/__init__.py2
-rw-r--r--synapse/events/builder.py69
-rw-r--r--synapse/federation/federation_server.py37
-rw-r--r--synapse/handlers/message.py7
-rw-r--r--synapse/handlers/space_summary.py11
-rw-r--r--synapse/storage/databases/main/event_federation.py66
-rw-r--r--synapse/storage/databases/main/events_bg_updates.py50
-rw-r--r--synapse/storage/engines/_base.py6
-rw-r--r--synapse/storage/engines/postgres.py5
-rw-r--r--synapse/storage/engines/sqlite.py5
-rw-r--r--synapse/storage/schema/main/delta/60/01recreate_stream_ordering.sql.postgres11
-rw-r--r--synapse/storage/schema/main/delta/60/02change_stream_ordering_columns.sql.postgres30
16 files changed, 243 insertions, 74 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py

index 0900492619..1bd03462ac 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py
@@ -47,7 +47,7 @@ try: except ImportError: pass -__version__ = "1.37.1a1" +__version__ = "1.37.1" if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)): # We import here so that we don't have to install a bunch of deps when diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 29cf257633..f8b068e563 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py
@@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union import pymacaroons from netaddr import IPAddress @@ -31,6 +31,7 @@ from synapse.api.errors import ( from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.appservice import ApplicationService from synapse.events import EventBase +from synapse.events.builder import EventBuilder from synapse.http import get_request_user_agent from synapse.http.site import SynapseRequest from synapse.logging import opentracing as opentracing @@ -490,7 +491,7 @@ class Auth: def compute_auth_events( self, - event, + event: Union[EventBase, EventBuilder], current_state_ids: StateMap[str], for_verification: bool = False, ) -> List[str]: diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index 414e4c019a..8363c2bb0f 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py
@@ -201,6 +201,12 @@ class EventContentFields: ) +class RoomTypes: + """Understood values of the room_type field of m.room.create events.""" + + SPACE = "m.space" + + class RoomEncryptionAlgorithms: MEGOLM_V1_AES_SHA2 = "m.megolm.v1.aes-sha2" DEFAULT = MEGOLM_V1_AES_SHA2 diff --git a/synapse/event_auth.py b/synapse/event_auth.py
index 33d7c60241..89bcf81515 100644 --- a/synapse/event_auth.py +++ b/synapse/event_auth.py
@@ -14,7 +14,7 @@ # limitations under the License. import logging -from typing import Any, Dict, List, Optional, Set, Tuple +from typing import Any, Dict, List, Optional, Set, Tuple, Union from canonicaljson import encode_canonical_json from signedjson.key import decode_verify_key_bytes @@ -29,6 +29,7 @@ from synapse.api.room_versions import ( RoomVersion, ) from synapse.events import EventBase +from synapse.events.builder import EventBuilder from synapse.types import StateMap, UserID, get_domain_from_id logger = logging.getLogger(__name__) @@ -724,7 +725,7 @@ def get_public_keys(invite_event: EventBase) -> List[Dict[str, Any]]: return public_keys -def auth_types_for_event(event: EventBase) -> Set[Tuple[str, str]]: +def auth_types_for_event(event: Union[EventBase, EventBuilder]) -> Set[Tuple[str, str]]: """Given an event, return a list of (EventType, StateKey) that may be needed to auth the event. The returned list may be a superset of what would actually be required depending on the full state of the room. diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py
index 0cb9c1cc1e..6286ad999a 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py
@@ -118,7 +118,7 @@ class _EventInternalMetadata: proactively_send = DictProperty("proactively_send") # type: bool redacted = DictProperty("redacted") # type: bool txn_id = DictProperty("txn_id") # type: str - token_id = DictProperty("token_id") # type: str + token_id = DictProperty("token_id") # type: int historical = DictProperty("historical") # type: bool # XXX: These are set by StreamWorkerStore._set_before_and_after. diff --git a/synapse/events/builder.py b/synapse/events/builder.py
index 81bf8615b7..fb48ec8541 100644 --- a/synapse/events/builder.py +++ b/synapse/events/builder.py
@@ -12,12 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import Any, Dict, List, Optional, Tuple, Union +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union import attr from nacl.signing import SigningKey -from synapse.api.auth import Auth from synapse.api.constants import MAX_DEPTH from synapse.api.errors import UnsupportedRoomVersionError from synapse.api.room_versions import ( @@ -34,10 +33,14 @@ from synapse.types import EventID, JsonDict from synapse.util import Clock from synapse.util.stringutils import random_string +if TYPE_CHECKING: + from synapse.api.auth import Auth + from synapse.server import HomeServer + logger = logging.getLogger(__name__) -@attr.s(slots=True, cmp=False, frozen=True) +@attr.s(slots=True, cmp=False, frozen=True, auto_attribs=True) class EventBuilder: """A format independent event builder used to build up the event content before signing the event. @@ -62,31 +65,30 @@ class EventBuilder: _signing_key: The signing key to use to sign the event as the server """ - _state = attr.ib(type=StateHandler) - _auth = attr.ib(type=Auth) - _store = attr.ib(type=DataStore) - _clock = attr.ib(type=Clock) - _hostname = attr.ib(type=str) - _signing_key = attr.ib(type=SigningKey) + _state: StateHandler + _auth: "Auth" + _store: DataStore + _clock: Clock + _hostname: str + _signing_key: SigningKey - room_version = attr.ib(type=RoomVersion) + room_version: RoomVersion - room_id = attr.ib(type=str) - type = attr.ib(type=str) - sender = attr.ib(type=str) + room_id: str + type: str + sender: str - content = attr.ib(default=attr.Factory(dict), type=JsonDict) - unsigned = attr.ib(default=attr.Factory(dict), type=JsonDict) + content: JsonDict = attr.Factory(dict) + unsigned: JsonDict = attr.Factory(dict) # These only exist on a subset of events, so they raise AttributeError if # someone tries to get them when they don't exist. - _state_key = attr.ib(default=None, type=Optional[str]) - _redacts = attr.ib(default=None, type=Optional[str]) - _origin_server_ts = attr.ib(default=None, type=Optional[int]) + _state_key: Optional[str] = None + _redacts: Optional[str] = None + _origin_server_ts: Optional[int] = None - internal_metadata = attr.ib( - default=attr.Factory(lambda: _EventInternalMetadata({})), - type=_EventInternalMetadata, + internal_metadata: _EventInternalMetadata = attr.Factory( + lambda: _EventInternalMetadata({}) ) @property @@ -184,7 +186,7 @@ class EventBuilder: class EventBuilderFactory: - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): self.clock = hs.get_clock() self.hostname = hs.hostname self.signing_key = hs.signing_key @@ -193,15 +195,14 @@ class EventBuilderFactory: self.state = hs.get_state_handler() self.auth = hs.get_auth() - def new(self, room_version, key_values): + def new(self, room_version: str, key_values: dict) -> EventBuilder: """Generate an event builder appropriate for the given room version Deprecated: use for_room_version with a RoomVersion object instead Args: - room_version (str): Version of the room that we're creating an event builder - for - key_values (dict): Fields used as the basis of the new event + room_version: Version of the room that we're creating an event builder for + key_values: Fields used as the basis of the new event Returns: EventBuilder @@ -212,13 +213,15 @@ class EventBuilderFactory: raise UnsupportedRoomVersionError() return self.for_room_version(v, key_values) - def for_room_version(self, room_version, key_values): + def for_room_version( + self, room_version: RoomVersion, key_values: dict + ) -> EventBuilder: """Generate an event builder appropriate for the given room version Args: - room_version (synapse.api.room_versions.RoomVersion): + room_version: Version of the room that we're creating an event builder for - key_values (dict): Fields used as the basis of the new event + key_values: Fields used as the basis of the new event Returns: EventBuilder @@ -286,15 +289,15 @@ def create_local_event_from_event_dict( _event_id_counter = 0 -def _create_event_id(clock, hostname): +def _create_event_id(clock: Clock, hostname: str) -> str: """Create a new event ID Args: - clock (Clock) - hostname (str): The server name for the event ID + clock + hostname: The server name for the event ID Returns: - str + The new event ID """ global _event_id_counter diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 742d29291e..e93b7577fe 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py
@@ -369,22 +369,21 @@ class FederationServer(FederationBase): async def process_pdu(pdu: EventBase) -> JsonDict: event_id = pdu.event_id - with pdu_process_time.time(): - with nested_logging_context(event_id): - try: - await self._handle_received_pdu(origin, pdu) - return {} - except FederationError as e: - logger.warning("Error handling PDU %s: %s", event_id, e) - return {"error": str(e)} - except Exception as e: - f = failure.Failure() - logger.error( - "Failed to handle PDU %s", - event_id, - exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore - ) - return {"error": str(e)} + with nested_logging_context(event_id): + try: + await self._handle_received_pdu(origin, pdu) + return {} + except FederationError as e: + logger.warning("Error handling PDU %s: %s", event_id, e) + return {"error": str(e)} + except Exception as e: + f = failure.Failure() + logger.error( + "Failed to handle PDU %s", + event_id, + exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore + ) + return {"error": str(e)} await concurrently_execute( process_pdus_for_room, pdus_by_room.keys(), TRANSACTION_CONCURRENCY_LIMIT @@ -932,9 +931,13 @@ class FederationServer(FederationBase): exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore ) - await self.store.remove_received_event_from_staging( + received_ts = await self.store.remove_received_event_from_staging( origin, event.event_id ) + if received_ts is not None: + pdu_process_time.observe( + (self._clock.time_msec() - received_ts) / 1000 + ) # We need to do this check outside the lock to avoid a race between # a new event being inserted by another instance and it attempting diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 2c1b10f652..2a7a6e6982 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py
@@ -509,6 +509,8 @@ class EventCreationHandler: Should normally be left as None, which will cause them to be calculated based on the room state at the prev_events. + If non-None, prev_event_ids must also be provided. + require_consent: Whether to check if the requester has consented to the privacy policy. @@ -581,6 +583,9 @@ class EventCreationHandler: # Strip down the auth_event_ids to only what we need to auth the event. # For example, we don't need extra m.room.member that don't match event.sender if auth_event_ids is not None: + # If auth events are provided, prev events must be also. + assert prev_event_ids is not None + temp_event = await builder.build( prev_event_ids=prev_event_ids, auth_event_ids=auth_event_ids, @@ -784,6 +789,8 @@ class EventCreationHandler: The event ids to use as the auth_events for the new event. Should normally be left as None, which will cause them to be calculated based on the room state at the prev_events. + + If non-None, prev_event_ids must also be provided. ratelimit: Whether to rate limit this send. txn_id: The transaction ID. ignore_shadow_ban: True if shadow-banned users should be allowed to diff --git a/synapse/handlers/space_summary.py b/synapse/handlers/space_summary.py
index 17fc47ce16..266f369883 100644 --- a/synapse/handlers/space_summary.py +++ b/synapse/handlers/space_summary.py
@@ -25,6 +25,7 @@ from synapse.api.constants import ( EventTypes, HistoryVisibility, Membership, + RoomTypes, ) from synapse.events import EventBase from synapse.events.utils import format_event_for_client_v2 @@ -318,7 +319,8 @@ class SpaceSummaryHandler: Returns: A tuple of: - An iterable of a single value of the room. + The room information, if the room should be returned to the + user. None, otherwise. An iterable of the sorted children events. This may be limited to a maximum size or may include all children. @@ -328,7 +330,11 @@ class SpaceSummaryHandler: room_entry = await self._build_room_entry(room_id) - # look for child rooms/spaces. + # If the room is not a space, return just the room information. + if room_entry.get("room_type") != RoomTypes.SPACE: + return room_entry, () + + # Otherwise, look for child rooms/spaces. child_events = await self._get_child_events(room_id) if suggested_only: @@ -348,6 +354,7 @@ class SpaceSummaryHandler: event_format=format_event_for_client_v2, ) ) + return room_entry, events_result async def _summarize_remote_room( diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index f23f8c6ecf..f2d27ee893 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py
@@ -1075,16 +1075,62 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas self, origin: str, event_id: str, - ) -> None: - """Remove the given event from the staging area""" - await self.db_pool.simple_delete( - table="federation_inbound_events_staging", - keyvalues={ - "origin": origin, - "event_id": event_id, - }, - desc="remove_received_event_from_staging", - ) + ) -> Optional[int]: + """Remove the given event from the staging area. + + Returns: + The received_ts of the row that was deleted, if any. + """ + if self.db_pool.engine.supports_returning: + + def _remove_received_event_from_staging_txn(txn): + sql = """ + DELETE FROM federation_inbound_events_staging + WHERE origin = ? AND event_id = ? + RETURNING received_ts + """ + + txn.execute(sql, (origin, event_id)) + return txn.fetchone() + + row = await self.db_pool.runInteraction( + "remove_received_event_from_staging", + _remove_received_event_from_staging_txn, + db_autocommit=True, + ) + if row is None: + return None + + return row[0] + + else: + + def _remove_received_event_from_staging_txn(txn): + received_ts = self.db_pool.simple_select_one_onecol_txn( + txn, + table="federation_inbound_events_staging", + keyvalues={ + "origin": origin, + "event_id": event_id, + }, + retcol="received_ts", + allow_none=True, + ) + self.db_pool.simple_delete_txn( + txn, + table="federation_inbound_events_staging", + keyvalues={ + "origin": origin, + "event_id": event_id, + }, + ) + + return received_ts + + return await self.db_pool.runInteraction( + "remove_received_event_from_staging", + _remove_received_event_from_staging_txn, + ) async def get_next_staged_event_id_for_room( self, diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py
index da3a7df27b..1c95c66648 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -29,13 +29,18 @@ from synapse.types import JsonDict logger = logging.getLogger(__name__) -_REPLACE_STREAM_ORDRING_SQL_COMMANDS = ( +_REPLACE_STREAM_ORDERING_SQL_COMMANDS = ( # there should be no leftover rows without a stream_ordering2, but just in case... "UPDATE events SET stream_ordering2 = stream_ordering WHERE stream_ordering2 IS NULL", - # finally, we can drop the rule and switch the columns + # now we can drop the rule and switch the columns "DROP RULE populate_stream_ordering2 ON events", "ALTER TABLE events DROP COLUMN stream_ordering", "ALTER TABLE events RENAME COLUMN stream_ordering2 TO stream_ordering", + # ... and finally, rename the indexes into place for consistency with sqlite + "ALTER INDEX event_contains_url_index2 RENAME TO event_contains_url_index", + "ALTER INDEX events_order_room2 RENAME TO events_order_room", + "ALTER INDEX events_room_stream2 RENAME TO events_room_stream", + "ALTER INDEX events_ts2 RENAME TO events_ts", ) @@ -45,6 +50,10 @@ class _BackgroundUpdates: DELETE_SOFT_FAILED_EXTREMITIES = "delete_soft_failed_extremities" POPULATE_STREAM_ORDERING2 = "populate_stream_ordering2" INDEX_STREAM_ORDERING2 = "index_stream_ordering2" + INDEX_STREAM_ORDERING2_CONTAINS_URL = "index_stream_ordering2_contains_url" + INDEX_STREAM_ORDERING2_ROOM_ORDER = "index_stream_ordering2_room_order" + INDEX_STREAM_ORDERING2_ROOM_STREAM = "index_stream_ordering2_room_stream" + INDEX_STREAM_ORDERING2_TS = "index_stream_ordering2_ts" REPLACE_STREAM_ORDERING_COLUMN = "replace_stream_ordering_column" @@ -155,12 +164,16 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): self._purged_chain_cover_index, ) + ################################################################################ + # bg updates for replacing stream_ordering with a BIGINT # (these only run on postgres.) + self.db_pool.updates.register_background_update_handler( _BackgroundUpdates.POPULATE_STREAM_ORDERING2, self._background_populate_stream_ordering2, ) + # CREATE UNIQUE INDEX events_stream_ordering ON events(stream_ordering2); self.db_pool.updates.register_background_index_update( _BackgroundUpdates.INDEX_STREAM_ORDERING2, index_name="events_stream_ordering", @@ -168,11 +181,42 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): columns=["stream_ordering2"], unique=True, ) + # CREATE INDEX event_contains_url_index ON events(room_id, topological_ordering, stream_ordering) WHERE contains_url = true AND outlier = false; + self.db_pool.updates.register_background_index_update( + _BackgroundUpdates.INDEX_STREAM_ORDERING2_CONTAINS_URL, + index_name="event_contains_url_index2", + table="events", + columns=["room_id", "topological_ordering", "stream_ordering2"], + where_clause="contains_url = true AND outlier = false", + ) + # CREATE INDEX events_order_room ON events(room_id, topological_ordering, stream_ordering); + self.db_pool.updates.register_background_index_update( + _BackgroundUpdates.INDEX_STREAM_ORDERING2_ROOM_ORDER, + index_name="events_order_room2", + table="events", + columns=["room_id", "topological_ordering", "stream_ordering2"], + ) + # CREATE INDEX events_room_stream ON events(room_id, stream_ordering); + self.db_pool.updates.register_background_index_update( + _BackgroundUpdates.INDEX_STREAM_ORDERING2_ROOM_STREAM, + index_name="events_room_stream2", + table="events", + columns=["room_id", "stream_ordering2"], + ) + # CREATE INDEX events_ts ON events(origin_server_ts, stream_ordering); + self.db_pool.updates.register_background_index_update( + _BackgroundUpdates.INDEX_STREAM_ORDERING2_TS, + index_name="events_ts2", + table="events", + columns=["origin_server_ts", "stream_ordering2"], + ) self.db_pool.updates.register_background_update_handler( _BackgroundUpdates.REPLACE_STREAM_ORDERING_COLUMN, self._background_replace_stream_ordering_column, ) + ################################################################################ + async def _background_reindex_fields_sender(self, progress, batch_size): target_min_stream_id = progress["target_min_stream_id_inclusive"] max_stream_id = progress["max_stream_id_exclusive"] @@ -1098,7 +1142,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): """Drop the old 'stream_ordering' column and rename 'stream_ordering2' into its place.""" def process(txn: Cursor) -> None: - for sql in _REPLACE_STREAM_ORDRING_SQL_COMMANDS: + for sql in _REPLACE_STREAM_ORDERING_SQL_COMMANDS: logger.info("completing stream_ordering migration: %s", sql) txn.execute(sql) diff --git a/synapse/storage/engines/_base.py b/synapse/storage/engines/_base.py
index 1882bfd9cf..20cd63c330 100644 --- a/synapse/storage/engines/_base.py +++ b/synapse/storage/engines/_base.py
@@ -49,6 +49,12 @@ class BaseDatabaseEngine(Generic[ConnectionType], metaclass=abc.ABCMeta): """ ... + @property + @abc.abstractmethod + def supports_returning(self) -> bool: + """Do we support the `RETURNING` clause in insert/update/delete?""" + ... + @abc.abstractmethod def check_database( self, db_conn: ConnectionType, allow_outdated_version: bool = False diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index 21411c5fea..30f948a0f7 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py
@@ -133,6 +133,11 @@ class PostgresEngine(BaseDatabaseEngine): """Do we support using `a = ANY(?)` and passing a list""" return True + @property + def supports_returning(self) -> bool: + """Do we support the `RETURNING` clause in insert/update/delete?""" + return True + def is_deadlock(self, error): if isinstance(error, self.module.DatabaseError): # https://www.postgresql.org/docs/current/static/errcodes-appendix.html diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py
index 5fe1b205e1..70d17d4f2c 100644 --- a/synapse/storage/engines/sqlite.py +++ b/synapse/storage/engines/sqlite.py
@@ -60,6 +60,11 @@ class Sqlite3Engine(BaseDatabaseEngine["sqlite3.Connection"]): """Do we support using `a = ANY(?)` and passing a list""" return False + @property + def supports_returning(self) -> bool: + """Do we support the `RETURNING` clause in insert/update/delete?""" + return self.module.sqlite_version_info >= (3, 35, 0) + def check_database(self, db_conn, allow_outdated_version: bool = False): if not allow_outdated_version: version = self.module.sqlite_version_info diff --git a/synapse/storage/schema/main/delta/60/01recreate_stream_ordering.sql.postgres b/synapse/storage/schema/main/delta/60/01recreate_stream_ordering.sql.postgres
index 88c9f8bd0d..b5fb763ddd 100644 --- a/synapse/storage/schema/main/delta/60/01recreate_stream_ordering.sql.postgres +++ b/synapse/storage/schema/main/delta/60/01recreate_stream_ordering.sql.postgres
@@ -31,10 +31,15 @@ CREATE OR REPLACE RULE "populate_stream_ordering2" AS INSERT INTO background_updates (ordering, update_name, progress_json) VALUES (6001, 'populate_stream_ordering2', '{}'); --- ... and another to build an index on it +-- ... and some more to build indexes on it. These aren't really interdependent +-- but the backround_updates manager can only handle a single dependency per update. INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES - (6001, 'index_stream_ordering2', '{}', 'populate_stream_ordering2'); + (6001, 'index_stream_ordering2', '{}', 'populate_stream_ordering2'), + (6001, 'index_stream_ordering2_room_order', '{}', 'index_stream_ordering2'), + (6001, 'index_stream_ordering2_contains_url', '{}', 'index_stream_ordering2_room_order'), + (6001, 'index_stream_ordering2_room_stream', '{}', 'index_stream_ordering2_contains_url'), + (6001, 'index_stream_ordering2_ts', '{}', 'index_stream_ordering2_room_stream'); -- ... and another to do the switcheroo INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES - (6001, 'replace_stream_ordering_column', '{}', 'index_stream_ordering2'); + (6003, 'replace_stream_ordering_column', '{}', 'index_stream_ordering2_ts'); diff --git a/synapse/storage/schema/main/delta/60/02change_stream_ordering_columns.sql.postgres b/synapse/storage/schema/main/delta/60/02change_stream_ordering_columns.sql.postgres new file mode 100644
index 0000000000..630c24fd9e --- /dev/null +++ b/synapse/storage/schema/main/delta/60/02change_stream_ordering_columns.sql.postgres
@@ -0,0 +1,30 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- This migration is closely related to '01recreate_stream_ordering.sql.postgres'. +-- +-- It updates the other tables which use an INTEGER to refer to a stream ordering. +-- These tables are all small enough that a re-create is tractable. +ALTER TABLE pushers ALTER COLUMN last_stream_ordering SET DATA TYPE BIGINT; +ALTER TABLE federation_stream_position ALTER COLUMN stream_id SET DATA TYPE BIGINT; + +-- these aren't actually event stream orderings, but they are numbers where 2 billion +-- is a bit limiting, application_services_state is tiny, and I don't want to ever have +-- to do this again. +ALTER TABLE application_services_state ALTER COLUMN last_txn SET DATA TYPE BIGINT; +ALTER TABLE application_services_state ALTER COLUMN read_receipt_stream_id SET DATA TYPE BIGINT; +ALTER TABLE application_services_state ALTER COLUMN presence_stream_id SET DATA TYPE BIGINT; + +