summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/config/logger.py25
-rw-r--r--synapse/federation/sender/__init__.py11
-rw-r--r--synapse/federation/sender/per_destination_queue.py11
-rw-r--r--synapse/python_dependencies.py2
-rw-r--r--synapse/storage/database.py4
-rw-r--r--synapse/storage/databases/main/purge_events.py2
-rw-r--r--synapse/storage/databases/main/schema/delta/58/15_catchup_destination_rooms.sql42
-rw-r--r--synapse/storage/databases/main/schema/delta/58/17_catchup_last_successful.sql21
-rw-r--r--synapse/storage/databases/main/transactions.py104
-rw-r--r--synapse/storage/prepare_database.py26
10 files changed, 233 insertions, 15 deletions
diff --git a/synapse/config/logger.py b/synapse/config/logger.py
index c96e6ef62a..13d6f6a3ea 100644
--- a/synapse/config/logger.py
+++ b/synapse/config/logger.py
@@ -17,6 +17,7 @@ import logging
 import logging.config
 import os
 import sys
+import threading
 from string import Template
 
 import yaml
@@ -25,6 +26,7 @@ from twisted.logger import (
     ILogObserver,
     LogBeginner,
     STDLibLogObserver,
+    eventAsText,
     globalLogBeginner,
 )
 
@@ -216,8 +218,9 @@ def _setup_stdlib_logging(config, log_config, logBeginner: LogBeginner):
     # system.
     observer = STDLibLogObserver()
 
-    def _log(event):
+    threadlocal = threading.local()
 
+    def _log(event):
         if "log_text" in event:
             if event["log_text"].startswith("DNSDatagramProtocol starting on "):
                 return
@@ -228,7 +231,25 @@ def _setup_stdlib_logging(config, log_config, logBeginner: LogBeginner):
             if event["log_text"].startswith("Timing out client"):
                 return
 
-        return observer(event)
+        # this is a workaround to make sure we don't get stack overflows when the
+        # logging system raises an error which is written to stderr which is redirected
+        # to the logging system, etc.
+        if getattr(threadlocal, "active", False):
+            # write the text of the event, if any, to the *real* stderr (which may
+            # be redirected to /dev/null, but there's not much we can do)
+            try:
+                event_text = eventAsText(event)
+                print("logging during logging: %s" % event_text, file=sys.__stderr__)
+            except Exception:
+                # gah.
+                pass
+            return
+
+        try:
+            threadlocal.active = True
+            return observer(event)
+        finally:
+            threadlocal.active = False
 
     logBeginner.beginLoggingTo([_log], redirectStandardIO=not config.no_redirect_stdio)
     if not config.no_redirect_stdio:
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 552519e82c..41a726878d 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -209,7 +209,7 @@ class FederationSender:
                     logger.debug("Sending %s to %r", event, destinations)
 
                     if destinations:
-                        self._send_pdu(event, destinations)
+                        await self._send_pdu(event, destinations)
 
                         now = self.clock.time_msec()
                         ts = await self.store.get_received_ts(event.event_id)
@@ -265,7 +265,7 @@ class FederationSender:
         finally:
             self._is_processing = False
 
-    def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None:
+    async def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None:
         # We loop through all destinations to see whether we already have
         # a transaction in progress. If we do, stick it in the pending_pdus
         # table and we'll get back to it later.
@@ -280,6 +280,13 @@ class FederationSender:
         sent_pdus_destination_dist_total.inc(len(destinations))
         sent_pdus_destination_dist_count.inc()
 
+        # track the fact that we have a PDU for these destinations,
+        # to allow us to perform catch-up later on if the remote is unreachable
+        # for a while.
+        await self.store.store_destination_rooms_entries(
+            destinations, pdu.room_id, pdu.internal_metadata.stream_ordering,
+        )
+
         for destination in destinations:
             self._get_per_destination_queue(destination).send_pdu(pdu)
 
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index defc228c23..9f0852b4a2 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -325,6 +325,17 @@ class PerDestinationQueue:
 
                     self._last_device_stream_id = device_stream_id
                     self._last_device_list_stream_id = dev_list_id
+
+                    if pending_pdus:
+                        # we sent some PDUs and it was successful, so update our
+                        # last_successful_stream_ordering in the destinations table.
+                        final_pdu = pending_pdus[-1]
+                        last_successful_stream_ordering = (
+                            final_pdu.internal_metadata.stream_ordering
+                        )
+                        await self._store.set_destination_last_successful_stream_ordering(
+                            self._destination, last_successful_stream_ordering
+                        )
                 else:
                     break
         except NotRetryingDestination as e:
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 2d995ec456..ff0c67228b 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -43,7 +43,7 @@ REQUIREMENTS = [
     "jsonschema>=2.5.1",
     "frozendict>=1",
     "unpaddedbase64>=1.1.0",
-    "canonicaljson>=1.3.0",
+    "canonicaljson>=1.4.0",
     # we use the type definitions added in signedjson 1.1.
     "signedjson>=1.1.0",
     "pynacl>=1.2.1",
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index ed8a9bffb1..79ec8f119d 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -952,7 +952,7 @@ class DatabasePool:
         key_names: Collection[str],
         key_values: Collection[Iterable[Any]],
         value_names: Collection[str],
-        value_values: Iterable[Iterable[str]],
+        value_values: Iterable[Iterable[Any]],
     ) -> None:
         """
         Upsert, many times.
@@ -981,7 +981,7 @@ class DatabasePool:
         key_names: Iterable[str],
         key_values: Collection[Iterable[Any]],
         value_names: Collection[str],
-        value_values: Iterable[Iterable[str]],
+        value_values: Iterable[Iterable[Any]],
     ) -> None:
         """
         Upsert, many times, but without native UPSERT support or batching.
diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index ea833829ae..d7a03cbf7d 100644
--- a/synapse/storage/databases/main/purge_events.py
+++ b/synapse/storage/databases/main/purge_events.py
@@ -69,6 +69,7 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
         #     room_depth
         #     state_groups
         #     state_groups_state
+        #     destination_rooms
 
         # we will build a temporary table listing the events so that we don't
         # have to keep shovelling the list back and forth across the
@@ -336,6 +337,7 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
         # and finally, the tables with an index on room_id (or no useful index)
         for table in (
             "current_state_events",
+            "destination_rooms",
             "event_backward_extremities",
             "event_forward_extremities",
             "event_json",
diff --git a/synapse/storage/databases/main/schema/delta/58/15_catchup_destination_rooms.sql b/synapse/storage/databases/main/schema/delta/58/15_catchup_destination_rooms.sql
new file mode 100644
index 0000000000..ebfbed7925
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/58/15_catchup_destination_rooms.sql
@@ -0,0 +1,42 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-- This schema delta alters the schema to enable 'catching up' remote homeservers
+-- after there has been a connectivity problem for any reason.
+
+-- This stores, for each (destination, room) pair, the stream_ordering of the
+-- latest event for that destination.
+CREATE TABLE IF NOT EXISTS destination_rooms (
+  -- the destination in question.
+  destination TEXT NOT NULL REFERENCES destinations (destination),
+  -- the ID of the room in question
+  room_id TEXT NOT NULL REFERENCES rooms (room_id),
+  -- the stream_ordering of the event
+  stream_ordering BIGINT NOT NULL,
+  PRIMARY KEY (destination, room_id)
+  -- We don't declare a foreign key on stream_ordering here because that'd mean
+  -- we'd need to either maintain an index (expensive) or do a table scan of
+  -- destination_rooms whenever we delete an event (also potentially expensive).
+  -- In addition to that, a foreign key on stream_ordering would be redundant
+  -- as this row doesn't need to refer to a specific event; if the event gets
+  -- deleted then it doesn't affect the validity of the stream_ordering here.
+);
+
+-- This index is needed to make it so that a deletion of a room (in the rooms
+-- table) can be efficient, as otherwise a table scan would need to be performed
+-- to check that no destination_rooms rows point to the room to be deleted.
+-- Also: it makes it efficient to delete all the entries for a given room ID,
+-- such as when purging a room.
+CREATE INDEX IF NOT EXISTS destination_rooms_room_id
+    ON destination_rooms (room_id);
diff --git a/synapse/storage/databases/main/schema/delta/58/17_catchup_last_successful.sql b/synapse/storage/databases/main/schema/delta/58/17_catchup_last_successful.sql
new file mode 100644
index 0000000000..a67aa5e500
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/58/17_catchup_last_successful.sql
@@ -0,0 +1,21 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- This column tracks the stream_ordering of the event that was most recently
+-- successfully transmitted to the destination.
+-- A value of NULL means that we have not sent an event successfully yet
+-- (at least, not since the introduction of this column).
+ALTER TABLE destinations
+    ADD COLUMN last_successful_stream_ordering BIGINT;
diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index 5b31aab700..c0a958252e 100644
--- a/synapse/storage/databases/main/transactions.py
+++ b/synapse/storage/databases/main/transactions.py
@@ -15,13 +15,14 @@
 
 import logging
 from collections import namedtuple
-from typing import Optional, Tuple
+from typing import Iterable, Optional, Tuple
 
 from canonicaljson import encode_canonical_json
 
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage._base import SQLBaseStore, db_to_json
-from synapse.storage.database import DatabasePool
+from synapse.storage.database import DatabasePool, LoggingTransaction
+from synapse.storage.engines import PostgresEngine, Sqlite3Engine
 from synapse.types import JsonDict
 from synapse.util.caches.expiringcache import ExpiringCache
 
@@ -164,7 +165,9 @@ class TransactionStore(SQLBaseStore):
             allow_none=True,
         )
 
-        if result and result["retry_last_ts"] > 0:
+        # check we have a row and retry_last_ts is not null or zero
+        # (retry_last_ts can't be negative)
+        if result and result["retry_last_ts"]:
             return result
         else:
             return None
@@ -273,3 +276,98 @@ class TransactionStore(SQLBaseStore):
         await self.db_pool.runInteraction(
             "_cleanup_transactions", _cleanup_transactions_txn
         )
+
+    async def store_destination_rooms_entries(
+        self, destinations: Iterable[str], room_id: str, stream_ordering: int,
+    ) -> None:
+        """
+        Updates or creates `destination_rooms` entries in batch for a single event.
+
+        Args:
+            destinations: list of destinations
+            room_id: the room_id of the event
+            stream_ordering: the stream_ordering of the event
+        """
+
+        return await self.db_pool.runInteraction(
+            "store_destination_rooms_entries",
+            self._store_destination_rooms_entries_txn,
+            destinations,
+            room_id,
+            stream_ordering,
+        )
+
+    def _store_destination_rooms_entries_txn(
+        self,
+        txn: LoggingTransaction,
+        destinations: Iterable[str],
+        room_id: str,
+        stream_ordering: int,
+    ) -> None:
+
+        # ensure we have a `destinations` row for this destination, as there is
+        # a foreign key constraint.
+        if isinstance(self.database_engine, PostgresEngine):
+            q = """
+                INSERT INTO destinations (destination)
+                    VALUES (?)
+                    ON CONFLICT DO NOTHING;
+            """
+        elif isinstance(self.database_engine, Sqlite3Engine):
+            q = """
+                INSERT OR IGNORE INTO destinations (destination)
+                    VALUES (?);
+            """
+        else:
+            raise RuntimeError("Unknown database engine")
+
+        txn.execute_batch(q, ((destination,) for destination in destinations))
+
+        rows = [(destination, room_id) for destination in destinations]
+
+        self.db_pool.simple_upsert_many_txn(
+            txn,
+            "destination_rooms",
+            ["destination", "room_id"],
+            rows,
+            ["stream_ordering"],
+            [(stream_ordering,)] * len(rows),
+        )
+
+    async def get_destination_last_successful_stream_ordering(
+        self, destination: str
+    ) -> Optional[int]:
+        """
+        Gets the stream ordering of the PDU most-recently successfully sent
+        to the specified destination, or None if this information has not been
+        tracked yet.
+
+        Args:
+            destination: the destination to query
+        """
+        return await self.db_pool.simple_select_one_onecol(
+            "destinations",
+            {"destination": destination},
+            "last_successful_stream_ordering",
+            allow_none=True,
+            desc="get_last_successful_stream_ordering",
+        )
+
+    async def set_destination_last_successful_stream_ordering(
+        self, destination: str, last_successful_stream_ordering: int
+    ) -> None:
+        """
+        Marks that we have successfully sent the PDUs up to and including the
+        one specified.
+
+        Args:
+            destination: the destination we have successfully sent to
+            last_successful_stream_ordering: the stream_ordering of the most
+                recent successfully-sent PDU
+        """
+        return await self.db_pool.simple_upsert(
+            "destinations",
+            keyvalues={"destination": destination},
+            values={"last_successful_stream_ordering": last_successful_stream_ordering},
+            desc="set_last_successful_stream_ordering",
+        )
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index ee60e2a718..a7f2dfb850 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -19,12 +19,15 @@ import logging
 import os
 import re
 from collections import Counter
-from typing import TextIO
+from typing import Optional, TextIO
 
 import attr
 
+from synapse.config.homeserver import HomeServerConfig
+from synapse.storage.engines import BaseDatabaseEngine
 from synapse.storage.engines.postgres import PostgresEngine
-from synapse.storage.types import Cursor
+from synapse.storage.types import Connection, Cursor
+from synapse.types import Collection
 
 logger = logging.getLogger(__name__)
 
@@ -63,7 +66,12 @@ UNAPPLIED_DELTA_ON_WORKER_ERROR = (
 )
 
 
-def prepare_database(db_conn, database_engine, config, databases=["main", "state"]):
+def prepare_database(
+    db_conn: Connection,
+    database_engine: BaseDatabaseEngine,
+    config: Optional[HomeServerConfig],
+    databases: Collection[str] = ["main", "state"],
+):
     """Prepares a physical database for usage. Will either create all necessary tables
     or upgrade from an older schema version.
 
@@ -73,16 +81,24 @@ def prepare_database(db_conn, database_engine, config, databases=["main", "state
     Args:
         db_conn:
         database_engine:
-        config (synapse.config.homeserver.HomeServerConfig|None):
+        config :
             application config, or None if we are connecting to an existing
             database which we expect to be configured already
-        databases (list[str]): The name of the databases that will be used
+        databases: The name of the databases that will be used
             with this physical database. Defaults to all databases.
     """
 
     try:
         cur = db_conn.cursor()
 
+        # sqlite does not automatically start transactions for DDL / SELECT statements,
+        # so we start one before running anything. This ensures that any upgrades
+        # are either applied completely, or not at all.
+        #
+        # (psycopg2 automatically starts a transaction as soon as we run any statements
+        # at all, so this is redundant but harmless there.)
+        cur.execute("BEGIN TRANSACTION")
+
         logger.info("%r: Checking existing schema version", databases)
         version_info = _get_or_create_schema_state(cur, database_engine)