diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 94cc63001e..86a611c49c 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -153,10 +153,25 @@ class FederationSender(object):
"process_event_queue_for_federation", self._process_event_queue_loop
)
- async def _process_event_queue_loop(self) -> None:
+ async def _process_event_queue_loop(self):
+ loop_start_time = self.clock.time_msec()
try:
self._is_processing = True
while True:
+ # if we've been going around this loop for a long time without
+ # catching up, deprioritise transaction transmission. This should mean
+ # that events get batched into fewer transactions, which is more
+ # efficient, and hence give us a chance to catch up
+ if (
+ self.clock.time_msec() - loop_start_time > 60 * 1000
+ and not self._transaction_manager.deprioritise_transmission
+ ):
+ logger.warning(
+ "Event queue is getting behind: deprioritising transaction "
+ "transmission"
+ )
+ self._transaction_manager.deprioritise_transmission = True
+
last_token = await self.store.get_federation_out_pos("events")
next_token, events = await self.store.get_all_new_events_stream(
last_token, self._last_poked_id, limit=100
@@ -266,6 +281,9 @@ class FederationSender(object):
finally:
self._is_processing = False
+ if self._transaction_manager.deprioritise_transmission:
+ logger.info("Event queue caught up: re-prioritising transmission")
+ self._transaction_manager.deprioritise_transmission = False
def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None:
# We loop through all destinations to see whether we already have
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index dd150f89a6..2fb1782f33 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -15,6 +15,7 @@
# limitations under the License.
import datetime
import logging
+import random
from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Tuple
from prometheus_client import Counter
@@ -39,6 +40,8 @@ if TYPE_CHECKING:
# This is defined in the Matrix spec and enforced by the receiver.
MAX_EDUS_PER_TRANSACTION = 100
+DEPRIORITISE_SLEEP_TIME = 10
+
logger = logging.getLogger(__name__)
@@ -221,6 +224,18 @@ class PerDestinationQueue(object):
pending_pdus = []
while True:
+ if self._transaction_manager.deprioritise_transmission:
+ # if the event-processing loop has got behind, sleep to give it
+ # a chance to catch up. Add some randomness so that the transmitters
+ # don't all wake up in sync.
+ sleeptime = random.uniform(
+ DEPRIORITISE_SLEEP_TIME, DEPRIORITISE_SLEEP_TIME * 2
+ )
+ logger.info(
+ "TX [%s]: sleeping for %f seconds", self._destination, sleeptime
+ )
+ await self._clock.sleep(sleeptime)
+
# We have to keep 2 free slots for presence and rr_edus
limit = MAX_EDUS_PER_TRANSACTION - 2
diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py
index c7f6cb3d73..a66a24b392 100644
--- a/synapse/federation/sender/transaction_manager.py
+++ b/synapse/federation/sender/transaction_manager.py
@@ -52,6 +52,10 @@ class TransactionManager(object):
# HACK to get unique tx id
self._next_txn_id = int(self.clock.time_msec())
+ # the federation sender sometimes sets this to delay transaction transmission,
+ # if the sender gets behind.
+ self.deprioritise_transmission = False
+
@measure_func("_send_new_transaction")
async def send_new_transaction(
self,
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 2643438e84..73e787f2f7 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -246,7 +246,7 @@ class MessageHandler(object):
# If this is an AS, double check that they are allowed to see the members.
# This can either be because the AS user is in the room or because there
# is a user in the room that the AS is "interested in"
- if requester.app_service and user_id not in users_with_profile:
+ if False and requester.app_service and user_id not in users_with_profile:
for uid in users_with_profile:
if requester.app_service.is_interested_in_user(uid):
break
@@ -415,8 +415,10 @@ class EventCreationHandler(object):
if self._block_events_without_consent_error:
self._consent_uri_builder = ConsentURIBuilder(self.config)
+ self._is_worker_app = self.config.worker_app is not None
+
if (
- not self.config.worker_app
+ not self._is_worker_app
and self.config.cleanup_extremities_with_dummy_events
):
self.clock.looping_call(
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index 5dd7b28391..0d678eee17 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -40,7 +40,8 @@ class RoomListHandler(BaseHandler):
def __init__(self, hs):
super(RoomListHandler, self).__init__(hs)
self.enable_room_list_search = hs.config.enable_room_list_search
- self.response_cache = ResponseCache(hs, "room_list")
+
+ self.response_cache = ResponseCache(hs, "room_list", timeout_ms=10 * 60 * 1000)
self.remote_response_cache = ResponseCache(
hs, "remote_room_list", timeout_ms=30 * 1000
)
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 31705cdbdb..4634f4df9d 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -67,6 +67,7 @@ class RoomMemberHandler(object):
self.event_creation_handler = hs.get_event_creation_handler()
self.member_linearizer = Linearizer(name="member")
+ self.member_limiter = Linearizer(max_count=10, name="member_as_limiter")
self.clock = hs.get_clock()
self.spam_checker = hs.get_spam_checker()
@@ -287,19 +288,38 @@ class RoomMemberHandler(object):
) -> Tuple[str, int]:
key = (room_id,)
- with (await self.member_linearizer.queue(key)):
- result = await self._update_membership(
- requester,
- target,
- room_id,
- action,
- txn_id=txn_id,
- remote_room_hosts=remote_room_hosts,
- third_party_signed=third_party_signed,
- ratelimit=ratelimit,
- content=content,
- require_consent=require_consent,
- )
+ as_id = object()
+ if requester.app_service:
+ as_id = requester.app_service.id
+
+ then = self.clock.time_msec()
+
+ with (await self.member_limiter.queue(as_id)):
+ diff = self.clock.time_msec() - then
+
+ if diff > 80 * 1000:
+ # haproxy would have timed the request out anyway...
+ raise SynapseError(504, "took to long to process")
+
+ with (await self.member_linearizer.queue(key)):
+ diff = self.clock.time_msec() - then
+
+ if diff > 80 * 1000:
+ # haproxy would have timed the request out anyway...
+ raise SynapseError(504, "took to long to process")
+
+ result = await self._update_membership(
+ requester,
+ target,
+ room_id,
+ action,
+ txn_id=txn_id,
+ remote_room_hosts=remote_room_hosts,
+ third_party_signed=third_party_signed,
+ ratelimit=ratelimit,
+ content=content,
+ require_consent=require_consent,
+ )
return result
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index c42dac18f5..e4932a1939 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -48,6 +48,7 @@ logger = logging.getLogger(__name__)
# Debug logger for https://github.com/matrix-org/synapse/issues/4422
issue4422_logger = logging.getLogger("synapse.handler.sync.4422_debug")
+SYNC_RESPONSE_CACHE_MS = 2 * 60 * 1000
# Counts the number of times we returned a non-empty sync. `type` is one of
# "initial_sync", "full_state_sync" or "incremental_sync", `lazy_loaded` is
@@ -244,7 +245,9 @@ class SyncHandler(object):
self.presence_handler = hs.get_presence_handler()
self.event_sources = hs.get_event_sources()
self.clock = hs.get_clock()
- self.response_cache = ResponseCache(hs, "sync")
+ self.response_cache = ResponseCache(
+ hs, "sync", timeout_ms=SYNC_RESPONSE_CACHE_MS
+ )
self.state = hs.get_state_handler()
self.auth = hs.get_auth()
self.storage = hs.get_storage()
diff --git a/synapse/http/federation/well_known_resolver.py b/synapse/http/federation/well_known_resolver.py
index 89a3b041ce..3a64ecf88d 100644
--- a/synapse/http/federation/well_known_resolver.py
+++ b/synapse/http/federation/well_known_resolver.py
@@ -110,6 +110,10 @@ class WellKnownResolver(object):
Returns:
Deferred[WellKnownLookupResult]: The result of the lookup
"""
+
+ if server_name == b"kde.org":
+ return WellKnownLookupResult(delegated_server=b"kde.modular.im:443")
+
try:
prev_result, expiry, ttl = self._well_known_cache.get_with_expiry(
server_name
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 4c469efb20..9acbc9e34d 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -98,6 +98,10 @@ class HttpPusher(object):
if "url" not in self.data:
raise PusherConfigException("'url' required in data for HTTP pusher")
self.url = self.data["url"]
+ self.url = self.url.replace(
+ "https://matrix.org/_matrix/push/v1/notify",
+ "http://10.103.0.7/_matrix/push/v1/notify",
+ )
self.http_client = hs.get_proxied_http_client()
self.data_minus_url = {}
self.data_minus_url.update(self.data)
diff --git a/synapse/storage/databases/main/client_ips.py b/synapse/storage/databases/main/client_ips.py
index 216a5925fc..4e2b2a85ee 100644
--- a/synapse/storage/databases/main/client_ips.py
+++ b/synapse/storage/databases/main/client_ips.py
@@ -26,7 +26,7 @@ logger = logging.getLogger(__name__)
# Number of msec of granularity to store the user IP 'last seen' time. Smaller
# times give more inserts into the database even for readonly API hits
# 120 seconds == 2 minutes
-LAST_SEEN_GRANULARITY = 120 * 1000
+LAST_SEEN_GRANULARITY = 10 * 60 * 1000
class ClientIpBackgroundUpdateStore(SQLBaseStore):
diff --git a/synapse/storage/databases/main/monthly_active_users.py b/synapse/storage/databases/main/monthly_active_users.py
index e71cdd2cb4..1d4db758d4 100644
--- a/synapse/storage/databases/main/monthly_active_users.py
+++ b/synapse/storage/databases/main/monthly_active_users.py
@@ -199,7 +199,7 @@ class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore):
# a least recently active basis.
# Note it is not possible to write this query using OFFSET due to
# incompatibilities in how sqlite and postgres support the feature.
- # Sqlite requires 'LIMIT -1 OFFSET ?', the LIMIT must be present,
+ # Sqlite requires 'LIMIT -1 OFFSET ?', the LIMIT must be presents,
# while Postgres does not require 'LIMIT', but also does not support
# negative LIMIT values. So there is no way to write it that both can
# support
diff --git a/synapse/storage/databases/main/search.py b/synapse/storage/databases/main/search.py
index 7f8d1880e5..dcbdeab36e 100644
--- a/synapse/storage/databases/main/search.py
+++ b/synapse/storage/databases/main/search.py
@@ -703,7 +703,7 @@ def _parse_query(database_engine, search_term):
results = re.findall(r"([\w\-]+)", search_term, re.UNICODE)
if isinstance(database_engine, PostgresEngine):
- return " & ".join(result + ":*" for result in results)
+ return " & ".join(result for result in results)
elif isinstance(database_engine, Sqlite3Engine):
return " & ".join(result + "*" for result in results)
else:
|