diff --git a/synapse/config/server.py b/synapse/config/server.py
index c6d58effd4..6d88231843 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -505,6 +505,9 @@ class ServerConfig(Config):
"cleanup_extremities_with_dummy_events", True
)
+ # The number of forward extremities in a room needed to send a dummy event.
+ self.dummy_events_threshold = config.get("dummy_events_threshold", 10)
+
self.enable_ephemeral_messages = config.get("enable_ephemeral_messages", False)
# Inhibits the /requestToken endpoints from returning an error that might leak
@@ -823,6 +826,18 @@ class ServerConfig(Config):
# bind_addresses: ['::1', '127.0.0.1']
# type: manhole
+ # Forward extremities can build up in a room due to networking delays between
+ # homeservers. Once this happens in a large room, calculation of the state of
+ # that room can become quite expensive. To mitigate this, once the number of
+ # forward extremities reaches a given threshold, Synapse will send an
+ # org.matrix.dummy_event event, which will reduce the forward extremities
+ # in the room.
+ #
+ # This setting defines the threshold (i.e. number of forward extremities in the
+ # room) at which dummy events are sent. The default value is 10.
+ #
+ #dummy_events_threshold: 5
+
## Homeserver blocking ##
diff --git a/synapse/groups/attestations.py b/synapse/groups/attestations.py
index 1eec3874b6..27b0c02655 100644
--- a/synapse/groups/attestations.py
+++ b/synapse/groups/attestations.py
@@ -46,7 +46,6 @@ from twisted.internet import defer
from synapse.api.errors import HttpResponseException, RequestSendFailed, SynapseError
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import get_domain_from_id
-from synapse.util.async_helpers import yieldable_gather_results
logger = logging.getLogger(__name__)
@@ -208,6 +207,5 @@ class GroupAttestionRenewer(object):
"Error renewing attestation of %r in %r", user_id, group_id
)
- await yieldable_gather_results(
- _renew_attestation, ((row["group_id"], row["user_id"]) for row in rows)
- )
+ for row in rows:
+ await _renew_attestation((row["group_id"], row["user_id"]))
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index a324f09340..a622a600b4 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -419,6 +419,8 @@ class EventCreationHandler(object):
self._ephemeral_events_enabled = hs.config.enable_ephemeral_messages
+ self._dummy_events_threshold = hs.config.dummy_events_threshold
+
@defer.inlineCallbacks
def create_event(
self,
@@ -1085,7 +1087,7 @@ class EventCreationHandler(object):
"""
self._expire_rooms_to_exclude_from_dummy_event_insertion()
room_ids = await self.store.get_rooms_with_many_extremities(
- min_count=10,
+ min_count=self._dummy_events_threshold,
limit=5,
room_id_filter=self._rooms_to_exclude_from_dummy_event_insertion.keys(),
)
diff --git a/synapse/logging/context.py b/synapse/logging/context.py
index 856534e91a..8b9c4e38bd 100644
--- a/synapse/logging/context.py
+++ b/synapse/logging/context.py
@@ -431,15 +431,7 @@ class LoggingContext(object):
return
utime_delta, stime_delta = self._get_cputime(rusage)
- self._resource_usage.ru_utime += utime_delta
- self._resource_usage.ru_stime += stime_delta
-
- # if we have a parent, pass our CPU usage stats on
- if self.parent_context:
- self.parent_context._resource_usage += self._resource_usage
-
- # reset them in case we get entered again
- self._resource_usage.reset()
+ self.add_cputime(utime_delta, stime_delta)
finally:
self.usage_start = None
@@ -497,30 +489,52 @@ class LoggingContext(object):
return utime_delta, stime_delta
+ def add_cputime(self, utime_delta: float, stime_delta: float) -> None:
+ """Update the CPU time usage of this context (and any parents, recursively).
+
+ Args:
+ utime_delta: additional user time, in seconds, spent in this context.
+ stime_delta: additional system time, in seconds, spent in this context.
+ """
+ self._resource_usage.ru_utime += utime_delta
+ self._resource_usage.ru_stime += stime_delta
+ if self.parent_context:
+ self.parent_context.add_cputime(utime_delta, stime_delta)
+
def add_database_transaction(self, duration_sec: float) -> None:
+ """Record the use of a database transaction and the length of time it took.
+
+ Args:
+ duration_sec: The number of seconds the database transaction took.
+ """
if duration_sec < 0:
raise ValueError("DB txn time can only be non-negative")
self._resource_usage.db_txn_count += 1
self._resource_usage.db_txn_duration_sec += duration_sec
+ if self.parent_context:
+ self.parent_context.add_database_transaction(duration_sec)
def add_database_scheduled(self, sched_sec: float) -> None:
"""Record a use of the database pool
Args:
- sched_sec (float): number of seconds it took us to get a
- connection
+ sched_sec: number of seconds it took us to get a connection
"""
if sched_sec < 0:
raise ValueError("DB scheduling time can only be non-negative")
self._resource_usage.db_sched_duration_sec += sched_sec
+ if self.parent_context:
+ self.parent_context.add_database_scheduled(sched_sec)
def record_event_fetch(self, event_count: int) -> None:
"""Record a number of events being fetched from the db
Args:
- event_count (int): number of events being fetched
+ event_count: number of events being fetched
"""
self._resource_usage.evt_db_fetch_count += event_count
+ if self.parent_context:
+ self.parent_context.record_event_fetch(event_count)
class LoggingContextFilter(logging.Filter):
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 733c51b758..39c99a2802 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -98,7 +98,9 @@ CONDITIONAL_REQUIREMENTS = {
"sentry": ["sentry-sdk>=0.7.2"],
"opentracing": ["jaeger-client>=4.0.0", "opentracing>=2.2.0"],
"jwt": ["pyjwt>=1.6.4"],
- "redis": ["txredisapi>=1.4.7"],
+ # hiredis is not a *strict* dependency, but it makes things much faster.
+ # (if it is not installed, we fall back to slow code.)
+ "redis": ["txredisapi>=1.4.7", "hiredis"],
}
ALL_OPTIONAL_REQUIREMENTS = set() # type: Set[str]
|