summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/6126.feature1
-rw-r--r--changelog.d/6418.bugfix1
-rw-r--r--synapse/federation/sender/__init__.py20
-rw-r--r--synapse/federation/sender/per_destination_queue.py15
-rw-r--r--synapse/federation/sender/transaction_manager.py4
-rw-r--r--synapse/handlers/message.py10
-rw-r--r--synapse/handlers/room_list.py3
-rw-r--r--synapse/handlers/room_member.py46
-rw-r--r--synapse/handlers/sync.py5
-rw-r--r--synapse/http/federation/well_known_resolver.py4
-rw-r--r--synapse/push/httppusher.py4
-rw-r--r--synapse/res/templates/saml_error.html2
-rw-r--r--synapse/server_notices/resource_limits_server_notices.py15
-rw-r--r--synapse/storage/data_stores/main/client_ips.py2
-rw-r--r--synapse/storage/data_stores/main/monthly_active_users.py19
-rw-r--r--synapse/storage/data_stores/main/search.py2
16 files changed, 114 insertions, 39 deletions
diff --git a/changelog.d/6126.feature b/changelog.d/6126.feature
new file mode 100644

index 0000000000..1207ba6206 --- /dev/null +++ b/changelog.d/6126.feature
@@ -0,0 +1 @@ +Group events into larger federation transactions at times of high traffic. diff --git a/changelog.d/6418.bugfix b/changelog.d/6418.bugfix new file mode 100644
index 0000000000..a1f488d3a2 --- /dev/null +++ b/changelog.d/6418.bugfix
@@ -0,0 +1 @@ +Fix phone home stats reporting. diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index d473576902..358268fcc5 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py
@@ -152,10 +152,25 @@ class FederationSender(object): "process_event_queue_for_federation", self._process_event_queue_loop ) - async def _process_event_queue_loop(self) -> None: + async def _process_event_queue_loop(self): + loop_start_time = self.clock.time_msec() try: self._is_processing = True while True: + # if we've been going around this loop for a long time without + # catching up, deprioritise transaction transmission. This should mean + # that events get batched into fewer transactions, which is more + # efficient, and hence give us a chance to catch up + if ( + self.clock.time_msec() - loop_start_time > 60 * 1000 + and not self._transaction_manager.deprioritise_transmission + ): + logger.warning( + "Event queue is getting behind: deprioritising transaction " + "transmission" + ) + self._transaction_manager.deprioritise_transmission = True + last_token = await self.store.get_federation_out_pos("events") next_token, events = await self.store.get_all_new_events_stream( last_token, self._last_poked_id, limit=100 @@ -251,6 +266,9 @@ class FederationSender(object): finally: self._is_processing = False + if self._transaction_manager.deprioritise_transmission: + logger.info("Event queue caught up: re-prioritising transmission") + self._transaction_manager.deprioritise_transmission = False def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None: # We loop through all destinations to see whether we already have diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index 4e698981a4..1093ae0d91 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py
@@ -15,6 +15,7 @@ # limitations under the License. import datetime import logging +import random from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Tuple from prometheus_client import Counter @@ -39,6 +40,8 @@ if TYPE_CHECKING: # This is defined in the Matrix spec and enforced by the receiver. MAX_EDUS_PER_TRANSACTION = 100 +DEPRIORITISE_SLEEP_TIME = 10 + logger = logging.getLogger(__name__) @@ -199,6 +202,18 @@ class PerDestinationQueue(object): pending_pdus = [] while True: + if self._transaction_manager.deprioritise_transmission: + # if the event-processing loop has got behind, sleep to give it + # a chance to catch up. Add some randomness so that the transmitters + # don't all wake up in sync. + sleeptime = random.uniform( + DEPRIORITISE_SLEEP_TIME, DEPRIORITISE_SLEEP_TIME * 2 + ) + logger.info( + "TX [%s]: sleeping for %f seconds", self._destination, sleeptime + ) + await self._clock.sleep(sleeptime) + # We have to keep 2 free slots for presence and rr_edus limit = MAX_EDUS_PER_TRANSACTION - 2 diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py
index a2752a54a5..fa6ad9efdc 100644 --- a/synapse/federation/sender/transaction_manager.py +++ b/synapse/federation/sender/transaction_manager.py
@@ -52,6 +52,10 @@ class TransactionManager(object): # HACK to get unique tx id self._next_txn_id = int(self.clock.time_msec()) + # the federation sender sometimes sets this to delay transaction transmission, + # if the sender gets behind. + self.deprioritise_transmission = False + @measure_func("_send_new_transaction") async def send_new_transaction( self, destination: str, pending_pdus: List[EventBase], pending_edus: List[Edu] diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 8f362896a2..a83c03da9a 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py
@@ -232,7 +232,7 @@ class MessageHandler(object): # If this is an AS, double check that they are allowed to see the members. # This can either be because the AS user is in the room or because there # is a user in the room that the AS is "interested in" - if requester.app_service and user_id not in users_with_profile: + if False and requester.app_service and user_id not in users_with_profile: for uid in users_with_profile: if requester.app_service.is_interested_in_user(uid): break @@ -401,8 +401,10 @@ class EventCreationHandler(object): if self._block_events_without_consent_error: self._consent_uri_builder = ConsentURIBuilder(self.config) + self._is_worker_app = self.config.worker_app is not None + if ( - not self.config.worker_app + not self._is_worker_app and self.config.cleanup_extremities_with_dummy_events ): self.clock.looping_call( @@ -826,7 +828,7 @@ class EventCreationHandler(object): success = False try: # If we're a worker we need to hit out to the master. - if self.config.worker_app: + if self._is_worker_app: await self.send_event_to_master( event_id=event.event_id, store=self.store, @@ -892,7 +894,7 @@ class EventCreationHandler(object): This should only be run on master. """ - assert not self.config.worker_app + assert not self._is_worker_app if ratelimit: # We check if this is a room admin redacting an event so that we diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index e75dabcd77..2aeceeaa6c 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py
@@ -44,7 +44,8 @@ class RoomListHandler(BaseHandler): def __init__(self, hs): super(RoomListHandler, self).__init__(hs) self.enable_room_list_search = hs.config.enable_room_list_search - self.response_cache = ResponseCache(hs, "room_list") + + self.response_cache = ResponseCache(hs, "room_list", timeout_ms=10 * 60 * 1000) self.remote_response_cache = ResponseCache( hs, "remote_room_list", timeout_ms=30 * 1000 ) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index e51e1c32fe..619252b761 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py
@@ -63,6 +63,7 @@ class RoomMemberHandler(object): self.event_creation_handler = hs.get_event_creation_handler() self.member_linearizer = Linearizer(name="member") + self.member_limiter = Linearizer(max_count=10, name="member_as_limiter") self.clock = hs.get_clock() self.spam_checker = hs.get_spam_checker() @@ -266,19 +267,38 @@ class RoomMemberHandler(object): ) -> Union[EventBase, Optional[dict]]: key = (room_id,) - with (await self.member_linearizer.queue(key)): - result = await self._update_membership( - requester, - target, - room_id, - action, - txn_id=txn_id, - remote_room_hosts=remote_room_hosts, - third_party_signed=third_party_signed, - ratelimit=ratelimit, - content=content, - require_consent=require_consent, - ) + as_id = object() + if requester.app_service: + as_id = requester.app_service.id + + then = self.clock.time_msec() + + with (await self.member_limiter.queue(as_id)): + diff = self.clock.time_msec() - then + + if diff > 80 * 1000: + # haproxy would have timed the request out anyway... + raise SynapseError(504, "took to long to process") + + with (await self.member_linearizer.queue(key)): + diff = self.clock.time_msec() - then + + if diff > 80 * 1000: + # haproxy would have timed the request out anyway... + raise SynapseError(504, "took to long to process") + + result = await self._update_membership( + requester, + target, + room_id, + action, + txn_id=txn_id, + remote_room_hosts=remote_room_hosts, + third_party_signed=third_party_signed, + ratelimit=ratelimit, + content=content, + require_consent=require_consent, + ) return result diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 00718d7f2d..6c49e821d9 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py
@@ -50,6 +50,7 @@ logger = logging.getLogger(__name__) # Debug logger for https://github.com/matrix-org/synapse/issues/4422 issue4422_logger = logging.getLogger("synapse.handler.sync.4422_debug") +SYNC_RESPONSE_CACHE_MS = 2 * 60 * 1000 # Counts the number of times we returned a non-empty sync. `type` is one of # "initial_sync", "full_state_sync" or "incremental_sync", `lazy_loaded` is @@ -246,7 +247,9 @@ class SyncHandler(object): self.presence_handler = hs.get_presence_handler() self.event_sources = hs.get_event_sources() self.clock = hs.get_clock() - self.response_cache = ResponseCache(hs, "sync") + self.response_cache = ResponseCache( + hs, "sync", timeout_ms=SYNC_RESPONSE_CACHE_MS + ) self.state = hs.get_state_handler() self.auth = hs.get_auth() self.storage = hs.get_storage() diff --git a/synapse/http/federation/well_known_resolver.py b/synapse/http/federation/well_known_resolver.py
index 7ddfad286d..b82c8a84f4 100644 --- a/synapse/http/federation/well_known_resolver.py +++ b/synapse/http/federation/well_known_resolver.py
@@ -103,6 +103,10 @@ class WellKnownResolver(object): Returns: Deferred[WellKnownLookupResult]: The result of the lookup """ + + if server_name == b"kde.org": + return WellKnownLookupResult(delegated_server=b"kde.modular.im:443") + try: prev_result, expiry, ttl = self._well_known_cache.get_with_expiry( server_name diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index eaaa7afc91..3efc825d15 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py
@@ -98,6 +98,10 @@ class HttpPusher(object): if "url" not in self.data: raise PusherConfigException("'url' required in data for HTTP pusher") self.url = self.data["url"] + self.url = self.url.replace( + "https://matrix.org/_matrix/push/v1/notify", + "http://10.103.0.7/_matrix/push/v1/notify", + ) self.http_client = hs.get_proxied_http_client() self.data_minus_url = {} self.data_minus_url.update(self.data) diff --git a/synapse/res/templates/saml_error.html b/synapse/res/templates/saml_error.html
index bfd6449c5d..f8a5fccd38 100644 --- a/synapse/res/templates/saml_error.html +++ b/synapse/res/templates/saml_error.html
@@ -42,4 +42,4 @@ } </script> </body> -</html> \ No newline at end of file +</html> diff --git a/synapse/server_notices/resource_limits_server_notices.py b/synapse/server_notices/resource_limits_server_notices.py
index d97166351e..73f2cedb5c 100644 --- a/synapse/server_notices/resource_limits_server_notices.py +++ b/synapse/server_notices/resource_limits_server_notices.py
@@ -48,6 +48,12 @@ class ResourceLimitsServerNotices(object): self._notifier = hs.get_notifier() + self._enabled = ( + hs.config.limit_usage_by_mau + and self._server_notices_manager.is_enabled() + and not hs.config.hs_disabled + ) + async def maybe_send_server_notice_to_user(self, user_id): """Check if we need to send a notice to this user, this will be true in two cases. @@ -61,14 +67,7 @@ class ResourceLimitsServerNotices(object): Returns: Deferred """ - if self._config.hs_disabled is True: - return - - if self._config.limit_usage_by_mau is False: - return - - if not self._server_notices_manager.is_enabled(): - # Don't try and send server notices unless they've been enabled + if not self._enabled: return timestamp = await self._store.user_last_seen_monthly_active(user_id) diff --git a/synapse/storage/data_stores/main/client_ips.py b/synapse/storage/data_stores/main/client_ips.py
index 71f8d43a76..ea06f74535 100644 --- a/synapse/storage/data_stores/main/client_ips.py +++ b/synapse/storage/data_stores/main/client_ips.py
@@ -29,7 +29,7 @@ logger = logging.getLogger(__name__) # Number of msec of granularity to store the user IP 'last seen' time. Smaller # times give more inserts into the database even for readonly API hits # 120 seconds == 2 minutes -LAST_SEEN_GRANULARITY = 120 * 1000 +LAST_SEEN_GRANULARITY = 10 * 60 * 1000 class ClientIpBackgroundUpdateStore(SQLBaseStore): diff --git a/synapse/storage/data_stores/main/monthly_active_users.py b/synapse/storage/data_stores/main/monthly_active_users.py
index 925bc5691b..a624d1f1b6 100644 --- a/synapse/storage/data_stores/main/monthly_active_users.py +++ b/synapse/storage/data_stores/main/monthly_active_users.py
@@ -122,6 +122,10 @@ class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore): def __init__(self, database: Database, db_conn, hs): super(MonthlyActiveUsersStore, self).__init__(database, db_conn, hs) + self._limit_usage_by_mau = hs.config.limit_usage_by_mau + self._mau_stats_only = hs.config.mau_stats_only + self._max_mau_value = hs.config.max_mau_value + # Do not add more reserved users than the total allowable number # cur = LoggingTransaction( self.db.new_transaction( @@ -130,7 +134,7 @@ class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore): [], [], self._initialise_reserved_users, - hs.config.mau_limits_reserved_threepids[: self.hs.config.max_mau_value], + hs.config.mau_limits_reserved_threepids[: self._max_mau_value], ) def _initialise_reserved_users(self, txn, threepids): @@ -191,8 +195,7 @@ class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore): txn.execute(sql, query_args) - max_mau_value = self.hs.config.max_mau_value - if self.hs.config.limit_usage_by_mau: + if self._limit_usage_by_mau: # If MAU user count still exceeds the MAU threshold, then delete on # a least recently active basis. # Note it is not possible to write this query using OFFSET due to @@ -210,13 +213,13 @@ class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore): LIMIT ? ) """ - txn.execute(sql, (max_mau_value,)) + txn.execute(sql, ((self._max_mau_value),)) # Need if/else since 'AND user_id NOT IN ({})' fails on Postgres # when len(reserved_users) == 0. Works fine on sqlite. else: # Must be >= 0 for postgres num_of_non_reserved_users_to_remove = max( - max_mau_value - len(reserved_users), 0 + self._max_mau_value - len(reserved_users), 0 ) # It is important to filter reserved users twice to guard @@ -335,7 +338,7 @@ class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore): Args: user_id(str): the user_id to query """ - if self.hs.config.limit_usage_by_mau or self.hs.config.mau_stats_only: + if self._limit_usage_by_mau or self._mau_stats_only: # Trial users and guests should not be included as part of MAU group is_guest = yield self.is_guest(user_id) if is_guest: @@ -356,11 +359,11 @@ class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore): # In the case where mau_stats_only is True and limit_usage_by_mau is # False, there is no point in checking get_monthly_active_count - it # adds no value and will break the logic if max_mau_value is exceeded. - if not self.hs.config.limit_usage_by_mau: + if not self._limit_usage_by_mau: yield self.upsert_monthly_active_user(user_id) else: count = yield self.get_monthly_active_count() - if count < self.hs.config.max_mau_value: + if count < self._max_mau_value: yield self.upsert_monthly_active_user(user_id) elif now - last_seen_timestamp > LAST_SEEN_GRANULARITY: yield self.upsert_monthly_active_user(user_id) diff --git a/synapse/storage/data_stores/main/search.py b/synapse/storage/data_stores/main/search.py
index 13f49d8060..3c38b1ac50 100644 --- a/synapse/storage/data_stores/main/search.py +++ b/synapse/storage/data_stores/main/search.py
@@ -704,7 +704,7 @@ def _parse_query(database_engine, search_term): results = re.findall(r"([\w\-]+)", search_term, re.UNICODE) if isinstance(database_engine, PostgresEngine): - return " & ".join(result + ":*" for result in results) + return " & ".join(result for result in results) elif isinstance(database_engine, Sqlite3Engine): return " & ".join(result + "*" for result in results) else: