summary refs log tree commit diff
path: root/synapse/federation/replication.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation/replication.py')
-rw-r--r--synapse/federation/replication.py41
1 files changed, 30 insertions, 11 deletions
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index 9f8aadccca..a4c29b484b 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -1,5 +1,5 @@
 # -*- coding: utf-8 -*-
-# Copyright 2014 OpenMarket Ltd
+# Copyright 2014, 2015 OpenMarket Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -256,31 +256,35 @@ class ReplicationLayer(object):
 
     @defer.inlineCallbacks
     @log_function
-    def get_state_for_context(self, destination, context, event_id=None):
+    def get_state_for_context(self, destination, context, event_id):
         """Requests all of the `current` state PDUs for a given context from
         a remote home server.
 
         Args:
             destination (str): The remote homeserver to query for the state.
             context (str): The context we're interested in.
+            event_id (str): The id of the event we want the state at.
 
         Returns:
             Deferred: Results in a list of PDUs.
         """
 
-        transaction_data = yield self.transport_layer.get_context_state(
+        result = yield self.transport_layer.get_context_state(
             destination,
             context,
             event_id=event_id,
         )
 
-        transaction = Transaction(**transaction_data)
         pdus = [
+            self.event_from_pdu_json(p, outlier=True) for p in result["pdus"]
+        ]
+
+        auth_chain = [
             self.event_from_pdu_json(p, outlier=True)
-            for p in transaction.pdus
+            for p in result.get("auth_chain", [])
         ]
 
-        defer.returnValue(pdus)
+        defer.returnValue((pdus, auth_chain))
 
     @defer.inlineCallbacks
     @log_function
@@ -383,10 +387,16 @@ class ReplicationLayer(object):
                 context,
                 event_id,
             )
+            auth_chain = yield self.store.get_auth_chain(
+                [pdu.event_id for pdu in pdus]
+            )
         else:
             raise NotImplementedError("Specify an event")
 
-        defer.returnValue((200, self._transaction_from_pdus(pdus).get_dict()))
+        defer.returnValue((200, {
+            "pdus": [pdu.get_pdu_json() for pdu in pdus],
+            "auth_chain": [pdu.get_pdu_json() for pdu in auth_chain],
+        }))
 
     @defer.inlineCallbacks
     @log_function
@@ -562,8 +572,8 @@ class ReplicationLayer(object):
 
         already_seen = (
             existing and (
-                not existing.internal_metadata.outlier
-                or pdu.internal_metadata.outlier
+                not existing.internal_metadata.is_outlier()
+                or pdu.internal_metadata.is_outlier()
             )
         )
         if already_seen:
@@ -573,6 +583,8 @@ class ReplicationLayer(object):
 
         state = None
 
+        auth_chain = []
+
         # We need to make sure we have all the auth events.
         # for e_id, _ in pdu.auth_events:
         #     exists = yield self._get_persisted_pdu(
@@ -604,7 +616,7 @@ class ReplicationLayer(object):
         #             )
 
         # Get missing pdus if necessary.
-        if not pdu.internal_metadata.outlier:
+        if not pdu.internal_metadata.is_outlier():
             # We only backfill backwards to the min depth.
             min_depth = yield self.handler.get_min_depth_for_context(
                 pdu.room_id
@@ -645,7 +657,7 @@ class ReplicationLayer(object):
                     "_handle_new_pdu getting state for %s",
                     pdu.room_id
                 )
-                state = yield self.get_state_for_context(
+                state, auth_chain = yield self.get_state_for_context(
                     origin, pdu.room_id, pdu.event_id,
                 )
 
@@ -655,6 +667,7 @@ class ReplicationLayer(object):
                 pdu,
                 backfilled=backfilled,
                 state=state,
+                auth_chain=auth_chain,
             )
         else:
             ret = None
@@ -717,6 +730,7 @@ class _TransactionQueue(object):
 
         destinations = set(destinations)
         destinations.discard(self.server_name)
+        destinations.discard("localhost")
 
         logger.debug("Sending to: %s", str(destinations))
 
@@ -801,6 +815,8 @@ class _TransactionQueue(object):
             else:
                 logger.info("TX [%s] is ready for retry", destination)
 
+        logger.info("TX [%s] _attempt_new_transaction", destination)
+        
         if destination in self.pending_transactions:
             # XXX: pending_transactions can get stuck on by a never-ending
             # request at which point pending_pdus_by_dest just keeps growing.
@@ -813,6 +829,9 @@ class _TransactionQueue(object):
         pending_edus = self.pending_edus_by_dest.pop(destination, [])
         pending_failures = self.pending_failures_by_dest.pop(destination, [])
 
+        if pending_pdus:
+            logger.info("TX [%s] len(pending_pdus_by_dest[dest]) = %d", destination, len(pending_pdus))
+
         if not pending_pdus and not pending_edus and not pending_failures:
             return