diff options
author | Erik Johnston <erik@matrix.org> | 2016-11-21 11:28:37 +0000 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2016-11-21 11:33:08 +0000 |
commit | 7c9cdb22453d1a442e5c280149aeeff4d46da215 (patch) | |
tree | 1434dca32f57320810b5e70314db521f5cc7a338 /synapse/federation/transaction_queue.py | |
parent | Handle sending events and device messages over federation (diff) | |
download | synapse-7c9cdb22453d1a442e5c280149aeeff4d46da215.tar.xz |
Store federation stream positions in the database
Diffstat (limited to 'synapse/federation/transaction_queue.py')
-rw-r--r-- | synapse/federation/transaction_queue.py | 21 |
1 files changed, 17 insertions, 4 deletions
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index aa664beead..1b0ea070c2 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -106,7 +106,7 @@ class TransactionQueue(object): self._order = 1 self._is_processing = False - self._last_token = 0 + self._last_poked_id = -1 def can_send_to(self, destination): """Can we send messages to the given server? @@ -130,17 +130,22 @@ class TransactionQueue(object): @defer.inlineCallbacks def notify_new_events(self, current_id): + self._last_poked_id = max(current_id, self._last_poked_id) + if self._is_processing: return try: self._is_processing = True while True: - self._last_token, events = yield self.store.get_all_new_events_stream( - self._last_token, current_id, limit=20, + last_token = yield self.store.get_federation_out_pos("events") + next_token, events = yield self.store.get_all_new_events_stream( + last_token, self._last_poked_id, limit=20, ) - if not events: + logger.debug("Handling %s -> %s", last_token, next_token) + + if not events and next_token >= self._last_poked_id: break for event in events: @@ -151,7 +156,15 @@ class TransactionQueue(object): destinations = [ get_domain_from_id(user_id) for user_id in users_in_room ] + + logger.debug("Sending %s to %r", event, destinations) + self.send_pdu(event, destinations) + + yield self.store.update_federation_out_pos( + "events", next_token + ) + finally: self._is_processing = False |