diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index 3a2efd56ee..c11d1f6d31 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -105,34 +105,34 @@ class PerDestinationQueue:
# catch-up at startup.
# New events will only be sent once this is finished, at which point
# _catching_up is flipped to False.
- self._catching_up = True # type: bool
+ self._catching_up: bool = True
# The stream_ordering of the most recent PDU that was discarded due to
# being in catch-up mode.
- self._catchup_last_skipped = 0 # type: int
+ self._catchup_last_skipped: int = 0
# Cache of the last successfully-transmitted stream ordering for this
# destination (we are the only updater so this is safe)
- self._last_successful_stream_ordering = None # type: Optional[int]
+ self._last_successful_stream_ordering: Optional[int] = None
# a queue of pending PDUs
- self._pending_pdus = [] # type: List[EventBase]
+ self._pending_pdus: List[EventBase] = []
# XXX this is never actually used: see
# https://github.com/matrix-org/synapse/issues/7549
- self._pending_edus = [] # type: List[Edu]
+ self._pending_edus: List[Edu] = []
# Pending EDUs by their "key". Keyed EDUs are EDUs that get clobbered
# based on their key (e.g. typing events by room_id)
# Map of (edu_type, key) -> Edu
- self._pending_edus_keyed = {} # type: Dict[Tuple[str, Hashable], Edu]
+ self._pending_edus_keyed: Dict[Tuple[str, Hashable], Edu] = {}
# Map of user_id -> UserPresenceState of pending presence to be sent to this
# destination
- self._pending_presence = {} # type: Dict[str, UserPresenceState]
+ self._pending_presence: Dict[str, UserPresenceState] = {}
# room_id -> receipt_type -> user_id -> receipt_dict
- self._pending_rrs = {} # type: Dict[str, Dict[str, Dict[str, dict]]]
+ self._pending_rrs: Dict[str, Dict[str, Dict[str, dict]]] = {}
self._rrs_pending_flush = False
# stream_id of last successfully sent to-device message.
@@ -171,14 +171,24 @@ class PerDestinationQueue:
self.attempt_new_transaction()
- def send_presence(self, states: Iterable[UserPresenceState]) -> None:
- """Add presence updates to the queue. Start the transmission loop if necessary.
+ def send_presence(
+ self, states: Iterable[UserPresenceState], start_loop: bool = True
+ ) -> None:
+ """Add presence updates to the queue.
+
+ Args:
+ states: Presence updates to send
+ start_loop: Whether to start the transmission loop if not already
+ running.
Args:
states: presence to send
"""
self._pending_presence.update({state.user_id: state for state in states})
- self.attempt_new_transaction()
+ self._new_data_to_send = True
+
+ if start_loop:
+ self.attempt_new_transaction()
def queue_read_receipt(self, receipt: ReadReceipt) -> None:
"""Add a RR to the list to be sent. Doesn't start the transmission loop yet
@@ -243,7 +253,7 @@ class PerDestinationQueue:
)
async def _transaction_transmission_loop(self) -> None:
- pending_pdus = [] # type: List[EventBase]
+ pending_pdus: List[EventBase] = []
try:
self.transmission_loop_running = True
|