summary refs log tree commit diff
path: root/synapse/federation/transaction_queue.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2016-11-17 15:46:44 +0000
committerErik Johnston <erik@matrix.org>2016-11-17 15:48:04 +0000
commitf8ee66250a16cb9dd3af01fb1150ff18cfebbc39 (patch)
tree9920bd4e8164f705b4e27c714d6c053082dcf7a5 /synapse/federation/transaction_queue.py
parentHook up the send queue and create a federation sender worker (diff)
downloadsynapse-f8ee66250a16cb9dd3af01fb1150ff18cfebbc39.tar.xz
Handle sending events and device messages over federation
Diffstat (limited to 'synapse/federation/transaction_queue.py')
-rw-r--r--synapse/federation/transaction_queue.py32
1 files changed, 32 insertions, 0 deletions
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 5d4f244377..aa664beead 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -26,6 +26,7 @@ from synapse.util.retryutils import (
     get_retry_limiter, NotRetryingDestination,
 )
 from synapse.util.metrics import measure_func
+from synapse.types import get_domain_from_id
 from synapse.handlers.presence import format_user_presence_state
 import synapse.metrics
 
@@ -54,6 +55,7 @@ class TransactionQueue(object):
         self.server_name = hs.hostname
 
         self.store = hs.get_datastore()
+        self.state = hs.get_state_handler()
         self.transaction_actions = TransactionActions(self.store)
 
         self.transport_layer = hs.get_federation_transport_client()
@@ -103,6 +105,9 @@ class TransactionQueue(object):
 
         self._order = 1
 
+        self._is_processing = False
+        self._last_token = 0
+
     def can_send_to(self, destination):
         """Can we send messages to the given server?
 
@@ -123,6 +128,33 @@ class TransactionQueue(object):
         else:
             return not destination.startswith("localhost")
 
+    @defer.inlineCallbacks
+    def notify_new_events(self, current_id):
+        if self._is_processing:
+            return
+
+        try:
+            self._is_processing = True
+            while True:
+                self._last_token, events = yield self.store.get_all_new_events_stream(
+                    self._last_token, current_id, limit=20,
+                )
+
+                if not events:
+                    break
+
+                for event in events:
+                    users_in_room = yield self.state.get_current_user_in_room(
+                        event.room_id, latest_event_ids=[event.event_id],
+                    )
+
+                    destinations = [
+                        get_domain_from_id(user_id) for user_id in users_in_room
+                    ]
+                    self.send_pdu(event, destinations)
+        finally:
+            self._is_processing = False
+
     def send_pdu(self, pdu, destinations):
         # We loop through all destinations to see whether we already have
         # a transaction in progress. If we do, stick it in the pending_pdus