diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index aa664beead..1b0ea070c2 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -106,7 +106,7 @@ class TransactionQueue(object):
self._order = 1
self._is_processing = False
- self._last_token = 0
+ self._last_poked_id = -1
def can_send_to(self, destination):
"""Can we send messages to the given server?
@@ -130,17 +130,22 @@ class TransactionQueue(object):
@defer.inlineCallbacks
def notify_new_events(self, current_id):
+ self._last_poked_id = max(current_id, self._last_poked_id)
+
if self._is_processing:
return
try:
self._is_processing = True
while True:
- self._last_token, events = yield self.store.get_all_new_events_stream(
- self._last_token, current_id, limit=20,
+ last_token = yield self.store.get_federation_out_pos("events")
+ next_token, events = yield self.store.get_all_new_events_stream(
+ last_token, self._last_poked_id, limit=20,
)
- if not events:
+ logger.debug("Handling %s -> %s", last_token, next_token)
+
+ if not events and next_token >= self._last_poked_id:
break
for event in events:
@@ -151,7 +156,15 @@ class TransactionQueue(object):
destinations = [
get_domain_from_id(user_id) for user_id in users_in_room
]
+
+ logger.debug("Sending %s to %r", event, destinations)
+
self.send_pdu(event, destinations)
+
+ yield self.store.update_federation_out_pos(
+ "events", next_token
+ )
+
finally:
self._is_processing = False
|