diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py
index 2a2c9e6f13..bb33345be6 100644
--- a/synapse/api/filtering.py
+++ b/synapse/api/filtering.py
@@ -15,10 +15,10 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import json
from typing import List
import jsonschema
-from canonicaljson import json
from jsonschema import FormatChecker
from synapse.api.constants import EventContentFields
diff --git a/synapse/app/admin_cmd.py b/synapse/app/admin_cmd.py
index b6c9085670..7d309b1bb0 100644
--- a/synapse/app/admin_cmd.py
+++ b/synapse/app/admin_cmd.py
@@ -14,13 +14,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
+import json
import logging
import os
import sys
import tempfile
-from canonicaljson import json
-
from twisted.internet import defer, task
import synapse
diff --git a/synapse/config/logger.py b/synapse/config/logger.py
index c96e6ef62a..13d6f6a3ea 100644
--- a/synapse/config/logger.py
+++ b/synapse/config/logger.py
@@ -17,6 +17,7 @@ import logging
import logging.config
import os
import sys
+import threading
from string import Template
import yaml
@@ -25,6 +26,7 @@ from twisted.logger import (
ILogObserver,
LogBeginner,
STDLibLogObserver,
+ eventAsText,
globalLogBeginner,
)
@@ -216,8 +218,9 @@ def _setup_stdlib_logging(config, log_config, logBeginner: LogBeginner):
# system.
observer = STDLibLogObserver()
- def _log(event):
+ threadlocal = threading.local()
+ def _log(event):
if "log_text" in event:
if event["log_text"].startswith("DNSDatagramProtocol starting on "):
return
@@ -228,7 +231,25 @@ def _setup_stdlib_logging(config, log_config, logBeginner: LogBeginner):
if event["log_text"].startswith("Timing out client"):
return
- return observer(event)
+ # this is a workaround to make sure we don't get stack overflows when the
+ # logging system raises an error which is written to stderr which is redirected
+ # to the logging system, etc.
+ if getattr(threadlocal, "active", False):
+ # write the text of the event, if any, to the *real* stderr (which may
+ # be redirected to /dev/null, but there's not much we can do)
+ try:
+ event_text = eventAsText(event)
+ print("logging during logging: %s" % event_text, file=sys.__stderr__)
+ except Exception:
+ # gah.
+ pass
+ return
+
+ try:
+ threadlocal.active = True
+ return observer(event)
+ finally:
+ threadlocal.active = False
logBeginner.beginLoggingTo([_log], redirectStandardIO=not config.no_redirect_stdio)
if not config.no_redirect_stdio:
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 552519e82c..41a726878d 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -209,7 +209,7 @@ class FederationSender:
logger.debug("Sending %s to %r", event, destinations)
if destinations:
- self._send_pdu(event, destinations)
+ await self._send_pdu(event, destinations)
now = self.clock.time_msec()
ts = await self.store.get_received_ts(event.event_id)
@@ -265,7 +265,7 @@ class FederationSender:
finally:
self._is_processing = False
- def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None:
+ async def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None:
# We loop through all destinations to see whether we already have
# a transaction in progress. If we do, stick it in the pending_pdus
# table and we'll get back to it later.
@@ -280,6 +280,13 @@ class FederationSender:
sent_pdus_destination_dist_total.inc(len(destinations))
sent_pdus_destination_dist_count.inc()
+ # track the fact that we have a PDU for these destinations,
+ # to allow us to perform catch-up later on if the remote is unreachable
+ # for a while.
+ await self.store.store_destination_rooms_entries(
+ destinations, pdu.room_id, pdu.internal_metadata.stream_ordering,
+ )
+
for destination in destinations:
self._get_per_destination_queue(destination).send_pdu(pdu)
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index defc228c23..9f0852b4a2 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -325,6 +325,17 @@ class PerDestinationQueue:
self._last_device_stream_id = device_stream_id
self._last_device_list_stream_id = dev_list_id
+
+ if pending_pdus:
+ # we sent some PDUs and it was successful, so update our
+ # last_successful_stream_ordering in the destinations table.
+ final_pdu = pending_pdus[-1]
+ last_successful_stream_ordering = (
+ final_pdu.internal_metadata.stream_ordering
+ )
+ await self._store.set_destination_last_successful_stream_ordering(
+ self._destination, last_successful_stream_ordering
+ )
else:
break
except NotRetryingDestination as e:
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 2d995ec456..ff0c67228b 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -43,7 +43,7 @@ REQUIREMENTS = [
"jsonschema>=2.5.1",
"frozendict>=1",
"unpaddedbase64>=1.1.0",
- "canonicaljson>=1.3.0",
+ "canonicaljson>=1.4.0",
# we use the type definitions added in signedjson 1.1.
"signedjson>=1.1.0",
"pynacl>=1.2.1",
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index ed8a9bffb1..79ec8f119d 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -952,7 +952,7 @@ class DatabasePool:
key_names: Collection[str],
key_values: Collection[Iterable[Any]],
value_names: Collection[str],
- value_values: Iterable[Iterable[str]],
+ value_values: Iterable[Iterable[Any]],
) -> None:
"""
Upsert, many times.
@@ -981,7 +981,7 @@ class DatabasePool:
key_names: Iterable[str],
key_values: Collection[Iterable[Any]],
value_names: Collection[str],
- value_values: Iterable[Iterable[str]],
+ value_values: Iterable[Iterable[Any]],
) -> None:
"""
Upsert, many times, but without native UPSERT support or batching.
diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index ea833829ae..d7a03cbf7d 100644
--- a/synapse/storage/databases/main/purge_events.py
+++ b/synapse/storage/databases/main/purge_events.py
@@ -69,6 +69,7 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
# room_depth
# state_groups
# state_groups_state
+ # destination_rooms
# we will build a temporary table listing the events so that we don't
# have to keep shovelling the list back and forth across the
@@ -336,6 +337,7 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
# and finally, the tables with an index on room_id (or no useful index)
for table in (
"current_state_events",
+ "destination_rooms",
"event_backward_extremities",
"event_forward_extremities",
"event_json",
diff --git a/synapse/storage/databases/main/schema/delta/58/15_catchup_destination_rooms.sql b/synapse/storage/databases/main/schema/delta/58/15_catchup_destination_rooms.sql
new file mode 100644
index 0000000000..ebfbed7925
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/58/15_catchup_destination_rooms.sql
@@ -0,0 +1,42 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-- This schema delta alters the schema to enable 'catching up' remote homeservers
+-- after there has been a connectivity problem for any reason.
+
+-- This stores, for each (destination, room) pair, the stream_ordering of the
+-- latest event for that destination.
+CREATE TABLE IF NOT EXISTS destination_rooms (
+ -- the destination in question.
+ destination TEXT NOT NULL REFERENCES destinations (destination),
+ -- the ID of the room in question
+ room_id TEXT NOT NULL REFERENCES rooms (room_id),
+ -- the stream_ordering of the event
+ stream_ordering BIGINT NOT NULL,
+ PRIMARY KEY (destination, room_id)
+ -- We don't declare a foreign key on stream_ordering here because that'd mean
+ -- we'd need to either maintain an index (expensive) or do a table scan of
+ -- destination_rooms whenever we delete an event (also potentially expensive).
+ -- In addition to that, a foreign key on stream_ordering would be redundant
+ -- as this row doesn't need to refer to a specific event; if the event gets
+ -- deleted then it doesn't affect the validity of the stream_ordering here.
+);
+
+-- This index is needed to make it so that a deletion of a room (in the rooms
+-- table) can be efficient, as otherwise a table scan would need to be performed
+-- to check that no destination_rooms rows point to the room to be deleted.
+-- Also: it makes it efficient to delete all the entries for a given room ID,
+-- such as when purging a room.
+CREATE INDEX IF NOT EXISTS destination_rooms_room_id
+ ON destination_rooms (room_id);
diff --git a/synapse/storage/databases/main/schema/delta/58/16populate_stats_process_rooms_fix.sql b/synapse/storage/databases/main/schema/delta/58/16populate_stats_process_rooms_fix.sql
new file mode 100644
index 0000000000..55f5d0f732
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/58/16populate_stats_process_rooms_fix.sql
@@ -0,0 +1,22 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-- This delta file fixes a regression introduced by 58/12room_stats.sql, removing the hacky
+-- populate_stats_process_rooms_2 background job and restores the functionality under the
+-- original name.
+-- See https://github.com/matrix-org/synapse/issues/8238 for details
+
+DELETE FROM background_updates WHERE update_name = 'populate_stats_process_rooms';
+UPDATE background_updates SET update_name = 'populate_stats_process_rooms'
+ WHERE update_name = 'populate_stats_process_rooms_2';
diff --git a/synapse/storage/databases/main/schema/delta/58/17_catchup_last_successful.sql b/synapse/storage/databases/main/schema/delta/58/17_catchup_last_successful.sql
new file mode 100644
index 0000000000..a67aa5e500
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/58/17_catchup_last_successful.sql
@@ -0,0 +1,21 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- This column tracks the stream_ordering of the event that was most recently
+-- successfully transmitted to the destination.
+-- A value of NULL means that we have not sent an event successfully yet
+-- (at least, not since the introduction of this column).
+ALTER TABLE destinations
+ ADD COLUMN last_successful_stream_ordering BIGINT;
diff --git a/synapse/storage/databases/main/stats.py b/synapse/storage/databases/main/stats.py
index 55a250ef06..30840dbbaa 100644
--- a/synapse/storage/databases/main/stats.py
+++ b/synapse/storage/databases/main/stats.py
@@ -74,9 +74,6 @@ class StatsStore(StateDeltasStore):
"populate_stats_process_rooms", self._populate_stats_process_rooms
)
self.db_pool.updates.register_background_update_handler(
- "populate_stats_process_rooms_2", self._populate_stats_process_rooms_2
- )
- self.db_pool.updates.register_background_update_handler(
"populate_stats_process_users", self._populate_stats_process_users
)
# we no longer need to perform clean-up, but we will give ourselves
@@ -148,31 +145,10 @@ class StatsStore(StateDeltasStore):
return len(users_to_work_on)
async def _populate_stats_process_rooms(self, progress, batch_size):
- """
- This was a background update which regenerated statistics for rooms.
-
- It has been replaced by StatsStore._populate_stats_process_rooms_2. This background
- job has been scheduled to run as part of Synapse v1.0.0, and again now. To ensure
- someone upgrading from <v1.0.0, this background task has been turned into a no-op
- so that the potentially expensive task is not run twice.
-
- Further context: https://github.com/matrix-org/synapse/pull/7977
- """
- await self.db_pool.updates._end_background_update(
- "populate_stats_process_rooms"
- )
- return 1
-
- async def _populate_stats_process_rooms_2(self, progress, batch_size):
- """
- This is a background update which regenerates statistics for rooms.
-
- It replaces StatsStore._populate_stats_process_rooms. See its docstring for the
- reasoning.
- """
+ """This is a background update which regenerates statistics for rooms."""
if not self.stats_enabled:
await self.db_pool.updates._end_background_update(
- "populate_stats_process_rooms_2"
+ "populate_stats_process_rooms"
)
return 1
@@ -189,13 +165,13 @@ class StatsStore(StateDeltasStore):
return [r for r, in txn]
rooms_to_work_on = await self.db_pool.runInteraction(
- "populate_stats_rooms_2_get_batch", _get_next_batch
+ "populate_stats_rooms_get_batch", _get_next_batch
)
# No more rooms -- complete the transaction.
if not rooms_to_work_on:
await self.db_pool.updates._end_background_update(
- "populate_stats_process_rooms_2"
+ "populate_stats_process_rooms"
)
return 1
@@ -204,9 +180,9 @@ class StatsStore(StateDeltasStore):
progress["last_room_id"] = room_id
await self.db_pool.runInteraction(
- "_populate_stats_process_rooms_2",
+ "_populate_stats_process_rooms",
self.db_pool.updates._background_update_progress_txn,
- "populate_stats_process_rooms_2",
+ "populate_stats_process_rooms",
progress,
)
diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index 5b31aab700..c0a958252e 100644
--- a/synapse/storage/databases/main/transactions.py
+++ b/synapse/storage/databases/main/transactions.py
@@ -15,13 +15,14 @@
import logging
from collections import namedtuple
-from typing import Optional, Tuple
+from typing import Iterable, Optional, Tuple
from canonicaljson import encode_canonical_json
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import SQLBaseStore, db_to_json
-from synapse.storage.database import DatabasePool
+from synapse.storage.database import DatabasePool, LoggingTransaction
+from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.types import JsonDict
from synapse.util.caches.expiringcache import ExpiringCache
@@ -164,7 +165,9 @@ class TransactionStore(SQLBaseStore):
allow_none=True,
)
- if result and result["retry_last_ts"] > 0:
+ # check we have a row and retry_last_ts is not null or zero
+ # (retry_last_ts can't be negative)
+ if result and result["retry_last_ts"]:
return result
else:
return None
@@ -273,3 +276,98 @@ class TransactionStore(SQLBaseStore):
await self.db_pool.runInteraction(
"_cleanup_transactions", _cleanup_transactions_txn
)
+
+ async def store_destination_rooms_entries(
+ self, destinations: Iterable[str], room_id: str, stream_ordering: int,
+ ) -> None:
+ """
+ Updates or creates `destination_rooms` entries in batch for a single event.
+
+ Args:
+ destinations: list of destinations
+ room_id: the room_id of the event
+ stream_ordering: the stream_ordering of the event
+ """
+
+ return await self.db_pool.runInteraction(
+ "store_destination_rooms_entries",
+ self._store_destination_rooms_entries_txn,
+ destinations,
+ room_id,
+ stream_ordering,
+ )
+
+ def _store_destination_rooms_entries_txn(
+ self,
+ txn: LoggingTransaction,
+ destinations: Iterable[str],
+ room_id: str,
+ stream_ordering: int,
+ ) -> None:
+
+ # ensure we have a `destinations` row for this destination, as there is
+ # a foreign key constraint.
+ if isinstance(self.database_engine, PostgresEngine):
+ q = """
+ INSERT INTO destinations (destination)
+ VALUES (?)
+ ON CONFLICT DO NOTHING;
+ """
+ elif isinstance(self.database_engine, Sqlite3Engine):
+ q = """
+ INSERT OR IGNORE INTO destinations (destination)
+ VALUES (?);
+ """
+ else:
+ raise RuntimeError("Unknown database engine")
+
+ txn.execute_batch(q, ((destination,) for destination in destinations))
+
+ rows = [(destination, room_id) for destination in destinations]
+
+ self.db_pool.simple_upsert_many_txn(
+ txn,
+ "destination_rooms",
+ ["destination", "room_id"],
+ rows,
+ ["stream_ordering"],
+ [(stream_ordering,)] * len(rows),
+ )
+
+ async def get_destination_last_successful_stream_ordering(
+ self, destination: str
+ ) -> Optional[int]:
+ """
+ Gets the stream ordering of the PDU most-recently successfully sent
+ to the specified destination, or None if this information has not been
+ tracked yet.
+
+ Args:
+ destination: the destination to query
+ """
+ return await self.db_pool.simple_select_one_onecol(
+ "destinations",
+ {"destination": destination},
+ "last_successful_stream_ordering",
+ allow_none=True,
+ desc="get_last_successful_stream_ordering",
+ )
+
+ async def set_destination_last_successful_stream_ordering(
+ self, destination: str, last_successful_stream_ordering: int
+ ) -> None:
+ """
+ Marks that we have successfully sent the PDUs up to and including the
+ one specified.
+
+ Args:
+ destination: the destination we have successfully sent to
+ last_successful_stream_ordering: the stream_ordering of the most
+ recent successfully-sent PDU
+ """
+ return await self.db_pool.simple_upsert(
+ "destinations",
+ keyvalues={"destination": destination},
+ values={"last_successful_stream_ordering": last_successful_stream_ordering},
+ desc="set_last_successful_stream_ordering",
+ )
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index ee60e2a718..a7f2dfb850 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -19,12 +19,15 @@ import logging
import os
import re
from collections import Counter
-from typing import TextIO
+from typing import Optional, TextIO
import attr
+from synapse.config.homeserver import HomeServerConfig
+from synapse.storage.engines import BaseDatabaseEngine
from synapse.storage.engines.postgres import PostgresEngine
-from synapse.storage.types import Cursor
+from synapse.storage.types import Connection, Cursor
+from synapse.types import Collection
logger = logging.getLogger(__name__)
@@ -63,7 +66,12 @@ UNAPPLIED_DELTA_ON_WORKER_ERROR = (
)
-def prepare_database(db_conn, database_engine, config, databases=["main", "state"]):
+def prepare_database(
+ db_conn: Connection,
+ database_engine: BaseDatabaseEngine,
+ config: Optional[HomeServerConfig],
+ databases: Collection[str] = ["main", "state"],
+):
"""Prepares a physical database for usage. Will either create all necessary tables
or upgrade from an older schema version.
@@ -73,16 +81,24 @@ def prepare_database(db_conn, database_engine, config, databases=["main", "state
Args:
db_conn:
database_engine:
- config (synapse.config.homeserver.HomeServerConfig|None):
+ config :
application config, or None if we are connecting to an existing
database which we expect to be configured already
- databases (list[str]): The name of the databases that will be used
+ databases: The name of the databases that will be used
with this physical database. Defaults to all databases.
"""
try:
cur = db_conn.cursor()
+ # sqlite does not automatically start transactions for DDL / SELECT statements,
+ # so we start one before running anything. This ensures that any upgrades
+ # are either applied completely, or not at all.
+ #
+ # (psycopg2 automatically starts a transaction as soon as we run any statements
+ # at all, so this is redundant but harmless there.)
+ cur.execute("BEGIN TRANSACTION")
+
logger.info("%r: Checking existing schema version", databases)
version_info = _get_or_create_schema_state(cur, database_engine)
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index 3ad4b28fc7..b2355700ad 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -13,11 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import json
import logging
import re
import attr
-from canonicaljson import json
from twisted.internet import defer, task
diff --git a/synapse/util/frozenutils.py b/synapse/util/frozenutils.py
index 0e445e01d7..bf094c9386 100644
--- a/synapse/util/frozenutils.py
+++ b/synapse/util/frozenutils.py
@@ -13,7 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from canonicaljson import json
+import json
+
from frozendict import frozendict
@@ -66,5 +67,5 @@ def _handle_frozendict(obj):
# A JSONEncoder which is capable of encoding frozendicts without barfing.
# Additionally reduce the whitespace produced by JSON encoding.
frozendict_json_encoder = json.JSONEncoder(
- default=_handle_frozendict, separators=(",", ":"),
+ allow_nan=False, separators=(",", ":"), default=_handle_frozendict,
)
|