diff options
Diffstat (limited to 'synapse/federation/sender/per_destination_queue.py')
-rw-r--r-- | synapse/federation/sender/per_destination_queue.py | 34 |
1 files changed, 22 insertions, 12 deletions
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index 3a2efd56ee..c11d1f6d31 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -105,34 +105,34 @@ class PerDestinationQueue: # catch-up at startup. # New events will only be sent once this is finished, at which point # _catching_up is flipped to False. - self._catching_up = True # type: bool + self._catching_up: bool = True # The stream_ordering of the most recent PDU that was discarded due to # being in catch-up mode. - self._catchup_last_skipped = 0 # type: int + self._catchup_last_skipped: int = 0 # Cache of the last successfully-transmitted stream ordering for this # destination (we are the only updater so this is safe) - self._last_successful_stream_ordering = None # type: Optional[int] + self._last_successful_stream_ordering: Optional[int] = None # a queue of pending PDUs - self._pending_pdus = [] # type: List[EventBase] + self._pending_pdus: List[EventBase] = [] # XXX this is never actually used: see # https://github.com/matrix-org/synapse/issues/7549 - self._pending_edus = [] # type: List[Edu] + self._pending_edus: List[Edu] = [] # Pending EDUs by their "key". Keyed EDUs are EDUs that get clobbered # based on their key (e.g. typing events by room_id) # Map of (edu_type, key) -> Edu - self._pending_edus_keyed = {} # type: Dict[Tuple[str, Hashable], Edu] + self._pending_edus_keyed: Dict[Tuple[str, Hashable], Edu] = {} # Map of user_id -> UserPresenceState of pending presence to be sent to this # destination - self._pending_presence = {} # type: Dict[str, UserPresenceState] + self._pending_presence: Dict[str, UserPresenceState] = {} # room_id -> receipt_type -> user_id -> receipt_dict - self._pending_rrs = {} # type: Dict[str, Dict[str, Dict[str, dict]]] + self._pending_rrs: Dict[str, Dict[str, Dict[str, dict]]] = {} self._rrs_pending_flush = False # stream_id of last successfully sent to-device message. @@ -171,14 +171,24 @@ class PerDestinationQueue: self.attempt_new_transaction() - def send_presence(self, states: Iterable[UserPresenceState]) -> None: - """Add presence updates to the queue. Start the transmission loop if necessary. + def send_presence( + self, states: Iterable[UserPresenceState], start_loop: bool = True + ) -> None: + """Add presence updates to the queue. + + Args: + states: Presence updates to send + start_loop: Whether to start the transmission loop if not already + running. Args: states: presence to send """ self._pending_presence.update({state.user_id: state for state in states}) - self.attempt_new_transaction() + self._new_data_to_send = True + + if start_loop: + self.attempt_new_transaction() def queue_read_receipt(self, receipt: ReadReceipt) -> None: """Add a RR to the list to be sent. Doesn't start the transmission loop yet @@ -243,7 +253,7 @@ class PerDestinationQueue: ) async def _transaction_transmission_loop(self) -> None: - pending_pdus = [] # type: List[EventBase] + pending_pdus: List[EventBase] = [] try: self.transmission_loop_running = True |