From f55836929d3c64f3f8d883d8f3643a88b6c9cbca Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 29 Jun 2021 12:00:04 -0400 Subject: Do not recurse into non-spaces in the spaces summary. (#10256) Previously m.child.room events in non-space rooms would be treated as part of the room graph, but this is no longer supported. --- synapse/api/constants.py | 6 ++++++ synapse/handlers/space_summary.py | 11 +++++++++-- 2 files changed, 15 insertions(+), 2 deletions(-) (limited to 'synapse') diff --git a/synapse/api/constants.py b/synapse/api/constants.py index 414e4c019a..8363c2bb0f 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -201,6 +201,12 @@ class EventContentFields: ) +class RoomTypes: + """Understood values of the room_type field of m.room.create events.""" + + SPACE = "m.space" + + class RoomEncryptionAlgorithms: MEGOLM_V1_AES_SHA2 = "m.megolm.v1.aes-sha2" DEFAULT = MEGOLM_V1_AES_SHA2 diff --git a/synapse/handlers/space_summary.py b/synapse/handlers/space_summary.py index 17fc47ce16..266f369883 100644 --- a/synapse/handlers/space_summary.py +++ b/synapse/handlers/space_summary.py @@ -25,6 +25,7 @@ from synapse.api.constants import ( EventTypes, HistoryVisibility, Membership, + RoomTypes, ) from synapse.events import EventBase from synapse.events.utils import format_event_for_client_v2 @@ -318,7 +319,8 @@ class SpaceSummaryHandler: Returns: A tuple of: - An iterable of a single value of the room. + The room information, if the room should be returned to the + user. None, otherwise. An iterable of the sorted children events. This may be limited to a maximum size or may include all children. @@ -328,7 +330,11 @@ class SpaceSummaryHandler: room_entry = await self._build_room_entry(room_id) - # look for child rooms/spaces. + # If the room is not a space, return just the room information. + if room_entry.get("room_type") != RoomTypes.SPACE: + return room_entry, () + + # Otherwise, look for child rooms/spaces. child_events = await self._get_child_events(room_id) if suggested_only: @@ -348,6 +354,7 @@ class SpaceSummaryHandler: event_format=format_event_for_client_v2, ) ) + return room_entry, events_result async def _summarize_remote_room( -- cgit 1.5.1 From d561367c18db3300804dee182e74b4a8fb7998e6 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 29 Jun 2021 21:39:30 +0100 Subject: 1.37.1rc1 --- CHANGES.md | 9 +++++++++ changelog.d/10269.bugfix | 1 - changelog.d/10272.bugfix | 1 - synapse/__init__.py | 2 +- 4 files changed, 10 insertions(+), 3 deletions(-) delete mode 100644 changelog.d/10269.bugfix delete mode 100644 changelog.d/10272.bugfix (limited to 'synapse') diff --git a/CHANGES.md b/CHANGES.md index eac91ffe02..8de3bad906 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,12 @@ +Synapse 1.37.1rc1 (2021-06-29) +============================== + +Features +-------- + +- Handle inbound events from federation asynchronously. ([\#10269](https://github.com/matrix-org/synapse/issues/10269), [\#10272](https://github.com/matrix-org/synapse/issues/10272)) + + Synapse 1.37.0 (2021-06-29) =========================== diff --git a/changelog.d/10269.bugfix b/changelog.d/10269.bugfix deleted file mode 100644 index 3cefa05788..0000000000 --- a/changelog.d/10269.bugfix +++ /dev/null @@ -1 +0,0 @@ -Handle inbound events from federation asynchronously. diff --git a/changelog.d/10272.bugfix b/changelog.d/10272.bugfix deleted file mode 100644 index 3cefa05788..0000000000 --- a/changelog.d/10272.bugfix +++ /dev/null @@ -1 +0,0 @@ -Handle inbound events from federation asynchronously. diff --git a/synapse/__init__.py b/synapse/__init__.py index 0900492619..2070724c34 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -47,7 +47,7 @@ try: except ImportError: pass -__version__ = "1.37.1a1" +__version__ = "1.37.1rc1" if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)): # We import here so that we don't have to install a bunch of deps when -- cgit 1.5.1 From 329ef5c715d81b538e8b071de046c698a82eae10 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 30 Jun 2021 12:07:16 +0100 Subject: Fix the inbound PDU metric (#10279) This broke in #10272 --- changelog.d/10279.bugfix | 1 + synapse/federation/federation_server.py | 37 ++++++------ synapse/storage/databases/main/event_federation.py | 66 ++++++++++++++++++---- synapse/storage/engines/_base.py | 6 ++ synapse/storage/engines/postgres.py | 5 ++ synapse/storage/engines/sqlite.py | 5 ++ 6 files changed, 93 insertions(+), 27 deletions(-) create mode 100644 changelog.d/10279.bugfix (limited to 'synapse') diff --git a/changelog.d/10279.bugfix b/changelog.d/10279.bugfix new file mode 100644 index 0000000000..ac8b64ead9 --- /dev/null +++ b/changelog.d/10279.bugfix @@ -0,0 +1 @@ +Fix the prometheus `synapse_federation_server_pdu_process_time` metric. Broke in v1.37.1. diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 742d29291e..e93b7577fe 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -369,22 +369,21 @@ class FederationServer(FederationBase): async def process_pdu(pdu: EventBase) -> JsonDict: event_id = pdu.event_id - with pdu_process_time.time(): - with nested_logging_context(event_id): - try: - await self._handle_received_pdu(origin, pdu) - return {} - except FederationError as e: - logger.warning("Error handling PDU %s: %s", event_id, e) - return {"error": str(e)} - except Exception as e: - f = failure.Failure() - logger.error( - "Failed to handle PDU %s", - event_id, - exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore - ) - return {"error": str(e)} + with nested_logging_context(event_id): + try: + await self._handle_received_pdu(origin, pdu) + return {} + except FederationError as e: + logger.warning("Error handling PDU %s: %s", event_id, e) + return {"error": str(e)} + except Exception as e: + f = failure.Failure() + logger.error( + "Failed to handle PDU %s", + event_id, + exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore + ) + return {"error": str(e)} await concurrently_execute( process_pdus_for_room, pdus_by_room.keys(), TRANSACTION_CONCURRENCY_LIMIT @@ -932,9 +931,13 @@ class FederationServer(FederationBase): exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore ) - await self.store.remove_received_event_from_staging( + received_ts = await self.store.remove_received_event_from_staging( origin, event.event_id ) + if received_ts is not None: + pdu_process_time.observe( + (self._clock.time_msec() - received_ts) / 1000 + ) # We need to do this check outside the lock to avoid a race between # a new event being inserted by another instance and it attempting diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index f23f8c6ecf..f2d27ee893 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1075,16 +1075,62 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas self, origin: str, event_id: str, - ) -> None: - """Remove the given event from the staging area""" - await self.db_pool.simple_delete( - table="federation_inbound_events_staging", - keyvalues={ - "origin": origin, - "event_id": event_id, - }, - desc="remove_received_event_from_staging", - ) + ) -> Optional[int]: + """Remove the given event from the staging area. + + Returns: + The received_ts of the row that was deleted, if any. + """ + if self.db_pool.engine.supports_returning: + + def _remove_received_event_from_staging_txn(txn): + sql = """ + DELETE FROM federation_inbound_events_staging + WHERE origin = ? AND event_id = ? + RETURNING received_ts + """ + + txn.execute(sql, (origin, event_id)) + return txn.fetchone() + + row = await self.db_pool.runInteraction( + "remove_received_event_from_staging", + _remove_received_event_from_staging_txn, + db_autocommit=True, + ) + if row is None: + return None + + return row[0] + + else: + + def _remove_received_event_from_staging_txn(txn): + received_ts = self.db_pool.simple_select_one_onecol_txn( + txn, + table="federation_inbound_events_staging", + keyvalues={ + "origin": origin, + "event_id": event_id, + }, + retcol="received_ts", + allow_none=True, + ) + self.db_pool.simple_delete_txn( + txn, + table="federation_inbound_events_staging", + keyvalues={ + "origin": origin, + "event_id": event_id, + }, + ) + + return received_ts + + return await self.db_pool.runInteraction( + "remove_received_event_from_staging", + _remove_received_event_from_staging_txn, + ) async def get_next_staged_event_id_for_room( self, diff --git a/synapse/storage/engines/_base.py b/synapse/storage/engines/_base.py index 1882bfd9cf..20cd63c330 100644 --- a/synapse/storage/engines/_base.py +++ b/synapse/storage/engines/_base.py @@ -49,6 +49,12 @@ class BaseDatabaseEngine(Generic[ConnectionType], metaclass=abc.ABCMeta): """ ... + @property + @abc.abstractmethod + def supports_returning(self) -> bool: + """Do we support the `RETURNING` clause in insert/update/delete?""" + ... + @abc.abstractmethod def check_database( self, db_conn: ConnectionType, allow_outdated_version: bool = False diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index 21411c5fea..30f948a0f7 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -133,6 +133,11 @@ class PostgresEngine(BaseDatabaseEngine): """Do we support using `a = ANY(?)` and passing a list""" return True + @property + def supports_returning(self) -> bool: + """Do we support the `RETURNING` clause in insert/update/delete?""" + return True + def is_deadlock(self, error): if isinstance(error, self.module.DatabaseError): # https://www.postgresql.org/docs/current/static/errcodes-appendix.html diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py index 5fe1b205e1..70d17d4f2c 100644 --- a/synapse/storage/engines/sqlite.py +++ b/synapse/storage/engines/sqlite.py @@ -60,6 +60,11 @@ class Sqlite3Engine(BaseDatabaseEngine["sqlite3.Connection"]): """Do we support using `a = ANY(?)` and passing a list""" return False + @property + def supports_returning(self) -> bool: + """Do we support the `RETURNING` clause in insert/update/delete?""" + return self.module.sqlite_version_info >= (3, 35, 0) + def check_database(self, db_conn, allow_outdated_version: bool = False): if not allow_outdated_version: version = self.module.sqlite_version_info -- cgit 1.5.1 From aaf7d1acb8804ddeeb007e21c2b2c915bd494898 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 30 Jun 2021 07:08:42 -0400 Subject: Correct type hints for synapse.event_auth. (#10253) --- changelog.d/10253.misc | 1 + synapse/api/auth.py | 5 ++-- synapse/event_auth.py | 5 ++-- synapse/events/__init__.py | 2 +- synapse/events/builder.py | 69 +++++++++++++++++++++++---------------------- synapse/handlers/message.py | 7 +++++ 6 files changed, 51 insertions(+), 38 deletions(-) create mode 100644 changelog.d/10253.misc (limited to 'synapse') diff --git a/changelog.d/10253.misc b/changelog.d/10253.misc new file mode 100644 index 0000000000..44d9217245 --- /dev/null +++ b/changelog.d/10253.misc @@ -0,0 +1 @@ +Fix type hints for computing auth events. diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 29cf257633..f8b068e563 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union import pymacaroons from netaddr import IPAddress @@ -31,6 +31,7 @@ from synapse.api.errors import ( from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.appservice import ApplicationService from synapse.events import EventBase +from synapse.events.builder import EventBuilder from synapse.http import get_request_user_agent from synapse.http.site import SynapseRequest from synapse.logging import opentracing as opentracing @@ -490,7 +491,7 @@ class Auth: def compute_auth_events( self, - event, + event: Union[EventBase, EventBuilder], current_state_ids: StateMap[str], for_verification: bool = False, ) -> List[str]: diff --git a/synapse/event_auth.py b/synapse/event_auth.py index 33d7c60241..89bcf81515 100644 --- a/synapse/event_auth.py +++ b/synapse/event_auth.py @@ -14,7 +14,7 @@ # limitations under the License. import logging -from typing import Any, Dict, List, Optional, Set, Tuple +from typing import Any, Dict, List, Optional, Set, Tuple, Union from canonicaljson import encode_canonical_json from signedjson.key import decode_verify_key_bytes @@ -29,6 +29,7 @@ from synapse.api.room_versions import ( RoomVersion, ) from synapse.events import EventBase +from synapse.events.builder import EventBuilder from synapse.types import StateMap, UserID, get_domain_from_id logger = logging.getLogger(__name__) @@ -724,7 +725,7 @@ def get_public_keys(invite_event: EventBase) -> List[Dict[str, Any]]: return public_keys -def auth_types_for_event(event: EventBase) -> Set[Tuple[str, str]]: +def auth_types_for_event(event: Union[EventBase, EventBuilder]) -> Set[Tuple[str, str]]: """Given an event, return a list of (EventType, StateKey) that may be needed to auth the event. The returned list may be a superset of what would actually be required depending on the full state of the room. diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 0cb9c1cc1e..6286ad999a 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -118,7 +118,7 @@ class _EventInternalMetadata: proactively_send = DictProperty("proactively_send") # type: bool redacted = DictProperty("redacted") # type: bool txn_id = DictProperty("txn_id") # type: str - token_id = DictProperty("token_id") # type: str + token_id = DictProperty("token_id") # type: int historical = DictProperty("historical") # type: bool # XXX: These are set by StreamWorkerStore._set_before_and_after. diff --git a/synapse/events/builder.py b/synapse/events/builder.py index 81bf8615b7..fb48ec8541 100644 --- a/synapse/events/builder.py +++ b/synapse/events/builder.py @@ -12,12 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import Any, Dict, List, Optional, Tuple, Union +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union import attr from nacl.signing import SigningKey -from synapse.api.auth import Auth from synapse.api.constants import MAX_DEPTH from synapse.api.errors import UnsupportedRoomVersionError from synapse.api.room_versions import ( @@ -34,10 +33,14 @@ from synapse.types import EventID, JsonDict from synapse.util import Clock from synapse.util.stringutils import random_string +if TYPE_CHECKING: + from synapse.api.auth import Auth + from synapse.server import HomeServer + logger = logging.getLogger(__name__) -@attr.s(slots=True, cmp=False, frozen=True) +@attr.s(slots=True, cmp=False, frozen=True, auto_attribs=True) class EventBuilder: """A format independent event builder used to build up the event content before signing the event. @@ -62,31 +65,30 @@ class EventBuilder: _signing_key: The signing key to use to sign the event as the server """ - _state = attr.ib(type=StateHandler) - _auth = attr.ib(type=Auth) - _store = attr.ib(type=DataStore) - _clock = attr.ib(type=Clock) - _hostname = attr.ib(type=str) - _signing_key = attr.ib(type=SigningKey) + _state: StateHandler + _auth: "Auth" + _store: DataStore + _clock: Clock + _hostname: str + _signing_key: SigningKey - room_version = attr.ib(type=RoomVersion) + room_version: RoomVersion - room_id = attr.ib(type=str) - type = attr.ib(type=str) - sender = attr.ib(type=str) + room_id: str + type: str + sender: str - content = attr.ib(default=attr.Factory(dict), type=JsonDict) - unsigned = attr.ib(default=attr.Factory(dict), type=JsonDict) + content: JsonDict = attr.Factory(dict) + unsigned: JsonDict = attr.Factory(dict) # These only exist on a subset of events, so they raise AttributeError if # someone tries to get them when they don't exist. - _state_key = attr.ib(default=None, type=Optional[str]) - _redacts = attr.ib(default=None, type=Optional[str]) - _origin_server_ts = attr.ib(default=None, type=Optional[int]) + _state_key: Optional[str] = None + _redacts: Optional[str] = None + _origin_server_ts: Optional[int] = None - internal_metadata = attr.ib( - default=attr.Factory(lambda: _EventInternalMetadata({})), - type=_EventInternalMetadata, + internal_metadata: _EventInternalMetadata = attr.Factory( + lambda: _EventInternalMetadata({}) ) @property @@ -184,7 +186,7 @@ class EventBuilder: class EventBuilderFactory: - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): self.clock = hs.get_clock() self.hostname = hs.hostname self.signing_key = hs.signing_key @@ -193,15 +195,14 @@ class EventBuilderFactory: self.state = hs.get_state_handler() self.auth = hs.get_auth() - def new(self, room_version, key_values): + def new(self, room_version: str, key_values: dict) -> EventBuilder: """Generate an event builder appropriate for the given room version Deprecated: use for_room_version with a RoomVersion object instead Args: - room_version (str): Version of the room that we're creating an event builder - for - key_values (dict): Fields used as the basis of the new event + room_version: Version of the room that we're creating an event builder for + key_values: Fields used as the basis of the new event Returns: EventBuilder @@ -212,13 +213,15 @@ class EventBuilderFactory: raise UnsupportedRoomVersionError() return self.for_room_version(v, key_values) - def for_room_version(self, room_version, key_values): + def for_room_version( + self, room_version: RoomVersion, key_values: dict + ) -> EventBuilder: """Generate an event builder appropriate for the given room version Args: - room_version (synapse.api.room_versions.RoomVersion): + room_version: Version of the room that we're creating an event builder for - key_values (dict): Fields used as the basis of the new event + key_values: Fields used as the basis of the new event Returns: EventBuilder @@ -286,15 +289,15 @@ def create_local_event_from_event_dict( _event_id_counter = 0 -def _create_event_id(clock, hostname): +def _create_event_id(clock: Clock, hostname: str) -> str: """Create a new event ID Args: - clock (Clock) - hostname (str): The server name for the event ID + clock + hostname: The server name for the event ID Returns: - str + The new event ID """ global _event_id_counter diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index db12abd59d..364c5cd2d3 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -509,6 +509,8 @@ class EventCreationHandler: Should normally be left as None, which will cause them to be calculated based on the room state at the prev_events. + If non-None, prev_event_ids must also be provided. + require_consent: Whether to check if the requester has consented to the privacy policy. @@ -581,6 +583,9 @@ class EventCreationHandler: # Strip down the auth_event_ids to only what we need to auth the event. # For example, we don't need extra m.room.member that don't match event.sender if auth_event_ids is not None: + # If auth events are provided, prev events must be also. + assert prev_event_ids is not None + temp_event = await builder.build( prev_event_ids=prev_event_ids, auth_event_ids=auth_event_ids, @@ -784,6 +789,8 @@ class EventCreationHandler: The event ids to use as the auth_events for the new event. Should normally be left as None, which will cause them to be calculated based on the room state at the prev_events. + + If non-None, prev_event_ids must also be provided. ratelimit: Whether to rate limit this send. txn_id: The transaction ID. ignore_shadow_ban: True if shadow-banned users should be allowed to -- cgit 1.5.1 From f193034d591f6fc38d6588a1c4e4ac86543e9a1b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 30 Jun 2021 12:24:13 +0100 Subject: 1.37.1 --- CHANGES.md | 6 ++++++ debian/changelog | 6 ++++++ synapse/__init__.py | 2 +- 3 files changed, 13 insertions(+), 1 deletion(-) (limited to 'synapse') diff --git a/CHANGES.md b/CHANGES.md index 8de3bad906..defec46f33 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,9 @@ +Synapse 1.37.1 (2021-06-30) +=========================== + +No significant changes. + + Synapse 1.37.1rc1 (2021-06-29) ============================== diff --git a/debian/changelog b/debian/changelog index cf190b7dba..35a0cddeaf 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,9 @@ +matrix-synapse-py3 (1.37.1) stable; urgency=medium + + * New synapse release 1.37.1. + + -- Synapse Packaging team Wed, 30 Jun 2021 12:24:06 +0100 + matrix-synapse-py3 (1.37.0) stable; urgency=medium * New synapse release 1.37.0. diff --git a/synapse/__init__.py b/synapse/__init__.py index 2070724c34..1bd03462ac 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -47,7 +47,7 @@ try: except ImportError: pass -__version__ = "1.37.1rc1" +__version__ = "1.37.1" if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)): # We import here so that we don't have to install a bunch of deps when -- cgit 1.5.1 From 859dc05b3692a3672c1a0db8deaaa9274b6aa6f5 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Wed, 30 Jun 2021 15:01:24 +0100 Subject: Rebuild other indexes using `stream_ordering` (#10282) We need to rebuild *all* of the indexes that use the current `stream_ordering` column. --- changelog.d/10282.bugfix | 1 + .../storage/databases/main/events_bg_updates.py | 50 ++++++++++++++++++++-- .../60/01recreate_stream_ordering.sql.postgres | 11 +++-- 3 files changed, 56 insertions(+), 6 deletions(-) create mode 100644 changelog.d/10282.bugfix (limited to 'synapse') diff --git a/changelog.d/10282.bugfix b/changelog.d/10282.bugfix new file mode 100644 index 0000000000..7ebda7cdc2 --- /dev/null +++ b/changelog.d/10282.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug where Synapse would return errors after 231 events were handled by the server. diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index da3a7df27b..1c95c66648 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -29,13 +29,18 @@ from synapse.types import JsonDict logger = logging.getLogger(__name__) -_REPLACE_STREAM_ORDRING_SQL_COMMANDS = ( +_REPLACE_STREAM_ORDERING_SQL_COMMANDS = ( # there should be no leftover rows without a stream_ordering2, but just in case... "UPDATE events SET stream_ordering2 = stream_ordering WHERE stream_ordering2 IS NULL", - # finally, we can drop the rule and switch the columns + # now we can drop the rule and switch the columns "DROP RULE populate_stream_ordering2 ON events", "ALTER TABLE events DROP COLUMN stream_ordering", "ALTER TABLE events RENAME COLUMN stream_ordering2 TO stream_ordering", + # ... and finally, rename the indexes into place for consistency with sqlite + "ALTER INDEX event_contains_url_index2 RENAME TO event_contains_url_index", + "ALTER INDEX events_order_room2 RENAME TO events_order_room", + "ALTER INDEX events_room_stream2 RENAME TO events_room_stream", + "ALTER INDEX events_ts2 RENAME TO events_ts", ) @@ -45,6 +50,10 @@ class _BackgroundUpdates: DELETE_SOFT_FAILED_EXTREMITIES = "delete_soft_failed_extremities" POPULATE_STREAM_ORDERING2 = "populate_stream_ordering2" INDEX_STREAM_ORDERING2 = "index_stream_ordering2" + INDEX_STREAM_ORDERING2_CONTAINS_URL = "index_stream_ordering2_contains_url" + INDEX_STREAM_ORDERING2_ROOM_ORDER = "index_stream_ordering2_room_order" + INDEX_STREAM_ORDERING2_ROOM_STREAM = "index_stream_ordering2_room_stream" + INDEX_STREAM_ORDERING2_TS = "index_stream_ordering2_ts" REPLACE_STREAM_ORDERING_COLUMN = "replace_stream_ordering_column" @@ -155,12 +164,16 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): self._purged_chain_cover_index, ) + ################################################################################ + # bg updates for replacing stream_ordering with a BIGINT # (these only run on postgres.) + self.db_pool.updates.register_background_update_handler( _BackgroundUpdates.POPULATE_STREAM_ORDERING2, self._background_populate_stream_ordering2, ) + # CREATE UNIQUE INDEX events_stream_ordering ON events(stream_ordering2); self.db_pool.updates.register_background_index_update( _BackgroundUpdates.INDEX_STREAM_ORDERING2, index_name="events_stream_ordering", @@ -168,11 +181,42 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): columns=["stream_ordering2"], unique=True, ) + # CREATE INDEX event_contains_url_index ON events(room_id, topological_ordering, stream_ordering) WHERE contains_url = true AND outlier = false; + self.db_pool.updates.register_background_index_update( + _BackgroundUpdates.INDEX_STREAM_ORDERING2_CONTAINS_URL, + index_name="event_contains_url_index2", + table="events", + columns=["room_id", "topological_ordering", "stream_ordering2"], + where_clause="contains_url = true AND outlier = false", + ) + # CREATE INDEX events_order_room ON events(room_id, topological_ordering, stream_ordering); + self.db_pool.updates.register_background_index_update( + _BackgroundUpdates.INDEX_STREAM_ORDERING2_ROOM_ORDER, + index_name="events_order_room2", + table="events", + columns=["room_id", "topological_ordering", "stream_ordering2"], + ) + # CREATE INDEX events_room_stream ON events(room_id, stream_ordering); + self.db_pool.updates.register_background_index_update( + _BackgroundUpdates.INDEX_STREAM_ORDERING2_ROOM_STREAM, + index_name="events_room_stream2", + table="events", + columns=["room_id", "stream_ordering2"], + ) + # CREATE INDEX events_ts ON events(origin_server_ts, stream_ordering); + self.db_pool.updates.register_background_index_update( + _BackgroundUpdates.INDEX_STREAM_ORDERING2_TS, + index_name="events_ts2", + table="events", + columns=["origin_server_ts", "stream_ordering2"], + ) self.db_pool.updates.register_background_update_handler( _BackgroundUpdates.REPLACE_STREAM_ORDERING_COLUMN, self._background_replace_stream_ordering_column, ) + ################################################################################ + async def _background_reindex_fields_sender(self, progress, batch_size): target_min_stream_id = progress["target_min_stream_id_inclusive"] max_stream_id = progress["max_stream_id_exclusive"] @@ -1098,7 +1142,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): """Drop the old 'stream_ordering' column and rename 'stream_ordering2' into its place.""" def process(txn: Cursor) -> None: - for sql in _REPLACE_STREAM_ORDRING_SQL_COMMANDS: + for sql in _REPLACE_STREAM_ORDERING_SQL_COMMANDS: logger.info("completing stream_ordering migration: %s", sql) txn.execute(sql) diff --git a/synapse/storage/schema/main/delta/60/01recreate_stream_ordering.sql.postgres b/synapse/storage/schema/main/delta/60/01recreate_stream_ordering.sql.postgres index 88c9f8bd0d..b5fb763ddd 100644 --- a/synapse/storage/schema/main/delta/60/01recreate_stream_ordering.sql.postgres +++ b/synapse/storage/schema/main/delta/60/01recreate_stream_ordering.sql.postgres @@ -31,10 +31,15 @@ CREATE OR REPLACE RULE "populate_stream_ordering2" AS INSERT INTO background_updates (ordering, update_name, progress_json) VALUES (6001, 'populate_stream_ordering2', '{}'); --- ... and another to build an index on it +-- ... and some more to build indexes on it. These aren't really interdependent +-- but the backround_updates manager can only handle a single dependency per update. INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES - (6001, 'index_stream_ordering2', '{}', 'populate_stream_ordering2'); + (6001, 'index_stream_ordering2', '{}', 'populate_stream_ordering2'), + (6001, 'index_stream_ordering2_room_order', '{}', 'index_stream_ordering2'), + (6001, 'index_stream_ordering2_contains_url', '{}', 'index_stream_ordering2_room_order'), + (6001, 'index_stream_ordering2_room_stream', '{}', 'index_stream_ordering2_contains_url'), + (6001, 'index_stream_ordering2_ts', '{}', 'index_stream_ordering2_room_stream'); -- ... and another to do the switcheroo INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES - (6001, 'replace_stream_ordering_column', '{}', 'index_stream_ordering2'); + (6003, 'replace_stream_ordering_column', '{}', 'index_stream_ordering2_ts'); -- cgit 1.5.1 From b6dbf89fae74af25ce1a6993de74e0e50705f105 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Wed, 30 Jun 2021 17:27:20 +0100 Subject: Change more stream_ordering columns to BIGINT (#10286) --- changelog.d/10286.bugfix | 1 + .../02change_stream_ordering_columns.sql.postgres | 30 ++++++++++++++++++++++ 2 files changed, 31 insertions(+) create mode 100644 changelog.d/10286.bugfix create mode 100644 synapse/storage/schema/main/delta/60/02change_stream_ordering_columns.sql.postgres (limited to 'synapse') diff --git a/changelog.d/10286.bugfix b/changelog.d/10286.bugfix new file mode 100644 index 0000000000..7ebda7cdc2 --- /dev/null +++ b/changelog.d/10286.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug where Synapse would return errors after 231 events were handled by the server. diff --git a/synapse/storage/schema/main/delta/60/02change_stream_ordering_columns.sql.postgres b/synapse/storage/schema/main/delta/60/02change_stream_ordering_columns.sql.postgres new file mode 100644 index 0000000000..630c24fd9e --- /dev/null +++ b/synapse/storage/schema/main/delta/60/02change_stream_ordering_columns.sql.postgres @@ -0,0 +1,30 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- This migration is closely related to '01recreate_stream_ordering.sql.postgres'. +-- +-- It updates the other tables which use an INTEGER to refer to a stream ordering. +-- These tables are all small enough that a re-create is tractable. +ALTER TABLE pushers ALTER COLUMN last_stream_ordering SET DATA TYPE BIGINT; +ALTER TABLE federation_stream_position ALTER COLUMN stream_id SET DATA TYPE BIGINT; + +-- these aren't actually event stream orderings, but they are numbers where 2 billion +-- is a bit limiting, application_services_state is tiny, and I don't want to ever have +-- to do this again. +ALTER TABLE application_services_state ALTER COLUMN last_txn SET DATA TYPE BIGINT; +ALTER TABLE application_services_state ALTER COLUMN read_receipt_stream_id SET DATA TYPE BIGINT; +ALTER TABLE application_services_state ALTER COLUMN presence_stream_id SET DATA TYPE BIGINT; + + -- cgit 1.5.1