summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/6126.feature1
-rw-r--r--changelog.d/6250.misc1
-rw-r--r--changelog.d/6251.misc1
-rw-r--r--synapse/crypto/event_signing.py6
-rw-r--r--synapse/federation/federation_client.py2
-rw-r--r--synapse/federation/sender/__init__.py18
-rw-r--r--synapse/federation/sender/per_destination_queue.py15
-rw-r--r--synapse/federation/sender/transaction_manager.py4
-rw-r--r--synapse/federation/transport/client.py8
-rw-r--r--synapse/handlers/message.py2
-rw-r--r--synapse/handlers/room_list.py3
-rw-r--r--synapse/handlers/room_member.py46
-rw-r--r--synapse/handlers/sync.py5
-rw-r--r--synapse/http/federation/well_known_resolver.py4
-rw-r--r--synapse/push/httppusher.py4
-rw-r--r--synapse/replication/tcp/streams/_base.py2
-rw-r--r--synapse/rest/client/v2_alpha/sync.py11
-rw-r--r--synapse/rest/media/v1/preview_url_resource.py14
-rw-r--r--synapse/storage/data_stores/main/client_ips.py2
-rw-r--r--synapse/storage/data_stores/main/event_federation.py4
-rw-r--r--synapse/storage/data_stores/main/search.py2
-rw-r--r--synapse/storage/data_stores/main/stats.py2
22 files changed, 118 insertions, 39 deletions
diff --git a/changelog.d/6126.feature b/changelog.d/6126.feature
new file mode 100644

index 0000000000..1207ba6206 --- /dev/null +++ b/changelog.d/6126.feature
@@ -0,0 +1 @@ +Group events into larger federation transactions at times of high traffic. diff --git a/changelog.d/6250.misc b/changelog.d/6250.misc new file mode 100644
index 0000000000..12e3fe66b0 --- /dev/null +++ b/changelog.d/6250.misc
@@ -0,0 +1 @@ +Reduce verbosity of user/room stats. diff --git a/changelog.d/6251.misc b/changelog.d/6251.misc new file mode 100644
index 0000000000..371c6983be --- /dev/null +++ b/changelog.d/6251.misc
@@ -0,0 +1 @@ +Reduce impact of debug logging. diff --git a/synapse/crypto/event_signing.py b/synapse/crypto/event_signing.py
index 694fb2c816..ccaa8a9920 100644 --- a/synapse/crypto/event_signing.py +++ b/synapse/crypto/event_signing.py
@@ -125,9 +125,11 @@ def compute_event_signature(event_dict, signature_name, signing_key): redact_json = prune_event_dict(event_dict) redact_json.pop("age_ts", None) redact_json.pop("unsigned", None) - logger.debug("Signing event: %s", encode_canonical_json(redact_json)) + if logger.isEnabledFor(logging.DEBUG): + logger.debug("Signing event: %s", encode_canonical_json(redact_json)) redact_json = sign_json(redact_json, signature_name, signing_key) - logger.debug("Signed event: %s", encode_canonical_json(redact_json)) + if logger.isEnabledFor(logging.DEBUG): + logger.debug("Signed event: %s", encode_canonical_json(redact_json)) return redact_json["signatures"] diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 5b22a39b7f..f5c1632916 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py
@@ -196,7 +196,7 @@ class FederationClient(FederationBase): dest, room_id, extremities, limit ) - logger.debug("backfill transaction_data=%s", repr(transaction_data)) + logger.debug("backfill transaction_data=%r", transaction_data) room_version = yield self.store.get_room_version(room_id) format_ver = room_version_to_event_format(room_version) diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 2b2ee8612a..788b26446d 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py
@@ -152,9 +152,24 @@ class FederationSender(object): @defer.inlineCallbacks def _process_event_queue_loop(self): + loop_start_time = self.clock.time_msec() try: self._is_processing = True while True: + # if we've been going around this loop for a long time without + # catching up, deprioritise transaction transmission. This should mean + # that events get batched into fewer transactions, which is more + # efficient, and hence give us a chance to catch up + if ( + self.clock.time_msec() - loop_start_time > 60 * 1000 + and not self._transaction_manager.deprioritise_transmission + ): + logger.warning( + "Event queue is getting behind: deprioritising transaction " + "transmission" + ) + self._transaction_manager.deprioritise_transmission = True + last_token = yield self.store.get_federation_out_pos("events") next_token, events = yield self.store.get_all_new_events_stream( last_token, self._last_poked_id, limit=100 @@ -252,6 +267,9 @@ class FederationSender(object): finally: self._is_processing = False + if self._transaction_manager.deprioritise_transmission: + logger.info("Event queue caught up: re-prioritising transmission") + self._transaction_manager.deprioritise_transmission = False def _send_pdu(self, pdu, destinations): # We loop through all destinations to see whether we already have diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index cc75c39476..bd217cf69f 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py
@@ -15,6 +15,7 @@ # limitations under the License. import datetime import logging +import random from prometheus_client import Counter @@ -36,6 +37,8 @@ from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter # This is defined in the Matrix spec and enforced by the receiver. MAX_EDUS_PER_TRANSACTION = 100 +DEPRIORITISE_SLEEP_TIME = 10 + logger = logging.getLogger(__name__) @@ -189,6 +192,18 @@ class PerDestinationQueue(object): pending_pdus = [] while True: + if self._transaction_manager.deprioritise_transmission: + # if the event-processing loop has got behind, sleep to give it + # a chance to catch up. Add some randomness so that the transmitters + # don't all wake up in sync. + sleeptime = random.uniform( + DEPRIORITISE_SLEEP_TIME, DEPRIORITISE_SLEEP_TIME * 2 + ) + logger.info( + "TX [%s]: sleeping for %f seconds", self._destination, sleeptime + ) + yield self._clock.sleep(sleeptime) + # We have to keep 2 free slots for presence and rr_edus limit = MAX_EDUS_PER_TRANSACTION - 2 diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py
index 5b6c79c51a..69679dbf65 100644 --- a/synapse/federation/sender/transaction_manager.py +++ b/synapse/federation/sender/transaction_manager.py
@@ -49,6 +49,10 @@ class TransactionManager(object): # HACK to get unique tx id self._next_txn_id = int(self.clock.time_msec()) + # the federation sender sometimes sets this to delay transaction transmission, + # if the sender gets behind. + self.deprioritise_transmission = False + @measure_func("_send_new_transaction") @defer.inlineCallbacks def send_new_transaction(self, destination, pending_pdus, pending_edus): diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 7b18408144..87e5821c04 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py
@@ -122,10 +122,10 @@ class TransportLayerClient(object): Deferred: Results in a dict received from the remote homeserver. """ logger.debug( - "backfill dest=%s, room_id=%s, event_tuples=%s, limit=%s", + "backfill dest=%s, room_id=%s, event_tuples=%r, limit=%s", destination, room_id, - repr(event_tuples), + event_tuples, str(limit), ) @@ -175,7 +175,7 @@ class TransportLayerClient(object): # generated by the json_data_callback. json_data = transaction.get_dict() - path = _create_v1_path("/send/%s", transaction.transaction_id) + path = _create_v1_path("/send/%s/", transaction.transaction_id) response = yield self.client.put_json( transaction.destination, @@ -184,7 +184,7 @@ class TransportLayerClient(object): json_data_callback=json_data_callback, long_retries=True, backoff_on_404=True, # If we get a 404 the other side has gone - try_trailing_slash_on_400=True, + # try_trailing_slash_on_400=True, ) return response diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 0f8cce8ffe..7214699971 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py
@@ -205,7 +205,7 @@ class MessageHandler(object): # If this is an AS, double check that they are allowed to see the members. # This can either be because the AS user is in the room or because there # is a user in the room that the AS is "interested in" - if requester.app_service and user_id not in users_with_profile: + if False and requester.app_service and user_id not in users_with_profile: for uid in users_with_profile: if requester.app_service.is_interested_in_user(uid): break diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index c615206df1..2252a86f77 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py
@@ -43,7 +43,8 @@ class RoomListHandler(BaseHandler): def __init__(self, hs): super(RoomListHandler, self).__init__(hs) self.enable_room_list_search = hs.config.enable_room_list_search - self.response_cache = ResponseCache(hs, "room_list") + + self.response_cache = ResponseCache(hs, "room_list", timeout_ms=10 * 60 * 1000) self.remote_response_cache = ResponseCache( hs, "remote_room_list", timeout_ms=30 * 1000 ) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 380e2fad5e..ab5ed0c7cd 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py
@@ -62,6 +62,7 @@ class RoomMemberHandler(object): self.event_creation_handler = hs.get_event_creation_handler() self.member_linearizer = Linearizer(name="member") + self.member_limiter = Linearizer(max_count=10, name="member_as_limiter") self.clock = hs.get_clock() self.spam_checker = hs.get_spam_checker() @@ -270,19 +271,38 @@ class RoomMemberHandler(object): ): key = (room_id,) - with (yield self.member_linearizer.queue(key)): - result = yield self._update_membership( - requester, - target, - room_id, - action, - txn_id=txn_id, - remote_room_hosts=remote_room_hosts, - third_party_signed=third_party_signed, - ratelimit=ratelimit, - content=content, - require_consent=require_consent, - ) + as_id = object() + if requester.app_service: + as_id = requester.app_service.id + + then = self.clock.time_msec() + + with (yield self.member_limiter.queue(as_id)): + diff = self.clock.time_msec() - then + + if diff > 80 * 1000: + # haproxy would have timed the request out anyway... + raise SynapseError(504, "took to long to process") + + with (yield self.member_linearizer.queue(key)): + diff = self.clock.time_msec() - then + + if diff > 80 * 1000: + # haproxy would have timed the request out anyway... + raise SynapseError(504, "took to long to process") + + result = yield self._update_membership( + requester, + target, + room_id, + action, + txn_id=txn_id, + remote_room_hosts=remote_room_hosts, + third_party_signed=third_party_signed, + ratelimit=ratelimit, + content=content, + require_consent=require_consent, + ) return result diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index d99160e9d7..f1741eaeec 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py
@@ -42,6 +42,7 @@ logger = logging.getLogger(__name__) # Debug logger for https://github.com/matrix-org/synapse/issues/4422 issue4422_logger = logging.getLogger("synapse.handler.sync.4422_debug") +SYNC_RESPONSE_CACHE_MS = 2 * 60 * 1000 # Counts the number of times we returned a non-empty sync. `type` is one of # "initial_sync", "full_state_sync" or "incremental_sync", `lazy_loaded` is @@ -227,7 +228,9 @@ class SyncHandler(object): self.presence_handler = hs.get_presence_handler() self.event_sources = hs.get_event_sources() self.clock = hs.get_clock() - self.response_cache = ResponseCache(hs, "sync") + self.response_cache = ResponseCache( + hs, "sync", timeout_ms=SYNC_RESPONSE_CACHE_MS + ) self.state = hs.get_state_handler() self.auth = hs.get_auth() diff --git a/synapse/http/federation/well_known_resolver.py b/synapse/http/federation/well_known_resolver.py
index 7ddfad286d..b82c8a84f4 100644 --- a/synapse/http/federation/well_known_resolver.py +++ b/synapse/http/federation/well_known_resolver.py
@@ -103,6 +103,10 @@ class WellKnownResolver(object): Returns: Deferred[WellKnownLookupResult]: The result of the lookup """ + + if server_name == b"kde.org": + return WellKnownLookupResult(delegated_server=b"kde.modular.im:443") + try: prev_result, expiry, ttl = self._well_known_cache.get_with_expiry( server_name diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 6299587808..5a3e3812e0 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py
@@ -102,6 +102,10 @@ class HttpPusher(object): if "url" not in self.data: raise PusherConfigException("'url' required in data for HTTP pusher") self.url = self.data["url"] + self.url = self.url.replace( + "https://matrix.org/_matrix/push/v1/notify", + "http://10.101.0.14/_matrix/push/v1/notify", + ) self.http_client = hs.get_simple_http_client() self.data_minus_url = {} self.data_minus_url.update(self.data) diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index f03111c259..b4572c35b2 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py
@@ -24,7 +24,7 @@ from twisted.internet import defer logger = logging.getLogger(__name__) -MAX_EVENTS_BEHIND = 10000 +MAX_EVENTS_BEHIND = 500000 BackfillStreamRow = namedtuple( "BackfillStreamRow", diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py
index a883c8adda..541a6b0e10 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py
@@ -112,9 +112,14 @@ class SyncRestServlet(RestServlet): full_state = parse_boolean(request, "full_state", default=False) logger.debug( - "/sync: user=%r, timeout=%r, since=%r," - " set_presence=%r, filter_id=%r, device_id=%r" - % (user, timeout, since, set_presence, filter_id, device_id) + "/sync: user=%r, timeout=%r, since=%r, " + "set_presence=%r, filter_id=%r, device_id=%r", + user, + timeout, + since, + set_presence, + filter_id, + device_id, ) request_key = (user, timeout, since, filter_id, full_state, device_id) diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py
index ec9c4619c9..999fcc2f04 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py
@@ -120,8 +120,10 @@ class PreviewUrlResource(DirectServeResource): pattern = entry[attrib] value = getattr(url_tuple, attrib) logger.debug( - ("Matching attrib '%s' with value '%s' against" " pattern '%s'") - % (attrib, value, pattern) + "Matching attrib '%s' with value '%s' against" " pattern '%s'", + attrib, + value, + pattern, ) if value is None: @@ -189,7 +191,7 @@ class PreviewUrlResource(DirectServeResource): media_info = yield self._download_url(url, user) - logger.debug("got media_info of '%s'" % media_info) + logger.debug("got media_info of '%s'", media_info) if _is_media(media_info["media_type"]): file_id = media_info["filesystem_id"] @@ -257,7 +259,7 @@ class PreviewUrlResource(DirectServeResource): og["og:image:width"] = dims["width"] og["og:image:height"] = dims["height"] else: - logger.warn("Couldn't get dims for %s" % og["og:image"]) + logger.warn("Couldn't get dims for %s", og["og:image"]) og["og:image"] = "mxc://%s/%s" % ( self.server_name, @@ -283,7 +285,7 @@ class PreviewUrlResource(DirectServeResource): for k in keys_to_remove: del og[k] - logger.debug("Calculated OG for %s as %s" % (url, og)) + logger.debug("Calculated OG for %s as %s", url, og) jsonog = json.dumps(og) @@ -312,7 +314,7 @@ class PreviewUrlResource(DirectServeResource): with self.media_storage.store_into_file(file_info) as (f, fname, finish): try: - logger.debug("Trying to get url '%s'" % url) + logger.debug("Trying to get url '%s'", url) length, headers, uri, code = yield self.client.get_file( url, output_stream=f, max_size=self.max_spider_size ) diff --git a/synapse/storage/data_stores/main/client_ips.py b/synapse/storage/data_stores/main/client_ips.py
index 706c6a1f3f..62b8e06fb4 100644 --- a/synapse/storage/data_stores/main/client_ips.py +++ b/synapse/storage/data_stores/main/client_ips.py
@@ -29,7 +29,7 @@ logger = logging.getLogger(__name__) # Number of msec of granularity to store the user IP 'last seen' time. Smaller # times give more inserts into the database even for readonly API hits # 120 seconds == 2 minutes -LAST_SEEN_GRANULARITY = 120 * 1000 +LAST_SEEN_GRANULARITY = 10 * 60 * 1000 class ClientIpBackgroundUpdateStore(background_updates.BackgroundUpdateStore): diff --git a/synapse/storage/data_stores/main/event_federation.py b/synapse/storage/data_stores/main/event_federation.py
index a470a48e0f..90bef0cd2c 100644 --- a/synapse/storage/data_stores/main/event_federation.py +++ b/synapse/storage/data_stores/main/event_federation.py
@@ -364,9 +364,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas ) def _get_backfill_events(self, txn, room_id, event_list, limit): - logger.debug( - "_get_backfill_events: %s, %s, %s", room_id, repr(event_list), limit - ) + logger.debug("_get_backfill_events: %s, %r, %s", room_id, event_list, limit) event_results = set() diff --git a/synapse/storage/data_stores/main/search.py b/synapse/storage/data_stores/main/search.py
index 0e08497452..e9aa95f3e7 100644 --- a/synapse/storage/data_stores/main/search.py +++ b/synapse/storage/data_stores/main/search.py
@@ -704,7 +704,7 @@ def _parse_query(database_engine, search_term): results = re.findall(r"([\w\-]+)", search_term, re.UNICODE) if isinstance(database_engine, PostgresEngine): - return " & ".join(result + ":*" for result in results) + return " & ".join(result for result in results) elif isinstance(database_engine, Sqlite3Engine): return " & ".join(result + "*" for result in results) else: diff --git a/synapse/storage/data_stores/main/stats.py b/synapse/storage/data_stores/main/stats.py
index 5ab639b2ad..4d59b7833f 100644 --- a/synapse/storage/data_stores/main/stats.py +++ b/synapse/storage/data_stores/main/stats.py
@@ -332,7 +332,7 @@ class StatsStore(StateDeltasStore): def _bulk_update_stats_delta_txn(txn): for stats_type, stats_updates in updates.items(): for stats_id, fields in stats_updates.items(): - logger.info( + logger.debug( "Updating %s stats for %s: %s", stats_type, stats_id, fields ) self._update_stats_delta_txn(