diff --git a/changelog.d/6126.feature b/changelog.d/6126.feature
new file mode 100644
index 0000000000..1207ba6206
--- /dev/null
+++ b/changelog.d/6126.feature
@@ -0,0 +1 @@
+Group events into larger federation transactions at times of high traffic.
diff --git a/changelog.d/6418.bugfix b/changelog.d/6418.bugfix
new file mode 100644
index 0000000000..a1f488d3a2
--- /dev/null
+++ b/changelog.d/6418.bugfix
@@ -0,0 +1 @@
+Fix phone home stats reporting.
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 5b8faea4e7..f3d6b3a1c1 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -150,10 +150,25 @@ class FederationSender(object):
"process_event_queue_for_federation", self._process_event_queue_loop
)
- async def _process_event_queue_loop(self) -> None:
+ async def _process_event_queue_loop(self):
+ loop_start_time = self.clock.time_msec()
try:
self._is_processing = True
while True:
+ # if we've been going around this loop for a long time without
+ # catching up, deprioritise transaction transmission. This should mean
+ # that events get batched into fewer transactions, which is more
+ # efficient, and hence give us a chance to catch up
+ if (
+ self.clock.time_msec() - loop_start_time > 60 * 1000
+ and not self._transaction_manager.deprioritise_transmission
+ ):
+ logger.warning(
+ "Event queue is getting behind: deprioritising transaction "
+ "transmission"
+ )
+ self._transaction_manager.deprioritise_transmission = True
+
last_token = await self.store.get_federation_out_pos("events")
next_token, events = await self.store.get_all_new_events_stream(
last_token, self._last_poked_id, limit=100
@@ -249,6 +264,9 @@ class FederationSender(object):
finally:
self._is_processing = False
+ if self._transaction_manager.deprioritise_transmission:
+ logger.info("Event queue caught up: re-prioritising transmission")
+ self._transaction_manager.deprioritise_transmission = False
def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None:
# We loop through all destinations to see whether we already have
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index 4e698981a4..1093ae0d91 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -15,6 +15,7 @@
# limitations under the License.
import datetime
import logging
+import random
from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Tuple
from prometheus_client import Counter
@@ -39,6 +40,8 @@ if TYPE_CHECKING:
# This is defined in the Matrix spec and enforced by the receiver.
MAX_EDUS_PER_TRANSACTION = 100
+DEPRIORITISE_SLEEP_TIME = 10
+
logger = logging.getLogger(__name__)
@@ -199,6 +202,18 @@ class PerDestinationQueue(object):
pending_pdus = []
while True:
+ if self._transaction_manager.deprioritise_transmission:
+ # if the event-processing loop has got behind, sleep to give it
+ # a chance to catch up. Add some randomness so that the transmitters
+ # don't all wake up in sync.
+ sleeptime = random.uniform(
+ DEPRIORITISE_SLEEP_TIME, DEPRIORITISE_SLEEP_TIME * 2
+ )
+ logger.info(
+ "TX [%s]: sleeping for %f seconds", self._destination, sleeptime
+ )
+ await self._clock.sleep(sleeptime)
+
# We have to keep 2 free slots for presence and rr_edus
limit = MAX_EDUS_PER_TRANSACTION - 2
diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py
index a2752a54a5..fa6ad9efdc 100644
--- a/synapse/federation/sender/transaction_manager.py
+++ b/synapse/federation/sender/transaction_manager.py
@@ -52,6 +52,10 @@ class TransactionManager(object):
# HACK to get unique tx id
self._next_txn_id = int(self.clock.time_msec())
+ # the federation sender sometimes sets this to delay transaction transmission,
+ # if the sender gets behind.
+ self.deprioritise_transmission = False
+
@measure_func("_send_new_transaction")
async def send_new_transaction(
self, destination: str, pending_pdus: List[EventBase], pending_edus: List[Edu]
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 665ad19b5d..70b5cb0f89 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -231,7 +231,7 @@ class MessageHandler(object):
# If this is an AS, double check that they are allowed to see the members.
# This can either be because the AS user is in the room or because there
# is a user in the room that the AS is "interested in"
- if requester.app_service and user_id not in users_with_profile:
+ if False and requester.app_service and user_id not in users_with_profile:
for uid in users_with_profile:
if requester.app_service.is_interested_in_user(uid):
break
@@ -402,8 +402,10 @@ class EventCreationHandler(object):
if self._block_events_without_consent_error:
self._consent_uri_builder = ConsentURIBuilder(self.config)
+ self._is_worker_app = self.config.worker_app is not None
+
if (
- not self.config.worker_app
+ not self._is_worker_app
and self.config.cleanup_extremities_with_dummy_events
):
self.clock.looping_call(
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index 5e05be6181..e3dbbcc052 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -42,7 +42,8 @@ class RoomListHandler(BaseHandler):
def __init__(self, hs):
super(RoomListHandler, self).__init__(hs)
self.enable_room_list_search = hs.config.enable_room_list_search
- self.response_cache = ResponseCache(hs, "room_list")
+
+ self.response_cache = ResponseCache(hs, "room_list", timeout_ms=10 * 60 * 1000)
self.remote_response_cache = ResponseCache(
hs, "remote_room_list", timeout_ms=30 * 1000
)
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 27c479da9e..c0b8742586 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -60,6 +60,7 @@ class RoomMemberHandler(object):
self.event_creation_handler = hs.get_event_creation_handler()
self.member_linearizer = Linearizer(name="member")
+ self.member_limiter = Linearizer(max_count=10, name="member_as_limiter")
self.clock = hs.get_clock()
self.spam_checker = hs.get_spam_checker()
@@ -291,19 +292,38 @@ class RoomMemberHandler(object):
) -> Tuple[Optional[str], int]:
key = (room_id,)
- with (await self.member_linearizer.queue(key)):
- result = await self._update_membership(
- requester,
- target,
- room_id,
- action,
- txn_id=txn_id,
- remote_room_hosts=remote_room_hosts,
- third_party_signed=third_party_signed,
- ratelimit=ratelimit,
- content=content,
- require_consent=require_consent,
- )
+ as_id = object()
+ if requester.app_service:
+ as_id = requester.app_service.id
+
+ then = self.clock.time_msec()
+
+ with (await self.member_limiter.queue(as_id)):
+ diff = self.clock.time_msec() - then
+
+ if diff > 80 * 1000:
+ # haproxy would have timed the request out anyway...
+ raise SynapseError(504, "took to long to process")
+
+ with (await self.member_linearizer.queue(key)):
+ diff = self.clock.time_msec() - then
+
+ if diff > 80 * 1000:
+ # haproxy would have timed the request out anyway...
+ raise SynapseError(504, "took to long to process")
+
+ result = await self._update_membership(
+ requester,
+ target,
+ room_id,
+ action,
+ txn_id=txn_id,
+ remote_room_hosts=remote_room_hosts,
+ third_party_signed=third_party_signed,
+ ratelimit=ratelimit,
+ content=content,
+ require_consent=require_consent,
+ )
return result
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 0b82aa72a6..b7c74d6219 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -48,6 +48,7 @@ logger = logging.getLogger(__name__)
# Debug logger for https://github.com/matrix-org/synapse/issues/4422
issue4422_logger = logging.getLogger("synapse.handler.sync.4422_debug")
+SYNC_RESPONSE_CACHE_MS = 2 * 60 * 1000
# Counts the number of times we returned a non-empty sync. `type` is one of
# "initial_sync", "full_state_sync" or "incremental_sync", `lazy_loaded` is
@@ -244,7 +245,9 @@ class SyncHandler(object):
self.presence_handler = hs.get_presence_handler()
self.event_sources = hs.get_event_sources()
self.clock = hs.get_clock()
- self.response_cache = ResponseCache(hs, "sync")
+ self.response_cache = ResponseCache(
+ hs, "sync", timeout_ms=SYNC_RESPONSE_CACHE_MS
+ )
self.state = hs.get_state_handler()
self.auth = hs.get_auth()
self.storage = hs.get_storage()
diff --git a/synapse/http/federation/well_known_resolver.py b/synapse/http/federation/well_known_resolver.py
index 89a3b041ce..3a64ecf88d 100644
--- a/synapse/http/federation/well_known_resolver.py
+++ b/synapse/http/federation/well_known_resolver.py
@@ -110,6 +110,10 @@ class WellKnownResolver(object):
Returns:
Deferred[WellKnownLookupResult]: The result of the lookup
"""
+
+ if server_name == b"kde.org":
+ return WellKnownLookupResult(delegated_server=b"kde.modular.im:443")
+
try:
prev_result, expiry, ttl = self._well_known_cache.get_with_expiry(
server_name
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index ed60dbc1bf..af7e365e90 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -98,6 +98,10 @@ class HttpPusher(object):
if "url" not in self.data:
raise PusherConfigException("'url' required in data for HTTP pusher")
self.url = self.data["url"]
+ self.url = self.url.replace(
+ "https://matrix.org/_matrix/push/v1/notify",
+ "http://10.103.0.7/_matrix/push/v1/notify",
+ )
self.http_client = hs.get_proxied_http_client()
self.data_minus_url = {}
self.data_minus_url.update(self.data)
diff --git a/synapse/res/templates/saml_error.html b/synapse/res/templates/saml_error.html
index bfd6449c5d..f8a5fccd38 100644
--- a/synapse/res/templates/saml_error.html
+++ b/synapse/res/templates/saml_error.html
@@ -42,4 +42,4 @@
}
</script>
</body>
-</html>
\ No newline at end of file
+</html>
diff --git a/synapse/storage/data_stores/main/client_ips.py b/synapse/storage/data_stores/main/client_ips.py
index 995d4764a9..1c035d51cb 100644
--- a/synapse/storage/data_stores/main/client_ips.py
+++ b/synapse/storage/data_stores/main/client_ips.py
@@ -27,7 +27,7 @@ logger = logging.getLogger(__name__)
# Number of msec of granularity to store the user IP 'last seen' time. Smaller
# times give more inserts into the database even for readonly API hits
# 120 seconds == 2 minutes
-LAST_SEEN_GRANULARITY = 120 * 1000
+LAST_SEEN_GRANULARITY = 10 * 60 * 1000
class ClientIpBackgroundUpdateStore(SQLBaseStore):
diff --git a/synapse/storage/data_stores/main/monthly_active_users.py b/synapse/storage/data_stores/main/monthly_active_users.py
index e459cf49a0..6572f41971 100644
--- a/synapse/storage/data_stores/main/monthly_active_users.py
+++ b/synapse/storage/data_stores/main/monthly_active_users.py
@@ -199,7 +199,7 @@ class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore):
# a least recently active basis.
# Note it is not possible to write this query using OFFSET due to
# incompatibilities in how sqlite and postgres support the feature.
- # Sqlite requires 'LIMIT -1 OFFSET ?', the LIMIT must be present,
+ # Sqlite requires 'LIMIT -1 OFFSET ?', the LIMIT must be presents,
# while Postgres does not require 'LIMIT', but also does not support
# negative LIMIT values. So there is no way to write it that both can
# support
diff --git a/synapse/storage/data_stores/main/search.py b/synapse/storage/data_stores/main/search.py
index a8381dc577..9d6540d142 100644
--- a/synapse/storage/data_stores/main/search.py
+++ b/synapse/storage/data_stores/main/search.py
@@ -702,7 +702,7 @@ def _parse_query(database_engine, search_term):
results = re.findall(r"([\w\-]+)", search_term, re.UNICODE)
if isinstance(database_engine, PostgresEngine):
- return " & ".join(result + ":*" for result in results)
+ return " & ".join(result for result in results)
elif isinstance(database_engine, Sqlite3Engine):
return " & ".join(result + "*" for result in results)
else:
|