summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorPatrick Cloke <clokep@users.noreply.github.com>2023-10-23 09:12:42 -0400
committerGitHub <noreply@github.com>2023-10-23 09:12:42 -0400
commit91e65700bdb19ff14f3a8b995f340d5a019b0495 (patch)
tree1e03f4659b36c21fbb47dcc6aab3426c88e74f59 /synapse
parentDocumentation. (diff)
parentMention how to redirect the Jaeger traces to a specific Jaeger instance (#16531) (diff)
downloadsynapse-clokep/db-upgrades.tar.xz
Merge branch 'develop' into clokep/db-upgrades github/clokep/db-upgrades clokep/db-upgrades
Diffstat (limited to 'synapse')
-rw-r--r--synapse/_scripts/register_new_matrix_user.py4
-rw-r--r--synapse/handlers/device.py25
-rw-r--r--synapse/handlers/sync.py52
-rw-r--r--synapse/replication/tcp/streams/events.py45
-rw-r--r--synapse/storage/databases/main/cache.py8
-rw-r--r--synapse/storage/databases/main/client_ips.py46
-rw-r--r--synapse/storage/databases/main/deviceinbox.py15
-rw-r--r--synapse/storage/databases/main/events.py74
-rw-r--r--synapse/storage/databases/main/events_worker.py6
-rw-r--r--synapse/storage/databases/main/stream.py47
-rw-r--r--synapse/storage/schema/__init__.py5
-rw-r--r--synapse/storage/schema/main/delta/82/05gaps.sql25
-rw-r--r--synapse/util/retryutils.py30
13 files changed, 283 insertions, 99 deletions
diff --git a/synapse/_scripts/register_new_matrix_user.py b/synapse/_scripts/register_new_matrix_user.py

index 19ca399d44..9293808640 100644 --- a/synapse/_scripts/register_new_matrix_user.py +++ b/synapse/_scripts/register_new_matrix_user.py
@@ -50,7 +50,7 @@ def request_registration( url = "%s/_synapse/admin/v1/register" % (server_location.rstrip("/"),) # Get the nonce - r = requests.get(url, verify=False) + r = requests.get(url) if r.status_code != 200: _print("ERROR! Received %d %s" % (r.status_code, r.reason)) @@ -88,7 +88,7 @@ def request_registration( } _print("Sending registration request...") - r = requests.post(url, json=data, verify=False) + r = requests.post(url, json=data) if r.status_code != 200: _print("ERROR! Received %d %s" % (r.status_code, r.reason)) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 50df4f2b06..3ce96ef3cb 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py
@@ -14,17 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import ( - TYPE_CHECKING, - Any, - Dict, - Iterable, - List, - Mapping, - Optional, - Set, - Tuple, -) +from typing import TYPE_CHECKING, Dict, Iterable, List, Mapping, Optional, Set, Tuple from synapse.api import errors from synapse.api.constants import EduTypes, EventTypes @@ -41,6 +31,7 @@ from synapse.metrics.background_process_metrics import ( run_as_background_process, wrap_as_background_process, ) +from synapse.storage.databases.main.client_ips import DeviceLastConnectionInfo from synapse.types import ( JsonDict, JsonMapping, @@ -601,6 +592,8 @@ class DeviceHandler(DeviceWorkerHandler): ) # Delete device messages asynchronously and in batches using the task scheduler + # We specify an upper stream id to avoid deleting non delivered messages + # if an user re-uses a device ID. await self._task_scheduler.schedule_task( DELETE_DEVICE_MSGS_TASK_NAME, resource_id=device_id, @@ -1008,14 +1001,14 @@ class DeviceHandler(DeviceWorkerHandler): def _update_device_from_client_ips( - device: JsonDict, client_ips: Mapping[Tuple[str, str], Mapping[str, Any]] + device: JsonDict, client_ips: Mapping[Tuple[str, str], DeviceLastConnectionInfo] ) -> None: - ip = client_ips.get((device["user_id"], device["device_id"]), {}) + ip = client_ips.get((device["user_id"], device["device_id"])) device.update( { - "last_seen_user_agent": ip.get("user_agent"), - "last_seen_ts": ip.get("last_seen"), - "last_seen_ip": ip.get("ip"), + "last_seen_user_agent": ip.user_agent if ip else None, + "last_seen_ts": ip.last_seen if ip else None, + "last_seen_ip": ip.ip if ip else None, } ) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 60b4d95cd7..f131c0e8e0 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py
@@ -500,12 +500,27 @@ class SyncHandler: async def _load_filtered_recents( self, room_id: str, + sync_result_builder: "SyncResultBuilder", sync_config: SyncConfig, - now_token: StreamToken, + upto_token: StreamToken, since_token: Optional[StreamToken] = None, potential_recents: Optional[List[EventBase]] = None, newly_joined_room: bool = False, ) -> TimelineBatch: + """Create a timeline batch for the room + + Args: + room_id + sync_result_builder + sync_config + upto_token: The token up to which we should fetch (more) events. + If `potential_results` is non-empty then this is *start* of + the the list. + since_token + potential_recents: If non-empty, the events between the since token + and current token to send down to clients. + newly_joined_room + """ with Measure(self.clock, "load_filtered_recents"): timeline_limit = sync_config.filter_collection.timeline_limit() block_all_timeline = ( @@ -521,6 +536,20 @@ class SyncHandler: else: limited = False + # Check if there is a gap, if so we need to mark this as limited and + # recalculate which events to send down. + gap_token = await self.store.get_timeline_gaps( + room_id, + since_token.room_key if since_token else None, + sync_result_builder.now_token.room_key, + ) + if gap_token: + # There's a gap, so we need to ignore the passed in + # `potential_recents`, and reset `upto_token` to match. + potential_recents = None + upto_token = sync_result_builder.now_token + limited = True + log_kv({"limited": limited}) if potential_recents: @@ -559,10 +588,10 @@ class SyncHandler: recents = [] if not limited or block_all_timeline: - prev_batch_token = now_token + prev_batch_token = upto_token if recents: room_key = recents[0].internal_metadata.before - prev_batch_token = now_token.copy_and_replace( + prev_batch_token = upto_token.copy_and_replace( StreamKeyType.ROOM, room_key ) @@ -573,11 +602,15 @@ class SyncHandler: filtering_factor = 2 load_limit = max(timeline_limit * filtering_factor, 10) max_repeat = 5 # Only try a few times per room, otherwise - room_key = now_token.room_key + room_key = upto_token.room_key end_key = room_key since_key = None - if since_token and not newly_joined_room: + if since_token and gap_token: + # If there is a gap then we need to only include events after + # it. + since_key = gap_token + elif since_token and not newly_joined_room: since_key = since_token.room_key while limited and len(recents) < timeline_limit and max_repeat: @@ -647,7 +680,7 @@ class SyncHandler: recents = recents[-timeline_limit:] room_key = recents[0].internal_metadata.before - prev_batch_token = now_token.copy_and_replace(StreamKeyType.ROOM, room_key) + prev_batch_token = upto_token.copy_and_replace(StreamKeyType.ROOM, room_key) # Don't bother to bundle aggregations if the timeline is unlimited, # as clients will have all the necessary information. @@ -662,7 +695,9 @@ class SyncHandler: return TimelineBatch( events=recents, prev_batch=prev_batch_token, - limited=limited or newly_joined_room, + # Also mark as limited if this is a new room or there has been a gap + # (to force client to paginate the gap). + limited=limited or newly_joined_room or gap_token is not None, bundled_aggregations=bundled_aggregations, ) @@ -2397,8 +2432,9 @@ class SyncHandler: batch = await self._load_filtered_recents( room_id, + sync_result_builder, sync_config, - now_token=upto_token, + upto_token=upto_token, since_token=since_token, potential_recents=events, newly_joined_room=newly_joined, diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
index ad9b760713..da6d948e1b 100644 --- a/synapse/replication/tcp/streams/events.py +++ b/synapse/replication/tcp/streams/events.py
@@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import heapq +from collections import defaultdict from typing import TYPE_CHECKING, Iterable, Optional, Tuple, Type, TypeVar, cast import attr @@ -51,8 +52,19 @@ data part are: * The state_key of the state which has changed * The event id of the new state +A "state-all" row is sent whenever the "current state" in a room changes, but there are +too many state updates for a particular room in the same update. This replaces any +"state" rows on a per-room basis. The fields in the data part are: + +* The room id for the state changes + """ +# Any room with more than _MAX_STATE_UPDATES_PER_ROOM will send a EventsStreamAllStateRow +# instead of individual EventsStreamEventRow. This is predominantly useful when +# purging large rooms. +_MAX_STATE_UPDATES_PER_ROOM = 150 + @attr.s(slots=True, frozen=True, auto_attribs=True) class EventsStreamRow: @@ -111,9 +123,17 @@ class EventsStreamCurrentStateRow(BaseEventsStreamRow): event_id: Optional[str] +@attr.s(slots=True, frozen=True, auto_attribs=True) +class EventsStreamAllStateRow(BaseEventsStreamRow): + TypeId = "state-all" + + room_id: str + + _EventRows: Tuple[Type[BaseEventsStreamRow], ...] = ( EventsStreamEventRow, EventsStreamCurrentStateRow, + EventsStreamAllStateRow, ) TypeToRow = {Row.TypeId: Row for Row in _EventRows} @@ -213,9 +233,28 @@ class EventsStream(Stream): if stream_id <= upper_limit ) + # Separate out rooms that have many state updates, listeners should clear + # all state for those rooms. + state_updates_by_room = defaultdict(list) + for stream_id, room_id, _type, _state_key, _event_id in state_rows: + state_updates_by_room[room_id].append(stream_id) + + state_all_rows = [ + (stream_ids[-1], room_id) + for room_id, stream_ids in state_updates_by_room.items() + if len(stream_ids) >= _MAX_STATE_UPDATES_PER_ROOM + ] + state_all_updates: Iterable[Tuple[int, Tuple]] = ( + (max_stream_id, (EventsStreamAllStateRow.TypeId, (room_id,))) + for (max_stream_id, room_id) in state_all_rows + ) + + # Any remaining state updates are sent individually. + state_all_rooms = {room_id for _, room_id in state_all_rows} state_updates: Iterable[Tuple[int, Tuple]] = ( (stream_id, (EventsStreamCurrentStateRow.TypeId, rest)) for (stream_id, *rest) in state_rows + if rest[0] not in state_all_rooms ) ex_outliers_updates: Iterable[Tuple[int, Tuple]] = ( @@ -224,7 +263,11 @@ class EventsStream(Stream): ) # we need to return a sorted list, so merge them together. - updates = list(heapq.merge(event_updates, state_updates, ex_outliers_updates)) + updates = list( + heapq.merge( + event_updates, state_all_updates, state_updates, ex_outliers_updates + ) + ) return updates, upper_limit, limited @classmethod diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py
index 2fbd389c71..4d0470ffd9 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py
@@ -23,6 +23,7 @@ from synapse.metrics.background_process_metrics import wrap_as_background_proces from synapse.replication.tcp.streams import BackfillStream, CachesStream from synapse.replication.tcp.streams.events import ( EventsStream, + EventsStreamAllStateRow, EventsStreamCurrentStateRow, EventsStreamEventRow, EventsStreamRow, @@ -264,6 +265,13 @@ class CacheInvalidationWorkerStore(SQLBaseStore): (data.state_key,) ) self.get_rooms_for_user.invalidate((data.state_key,)) # type: ignore[attr-defined] + elif row.type == EventsStreamAllStateRow.TypeId: + assert isinstance(data, EventsStreamAllStateRow) + # Similar to the above, but the entire caches are invalidated. This is + # unfortunate for the membership caches, but should recover quickly. + self._curr_state_delta_stream_cache.entity_has_changed(data.room_id, token) # type: ignore[attr-defined] + self.get_rooms_for_user_with_stream_ordering.invalidate_all() # type: ignore[attr-defined] + self.get_rooms_for_user.invalidate_all() # type: ignore[attr-defined] else: raise Exception("Unknown events stream row type %s" % (row.type,)) diff --git a/synapse/storage/databases/main/client_ips.py b/synapse/storage/databases/main/client_ips.py
index 7da47c3dd7..8be1511859 100644 --- a/synapse/storage/databases/main/client_ips.py +++ b/synapse/storage/databases/main/client_ips.py
@@ -15,6 +15,7 @@ import logging from typing import TYPE_CHECKING, Dict, List, Mapping, Optional, Tuple, Union, cast +import attr from typing_extensions import TypedDict from synapse.metrics.background_process_metrics import wrap_as_background_process @@ -42,7 +43,8 @@ logger = logging.getLogger(__name__) LAST_SEEN_GRANULARITY = 120 * 1000 -class DeviceLastConnectionInfo(TypedDict): +@attr.s(slots=True, frozen=True, auto_attribs=True) +class DeviceLastConnectionInfo: """Metadata for the last connection seen for a user and device combination""" # These types must match the columns in the `devices` table @@ -499,24 +501,29 @@ class ClientIpWorkerStore(ClientIpBackgroundUpdateStore, MonthlyActiveUsersWorke device_id: If None fetches all devices for the user Returns: - A dictionary mapping a tuple of (user_id, device_id) to dicts, with - keys giving the column names from the devices table. + A dictionary mapping a tuple of (user_id, device_id) to DeviceLastConnectionInfo. """ keyvalues = {"user_id": user_id} if device_id is not None: keyvalues["device_id"] = device_id - res = cast( - List[DeviceLastConnectionInfo], - await self.db_pool.simple_select_list( - table="devices", - keyvalues=keyvalues, - retcols=("user_id", "ip", "user_agent", "device_id", "last_seen"), - ), + res = await self.db_pool.simple_select_list( + table="devices", + keyvalues=keyvalues, + retcols=("user_id", "ip", "user_agent", "device_id", "last_seen"), ) - return {(d["user_id"], d["device_id"]): d for d in res} + return { + (d["user_id"], d["device_id"]): DeviceLastConnectionInfo( + user_id=d["user_id"], + device_id=d["device_id"], + ip=d["ip"], + user_agent=d["user_agent"], + last_seen=d["last_seen"], + ) + for d in res + } async def _get_user_ip_and_agents_from_database( self, user: UserID, since_ts: int = 0 @@ -683,8 +690,7 @@ class ClientIpWorkerStore(ClientIpBackgroundUpdateStore, MonthlyActiveUsersWorke device_id: If None fetches all devices for the user Returns: - A dictionary mapping a tuple of (user_id, device_id) to dicts, with - keys giving the column names from the devices table. + A dictionary mapping a tuple of (user_id, device_id) to DeviceLastConnectionInfo. """ ret = await self._get_last_client_ip_by_device_from_database(user_id, device_id) @@ -705,13 +711,13 @@ class ClientIpWorkerStore(ClientIpBackgroundUpdateStore, MonthlyActiveUsersWorke continue if not device_id or did == device_id: - ret[(user_id, did)] = { - "user_id": user_id, - "ip": ip, - "user_agent": user_agent, - "device_id": did, - "last_seen": last_seen, - } + ret[(user_id, did)] = DeviceLastConnectionInfo( + user_id=user_id, + ip=ip, + user_agent=user_agent, + device_id=did, + last_seen=last_seen, + ) return ret async def get_user_ip_and_agents( diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index 1faa6f04b2..3e7425d4a6 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py
@@ -478,18 +478,19 @@ class DeviceInboxWorkerStore(SQLBaseStore): log_kv({"message": "No changes in cache since last check"}) return 0 - ROW_ID_NAME = self.database_engine.row_id_name - def delete_messages_for_device_txn(txn: LoggingTransaction) -> int: limit_statement = "" if limit is None else f"LIMIT {limit}" sql = f""" - DELETE FROM device_inbox WHERE {ROW_ID_NAME} IN ( - SELECT {ROW_ID_NAME} FROM device_inbox - WHERE user_id = ? AND device_id = ? AND stream_id <= ? - {limit_statement} + DELETE FROM device_inbox WHERE user_id = ? AND device_id = ? AND stream_id <= ( + SELECT MAX(stream_id) FROM ( + SELECT stream_id FROM device_inbox + WHERE user_id = ? AND device_id = ? AND stream_id <= ? + ORDER BY stream_id + {limit_statement} + ) AS q1 ) """ - txn.execute(sql, (user_id, device_id, up_to_stream_id)) + txn.execute(sql, (user_id, device_id, user_id, device_id, up_to_stream_id)) return txn.rowcount count = await self.db_pool.runInteraction( diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index ef6766b5e0..3c1492e3ad 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py
@@ -2267,35 +2267,59 @@ class PersistEventsStore: Forward extremities are handled when we first start persisting the events. """ - # From the events passed in, add all of the prev events as backwards extremities. - # Ignore any events that are already backwards extrems or outliers. - query = ( - "INSERT INTO event_backward_extremities (event_id, room_id)" - " SELECT ?, ? WHERE NOT EXISTS (" - " SELECT 1 FROM event_backward_extremities" - " WHERE event_id = ? AND room_id = ?" - " )" - # 1. Don't add an event as a extremity again if we already persisted it - # as a non-outlier. - # 2. Don't add an outlier as an extremity if it has no prev_events - " AND NOT EXISTS (" - " SELECT 1 FROM events" - " LEFT JOIN event_edges edge" - " ON edge.event_id = events.event_id" - " WHERE events.event_id = ? AND events.room_id = ? AND (events.outlier = FALSE OR edge.event_id IS NULL)" - " )" + + room_id = events[0].room_id + + potential_backwards_extremities = { + e_id + for ev in events + for e_id in ev.prev_event_ids() + if not ev.internal_metadata.is_outlier() + } + + if not potential_backwards_extremities: + return + + existing_events_outliers = self.db_pool.simple_select_many_txn( + txn, + table="events", + column="event_id", + iterable=potential_backwards_extremities, + keyvalues={"outlier": False}, + retcols=("event_id",), ) - txn.execute_batch( - query, - [ - (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id) - for ev in events - for e_id in ev.prev_event_ids() - if not ev.internal_metadata.is_outlier() - ], + potential_backwards_extremities.difference_update( + e for e, in existing_events_outliers ) + if potential_backwards_extremities: + self.db_pool.simple_upsert_many_txn( + txn, + table="event_backward_extremities", + key_names=("room_id", "event_id"), + key_values=[(room_id, ev) for ev in potential_backwards_extremities], + value_names=(), + value_values=(), + ) + + # Record the stream orderings where we have new gaps. + gap_events = [ + (room_id, self._instance_name, ev.internal_metadata.stream_ordering) + for ev in events + if any( + e_id in potential_backwards_extremities + for e_id in ev.prev_event_ids() + ) + ] + + self.db_pool.simple_insert_many_txn( + txn, + table="timeline_gaps", + keys=("room_id", "instance_name", "stream_ordering"), + values=gap_events, + ) + # Delete all these events that we've already fetched and now know that their # prev events are the new backwards extremeties. query = ( diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 8af638d60f..5bf864c1fb 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py
@@ -2096,12 +2096,6 @@ class EventsWorkerStore(SQLBaseStore): def _cleanup_old_transaction_ids_txn(txn: LoggingTransaction) -> None: one_day_ago = self._clock.time_msec() - 24 * 60 * 60 * 1000 sql = """ - DELETE FROM event_txn_id - WHERE inserted_ts < ? - """ - txn.execute(sql, (one_day_ago,)) - - sql = """ DELETE FROM event_txn_id_device_id WHERE inserted_ts < ? """ diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index ea06e4eee0..872df6bda1 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py
@@ -1616,3 +1616,50 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): retcol="instance_name", desc="get_name_from_instance_id", ) + + async def get_timeline_gaps( + self, + room_id: str, + from_token: Optional[RoomStreamToken], + to_token: RoomStreamToken, + ) -> Optional[RoomStreamToken]: + """Check if there is a gap, and return a token that marks the position + of the gap in the stream. + """ + + sql = """ + SELECT instance_name, stream_ordering + FROM timeline_gaps + WHERE room_id = ? AND ? < stream_ordering AND stream_ordering <= ? + ORDER BY stream_ordering + """ + + rows = await self.db_pool.execute( + "get_timeline_gaps", + None, + sql, + room_id, + from_token.stream if from_token else 0, + to_token.get_max_stream_pos(), + ) + + if not rows: + return None + + positions = [ + PersistedEventPosition(instance_name, stream_ordering) + for instance_name, stream_ordering in rows + ] + if from_token: + positions = [p for p in positions if p.persisted_after(from_token)] + + positions = [p for p in positions if not p.persisted_after(to_token)] + + if positions: + # We return a stream token that ensures the event *at* the position + # of the gap is included (as the gap is *before* the persisted + # event). + last_position = positions[-1] + return RoomStreamToken(stream=last_position.stream - 1) + + return None diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py
index eb461a151c..7438337f45 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py
@@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -SCHEMA_VERSION = 82 # remember to update the list below when updating +SCHEMA_VERSION = 83 # remember to update the list below when updating """Represents the expectations made by the codebase about the database schema This should be incremented whenever the codebase changes its requirements on the @@ -121,6 +121,9 @@ Changes in SCHEMA_VERSION = 81 Changes in SCHEMA_VERSION = 82 - The insertion_events, insertion_event_extremities, insertion_event_edges, and batch_events tables are no longer purged in preparation for their removal. + +Changes in SCHEMA_VERSION = 83 + - The event_txn_id is no longer used. """ diff --git a/synapse/storage/schema/main/delta/82/05gaps.sql b/synapse/storage/schema/main/delta/82/05gaps.sql new file mode 100644
index 0000000000..6813b488ca --- /dev/null +++ b/synapse/storage/schema/main/delta/82/05gaps.sql
@@ -0,0 +1,25 @@ +/* Copyright 2023 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Records when we see a "gap in the timeline", due to missing events over +-- federation. We record this so that we can tell clients there is a gap (by +-- marking the timeline section of a sync request as limited). +CREATE TABLE IF NOT EXISTS timeline_gaps ( + room_id TEXT NOT NULL, + instance_name TEXT NOT NULL, + stream_ordering BIGINT NOT NULL +); + +CREATE INDEX timeline_gaps_room_id ON timeline_gaps(room_id, stream_ordering); diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py
index 0e1f907667..547202c96b 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py
@@ -170,10 +170,10 @@ class RetryDestinationLimiter: database in milliseconds, or zero if the last request was successful. backoff_on_404: Back off if we get a 404 - backoff_on_failure: set to False if we should not increase the retry interval on a failure. - + notifier: A notifier used to mark servers as up. + replication_client A replication client used to mark servers as up. backoff_on_all_error_codes: Whether we should back off on any error code. """ @@ -237,6 +237,9 @@ class RetryDestinationLimiter: else: valid_err_code = False + # Whether previous requests to the destination had been failing. + previously_failing = bool(self.failure_ts) + if success: # We connected successfully. if not self.retry_interval: @@ -282,6 +285,9 @@ class RetryDestinationLimiter: if self.failure_ts is None: self.failure_ts = retry_last_ts + # Whether the current request to the destination had been failing. + currently_failing = bool(self.failure_ts) + async def store_retry_timings() -> None: try: await self.store.set_destination_retry_timings( @@ -291,17 +297,15 @@ class RetryDestinationLimiter: self.retry_interval, ) - if self.notifier: - # Inform the relevant places that the remote server is back up. - self.notifier.notify_remote_server_up(self.destination) - - if self.replication_client: - # If we're on a worker we try and inform master about this. The - # replication client doesn't hook into the notifier to avoid - # infinite loops where we send a `REMOTE_SERVER_UP` command to - # master, which then echoes it back to us which in turn pokes - # the notifier. - self.replication_client.send_remote_server_up(self.destination) + # If the server was previously failing, but is no longer. + if previously_failing and not currently_failing: + if self.notifier: + # Inform the relevant places that the remote server is back up. + self.notifier.notify_remote_server_up(self.destination) + + if self.replication_client: + # Inform other workers that the remote server is up. + self.replication_client.send_remote_server_up(self.destination) except Exception: logger.exception("Failed to store destination_retry_timings")