diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/config/logger.py | 25 | ||||
-rw-r--r-- | synapse/federation/sender/__init__.py | 11 | ||||
-rw-r--r-- | synapse/federation/sender/per_destination_queue.py | 11 | ||||
-rw-r--r-- | synapse/python_dependencies.py | 2 | ||||
-rw-r--r-- | synapse/storage/database.py | 4 | ||||
-rw-r--r-- | synapse/storage/databases/main/purge_events.py | 2 | ||||
-rw-r--r-- | synapse/storage/databases/main/schema/delta/58/15_catchup_destination_rooms.sql | 42 | ||||
-rw-r--r-- | synapse/storage/databases/main/schema/delta/58/17_catchup_last_successful.sql | 21 | ||||
-rw-r--r-- | synapse/storage/databases/main/transactions.py | 104 | ||||
-rw-r--r-- | synapse/storage/prepare_database.py | 26 |
10 files changed, 233 insertions, 15 deletions
diff --git a/synapse/config/logger.py b/synapse/config/logger.py index c96e6ef62a..13d6f6a3ea 100644 --- a/synapse/config/logger.py +++ b/synapse/config/logger.py @@ -17,6 +17,7 @@ import logging import logging.config import os import sys +import threading from string import Template import yaml @@ -25,6 +26,7 @@ from twisted.logger import ( ILogObserver, LogBeginner, STDLibLogObserver, + eventAsText, globalLogBeginner, ) @@ -216,8 +218,9 @@ def _setup_stdlib_logging(config, log_config, logBeginner: LogBeginner): # system. observer = STDLibLogObserver() - def _log(event): + threadlocal = threading.local() + def _log(event): if "log_text" in event: if event["log_text"].startswith("DNSDatagramProtocol starting on "): return @@ -228,7 +231,25 @@ def _setup_stdlib_logging(config, log_config, logBeginner: LogBeginner): if event["log_text"].startswith("Timing out client"): return - return observer(event) + # this is a workaround to make sure we don't get stack overflows when the + # logging system raises an error which is written to stderr which is redirected + # to the logging system, etc. + if getattr(threadlocal, "active", False): + # write the text of the event, if any, to the *real* stderr (which may + # be redirected to /dev/null, but there's not much we can do) + try: + event_text = eventAsText(event) + print("logging during logging: %s" % event_text, file=sys.__stderr__) + except Exception: + # gah. + pass + return + + try: + threadlocal.active = True + return observer(event) + finally: + threadlocal.active = False logBeginner.beginLoggingTo([_log], redirectStandardIO=not config.no_redirect_stdio) if not config.no_redirect_stdio: diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 552519e82c..41a726878d 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -209,7 +209,7 @@ class FederationSender: logger.debug("Sending %s to %r", event, destinations) if destinations: - self._send_pdu(event, destinations) + await self._send_pdu(event, destinations) now = self.clock.time_msec() ts = await self.store.get_received_ts(event.event_id) @@ -265,7 +265,7 @@ class FederationSender: finally: self._is_processing = False - def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None: + async def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None: # We loop through all destinations to see whether we already have # a transaction in progress. If we do, stick it in the pending_pdus # table and we'll get back to it later. @@ -280,6 +280,13 @@ class FederationSender: sent_pdus_destination_dist_total.inc(len(destinations)) sent_pdus_destination_dist_count.inc() + # track the fact that we have a PDU for these destinations, + # to allow us to perform catch-up later on if the remote is unreachable + # for a while. + await self.store.store_destination_rooms_entries( + destinations, pdu.room_id, pdu.internal_metadata.stream_ordering, + ) + for destination in destinations: self._get_per_destination_queue(destination).send_pdu(pdu) diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index defc228c23..9f0852b4a2 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -325,6 +325,17 @@ class PerDestinationQueue: self._last_device_stream_id = device_stream_id self._last_device_list_stream_id = dev_list_id + + if pending_pdus: + # we sent some PDUs and it was successful, so update our + # last_successful_stream_ordering in the destinations table. + final_pdu = pending_pdus[-1] + last_successful_stream_ordering = ( + final_pdu.internal_metadata.stream_ordering + ) + await self._store.set_destination_last_successful_stream_ordering( + self._destination, last_successful_stream_ordering + ) else: break except NotRetryingDestination as e: diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index 2d995ec456..ff0c67228b 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -43,7 +43,7 @@ REQUIREMENTS = [ "jsonschema>=2.5.1", "frozendict>=1", "unpaddedbase64>=1.1.0", - "canonicaljson>=1.3.0", + "canonicaljson>=1.4.0", # we use the type definitions added in signedjson 1.1. "signedjson>=1.1.0", "pynacl>=1.2.1", diff --git a/synapse/storage/database.py b/synapse/storage/database.py index ed8a9bffb1..79ec8f119d 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -952,7 +952,7 @@ class DatabasePool: key_names: Collection[str], key_values: Collection[Iterable[Any]], value_names: Collection[str], - value_values: Iterable[Iterable[str]], + value_values: Iterable[Iterable[Any]], ) -> None: """ Upsert, many times. @@ -981,7 +981,7 @@ class DatabasePool: key_names: Iterable[str], key_values: Collection[Iterable[Any]], value_names: Collection[str], - value_values: Iterable[Iterable[str]], + value_values: Iterable[Iterable[Any]], ) -> None: """ Upsert, many times, but without native UPSERT support or batching. diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py index ea833829ae..d7a03cbf7d 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py @@ -69,6 +69,7 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore): # room_depth # state_groups # state_groups_state + # destination_rooms # we will build a temporary table listing the events so that we don't # have to keep shovelling the list back and forth across the @@ -336,6 +337,7 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore): # and finally, the tables with an index on room_id (or no useful index) for table in ( "current_state_events", + "destination_rooms", "event_backward_extremities", "event_forward_extremities", "event_json", diff --git a/synapse/storage/databases/main/schema/delta/58/15_catchup_destination_rooms.sql b/synapse/storage/databases/main/schema/delta/58/15_catchup_destination_rooms.sql new file mode 100644 index 0000000000..ebfbed7925 --- /dev/null +++ b/synapse/storage/databases/main/schema/delta/58/15_catchup_destination_rooms.sql @@ -0,0 +1,42 @@ +/* Copyright 2020 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +-- This schema delta alters the schema to enable 'catching up' remote homeservers +-- after there has been a connectivity problem for any reason. + +-- This stores, for each (destination, room) pair, the stream_ordering of the +-- latest event for that destination. +CREATE TABLE IF NOT EXISTS destination_rooms ( + -- the destination in question. + destination TEXT NOT NULL REFERENCES destinations (destination), + -- the ID of the room in question + room_id TEXT NOT NULL REFERENCES rooms (room_id), + -- the stream_ordering of the event + stream_ordering BIGINT NOT NULL, + PRIMARY KEY (destination, room_id) + -- We don't declare a foreign key on stream_ordering here because that'd mean + -- we'd need to either maintain an index (expensive) or do a table scan of + -- destination_rooms whenever we delete an event (also potentially expensive). + -- In addition to that, a foreign key on stream_ordering would be redundant + -- as this row doesn't need to refer to a specific event; if the event gets + -- deleted then it doesn't affect the validity of the stream_ordering here. +); + +-- This index is needed to make it so that a deletion of a room (in the rooms +-- table) can be efficient, as otherwise a table scan would need to be performed +-- to check that no destination_rooms rows point to the room to be deleted. +-- Also: it makes it efficient to delete all the entries for a given room ID, +-- such as when purging a room. +CREATE INDEX IF NOT EXISTS destination_rooms_room_id + ON destination_rooms (room_id); diff --git a/synapse/storage/databases/main/schema/delta/58/17_catchup_last_successful.sql b/synapse/storage/databases/main/schema/delta/58/17_catchup_last_successful.sql new file mode 100644 index 0000000000..a67aa5e500 --- /dev/null +++ b/synapse/storage/databases/main/schema/delta/58/17_catchup_last_successful.sql @@ -0,0 +1,21 @@ +/* Copyright 2020 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- This column tracks the stream_ordering of the event that was most recently +-- successfully transmitted to the destination. +-- A value of NULL means that we have not sent an event successfully yet +-- (at least, not since the introduction of this column). +ALTER TABLE destinations + ADD COLUMN last_successful_stream_ordering BIGINT; diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index 5b31aab700..c0a958252e 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -15,13 +15,14 @@ import logging from collections import namedtuple -from typing import Optional, Tuple +from typing import Iterable, Optional, Tuple from canonicaljson import encode_canonical_json from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage._base import SQLBaseStore, db_to_json -from synapse.storage.database import DatabasePool +from synapse.storage.database import DatabasePool, LoggingTransaction +from synapse.storage.engines import PostgresEngine, Sqlite3Engine from synapse.types import JsonDict from synapse.util.caches.expiringcache import ExpiringCache @@ -164,7 +165,9 @@ class TransactionStore(SQLBaseStore): allow_none=True, ) - if result and result["retry_last_ts"] > 0: + # check we have a row and retry_last_ts is not null or zero + # (retry_last_ts can't be negative) + if result and result["retry_last_ts"]: return result else: return None @@ -273,3 +276,98 @@ class TransactionStore(SQLBaseStore): await self.db_pool.runInteraction( "_cleanup_transactions", _cleanup_transactions_txn ) + + async def store_destination_rooms_entries( + self, destinations: Iterable[str], room_id: str, stream_ordering: int, + ) -> None: + """ + Updates or creates `destination_rooms` entries in batch for a single event. + + Args: + destinations: list of destinations + room_id: the room_id of the event + stream_ordering: the stream_ordering of the event + """ + + return await self.db_pool.runInteraction( + "store_destination_rooms_entries", + self._store_destination_rooms_entries_txn, + destinations, + room_id, + stream_ordering, + ) + + def _store_destination_rooms_entries_txn( + self, + txn: LoggingTransaction, + destinations: Iterable[str], + room_id: str, + stream_ordering: int, + ) -> None: + + # ensure we have a `destinations` row for this destination, as there is + # a foreign key constraint. + if isinstance(self.database_engine, PostgresEngine): + q = """ + INSERT INTO destinations (destination) + VALUES (?) + ON CONFLICT DO NOTHING; + """ + elif isinstance(self.database_engine, Sqlite3Engine): + q = """ + INSERT OR IGNORE INTO destinations (destination) + VALUES (?); + """ + else: + raise RuntimeError("Unknown database engine") + + txn.execute_batch(q, ((destination,) for destination in destinations)) + + rows = [(destination, room_id) for destination in destinations] + + self.db_pool.simple_upsert_many_txn( + txn, + "destination_rooms", + ["destination", "room_id"], + rows, + ["stream_ordering"], + [(stream_ordering,)] * len(rows), + ) + + async def get_destination_last_successful_stream_ordering( + self, destination: str + ) -> Optional[int]: + """ + Gets the stream ordering of the PDU most-recently successfully sent + to the specified destination, or None if this information has not been + tracked yet. + + Args: + destination: the destination to query + """ + return await self.db_pool.simple_select_one_onecol( + "destinations", + {"destination": destination}, + "last_successful_stream_ordering", + allow_none=True, + desc="get_last_successful_stream_ordering", + ) + + async def set_destination_last_successful_stream_ordering( + self, destination: str, last_successful_stream_ordering: int + ) -> None: + """ + Marks that we have successfully sent the PDUs up to and including the + one specified. + + Args: + destination: the destination we have successfully sent to + last_successful_stream_ordering: the stream_ordering of the most + recent successfully-sent PDU + """ + return await self.db_pool.simple_upsert( + "destinations", + keyvalues={"destination": destination}, + values={"last_successful_stream_ordering": last_successful_stream_ordering}, + desc="set_last_successful_stream_ordering", + ) diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index ee60e2a718..a7f2dfb850 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -19,12 +19,15 @@ import logging import os import re from collections import Counter -from typing import TextIO +from typing import Optional, TextIO import attr +from synapse.config.homeserver import HomeServerConfig +from synapse.storage.engines import BaseDatabaseEngine from synapse.storage.engines.postgres import PostgresEngine -from synapse.storage.types import Cursor +from synapse.storage.types import Connection, Cursor +from synapse.types import Collection logger = logging.getLogger(__name__) @@ -63,7 +66,12 @@ UNAPPLIED_DELTA_ON_WORKER_ERROR = ( ) -def prepare_database(db_conn, database_engine, config, databases=["main", "state"]): +def prepare_database( + db_conn: Connection, + database_engine: BaseDatabaseEngine, + config: Optional[HomeServerConfig], + databases: Collection[str] = ["main", "state"], +): """Prepares a physical database for usage. Will either create all necessary tables or upgrade from an older schema version. @@ -73,16 +81,24 @@ def prepare_database(db_conn, database_engine, config, databases=["main", "state Args: db_conn: database_engine: - config (synapse.config.homeserver.HomeServerConfig|None): + config : application config, or None if we are connecting to an existing database which we expect to be configured already - databases (list[str]): The name of the databases that will be used + databases: The name of the databases that will be used with this physical database. Defaults to all databases. """ try: cur = db_conn.cursor() + # sqlite does not automatically start transactions for DDL / SELECT statements, + # so we start one before running anything. This ensures that any upgrades + # are either applied completely, or not at all. + # + # (psycopg2 automatically starts a transaction as soon as we run any statements + # at all, so this is redundant but harmless there.) + cur.execute("BEGIN TRANSACTION") + logger.info("%r: Checking existing schema version", databases) version_info = _get_or_create_schema_state(cur, database_engine) |