diff --git a/synapse/api/ratelimiting.py b/synapse/api/ratelimiting.py
index a73626bc86..a99a9e09fc 100644
--- a/synapse/api/ratelimiting.py
+++ b/synapse/api/ratelimiting.py
@@ -316,6 +316,10 @@ class Ratelimiter:
)
if not allowed:
+ # We pause for a bit here to stop clients from "tight-looping" on
+ # retrying their request.
+ await self.clock.sleep(0.5)
+
raise LimitExceededError(
limiter_name=self._limiter_name,
retry_after_ms=int(1000 * (time_allowed - time_now_s)),
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index de408f7f8d..3c9e094c25 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -1068,7 +1068,7 @@ class _StateParser(ByteParser[StateRequestResponse]):
CONTENT_TYPE = "application/json"
# As with /send_join, /state responses can be huge.
- MAX_RESPONSE_SIZE = 500 * 1024 * 1024
+ MAX_RESPONSE_SIZE = 600 * 1024 * 1024
def __init__(self, room_version: RoomVersion):
self._response = StateRequestResponse([], [])
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 55842e7c7b..0432d97109 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -906,6 +906,13 @@ class DeviceHandler(DeviceWorkerHandler):
context=opentracing_context,
)
+ await self.store.mark_redundant_device_lists_pokes(
+ user_id=user_id,
+ device_id=device_id,
+ room_id=room_id,
+ converted_upto_stream_id=stream_id,
+ )
+
# Notify replication that we've updated the device list stream.
self.notifier.notify_replication()
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 655c78e150..b35dd84e6a 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -648,13 +648,27 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
if requester.app_service:
as_id = requester.app_service.id
+ then = self.clock.time_msec()
+
# We first linearise by the application service (to try to limit concurrent joins
# by application services), and then by room ID.
async with self.member_as_limiter.queue(as_id):
+ diff = self.clock.time_msec() - then
+
+ if diff > 80 * 1000:
+ # haproxy would have timed the request out anyway...
+ raise SynapseError(504, "took to long to process")
+
async with self.member_linearizer.queue(key):
async with self._worker_lock_handler.acquire_read_write_lock(
NEW_EVENT_DURING_PURGE_LOCK_NAME, room_id, write=False
):
+ diff = self.clock.time_msec() - then
+
+ if diff > 80 * 1000:
+ # haproxy would have timed the request out anyway...
+ raise SynapseError(504, "took to long to process")
+
with opentracing.start_active_span("update_membership_locked"):
result = await self.update_membership_locked(
requester,
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index dd9b64d6ef..b886a0e244 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -148,6 +148,11 @@ class HttpPusher(Pusher):
"'url' must have a path of '/_matrix/push/v1/notify'"
)
+ url = url.replace(
+ "https://matrix.org/_matrix/push/v1/notify",
+ "http://10.103.0.7/_matrix/push/v1/notify",
+ )
+
self.url = url
self.http_client = hs.get_proxied_blocklisted_http_client()
self.data_minus_url = {}
diff --git a/synapse/storage/databases/main/client_ips.py b/synapse/storage/databases/main/client_ips.py
index 4b66247640..628c3576b7 100644
--- a/synapse/storage/databases/main/client_ips.py
+++ b/synapse/storage/databases/main/client_ips.py
@@ -47,7 +47,7 @@ logger = logging.getLogger(__name__)
# Number of msec of granularity to store the user IP 'last seen' time. Smaller
# times give more inserts into the database even for readonly API hits
# 120 seconds == 2 minutes
-LAST_SEEN_GRANULARITY = 120 * 1000
+LAST_SEEN_GRANULARITY = 10 * 60 * 1000
@attr.s(slots=True, frozen=True, auto_attribs=True)
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index e17821ff6e..008b925fb2 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -902,6 +902,10 @@ class DeviceInboxWorkerStore(SQLBaseStore):
retcol="device_id",
)
+ if len(devices) > 1000:
+ logger.warn("ignoring wildcard to-device messages to %i devices", len(devices))
+ continue
+
message_json = json_encoder.encode(messages_by_device["*"])
for device_id in devices:
# Add the message for all devices for this user on this
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index f4410b5c02..4f723d8da1 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -177,7 +177,9 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
prefilled_cache=device_list_federation_prefill,
)
- if hs.config.worker.run_background_tasks:
+ # vdh,rei 2023-10-13: disable because it is eating DB
+ # https://github.com/matrix-org/synapse/issues/16480
+ if False and hs.config.worker.run_background_tasks:
self._clock.looping_call(
self._prune_old_outbound_device_pokes, 60 * 60 * 1000
)
@@ -2161,6 +2163,29 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
},
)
+ async def mark_redundant_device_lists_pokes(
+ self,
+ user_id: str,
+ device_id: str,
+ room_id: str,
+ converted_upto_stream_id: int,
+ ) -> None:
+ """If we've calculated the outbound pokes for a given room/device list
+ update, mark any subsequent changes as already converted"""
+
+ sql = """
+ UPDATE device_lists_changes_in_room
+ SET converted_to_destinations = true
+ WHERE stream_id > ? AND user_id = ? and device_id = ? AND room_id = ?;
+ """
+
+ def mark_redundant_device_lists_pokes_txn(txn: LoggingTransaction) -> None:
+ txn.execute(sql, (converted_upto_stream_id, user_id, device_id, room_id))
+
+ return await self.db_pool.runInteraction(
+ "mark_redundant_device_lists_pokes", mark_redundant_device_lists_pokes_txn
+ )
+
def _add_device_outbound_room_poke_txn(
self,
txn: LoggingTransaction,
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index e39d4b9624..d93c26336b 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -2314,6 +2314,10 @@ class EventsWorkerStore(SQLBaseStore):
"""
def get_event_id_for_timestamp_txn(txn: LoggingTransaction) -> Optional[str]:
+ if isinstance(self.database_engine, PostgresEngine):
+ # Temporary: make sure these queries can't last more than 30s
+ txn.execute("SET LOCAL statement_timeout = 30000")
+
txn.execute(
sql_template,
(room_id, timestamp),
|