diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 5d4f244377..aa664beead 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -26,6 +26,7 @@ from synapse.util.retryutils import (
get_retry_limiter, NotRetryingDestination,
)
from synapse.util.metrics import measure_func
+from synapse.types import get_domain_from_id
from synapse.handlers.presence import format_user_presence_state
import synapse.metrics
@@ -54,6 +55,7 @@ class TransactionQueue(object):
self.server_name = hs.hostname
self.store = hs.get_datastore()
+ self.state = hs.get_state_handler()
self.transaction_actions = TransactionActions(self.store)
self.transport_layer = hs.get_federation_transport_client()
@@ -103,6 +105,9 @@ class TransactionQueue(object):
self._order = 1
+ self._is_processing = False
+ self._last_token = 0
+
def can_send_to(self, destination):
"""Can we send messages to the given server?
@@ -123,6 +128,33 @@ class TransactionQueue(object):
else:
return not destination.startswith("localhost")
+ @defer.inlineCallbacks
+ def notify_new_events(self, current_id):
+ if self._is_processing:
+ return
+
+ try:
+ self._is_processing = True
+ while True:
+ self._last_token, events = yield self.store.get_all_new_events_stream(
+ self._last_token, current_id, limit=20,
+ )
+
+ if not events:
+ break
+
+ for event in events:
+ users_in_room = yield self.state.get_current_user_in_room(
+ event.room_id, latest_event_ids=[event.event_id],
+ )
+
+ destinations = [
+ get_domain_from_id(user_id) for user_id in users_in_room
+ ]
+ self.send_pdu(event, destinations)
+ finally:
+ self._is_processing = False
+
def send_pdu(self, pdu, destinations):
# We loop through all destinations to see whether we already have
# a transaction in progress. If we do, stick it in the pending_pdus
|