summary refs log tree commit diff
path: root/synapse/federation/transaction_queue.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2016-11-21 11:28:37 +0000
committerErik Johnston <erik@matrix.org>2016-11-21 11:33:08 +0000
commit7c9cdb22453d1a442e5c280149aeeff4d46da215 (patch)
tree1434dca32f57320810b5e70314db521f5cc7a338 /synapse/federation/transaction_queue.py
parentHandle sending events and device messages over federation (diff)
downloadsynapse-7c9cdb22453d1a442e5c280149aeeff4d46da215.tar.xz
Store federation stream positions in the database
Diffstat (limited to 'synapse/federation/transaction_queue.py')
-rw-r--r--synapse/federation/transaction_queue.py21
1 files changed, 17 insertions, 4 deletions
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index aa664beead..1b0ea070c2 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -106,7 +106,7 @@ class TransactionQueue(object):
         self._order = 1
 
         self._is_processing = False
-        self._last_token = 0
+        self._last_poked_id = -1
 
     def can_send_to(self, destination):
         """Can we send messages to the given server?
@@ -130,17 +130,22 @@ class TransactionQueue(object):
 
     @defer.inlineCallbacks
     def notify_new_events(self, current_id):
+        self._last_poked_id = max(current_id, self._last_poked_id)
+
         if self._is_processing:
             return
 
         try:
             self._is_processing = True
             while True:
-                self._last_token, events = yield self.store.get_all_new_events_stream(
-                    self._last_token, current_id, limit=20,
+                last_token = yield self.store.get_federation_out_pos("events")
+                next_token, events = yield self.store.get_all_new_events_stream(
+                    last_token, self._last_poked_id, limit=20,
                 )
 
-                if not events:
+                logger.debug("Handling %s -> %s", last_token, next_token)
+
+                if not events and next_token >= self._last_poked_id:
                     break
 
                 for event in events:
@@ -151,7 +156,15 @@ class TransactionQueue(object):
                     destinations = [
                         get_domain_from_id(user_id) for user_id in users_in_room
                     ]
+
+                    logger.debug("Sending %s to %r", event, destinations)
+
                     self.send_pdu(event, destinations)
+
+                yield self.store.update_federation_out_pos(
+                    "events", next_token
+                )
+
         finally:
             self._is_processing = False