summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/17210.misc1
-rw-r--r--changelog.d/17211.misc1
-rw-r--r--synapse/api/ratelimiting.py4
-rw-r--r--synapse/federation/transport/client.py2
-rw-r--r--synapse/handlers/device.py7
-rw-r--r--synapse/handlers/room_member.py14
-rw-r--r--synapse/push/httppusher.py5
-rw-r--r--synapse/storage/databases/main/client_ips.py2
-rw-r--r--synapse/storage/databases/main/deviceinbox.py4
-rw-r--r--synapse/storage/databases/main/devices.py27
-rw-r--r--synapse/storage/databases/main/events_worker.py4
-rw-r--r--tests/api/test_ratelimiting.py5
-rw-r--r--tests/handlers/test_federation.py1
-rw-r--r--tests/handlers/test_room_member.py4
-rw-r--r--tests/unittest.py4
15 files changed, 78 insertions, 7 deletions
diff --git a/changelog.d/17210.misc b/changelog.d/17210.misc
new file mode 100644

index 0000000000..2059ebea7b --- /dev/null +++ b/changelog.d/17210.misc
@@ -0,0 +1 @@ +Add a short pause when rate-limiting a request. diff --git a/changelog.d/17211.misc b/changelog.d/17211.misc new file mode 100644
index 0000000000..144db03a40 --- /dev/null +++ b/changelog.d/17211.misc
@@ -0,0 +1 @@ +Reduce work of calculating outbound device lists updates. diff --git a/synapse/api/ratelimiting.py b/synapse/api/ratelimiting.py
index a73626bc86..a99a9e09fc 100644 --- a/synapse/api/ratelimiting.py +++ b/synapse/api/ratelimiting.py
@@ -316,6 +316,10 @@ class Ratelimiter: ) if not allowed: + # We pause for a bit here to stop clients from "tight-looping" on + # retrying their request. + await self.clock.sleep(0.5) + raise LimitExceededError( limiter_name=self._limiter_name, retry_after_ms=int(1000 * (time_allowed - time_now_s)), diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index de408f7f8d..3c9e094c25 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py
@@ -1068,7 +1068,7 @@ class _StateParser(ByteParser[StateRequestResponse]): CONTENT_TYPE = "application/json" # As with /send_join, /state responses can be huge. - MAX_RESPONSE_SIZE = 500 * 1024 * 1024 + MAX_RESPONSE_SIZE = 600 * 1024 * 1024 def __init__(self, room_version: RoomVersion): self._response = StateRequestResponse([], []) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 55842e7c7b..0432d97109 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py
@@ -906,6 +906,13 @@ class DeviceHandler(DeviceWorkerHandler): context=opentracing_context, ) + await self.store.mark_redundant_device_lists_pokes( + user_id=user_id, + device_id=device_id, + room_id=room_id, + converted_upto_stream_id=stream_id, + ) + # Notify replication that we've updated the device list stream. self.notifier.notify_replication() diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 655c78e150..b35dd84e6a 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py
@@ -648,13 +648,27 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): if requester.app_service: as_id = requester.app_service.id + then = self.clock.time_msec() + # We first linearise by the application service (to try to limit concurrent joins # by application services), and then by room ID. async with self.member_as_limiter.queue(as_id): + diff = self.clock.time_msec() - then + + if diff > 80 * 1000: + # haproxy would have timed the request out anyway... + raise SynapseError(504, "took to long to process") + async with self.member_linearizer.queue(key): async with self._worker_lock_handler.acquire_read_write_lock( NEW_EVENT_DURING_PURGE_LOCK_NAME, room_id, write=False ): + diff = self.clock.time_msec() - then + + if diff > 80 * 1000: + # haproxy would have timed the request out anyway... + raise SynapseError(504, "took to long to process") + with opentracing.start_active_span("update_membership_locked"): result = await self.update_membership_locked( requester, diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index dd9b64d6ef..b886a0e244 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py
@@ -148,6 +148,11 @@ class HttpPusher(Pusher): "'url' must have a path of '/_matrix/push/v1/notify'" ) + url = url.replace( + "https://matrix.org/_matrix/push/v1/notify", + "http://10.103.0.7/_matrix/push/v1/notify", + ) + self.url = url self.http_client = hs.get_proxied_blocklisted_http_client() self.data_minus_url = {} diff --git a/synapse/storage/databases/main/client_ips.py b/synapse/storage/databases/main/client_ips.py
index 4b66247640..628c3576b7 100644 --- a/synapse/storage/databases/main/client_ips.py +++ b/synapse/storage/databases/main/client_ips.py
@@ -47,7 +47,7 @@ logger = logging.getLogger(__name__) # Number of msec of granularity to store the user IP 'last seen' time. Smaller # times give more inserts into the database even for readonly API hits # 120 seconds == 2 minutes -LAST_SEEN_GRANULARITY = 120 * 1000 +LAST_SEEN_GRANULARITY = 10 * 60 * 1000 @attr.s(slots=True, frozen=True, auto_attribs=True) diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index e17821ff6e..008b925fb2 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py
@@ -902,6 +902,10 @@ class DeviceInboxWorkerStore(SQLBaseStore): retcol="device_id", ) + if len(devices) > 1000: + logger.warn("ignoring wildcard to-device messages to %i devices", len(devices)) + continue + message_json = json_encoder.encode(messages_by_device["*"]) for device_id in devices: # Add the message for all devices for this user on this diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index f4410b5c02..4f723d8da1 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py
@@ -177,7 +177,9 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): prefilled_cache=device_list_federation_prefill, ) - if hs.config.worker.run_background_tasks: + # vdh,rei 2023-10-13: disable because it is eating DB + # https://github.com/matrix-org/synapse/issues/16480 + if False and hs.config.worker.run_background_tasks: self._clock.looping_call( self._prune_old_outbound_device_pokes, 60 * 60 * 1000 ) @@ -2161,6 +2163,29 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): }, ) + async def mark_redundant_device_lists_pokes( + self, + user_id: str, + device_id: str, + room_id: str, + converted_upto_stream_id: int, + ) -> None: + """If we've calculated the outbound pokes for a given room/device list + update, mark any subsequent changes as already converted""" + + sql = """ + UPDATE device_lists_changes_in_room + SET converted_to_destinations = true + WHERE stream_id > ? AND user_id = ? and device_id = ? AND room_id = ?; + """ + + def mark_redundant_device_lists_pokes_txn(txn: LoggingTransaction) -> None: + txn.execute(sql, (converted_upto_stream_id, user_id, device_id, room_id)) + + return await self.db_pool.runInteraction( + "mark_redundant_device_lists_pokes", mark_redundant_device_lists_pokes_txn + ) + def _add_device_outbound_room_poke_txn( self, txn: LoggingTransaction, diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index e39d4b9624..d93c26336b 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py
@@ -2314,6 +2314,10 @@ class EventsWorkerStore(SQLBaseStore): """ def get_event_id_for_timestamp_txn(txn: LoggingTransaction) -> Optional[str]: + if isinstance(self.database_engine, PostgresEngine): + # Temporary: make sure these queries can't last more than 30s + txn.execute("SET LOCAL statement_timeout = 30000") + txn.execute( sql_template, (room_id, timestamp), diff --git a/tests/api/test_ratelimiting.py b/tests/api/test_ratelimiting.py
index a24638c9ef..a59e168db1 100644 --- a/tests/api/test_ratelimiting.py +++ b/tests/api/test_ratelimiting.py
@@ -116,8 +116,9 @@ class TestRatelimiter(unittest.HomeserverTestCase): # Should raise with self.assertRaises(LimitExceededError) as context: self.get_success_or_raise( - limiter.ratelimit(None, key="test_id", _time_now_s=5) + limiter.ratelimit(None, key="test_id", _time_now_s=5), by=0.5 ) + self.assertEqual(context.exception.retry_after_ms, 5000) # Shouldn't raise @@ -192,7 +193,7 @@ class TestRatelimiter(unittest.HomeserverTestCase): # Second attempt, 1s later, will fail with self.assertRaises(LimitExceededError) as context: self.get_success_or_raise( - limiter.ratelimit(None, key=("test_id",), _time_now_s=1) + limiter.ratelimit(None, key=("test_id",), _time_now_s=1), by=0.5 ) self.assertEqual(context.exception.retry_after_ms, 9000) diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py
index b819b60c5d..3fe5b0a1b4 100644 --- a/tests/handlers/test_federation.py +++ b/tests/handlers/test_federation.py
@@ -483,6 +483,7 @@ class FederationTestCase(unittest.FederatingHomeserverTestCase): event.room_version, ), exc=LimitExceededError, + by=0.5, ) def _build_and_send_join_event( diff --git a/tests/handlers/test_room_member.py b/tests/handlers/test_room_member.py
index 3e28117e2c..df43ce581c 100644 --- a/tests/handlers/test_room_member.py +++ b/tests/handlers/test_room_member.py
@@ -70,6 +70,7 @@ class TestJoinsLimitedByPerRoomRateLimiter(FederatingHomeserverTestCase): action=Membership.JOIN, ), LimitExceededError, + by=0.5, ) @override_config({"rc_joins_per_room": {"per_second": 0, "burst_count": 2}}) @@ -206,6 +207,7 @@ class TestJoinsLimitedByPerRoomRateLimiter(FederatingHomeserverTestCase): remote_room_hosts=[self.OTHER_SERVER_NAME], ), LimitExceededError, + by=0.5, ) # TODO: test that remote joins to a room are rate limited. @@ -273,6 +275,7 @@ class TestReplicatedJoinsLimitedByPerRoomRateLimiter(BaseMultiWorkerStreamTestCa action=Membership.JOIN, ), LimitExceededError, + by=0.5, ) # Try to join as Chris on the original worker. Should get denied because Alice @@ -285,6 +288,7 @@ class TestReplicatedJoinsLimitedByPerRoomRateLimiter(BaseMultiWorkerStreamTestCa action=Membership.JOIN, ), LimitExceededError, + by=0.5, ) diff --git a/tests/unittest.py b/tests/unittest.py
index e6aad9ed40..18963b9e32 100644 --- a/tests/unittest.py +++ b/tests/unittest.py
@@ -637,13 +637,13 @@ class HomeserverTestCase(TestCase): return self.successResultOf(deferred) def get_failure( - self, d: Awaitable[Any], exc: Type[_ExcType] + self, d: Awaitable[Any], exc: Type[_ExcType], by: float = 0.0 ) -> _TypedFailure[_ExcType]: """ Run a Deferred and get a Failure from it. The failure must be of the type `exc`. """ deferred: Deferred[Any] = ensureDeferred(d) # type: ignore[arg-type] - self.pump() + self.pump(by) return self.failureResultOf(deferred, exc) def get_success_or_raise(self, d: Awaitable[TV], by: float = 0.0) -> TV: