diff --git a/synapse/federation/__init__.py b/synapse/federation/__init__.py
index 0112588656..7517c529d4 100644
--- a/synapse/federation/__init__.py
+++ b/synapse/federation/__init__.py
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
-# Copyright 2014 OpenMarket Ltd
+# Copyright 2014, 2015 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
diff --git a/synapse/federation/persistence.py b/synapse/federation/persistence.py
index 73dc844d59..85c82a4623 100644
--- a/synapse/federation/persistence.py
+++ b/synapse/federation/persistence.py
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
-# Copyright 2014 OpenMarket Ltd
+# Copyright 2014, 2015 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index 9f8aadccca..a4c29b484b 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
-# Copyright 2014 OpenMarket Ltd
+# Copyright 2014, 2015 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -256,31 +256,35 @@ class ReplicationLayer(object):
@defer.inlineCallbacks
@log_function
- def get_state_for_context(self, destination, context, event_id=None):
+ def get_state_for_context(self, destination, context, event_id):
"""Requests all of the `current` state PDUs for a given context from
a remote home server.
Args:
destination (str): The remote homeserver to query for the state.
context (str): The context we're interested in.
+ event_id (str): The id of the event we want the state at.
Returns:
Deferred: Results in a list of PDUs.
"""
- transaction_data = yield self.transport_layer.get_context_state(
+ result = yield self.transport_layer.get_context_state(
destination,
context,
event_id=event_id,
)
- transaction = Transaction(**transaction_data)
pdus = [
+ self.event_from_pdu_json(p, outlier=True) for p in result["pdus"]
+ ]
+
+ auth_chain = [
self.event_from_pdu_json(p, outlier=True)
- for p in transaction.pdus
+ for p in result.get("auth_chain", [])
]
- defer.returnValue(pdus)
+ defer.returnValue((pdus, auth_chain))
@defer.inlineCallbacks
@log_function
@@ -383,10 +387,16 @@ class ReplicationLayer(object):
context,
event_id,
)
+ auth_chain = yield self.store.get_auth_chain(
+ [pdu.event_id for pdu in pdus]
+ )
else:
raise NotImplementedError("Specify an event")
- defer.returnValue((200, self._transaction_from_pdus(pdus).get_dict()))
+ defer.returnValue((200, {
+ "pdus": [pdu.get_pdu_json() for pdu in pdus],
+ "auth_chain": [pdu.get_pdu_json() for pdu in auth_chain],
+ }))
@defer.inlineCallbacks
@log_function
@@ -562,8 +572,8 @@ class ReplicationLayer(object):
already_seen = (
existing and (
- not existing.internal_metadata.outlier
- or pdu.internal_metadata.outlier
+ not existing.internal_metadata.is_outlier()
+ or pdu.internal_metadata.is_outlier()
)
)
if already_seen:
@@ -573,6 +583,8 @@ class ReplicationLayer(object):
state = None
+ auth_chain = []
+
# We need to make sure we have all the auth events.
# for e_id, _ in pdu.auth_events:
# exists = yield self._get_persisted_pdu(
@@ -604,7 +616,7 @@ class ReplicationLayer(object):
# )
# Get missing pdus if necessary.
- if not pdu.internal_metadata.outlier:
+ if not pdu.internal_metadata.is_outlier():
# We only backfill backwards to the min depth.
min_depth = yield self.handler.get_min_depth_for_context(
pdu.room_id
@@ -645,7 +657,7 @@ class ReplicationLayer(object):
"_handle_new_pdu getting state for %s",
pdu.room_id
)
- state = yield self.get_state_for_context(
+ state, auth_chain = yield self.get_state_for_context(
origin, pdu.room_id, pdu.event_id,
)
@@ -655,6 +667,7 @@ class ReplicationLayer(object):
pdu,
backfilled=backfilled,
state=state,
+ auth_chain=auth_chain,
)
else:
ret = None
@@ -717,6 +730,7 @@ class _TransactionQueue(object):
destinations = set(destinations)
destinations.discard(self.server_name)
+ destinations.discard("localhost")
logger.debug("Sending to: %s", str(destinations))
@@ -801,6 +815,8 @@ class _TransactionQueue(object):
else:
logger.info("TX [%s] is ready for retry", destination)
+ logger.info("TX [%s] _attempt_new_transaction", destination)
+
if destination in self.pending_transactions:
# XXX: pending_transactions can get stuck on by a never-ending
# request at which point pending_pdus_by_dest just keeps growing.
@@ -813,6 +829,9 @@ class _TransactionQueue(object):
pending_edus = self.pending_edus_by_dest.pop(destination, [])
pending_failures = self.pending_failures_by_dest.pop(destination, [])
+ if pending_pdus:
+ logger.info("TX [%s] len(pending_pdus_by_dest[dest]) = %d", destination, len(pending_pdus))
+
if not pending_pdus and not pending_edus and not pending_failures:
return
diff --git a/synapse/federation/transport.py b/synapse/federation/transport.py
index 0f11c6d491..1f0f06e0fe 100644
--- a/synapse/federation/transport.py
+++ b/synapse/federation/transport.py
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
-# Copyright 2014 OpenMarket Ltd
+# Copyright 2014, 2015 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
diff --git a/synapse/federation/units.py b/synapse/federation/units.py
index 1bcd0548c2..816f55bf39 100644
--- a/synapse/federation/units.py
+++ b/synapse/federation/units.py
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
-# Copyright 2014 OpenMarket Ltd
+# Copyright 2014, 2015 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
|