diff --git a/changelog.d/16223.feature b/changelog.d/16223.feature
new file mode 100644
index 0000000000..a52d66658b
--- /dev/null
+++ b/changelog.d/16223.feature
@@ -0,0 +1 @@
+Improve resource usage when sending data to a large number of remote hosts that are marked as "down".
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index fb448f2155..6520795635 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -49,7 +49,7 @@ from synapse.api.presence import UserPresenceState
from synapse.federation.sender import AbstractFederationSender, FederationSender
from synapse.metrics import LaterGauge
from synapse.replication.tcp.streams.federation import FederationStream
-from synapse.types import JsonDict, ReadReceipt, RoomStreamToken
+from synapse.types import JsonDict, ReadReceipt, RoomStreamToken, StrCollection
from synapse.util.metrics import Measure
from .units import Edu
@@ -229,7 +229,7 @@ class FederationRemoteSendQueue(AbstractFederationSender):
"""
# nothing to do here: the replication listener will handle it.
- def send_presence_to_destinations(
+ async def send_presence_to_destinations(
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
) -> None:
"""As per FederationSender
@@ -245,7 +245,9 @@ class FederationRemoteSendQueue(AbstractFederationSender):
self.notifier.on_new_replication_data()
- def send_device_messages(self, destination: str, immediate: bool = True) -> None:
+ async def send_device_messages(
+ self, destinations: StrCollection, immediate: bool = True
+ ) -> None:
"""As per FederationSender"""
# We don't need to replicate this as it gets sent down a different
# stream.
@@ -463,7 +465,7 @@ class ParsedFederationStreamData:
edus: Dict[str, List[Edu]]
-def process_rows_for_federation(
+async def process_rows_for_federation(
transaction_queue: FederationSender,
rows: List[FederationStream.FederationStreamRow],
) -> None:
@@ -496,7 +498,7 @@ def process_rows_for_federation(
parsed_row.add_to_buffer(buff)
for state, destinations in buff.presence_destinations:
- transaction_queue.send_presence_to_destinations(
+ await transaction_queue.send_presence_to_destinations(
states=[state], destinations=destinations
)
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 97abbdee18..fb20fd8a10 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -147,7 +147,10 @@ from twisted.internet import defer
import synapse.metrics
from synapse.api.presence import UserPresenceState
from synapse.events import EventBase
-from synapse.federation.sender.per_destination_queue import PerDestinationQueue
+from synapse.federation.sender.per_destination_queue import (
+ CATCHUP_RETRY_INTERVAL,
+ PerDestinationQueue,
+)
from synapse.federation.sender.transaction_manager import TransactionManager
from synapse.federation.units import Edu
from synapse.logging.context import make_deferred_yieldable, run_in_background
@@ -161,9 +164,10 @@ from synapse.metrics.background_process_metrics import (
run_as_background_process,
wrap_as_background_process,
)
-from synapse.types import JsonDict, ReadReceipt, RoomStreamToken
+from synapse.types import JsonDict, ReadReceipt, RoomStreamToken, StrCollection
from synapse.util import Clock
from synapse.util.metrics import Measure
+from synapse.util.retryutils import filter_destinations_by_retry_limiter
if TYPE_CHECKING:
from synapse.events.presence_router import PresenceRouter
@@ -213,7 +217,7 @@ class AbstractFederationSender(metaclass=abc.ABCMeta):
raise NotImplementedError()
@abc.abstractmethod
- def send_presence_to_destinations(
+ async def send_presence_to_destinations(
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
) -> None:
"""Send the given presence states to the given destinations.
@@ -242,9 +246,11 @@ class AbstractFederationSender(metaclass=abc.ABCMeta):
raise NotImplementedError()
@abc.abstractmethod
- def send_device_messages(self, destination: str, immediate: bool = True) -> None:
+ async def send_device_messages(
+ self, destinations: StrCollection, immediate: bool = True
+ ) -> None:
"""Tells the sender that a new device message is ready to be sent to the
- destination. The `immediate` flag specifies whether the messages should
+ destinations. The `immediate` flag specifies whether the messages should
be tried to be sent immediately, or whether it can be delayed for a
short while (to aid performance).
"""
@@ -716,6 +722,13 @@ class FederationSender(AbstractFederationSender):
pdu.internal_metadata.stream_ordering,
)
+ destinations = await filter_destinations_by_retry_limiter(
+ destinations,
+ clock=self.clock,
+ store=self.store,
+ retry_due_within_ms=CATCHUP_RETRY_INTERVAL,
+ )
+
for destination in destinations:
self._get_per_destination_queue(destination).send_pdu(pdu)
@@ -763,12 +776,20 @@ class FederationSender(AbstractFederationSender):
domains_set = await self._storage_controllers.state.get_current_hosts_in_room_or_partial_state_approximation(
room_id
)
- domains = [
+ domains: StrCollection = [
d
for d in domains_set
if not self.is_mine_server_name(d)
and self._federation_shard_config.should_handle(self._instance_name, d)
]
+
+ domains = await filter_destinations_by_retry_limiter(
+ domains,
+ clock=self.clock,
+ store=self.store,
+ retry_due_within_ms=CATCHUP_RETRY_INTERVAL,
+ )
+
if not domains:
return
@@ -816,7 +837,7 @@ class FederationSender(AbstractFederationSender):
for queue in queues:
queue.flush_read_receipts_for_room(room_id)
- def send_presence_to_destinations(
+ async def send_presence_to_destinations(
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
) -> None:
"""Send the given presence states to the given destinations.
@@ -831,13 +852,20 @@ class FederationSender(AbstractFederationSender):
for state in states:
assert self.is_mine_id(state.user_id)
+ destinations = await filter_destinations_by_retry_limiter(
+ [
+ d
+ for d in destinations
+ if self._federation_shard_config.should_handle(self._instance_name, d)
+ ],
+ clock=self.clock,
+ store=self.store,
+ retry_due_within_ms=CATCHUP_RETRY_INTERVAL,
+ )
+
for destination in destinations:
if self.is_mine_server_name(destination):
continue
- if not self._federation_shard_config.should_handle(
- self._instance_name, destination
- ):
- continue
self._get_per_destination_queue(destination).send_presence(
states, start_loop=False
@@ -896,21 +924,29 @@ class FederationSender(AbstractFederationSender):
else:
queue.send_edu(edu)
- def send_device_messages(self, destination: str, immediate: bool = True) -> None:
- if self.is_mine_server_name(destination):
- logger.warning("Not sending device update to ourselves")
- return
-
- if not self._federation_shard_config.should_handle(
- self._instance_name, destination
- ):
- return
+ async def send_device_messages(
+ self, destinations: StrCollection, immediate: bool = True
+ ) -> None:
+ destinations = await filter_destinations_by_retry_limiter(
+ [
+ destination
+ for destination in destinations
+ if self._federation_shard_config.should_handle(
+ self._instance_name, destination
+ )
+ and not self.is_mine_server_name(destination)
+ ],
+ clock=self.clock,
+ store=self.store,
+ retry_due_within_ms=CATCHUP_RETRY_INTERVAL,
+ )
- if immediate:
- self._get_per_destination_queue(destination).attempt_new_transaction()
- else:
- self._get_per_destination_queue(destination).mark_new_data()
- self._destination_wakeup_queue.add_to_queue(destination)
+ for destination in destinations:
+ if immediate:
+ self._get_per_destination_queue(destination).attempt_new_transaction()
+ else:
+ self._get_per_destination_queue(destination).mark_new_data()
+ self._destination_wakeup_queue.add_to_queue(destination)
def wake_destination(self, destination: str) -> None:
"""Called when we want to retry sending transactions to a remote.
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index 31c5c2b7de..9105ba664c 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -59,6 +59,10 @@ sent_edus_by_type = Counter(
)
+# If the retry interval is larger than this then we enter "catchup" mode
+CATCHUP_RETRY_INTERVAL = 60 * 60 * 1000
+
+
class PerDestinationQueue:
"""
Manages the per-destination transmission queues.
@@ -370,7 +374,7 @@ class PerDestinationQueue:
),
)
- if e.retry_interval > 60 * 60 * 1000:
+ if e.retry_interval > CATCHUP_RETRY_INTERVAL:
# we won't retry for another hour!
# (this suggests a significant outage)
# We drop pending EDUs because otherwise they will
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 5ae427d52c..763f56dfc1 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -836,17 +836,16 @@ class DeviceHandler(DeviceWorkerHandler):
user_id,
hosts,
)
- for host in hosts:
- self.federation_sender.send_device_messages(
- host, immediate=False
- )
- # TODO: when called, this isn't in a logging context.
- # This leads to log spam, sentry event spam, and massive
- # memory usage.
- # See https://github.com/matrix-org/synapse/issues/12552.
- # log_kv(
- # {"message": "sent device update to host", "host": host}
- # )
+ await self.federation_sender.send_device_messages(
+ hosts, immediate=False
+ )
+ # TODO: when called, this isn't in a logging context.
+ # This leads to log spam, sentry event spam, and massive
+ # memory usage.
+ # See https://github.com/matrix-org/synapse/issues/12552.
+ # log_kv(
+ # {"message": "sent device update to host", "host": host}
+ # )
if current_stream_id != stream_id:
# Clear the set of hosts we've already sent to as we're
@@ -951,8 +950,9 @@ class DeviceHandler(DeviceWorkerHandler):
# Notify things that device lists need to be sent out.
self.notifier.notify_replication()
- for host in potentially_changed_hosts:
- self.federation_sender.send_device_messages(host, immediate=False)
+ await self.federation_sender.send_device_messages(
+ potentially_changed_hosts, immediate=False
+ )
def _update_device_from_client_ips(
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index 798c7039f9..1c79f7a61e 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -302,10 +302,9 @@ class DeviceMessageHandler:
)
if self.federation_sender:
- for destination in remote_messages.keys():
- # Enqueue a new federation transaction to send the new
- # device messages to each remote destination.
- self.federation_sender.send_device_messages(destination)
+ # Enqueue a new federation transaction to send the new
+ # device messages to each remote destination.
+ await self.federation_sender.send_device_messages(remote_messages.keys())
async def get_events_for_dehydrated_device(
self,
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 2f841863ae..f31e18328b 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -354,7 +354,9 @@ class BasePresenceHandler(abc.ABC):
)
for destination, host_states in hosts_to_states.items():
- self._federation.send_presence_to_destinations(host_states, [destination])
+ await self._federation.send_presence_to_destinations(
+ host_states, [destination]
+ )
async def send_full_presence_to_users(self, user_ids: StrCollection) -> None:
"""
@@ -936,7 +938,7 @@ class PresenceHandler(BasePresenceHandler):
)
for destination, states in hosts_to_states.items():
- self._federation_queue.send_presence_to_destinations(
+ await self._federation_queue.send_presence_to_destinations(
states, [destination]
)
@@ -1508,7 +1510,7 @@ class PresenceHandler(BasePresenceHandler):
or state.status_msg is not None
]
- self._federation_queue.send_presence_to_destinations(
+ await self._federation_queue.send_presence_to_destinations(
destinations=newly_joined_remote_hosts,
states=states,
)
@@ -1519,7 +1521,7 @@ class PresenceHandler(BasePresenceHandler):
prev_remote_hosts or newly_joined_remote_hosts
):
local_states = await self.current_state_for_users(newly_joined_local_users)
- self._federation_queue.send_presence_to_destinations(
+ await self._federation_queue.send_presence_to_destinations(
destinations=prev_remote_hosts | newly_joined_remote_hosts,
states=list(local_states.values()),
)
@@ -2182,7 +2184,7 @@ class PresenceFederationQueue:
index = bisect(self._queue, (clear_before,))
self._queue = self._queue[index:]
- def send_presence_to_destinations(
+ async def send_presence_to_destinations(
self, states: Collection[UserPresenceState], destinations: StrCollection
) -> None:
"""Send the presence states to the given destinations.
@@ -2202,7 +2204,7 @@ class PresenceFederationQueue:
return
if self._federation:
- self._federation.send_presence_to_destinations(
+ await self._federation.send_presence_to_destinations(
states=states,
destinations=destinations,
)
@@ -2325,7 +2327,7 @@ class PresenceFederationQueue:
for host, user_ids in hosts_to_users.items():
states = await self._presence_handler.current_state_for_users(user_ids)
- self._federation.send_presence_to_destinations(
+ await self._federation.send_presence_to_destinations(
states=states.values(),
destinations=[host],
)
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 7aeae5319c..4b4227003d 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -26,9 +26,10 @@ from synapse.metrics.background_process_metrics import (
)
from synapse.replication.tcp.streams import TypingStream
from synapse.streams import EventSource
-from synapse.types import JsonDict, Requester, StreamKeyType, UserID
+from synapse.types import JsonDict, Requester, StrCollection, StreamKeyType, UserID
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.metrics import Measure
+from synapse.util.retryutils import filter_destinations_by_retry_limiter
from synapse.util.wheel_timer import WheelTimer
if TYPE_CHECKING:
@@ -150,8 +151,15 @@ class FollowerTypingHandler:
now=now, obj=member, then=now + FEDERATION_PING_INTERVAL
)
- hosts = await self._storage_controllers.state.get_current_hosts_in_room(
- member.room_id
+ hosts: StrCollection = (
+ await self._storage_controllers.state.get_current_hosts_in_room(
+ member.room_id
+ )
+ )
+ hosts = await filter_destinations_by_retry_limiter(
+ hosts,
+ clock=self.clock,
+ store=self.store,
)
for domain in hosts:
if not self.is_mine_server_name(domain):
diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py
index 9ad8e038ae..2f00a7ba20 100644
--- a/synapse/module_api/__init__.py
+++ b/synapse/module_api/__init__.py
@@ -1180,7 +1180,7 @@ class ModuleApi:
# Send to remote destinations.
destination = UserID.from_string(user).domain
- presence_handler.get_federation_queue().send_presence_to_destinations(
+ await presence_handler.get_federation_queue().send_presence_to_destinations(
presence_events, [destination]
)
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 3b88dc68ea..51285e6d33 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -422,7 +422,7 @@ class FederationSenderHandler:
# The federation stream contains things that we want to send out, e.g.
# presence, typing, etc.
if stream_name == "federation":
- send_queue.process_rows_for_federation(self.federation_sender, rows)
+ await send_queue.process_rows_for_federation(self.federation_sender, rows)
await self.update_token(token)
# ... and when new receipts happen
@@ -439,16 +439,14 @@ class FederationSenderHandler:
for row in rows
if not row.entity.startswith("@") and not row.is_signature
}
- for host in hosts:
- self.federation_sender.send_device_messages(host, immediate=False)
+ await self.federation_sender.send_device_messages(hosts, immediate=False)
elif stream_name == ToDeviceStream.NAME:
# The to_device stream includes stuff to be pushed to both local
# clients and remote servers, so we ignore entities that start with
# '@' (since they'll be local users rather than destinations).
hosts = {row.entity for row in rows if not row.entity.startswith("@")}
- for host in hosts:
- self.federation_sender.send_device_messages(host)
+ await self.federation_sender.send_device_messages(hosts)
async def _on_new_receipts(
self, rows: Iterable[ReceiptsStream.ReceiptsStreamRow]
diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index 860bbf7c0f..efd21b5bfc 100644
--- a/synapse/storage/databases/main/transactions.py
+++ b/synapse/storage/databases/main/transactions.py
@@ -14,7 +14,7 @@
import logging
from enum import Enum
-from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple, cast
+from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple, cast
import attr
from canonicaljson import encode_canonical_json
@@ -28,8 +28,8 @@ from synapse.storage.database import (
LoggingTransaction,
)
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
-from synapse.types import JsonDict
-from synapse.util.caches.descriptors import cached
+from synapse.types import JsonDict, StrCollection
+from synapse.util.caches.descriptors import cached, cachedList
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -205,6 +205,26 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
else:
return None
+ @cachedList(
+ cached_method_name="get_destination_retry_timings", list_name="destinations"
+ )
+ async def get_destination_retry_timings_batch(
+ self, destinations: StrCollection
+ ) -> Dict[str, Optional[DestinationRetryTimings]]:
+ rows = await self.db_pool.simple_select_many_batch(
+ table="destinations",
+ iterable=destinations,
+ column="destination",
+ retcols=("destination", "failure_ts", "retry_last_ts", "retry_interval"),
+ desc="get_destination_retry_timings_batch",
+ )
+
+ return {
+ row.pop("destination"): DestinationRetryTimings(**row)
+ for row in rows
+ if row["retry_last_ts"] and row["failure_ts"] and row["retry_interval"]
+ }
+
async def set_destination_retry_timings(
self,
destination: str,
diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py
index 9d2065372c..0e1f907667 100644
--- a/synapse/util/retryutils.py
+++ b/synapse/util/retryutils.py
@@ -19,6 +19,7 @@ from typing import TYPE_CHECKING, Any, Optional, Type
from synapse.api.errors import CodeMessageException
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage import DataStore
+from synapse.types import StrCollection
from synapse.util import Clock
if TYPE_CHECKING:
@@ -116,6 +117,30 @@ async def get_retry_limiter(
)
+async def filter_destinations_by_retry_limiter(
+ destinations: StrCollection,
+ clock: Clock,
+ store: DataStore,
+ retry_due_within_ms: int = 0,
+) -> StrCollection:
+ """Filter down the list of destinations to only those that will are either
+ alive or due for a retry (within `retry_due_within_ms`)
+ """
+ if not destinations:
+ return destinations
+
+ retry_timings = await store.get_destination_retry_timings_batch(destinations)
+
+ now = int(clock.time_msec())
+
+ return [
+ destination
+ for destination, timings in retry_timings.items()
+ if timings is None
+ or timings.retry_last_ts + timings.retry_interval <= now + retry_due_within_ms
+ ]
+
+
class RetryDestinationLimiter:
def __init__(
self,
diff --git a/tests/federation/test_federation_sender.py b/tests/federation/test_federation_sender.py
index 7bd3d06859..caf04b54cb 100644
--- a/tests/federation/test_federation_sender.py
+++ b/tests/federation/test_federation_sender.py
@@ -75,7 +75,7 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase):
thread_id=None,
data={"ts": 1234},
)
- self.successResultOf(defer.ensureDeferred(sender.send_read_receipt(receipt)))
+ self.get_success(sender.send_read_receipt(receipt))
self.pump()
@@ -111,6 +111,9 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase):
# * The same room / user on multiple threads.
# * A different user in the same room.
sender = self.hs.get_federation_sender()
+ # Hack so that we have a txn in-flight so we batch up read receipts
+ # below
+ sender.wake_destination("host2")
for user, thread in (
("alice", None),
("alice", "thread"),
@@ -125,9 +128,7 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase):
thread_id=thread,
data={"ts": 1234},
)
- self.successResultOf(
- defer.ensureDeferred(sender.send_read_receipt(receipt))
- )
+ defer.ensureDeferred(sender.send_read_receipt(receipt))
self.pump()
@@ -191,7 +192,7 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase):
thread_id=None,
data={"ts": 1234},
)
- self.successResultOf(defer.ensureDeferred(sender.send_read_receipt(receipt)))
+ self.get_success(sender.send_read_receipt(receipt))
self.pump()
@@ -342,7 +343,9 @@ class FederationSenderDevicesTestCases(HomeserverTestCase):
self.reactor.advance(1)
# a second call should produce no new device EDUs
- self.hs.get_federation_sender().send_device_messages("host2")
+ self.get_success(
+ self.hs.get_federation_sender().send_device_messages(["host2"])
+ )
self.assertEqual(self.edus, [])
# a second device
@@ -550,7 +553,9 @@ class FederationSenderDevicesTestCases(HomeserverTestCase):
# recover the server
mock_send_txn.side_effect = self.record_transaction
- self.hs.get_federation_sender().send_device_messages("host2")
+ self.get_success(
+ self.hs.get_federation_sender().send_device_messages(["host2"])
+ )
# We queue up device list updates to be sent over federation, so we
# advance to clear the queue.
@@ -601,7 +606,9 @@ class FederationSenderDevicesTestCases(HomeserverTestCase):
# recover the server
mock_send_txn.side_effect = self.record_transaction
- self.hs.get_federation_sender().send_device_messages("host2")
+ self.get_success(
+ self.hs.get_federation_sender().send_device_messages(["host2"])
+ )
# We queue up device list updates to be sent over federation, so we
# advance to clear the queue.
@@ -656,7 +663,9 @@ class FederationSenderDevicesTestCases(HomeserverTestCase):
# recover the server
mock_send_txn.side_effect = self.record_transaction
- self.hs.get_federation_sender().send_device_messages("host2")
+ self.get_success(
+ self.hs.get_federation_sender().send_device_messages(["host2"])
+ )
# We queue up device list updates to be sent over federation, so we
# advance to clear the queue.
diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py
index a987267308..88a16193a3 100644
--- a/tests/handlers/test_presence.py
+++ b/tests/handlers/test_presence.py
@@ -909,8 +909,14 @@ class PresenceFederationQueueTestCase(unittest.HomeserverTestCase):
prev_token = self.queue.get_current_token(self.instance_name)
- self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2"))
- self.queue.send_presence_to_destinations((state3,), ("dest3",))
+ self.get_success(
+ self.queue.send_presence_to_destinations(
+ (state1, state2), ("dest1", "dest2")
+ )
+ )
+ self.get_success(
+ self.queue.send_presence_to_destinations((state3,), ("dest3",))
+ )
now_token = self.queue.get_current_token(self.instance_name)
@@ -946,11 +952,17 @@ class PresenceFederationQueueTestCase(unittest.HomeserverTestCase):
prev_token = self.queue.get_current_token(self.instance_name)
- self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2"))
+ self.get_success(
+ self.queue.send_presence_to_destinations(
+ (state1, state2), ("dest1", "dest2")
+ )
+ )
now_token = self.queue.get_current_token(self.instance_name)
- self.queue.send_presence_to_destinations((state3,), ("dest3",))
+ self.get_success(
+ self.queue.send_presence_to_destinations((state3,), ("dest3",))
+ )
rows, upto_token, limited = self.get_success(
self.queue.get_replication_rows("master", prev_token, now_token, 10)
@@ -989,8 +1001,14 @@ class PresenceFederationQueueTestCase(unittest.HomeserverTestCase):
prev_token = self.queue.get_current_token(self.instance_name)
- self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2"))
- self.queue.send_presence_to_destinations((state3,), ("dest3",))
+ self.get_success(
+ self.queue.send_presence_to_destinations(
+ (state1, state2), ("dest1", "dest2")
+ )
+ )
+ self.get_success(
+ self.queue.send_presence_to_destinations((state3,), ("dest3",))
+ )
self.reactor.advance(10 * 60 * 1000)
@@ -1005,8 +1023,14 @@ class PresenceFederationQueueTestCase(unittest.HomeserverTestCase):
prev_token = self.queue.get_current_token(self.instance_name)
- self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2"))
- self.queue.send_presence_to_destinations((state3,), ("dest3",))
+ self.get_success(
+ self.queue.send_presence_to_destinations(
+ (state1, state2), ("dest1", "dest2")
+ )
+ )
+ self.get_success(
+ self.queue.send_presence_to_destinations((state3,), ("dest3",))
+ )
now_token = self.queue.get_current_token(self.instance_name)
@@ -1033,11 +1057,17 @@ class PresenceFederationQueueTestCase(unittest.HomeserverTestCase):
prev_token = self.queue.get_current_token(self.instance_name)
- self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2"))
+ self.get_success(
+ self.queue.send_presence_to_destinations(
+ (state1, state2), ("dest1", "dest2")
+ )
+ )
self.reactor.advance(2 * 60 * 1000)
- self.queue.send_presence_to_destinations((state3,), ("dest3",))
+ self.get_success(
+ self.queue.send_presence_to_destinations((state3,), ("dest3",))
+ )
self.reactor.advance(4 * 60 * 1000)
@@ -1053,8 +1083,14 @@ class PresenceFederationQueueTestCase(unittest.HomeserverTestCase):
prev_token = self.queue.get_current_token(self.instance_name)
- self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2"))
- self.queue.send_presence_to_destinations((state3,), ("dest3",))
+ self.get_success(
+ self.queue.send_presence_to_destinations(
+ (state1, state2), ("dest1", "dest2")
+ )
+ )
+ self.get_success(
+ self.queue.send_presence_to_destinations((state3,), ("dest3",))
+ )
now_token = self.queue.get_current_token(self.instance_name)
diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py
index 43c513b157..95106ec8f3 100644
--- a/tests/handlers/test_typing.py
+++ b/tests/handlers/test_typing.py
@@ -120,8 +120,6 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase):
self.datastore = hs.get_datastores().main
- self.datastore.get_destination_retry_timings = AsyncMock(return_value=None)
-
self.datastore.get_device_updates_by_remote = AsyncMock( # type: ignore[method-assign]
return_value=(0, [])
)
|