diff options
author | Andrew Morgan <andrew@amorgan.xyz> | 2020-08-26 12:22:25 +0100 |
---|---|---|
committer | Andrew Morgan <andrew@amorgan.xyz> | 2020-08-26 12:22:25 +0100 |
commit | 7affcd01c76f495dfe70dbb9f68d964a2d58b9bd (patch) | |
tree | 7a42640f7b1c7bd068332a4fd9dce3c2a0dcecd6 /synapse/federation/sender | |
parent | Simplify medium and address assignment (diff) | |
parent | Add functions to `MultiWriterIdGen` used by events stream (#8164) (diff) | |
download | synapse-anoa/user_param_ui_auth.tar.xz |
Merge branch 'develop' of github.com:matrix-org/synapse into anoa/user_param_ui_auth anoa/user_param_ui_auth
* 'develop' of github.com:matrix-org/synapse: (369 commits) Add functions to `MultiWriterIdGen` used by events stream (#8164) Do not allow send_nonmember_event to be called with shadow-banned users. (#8158) Changelog fixes 1.19.1rc1 Make StreamIdGen `get_next` and `get_next_mult` async (#8161) Wording fixes to 'name' user admin api filter (#8163) Fix missing double-backtick in RST document Search in columns 'name' and 'displayname' in the admin users endpoint (#7377) Add type hints for state. (#8140) Stop shadow-banned users from sending non-member events. (#8142) Allow capping a room's retention policy (#8104) Add healthcheck for default localhost 8008 port on /health endpoint. (#8147) Fix flaky shadow-ban tests. (#8152) Fix join ratelimiter breaking profile updates and idempotency (#8153) Do not apply ratelimiting on joins to appservices (#8139) Don't fail /submit_token requests on incorrect session ID if request_token_inhibit_3pid_errors is turned on (#7991) Do not apply ratelimiting on joins to appservices (#8139) Micro-optimisations to get_auth_chain_ids (#8132) Allow denying or shadow banning registrations via the spam checker (#8034) Stop shadow-banned users from sending invites. (#8095) ...
Diffstat (limited to 'synapse/federation/sender')
-rw-r--r-- | synapse/federation/sender/__init__.py | 79 | ||||
-rw-r--r-- | synapse/federation/sender/per_destination_queue.py | 50 | ||||
-rw-r--r-- | synapse/federation/sender/transaction_manager.py | 14 |
3 files changed, 117 insertions, 26 deletions
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index d473576902..4662008bfd 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -16,14 +16,13 @@ import logging from typing import Dict, Hashable, Iterable, List, Optional, Set, Tuple -from six import itervalues - from prometheus_client import Counter from twisted.internet import defer import synapse import synapse.metrics +from synapse.api.presence import UserPresenceState from synapse.events import EventBase from synapse.federation.sender.per_destination_queue import PerDestinationQueue from synapse.federation.sender.transaction_manager import TransactionManager @@ -41,7 +40,6 @@ from synapse.metrics import ( events_processed_counter, ) from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.storage.presence import UserPresenceState from synapse.types import ReadReceipt from synapse.util.metrics import Measure, measure_func @@ -71,6 +69,9 @@ class FederationSender(object): self._transaction_manager = TransactionManager(hs) + self._instance_name = hs.get_instance_name() + self._federation_shard_config = hs.config.worker.federation_shard_config + # map from destination to PerDestinationQueue self._per_destination_queues = {} # type: Dict[str, PerDestinationQueue] @@ -193,7 +194,13 @@ class FederationSender(object): ) return - destinations = set(destinations) + destinations = { + d + for d in destinations + if self._federation_shard_config.should_handle( + self._instance_name, d + ) + } if send_on_behalf_of is not None: # If we are sending the event on behalf of another server @@ -203,7 +210,15 @@ class FederationSender(object): logger.debug("Sending %s to %r", event, destinations) - self._send_pdu(event, destinations) + if destinations: + self._send_pdu(event, destinations) + + now = self.clock.time_msec() + ts = await self.store.get_received_ts(event.event_id) + + synapse.metrics.event_processing_lag_by_event.labels( + "federation_sender" + ).observe((now - ts) / 1000) async def handle_room_events(events: Iterable[EventBase]) -> None: with Measure(self.clock, "handle_room_events"): @@ -218,7 +233,7 @@ class FederationSender(object): defer.gatherResults( [ run_in_background(handle_room_events, evs) - for evs in itervalues(events_by_room) + for evs in events_by_room.values() ], consumeErrors=True, ) @@ -273,8 +288,7 @@ class FederationSender(object): for destination in destinations: self._get_per_destination_queue(destination).send_pdu(pdu, order) - @defer.inlineCallbacks - def send_read_receipt(self, receipt: ReadReceipt): + async def send_read_receipt(self, receipt: ReadReceipt) -> None: """Send a RR to any other servers in the room Args: @@ -315,8 +329,13 @@ class FederationSender(object): room_id = receipt.room_id # Work out which remote servers should be poked and poke them. - domains = yield self.state.get_current_hosts_in_room(room_id) - domains = [d for d in domains if d != self.server_name] + domains_set = await self.state.get_current_hosts_in_room(room_id) + domains = [ + d + for d in domains_set + if d != self.server_name + and self._federation_shard_config.should_handle(self._instance_name, d) + ] if not domains: return @@ -365,8 +384,7 @@ class FederationSender(object): queue.flush_read_receipts_for_room(room_id) @preserve_fn # the caller should not yield on this - @defer.inlineCallbacks - def send_presence(self, states: List[UserPresenceState]): + async def send_presence(self, states: List[UserPresenceState]): """Send the new presence states to the appropriate destinations. This actually queues up the presence states ready for sending and @@ -401,7 +419,7 @@ class FederationSender(object): if not states_map: break - yield self._process_presence_inner(list(states_map.values())) + await self._process_presence_inner(list(states_map.values())) except Exception: logger.exception("Error sending presence states to servers") finally: @@ -421,20 +439,29 @@ class FederationSender(object): for destination in destinations: if destination == self.server_name: continue + if not self._federation_shard_config.should_handle( + self._instance_name, destination + ): + continue self._get_per_destination_queue(destination).send_presence(states) @measure_func("txnqueue._process_presence") - @defer.inlineCallbacks - def _process_presence_inner(self, states: List[UserPresenceState]): + async def _process_presence_inner(self, states: List[UserPresenceState]): """Given a list of states populate self.pending_presence_by_dest and poke to send a new transaction to each destination """ - hosts_and_states = yield get_interested_remotes(self.store, states, self.state) + hosts_and_states = await get_interested_remotes(self.store, states, self.state) for destinations, states in hosts_and_states: for destination in destinations: if destination == self.server_name: continue + + if not self._federation_shard_config.should_handle( + self._instance_name, destination + ): + continue + self._get_per_destination_queue(destination).send_presence(states) def build_and_send_edu( @@ -456,6 +483,11 @@ class FederationSender(object): logger.info("Not sending EDU to ourselves") return + if not self._federation_shard_config.should_handle( + self._instance_name, destination + ): + return + edu = Edu( origin=self.server_name, destination=destination, @@ -472,6 +504,11 @@ class FederationSender(object): edu: edu to send key: clobbering key for this edu """ + if not self._federation_shard_config.should_handle( + self._instance_name, edu.destination + ): + return + queue = self._get_per_destination_queue(edu.destination) if key: queue.send_keyed_edu(edu, key) @@ -483,6 +520,11 @@ class FederationSender(object): logger.warning("Not sending device update to ourselves") return + if not self._federation_shard_config.should_handle( + self._instance_name, destination + ): + return + self._get_per_destination_queue(destination).attempt_new_transaction() def wake_destination(self, destination: str): @@ -496,6 +538,11 @@ class FederationSender(object): logger.warning("Not waking up ourselves") return + if not self._federation_shard_config.should_handle( + self._instance_name, destination + ): + return + self._get_per_destination_queue(destination).attempt_new_transaction() @staticmethod diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index 4e698981a4..c09ffcaf4c 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -24,12 +24,12 @@ from synapse.api.errors import ( HttpResponseException, RequestSendFailed, ) +from synapse.api.presence import UserPresenceState from synapse.events import EventBase from synapse.federation.units import Edu from synapse.handlers.presence import format_user_presence_state from synapse.metrics import sent_transactions_counter from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.storage.presence import UserPresenceState from synapse.types import ReadReceipt from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter @@ -74,6 +74,20 @@ class PerDestinationQueue(object): self._clock = hs.get_clock() self._store = hs.get_datastore() self._transaction_manager = transaction_manager + self._instance_name = hs.get_instance_name() + self._federation_shard_config = hs.config.worker.federation_shard_config + + self._should_send_on_this_instance = True + if not self._federation_shard_config.should_handle( + self._instance_name, destination + ): + # We don't raise an exception here to avoid taking out any other + # processing. We have a guard in `attempt_new_transaction` that + # ensure we don't start sending stuff. + logger.error( + "Create a per destination queue for %s on wrong worker", destination, + ) + self._should_send_on_this_instance = False self._destination = destination self.transmission_loop_running = False @@ -119,7 +133,7 @@ class PerDestinationQueue(object): ) def send_pdu(self, pdu: EventBase, order: int) -> None: - """Add a PDU to the queue, and start the transmission loop if neccessary + """Add a PDU to the queue, and start the transmission loop if necessary Args: pdu: pdu to send @@ -129,7 +143,7 @@ class PerDestinationQueue(object): self.attempt_new_transaction() def send_presence(self, states: Iterable[UserPresenceState]) -> None: - """Add presence updates to the queue. Start the transmission loop if neccessary. + """Add presence updates to the queue. Start the transmission loop if necessary. Args: states: presence to send @@ -180,6 +194,14 @@ class PerDestinationQueue(object): logger.debug("TX [%s] Transaction already in progress", self._destination) return + if not self._should_send_on_this_instance: + # We don't raise an exception here to avoid taking out any other + # processing. + logger.error( + "Trying to start a transaction to %s on wrong worker", self._destination + ) + return + logger.debug("TX [%s] Starting transaction loop", self._destination) run_as_background_process( @@ -315,6 +337,28 @@ class PerDestinationQueue(object): (e.retry_last_ts + e.retry_interval) / 1000.0 ), ) + + if e.retry_interval > 60 * 60 * 1000: + # we won't retry for another hour! + # (this suggests a significant outage) + # We drop pending PDUs and EDUs because otherwise they will + # rack up indefinitely. + # Note that: + # - the EDUs that are being dropped here are those that we can + # afford to drop (specifically, only typing notifications, + # read receipts and presence updates are being dropped here) + # - Other EDUs such as to_device messages are queued with a + # different mechanism + # - this is all volatile state that would be lost if the + # federation sender restarted anyway + + # dropping read receipts is a bit sad but should be solved + # through another mechanism, because this is all volatile! + self._pending_pdus = [] + self._pending_edus = [] + self._pending_edus_keyed = {} + self._pending_presence = {} + self._pending_rrs = {} except FederationDeniedError as e: logger.info(e) except HttpResponseException as e: diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py index a2752a54a5..9bd534a313 100644 --- a/synapse/federation/sender/transaction_manager.py +++ b/synapse/federation/sender/transaction_manager.py @@ -13,9 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import TYPE_CHECKING, List - -from canonicaljson import json +from typing import TYPE_CHECKING, List, Tuple from synapse.api.errors import HttpResponseException from synapse.events import EventBase @@ -28,6 +26,7 @@ from synapse.logging.opentracing import ( tags, whitelisted_homeserver, ) +from synapse.util import json_decoder from synapse.util.metrics import measure_func if TYPE_CHECKING: @@ -54,15 +53,16 @@ class TransactionManager(object): @measure_func("_send_new_transaction") async def send_new_transaction( - self, destination: str, pending_pdus: List[EventBase], pending_edus: List[Edu] + self, + destination: str, + pending_pdus: List[Tuple[EventBase, int]], + pending_edus: List[Edu], ): # Make a transaction-sending opentracing span. This span follows on from # all the edus in that transaction. This needs to be done since there is # no active span here, so if the edus were not received by the remote the # span would have no causality and it would be forgotten. - # The span_contexts is a generator so that it won't be evaluated if - # opentracing is disabled. (Yay speed!) span_contexts = [] keep_destination = whitelisted_homeserver(destination) @@ -70,7 +70,7 @@ class TransactionManager(object): for edu in pending_edus: context = edu.get_context() if context: - span_contexts.append(extract_text_map(json.loads(context))) + span_contexts.append(extract_text_map(json_decoder.decode(context))) if keep_destination: edu.strip_context() |