diff --git a/synapse/storage/databases/main/client_ips.py b/synapse/storage/databases/main/client_ips.py
index 0df160d2b0..1f6558c3df 100644
--- a/synapse/storage/databases/main/client_ips.py
+++ b/synapse/storage/databases/main/client_ips.py
@@ -39,7 +39,7 @@ logger = logging.getLogger(__name__)
# Number of msec of granularity to store the user IP 'last seen' time. Smaller
# times give more inserts into the database even for readonly API hits
# 120 seconds == 2 minutes
-LAST_SEEN_GRANULARITY = 120 * 1000
+LAST_SEEN_GRANULARITY = 10 * 60 * 1000
class DeviceLastConnectionInfo(TypedDict):
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index 48a54d9cb8..50899b2949 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -818,6 +818,10 @@ class DeviceInboxWorkerStore(SQLBaseStore):
retcol="device_id",
)
+ if len(devices) > 1000:
+ logger.warn("ignoring wildcard to-device messages to %i devices", len(devices))
+ continue
+
message_json = json_encoder.encode(messages_by_device["*"])
for device_id in devices:
# Add the message for all devices for this user on this
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index a5bb4d404e..a921332cb0 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -54,7 +54,7 @@ from synapse.storage.util.id_generators import (
AbstractStreamIdTracker,
StreamIdGenerator,
)
-from synapse.types import JsonDict, get_verify_key_from_cross_signing_key
+from synapse.types import JsonDict, StrCollection, get_verify_key_from_cross_signing_key
from synapse.util import json_decoder, json_encoder
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.lrucache import LruCache
@@ -1062,16 +1062,30 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
return {row["user_id"] for row in rows}
- async def mark_remote_user_device_cache_as_stale(self, user_id: str) -> None:
+ async def mark_remote_users_device_caches_as_stale(
+ self, user_ids: StrCollection
+ ) -> None:
"""Records that the server has reason to believe the cache of the devices
for the remote users is out of date.
"""
- await self.db_pool.simple_upsert(
- table="device_lists_remote_resync",
- keyvalues={"user_id": user_id},
- values={},
- insertion_values={"added_ts": self._clock.time_msec()},
- desc="mark_remote_user_device_cache_as_stale",
+
+ def _mark_remote_users_device_caches_as_stale_txn(
+ txn: LoggingTransaction,
+ ) -> None:
+ # TODO add insertion_values support to simple_upsert_many and use
+ # that!
+ for user_id in user_ids:
+ self.db_pool.simple_upsert_txn(
+ txn,
+ table="device_lists_remote_resync",
+ keyvalues={"user_id": user_id},
+ values={},
+ insertion_values={"added_ts": self._clock.time_msec()},
+ )
+
+ await self.db_pool.runInteraction(
+ "mark_remote_users_device_caches_as_stale",
+ _mark_remote_users_device_caches_as_stale_txn,
)
async def mark_remote_user_device_cache_as_valid(self, user_id: str) -> None:
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 761b15a815..f80b494edb 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -2276,6 +2276,10 @@ class EventsWorkerStore(SQLBaseStore):
"""
def get_event_id_for_timestamp_txn(txn: LoggingTransaction) -> Optional[str]:
+ if isinstance(self.database_engine, PostgresEngine):
+ # Temporary: make sure these queries can't last more than 30s
+ txn.execute("SET LOCAL statement_timeout = 30000")
+
txn.execute(
sql_template,
(room_id, timestamp),
|