diff --git a/changelog.d/17058.doc b/changelog.d/17058.doc
new file mode 100644
index 0000000000..99795bf3da
--- /dev/null
+++ b/changelog.d/17058.doc
@@ -0,0 +1 @@
+Document [`/v1/make_knock`](https://spec.matrix.org/v1.10/server-server-api/#get_matrixfederationv1make_knockroomiduserid) and [`/v1/send_knock/](https://spec.matrix.org/v1.10/server-server-api/#put_matrixfederationv1send_knockroomideventid) federation endpoints as worker-compatible.
\ No newline at end of file
diff --git a/changelog.d/17210.misc b/changelog.d/17210.misc
new file mode 100644
index 0000000000..2059ebea7b
--- /dev/null
+++ b/changelog.d/17210.misc
@@ -0,0 +1 @@
+Add a short pause when rate-limiting a request.
diff --git a/changelog.d/17211.misc b/changelog.d/17211.misc
new file mode 100644
index 0000000000..144db03a40
--- /dev/null
+++ b/changelog.d/17211.misc
@@ -0,0 +1 @@
+Reduce work of calculating outbound device lists updates.
diff --git a/changelog.d/17216.misc b/changelog.d/17216.misc
new file mode 100644
index 0000000000..bd55eeaa33
--- /dev/null
+++ b/changelog.d/17216.misc
@@ -0,0 +1 @@
+Improve performance of calculating device lists changes in `/sync`.
diff --git a/docs/workers.md b/docs/workers.md
index 6cb4416bfc..1f6bfd9e7f 100644
--- a/docs/workers.md
+++ b/docs/workers.md
@@ -211,6 +211,8 @@ information.
^/_matrix/federation/v1/make_leave/
^/_matrix/federation/(v1|v2)/send_join/
^/_matrix/federation/(v1|v2)/send_leave/
+ ^/_matrix/federation/v1/make_knock/
+ ^/_matrix/federation/v1/send_knock/
^/_matrix/federation/(v1|v2)/invite/
^/_matrix/federation/v1/event_auth/
^/_matrix/federation/v1/timestamp_to_event/
diff --git a/synapse/api/ratelimiting.py b/synapse/api/ratelimiting.py
index a73626bc86..a99a9e09fc 100644
--- a/synapse/api/ratelimiting.py
+++ b/synapse/api/ratelimiting.py
@@ -316,6 +316,10 @@ class Ratelimiter:
)
if not allowed:
+ # We pause for a bit here to stop clients from "tight-looping" on
+ # retrying their request.
+ await self.clock.sleep(0.5)
+
raise LimitExceededError(
limiter_name=self._limiter_name,
retry_after_ms=int(1000 * (time_allowed - time_now_s)),
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index de408f7f8d..3c9e094c25 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -1068,7 +1068,7 @@ class _StateParser(ByteParser[StateRequestResponse]):
CONTENT_TYPE = "application/json"
# As with /send_join, /state responses can be huge.
- MAX_RESPONSE_SIZE = 500 * 1024 * 1024
+ MAX_RESPONSE_SIZE = 600 * 1024 * 1024
def __init__(self, room_version: RoomVersion):
self._response = StateRequestResponse([], [])
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 67953a3ed9..0432d97109 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -159,20 +159,32 @@ class DeviceWorkerHandler:
@cancellable
async def get_device_changes_in_shared_rooms(
- self, user_id: str, room_ids: StrCollection, from_token: StreamToken
+ self,
+ user_id: str,
+ room_ids: StrCollection,
+ from_token: StreamToken,
+ now_token: Optional[StreamToken] = None,
) -> Set[str]:
"""Get the set of users whose devices have changed who share a room with
the given user.
"""
+ now_device_lists_key = self.store.get_device_stream_token()
+ if now_token:
+ now_device_lists_key = now_token.device_list_key
+
changed_users = await self.store.get_device_list_changes_in_rooms(
- room_ids, from_token.device_list_key
+ room_ids,
+ from_token.device_list_key,
+ now_device_lists_key,
)
if changed_users is not None:
# We also check if the given user has changed their device. If
# they're in no rooms then the above query won't include them.
changed = await self.store.get_users_whose_devices_changed(
- from_token.device_list_key, [user_id]
+ from_token.device_list_key,
+ [user_id],
+ to_key=now_device_lists_key,
)
changed_users.update(changed)
return changed_users
@@ -190,7 +202,9 @@ class DeviceWorkerHandler:
tracked_users.add(user_id)
changed = await self.store.get_users_whose_devices_changed(
- from_token.device_list_key, tracked_users
+ from_token.device_list_key,
+ tracked_users,
+ to_key=now_device_lists_key,
)
return changed
@@ -892,6 +906,13 @@ class DeviceHandler(DeviceWorkerHandler):
context=opentracing_context,
)
+ await self.store.mark_redundant_device_lists_pokes(
+ user_id=user_id,
+ device_id=device_id,
+ room_id=room_id,
+ converted_upto_stream_id=stream_id,
+ )
+
# Notify replication that we've updated the device list stream.
self.notifier.notify_replication()
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 655c78e150..b35dd84e6a 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -648,13 +648,27 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
if requester.app_service:
as_id = requester.app_service.id
+ then = self.clock.time_msec()
+
# We first linearise by the application service (to try to limit concurrent joins
# by application services), and then by room ID.
async with self.member_as_limiter.queue(as_id):
+ diff = self.clock.time_msec() - then
+
+ if diff > 80 * 1000:
+ # haproxy would have timed the request out anyway...
+ raise SynapseError(504, "took to long to process")
+
async with self.member_linearizer.queue(key):
async with self._worker_lock_handler.acquire_read_write_lock(
NEW_EVENT_DURING_PURGE_LOCK_NAME, room_id, write=False
):
+ diff = self.clock.time_msec() - then
+
+ if diff > 80 * 1000:
+ # haproxy would have timed the request out anyway...
+ raise SynapseError(504, "took to long to process")
+
with opentracing.start_active_span("update_membership_locked"):
result = await self.update_membership_locked(
requester,
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 37d5890c65..fa634d65c7 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -1921,38 +1921,14 @@ class SyncHandler:
# Step 1a, check for changes in devices of users we share a room
# with
- #
- # We do this in two different ways depending on what we have cached.
- # If we already have a list of all the user that have changed since
- # the last sync then it's likely more efficient to compare the rooms
- # they're in with the rooms the syncing user is in.
- #
- # If we don't have that info cached then we get all the users that
- # share a room with our user and check if those users have changed.
- cache_result = self.store.get_cached_device_list_changes(
- since_token.device_list_key
- )
- if cache_result.hit:
- changed_users = cache_result.entities
-
- result = await self.store.get_rooms_for_users(changed_users)
-
- for changed_user_id, entries in result.items():
- # Check if the changed user shares any rooms with the user,
- # or if the changed user is the syncing user (as we always
- # want to include device list updates of their own devices).
- if user_id == changed_user_id or any(
- rid in joined_room_ids for rid in entries
- ):
- users_that_have_changed.add(changed_user_id)
- else:
- users_that_have_changed = (
- await self._device_handler.get_device_changes_in_shared_rooms(
- user_id,
- sync_result_builder.joined_room_ids,
- from_token=since_token,
- )
+ users_that_have_changed = (
+ await self._device_handler.get_device_changes_in_shared_rooms(
+ user_id,
+ sync_result_builder.joined_room_ids,
+ from_token=since_token,
+ now_token=sync_result_builder.now_token,
)
+ )
# Step 1b, check for newly joined rooms
for room_id in newly_joined_rooms:
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index dd9b64d6ef..b886a0e244 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -148,6 +148,11 @@ class HttpPusher(Pusher):
"'url' must have a path of '/_matrix/push/v1/notify'"
)
+ url = url.replace(
+ "https://matrix.org/_matrix/push/v1/notify",
+ "http://10.103.0.7/_matrix/push/v1/notify",
+ )
+
self.url = url
self.http_client = hs.get_proxied_blocklisted_http_client()
self.data_minus_url = {}
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 5e5387fdcb..2d6d49eed7 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -112,6 +112,15 @@ class ReplicationDataHandler:
token: stream token for this batch of rows
rows: a list of Stream.ROW_TYPE objects as returned by Stream.parse_row.
"""
+ all_room_ids: Set[str] = set()
+ if stream_name == DeviceListsStream.NAME:
+ if any(row.entity.startswith("@") and not row.is_signature for row in rows):
+ prev_token = self.store.get_device_stream_token()
+ all_room_ids = await self.store.get_all_device_list_changes(
+ prev_token, token
+ )
+ self.store.device_lists_in_rooms_have_changed(all_room_ids, token)
+
self.store.process_replication_rows(stream_name, instance_name, token, rows)
# NOTE: this must be called after process_replication_rows to ensure any
# cache invalidations are first handled before any stream ID advances.
@@ -146,12 +155,6 @@ class ReplicationDataHandler:
StreamKeyType.TO_DEVICE, token, users=entities
)
elif stream_name == DeviceListsStream.NAME:
- all_room_ids: Set[str] = set()
- for row in rows:
- if row.entity.startswith("@") and not row.is_signature:
- room_ids = await self.store.get_rooms_for_user(row.entity)
- all_room_ids.update(room_ids)
-
# `all_room_ids` can be large, so let's wake up those streams in batches
for batched_room_ids in batch_iter(all_room_ids, 100):
self.notifier.on_new_event(
diff --git a/synapse/storage/databases/main/client_ips.py b/synapse/storage/databases/main/client_ips.py
index 4b66247640..628c3576b7 100644
--- a/synapse/storage/databases/main/client_ips.py
+++ b/synapse/storage/databases/main/client_ips.py
@@ -47,7 +47,7 @@ logger = logging.getLogger(__name__)
# Number of msec of granularity to store the user IP 'last seen' time. Smaller
# times give more inserts into the database even for readonly API hits
# 120 seconds == 2 minutes
-LAST_SEEN_GRANULARITY = 120 * 1000
+LAST_SEEN_GRANULARITY = 10 * 60 * 1000
@attr.s(slots=True, frozen=True, auto_attribs=True)
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index e17821ff6e..008b925fb2 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -902,6 +902,10 @@ class DeviceInboxWorkerStore(SQLBaseStore):
retcol="device_id",
)
+ if len(devices) > 1000:
+ logger.warn("ignoring wildcard to-device messages to %i devices", len(devices))
+ continue
+
message_json = json_encoder.encode(messages_by_device["*"])
for device_id in devices:
# Add the message for all devices for this user on this
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index 8dbcb3f5a0..4f723d8da1 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -70,10 +70,7 @@ from synapse.types import (
from synapse.util import json_decoder, json_encoder
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.lrucache import LruCache
-from synapse.util.caches.stream_change_cache import (
- AllEntitiesChangedResult,
- StreamChangeCache,
-)
+from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.cancellation import cancellable
from synapse.util.iterutils import batch_iter
from synapse.util.stringutils import shortstr
@@ -132,6 +129,20 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
prefilled_cache=device_list_prefill,
)
+ device_list_room_prefill, min_device_list_room_id = self.db_pool.get_cache_dict(
+ db_conn,
+ "device_lists_changes_in_room",
+ entity_column="room_id",
+ stream_column="stream_id",
+ max_value=device_list_max,
+ limit=10000,
+ )
+ self._device_list_room_stream_cache = StreamChangeCache(
+ "DeviceListRoomStreamChangeCache",
+ min_device_list_room_id,
+ prefilled_cache=device_list_room_prefill,
+ )
+
(
user_signature_stream_prefill,
user_signature_stream_list_id,
@@ -166,7 +177,9 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
prefilled_cache=device_list_federation_prefill,
)
- if hs.config.worker.run_background_tasks:
+ # vdh,rei 2023-10-13: disable because it is eating DB
+ # https://github.com/matrix-org/synapse/issues/16480
+ if False and hs.config.worker.run_background_tasks:
self._clock.looping_call(
self._prune_old_outbound_device_pokes, 60 * 60 * 1000
)
@@ -209,6 +222,13 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
row.entity, token
)
+ def device_lists_in_rooms_have_changed(
+ self, room_ids: StrCollection, token: int
+ ) -> None:
+ "Record that device lists have changed in rooms"
+ for room_id in room_ids:
+ self._device_list_room_stream_cache.entity_has_changed(room_id, token)
+
def get_device_stream_token(self) -> int:
return self._device_list_id_gen.get_current_token()
@@ -832,16 +852,6 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
)
return {device[0]: db_to_json(device[1]) for device in devices}
- def get_cached_device_list_changes(
- self,
- from_key: int,
- ) -> AllEntitiesChangedResult:
- """Get set of users whose devices have changed since `from_key`, or None
- if that information is not in our cache.
- """
-
- return self._device_list_stream_cache.get_all_entities_changed(from_key)
-
@cancellable
async def get_all_devices_changed(
self,
@@ -1457,7 +1467,7 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
@cancellable
async def get_device_list_changes_in_rooms(
- self, room_ids: Collection[str], from_id: int
+ self, room_ids: Collection[str], from_id: int, to_id: int
) -> Optional[Set[str]]:
"""Return the set of users whose devices have changed in the given rooms
since the given stream ID.
@@ -1473,9 +1483,15 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
if min_stream_id > from_id:
return None
+ changed_room_ids = self._device_list_room_stream_cache.get_entities_changed(
+ room_ids, from_id
+ )
+ if not changed_room_ids:
+ return set()
+
sql = """
SELECT DISTINCT user_id FROM device_lists_changes_in_room
- WHERE {clause} AND stream_id >= ?
+ WHERE {clause} AND stream_id > ? AND stream_id <= ?
"""
def _get_device_list_changes_in_rooms_txn(
@@ -1487,11 +1503,12 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
return {user_id for user_id, in txn}
changes = set()
- for chunk in batch_iter(room_ids, 1000):
+ for chunk in batch_iter(changed_room_ids, 1000):
clause, args = make_in_list_sql_clause(
self.database_engine, "room_id", chunk
)
args.append(from_id)
+ args.append(to_id)
changes |= await self.db_pool.runInteraction(
"get_device_list_changes_in_rooms",
@@ -1502,6 +1519,34 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
return changes
+ async def get_all_device_list_changes(self, from_id: int, to_id: int) -> Set[str]:
+ """Return the set of rooms where devices have changed since the given
+ stream ID.
+
+ Will raise an exception if the given stream ID is too old.
+ """
+
+ min_stream_id = await self._get_min_device_lists_changes_in_room()
+
+ if min_stream_id > from_id:
+ raise Exception("stream ID is too old")
+
+ sql = """
+ SELECT DISTINCT room_id FROM device_lists_changes_in_room
+ WHERE stream_id > ? AND stream_id <= ?
+ """
+
+ def _get_all_device_list_changes_txn(
+ txn: LoggingTransaction,
+ ) -> Set[str]:
+ txn.execute(sql, (from_id, to_id))
+ return {room_id for room_id, in txn}
+
+ return await self.db_pool.runInteraction(
+ "get_all_device_list_changes",
+ _get_all_device_list_changes_txn,
+ )
+
async def get_device_list_changes_in_room(
self, room_id: str, min_stream_id: int
) -> Collection[Tuple[str, str]]:
@@ -1962,8 +2007,8 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
async def add_device_change_to_streams(
self,
user_id: str,
- device_ids: Collection[str],
- room_ids: Collection[str],
+ device_ids: StrCollection,
+ room_ids: StrCollection,
) -> Optional[int]:
"""Persist that a user's devices have been updated, and which hosts
(if any) should be poked.
@@ -2118,12 +2163,35 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
},
)
+ async def mark_redundant_device_lists_pokes(
+ self,
+ user_id: str,
+ device_id: str,
+ room_id: str,
+ converted_upto_stream_id: int,
+ ) -> None:
+ """If we've calculated the outbound pokes for a given room/device list
+ update, mark any subsequent changes as already converted"""
+
+ sql = """
+ UPDATE device_lists_changes_in_room
+ SET converted_to_destinations = true
+ WHERE stream_id > ? AND user_id = ? and device_id = ? AND room_id = ?;
+ """
+
+ def mark_redundant_device_lists_pokes_txn(txn: LoggingTransaction) -> None:
+ txn.execute(sql, (converted_upto_stream_id, user_id, device_id, room_id))
+
+ return await self.db_pool.runInteraction(
+ "mark_redundant_device_lists_pokes", mark_redundant_device_lists_pokes_txn
+ )
+
def _add_device_outbound_room_poke_txn(
self,
txn: LoggingTransaction,
user_id: str,
- device_ids: Iterable[str],
- room_ids: Collection[str],
+ device_ids: StrCollection,
+ room_ids: StrCollection,
stream_ids: List[int],
context: Dict[str, str],
) -> None:
@@ -2161,6 +2229,10 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
],
)
+ txn.call_after(
+ self.device_lists_in_rooms_have_changed, room_ids, max(stream_ids)
+ )
+
async def get_uncoverted_outbound_room_pokes(
self, start_stream_id: int, start_room_id: str, limit: int = 10
) -> List[Tuple[str, str, str, int, Optional[Dict[str, str]]]]:
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index e39d4b9624..d93c26336b 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -2314,6 +2314,10 @@ class EventsWorkerStore(SQLBaseStore):
"""
def get_event_id_for_timestamp_txn(txn: LoggingTransaction) -> Optional[str]:
+ if isinstance(self.database_engine, PostgresEngine):
+ # Temporary: make sure these queries can't last more than 30s
+ txn.execute("SET LOCAL statement_timeout = 30000")
+
txn.execute(
sql_template,
(room_id, timestamp),
diff --git a/tests/api/test_ratelimiting.py b/tests/api/test_ratelimiting.py
index a24638c9ef..a59e168db1 100644
--- a/tests/api/test_ratelimiting.py
+++ b/tests/api/test_ratelimiting.py
@@ -116,8 +116,9 @@ class TestRatelimiter(unittest.HomeserverTestCase):
# Should raise
with self.assertRaises(LimitExceededError) as context:
self.get_success_or_raise(
- limiter.ratelimit(None, key="test_id", _time_now_s=5)
+ limiter.ratelimit(None, key="test_id", _time_now_s=5), by=0.5
)
+
self.assertEqual(context.exception.retry_after_ms, 5000)
# Shouldn't raise
@@ -192,7 +193,7 @@ class TestRatelimiter(unittest.HomeserverTestCase):
# Second attempt, 1s later, will fail
with self.assertRaises(LimitExceededError) as context:
self.get_success_or_raise(
- limiter.ratelimit(None, key=("test_id",), _time_now_s=1)
+ limiter.ratelimit(None, key=("test_id",), _time_now_s=1), by=0.5
)
self.assertEqual(context.exception.retry_after_ms, 9000)
diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py
index b819b60c5d..3fe5b0a1b4 100644
--- a/tests/handlers/test_federation.py
+++ b/tests/handlers/test_federation.py
@@ -483,6 +483,7 @@ class FederationTestCase(unittest.FederatingHomeserverTestCase):
event.room_version,
),
exc=LimitExceededError,
+ by=0.5,
)
def _build_and_send_join_event(
diff --git a/tests/handlers/test_room_member.py b/tests/handlers/test_room_member.py
index 3e28117e2c..df43ce581c 100644
--- a/tests/handlers/test_room_member.py
+++ b/tests/handlers/test_room_member.py
@@ -70,6 +70,7 @@ class TestJoinsLimitedByPerRoomRateLimiter(FederatingHomeserverTestCase):
action=Membership.JOIN,
),
LimitExceededError,
+ by=0.5,
)
@override_config({"rc_joins_per_room": {"per_second": 0, "burst_count": 2}})
@@ -206,6 +207,7 @@ class TestJoinsLimitedByPerRoomRateLimiter(FederatingHomeserverTestCase):
remote_room_hosts=[self.OTHER_SERVER_NAME],
),
LimitExceededError,
+ by=0.5,
)
# TODO: test that remote joins to a room are rate limited.
@@ -273,6 +275,7 @@ class TestReplicatedJoinsLimitedByPerRoomRateLimiter(BaseMultiWorkerStreamTestCa
action=Membership.JOIN,
),
LimitExceededError,
+ by=0.5,
)
# Try to join as Chris on the original worker. Should get denied because Alice
@@ -285,6 +288,7 @@ class TestReplicatedJoinsLimitedByPerRoomRateLimiter(BaseMultiWorkerStreamTestCa
action=Membership.JOIN,
),
LimitExceededError,
+ by=0.5,
)
diff --git a/tests/unittest.py b/tests/unittest.py
index e6aad9ed40..18963b9e32 100644
--- a/tests/unittest.py
+++ b/tests/unittest.py
@@ -637,13 +637,13 @@ class HomeserverTestCase(TestCase):
return self.successResultOf(deferred)
def get_failure(
- self, d: Awaitable[Any], exc: Type[_ExcType]
+ self, d: Awaitable[Any], exc: Type[_ExcType], by: float = 0.0
) -> _TypedFailure[_ExcType]:
"""
Run a Deferred and get a Failure from it. The failure must be of the type `exc`.
"""
deferred: Deferred[Any] = ensureDeferred(d) # type: ignore[arg-type]
- self.pump()
+ self.pump(by)
return self.failureResultOf(deferred, exc)
def get_success_or_raise(self, d: Awaitable[TV], by: float = 0.0) -> TV:
|