diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index 0cb632fb08..6388bb98e2 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -25,6 +25,7 @@ from .persistence import TransactionActions
from synapse.util.logutils import log_function
from synapse.util.logcontext import PreserveLoggingContext
+from synapse.events import FrozenEvent
import logging
@@ -74,6 +75,7 @@ class ReplicationLayer(object):
self._clock = hs.get_clock()
self.event_factory = hs.get_event_factory()
+ self.event_builder_factory = hs.get_event_builder_factory()
def set_handler(self, handler):
"""Sets the handler that the replication layer will use to communicate
@@ -112,7 +114,7 @@ class ReplicationLayer(object):
self.query_handlers[query_type] = handler
@log_function
- def send_pdu(self, pdu):
+ def send_pdu(self, pdu, destinations):
"""Informs the replication layer about a new PDU generated within the
home server that should be transmitted to others.
@@ -131,7 +133,7 @@ class ReplicationLayer(object):
logger.debug("[%s] transaction_layer.enqueue_pdu... ", pdu.event_id)
# TODO, add errback, etc.
- self._transaction_queue.enqueue_pdu(pdu, order)
+ self._transaction_queue.enqueue_pdu(pdu, destinations, order)
logger.debug(
"[%s] transaction_layer.enqueue_pdu... done",
@@ -438,7 +440,9 @@ class ReplicationLayer(object):
@defer.inlineCallbacks
def on_send_join_request(self, origin, content):
+ logger.debug("on_send_join_request: content: %s", content)
pdu = self.event_from_pdu_json(content)
+ logger.debug("on_send_join_request: pdu sigs: %s", pdu.signatures)
res_pdus = yield self.handler.on_send_join_request(origin, pdu)
time_now = self._clock.time_msec()
defer.returnValue((200, {
@@ -557,7 +561,13 @@ class ReplicationLayer(object):
origin, pdu.event_id, do_auth=False
)
- if existing and (not existing.outlier or pdu.outlier):
+ already_seen = (
+ existing and (
+ not existing.internal_metadata.outlier
+ or pdu.internal_metadata.outlier
+ )
+ )
+ if already_seen:
logger.debug("Already seen pdu %s", pdu.event_id)
defer.returnValue({})
return
@@ -595,7 +605,7 @@ class ReplicationLayer(object):
# )
# Get missing pdus if necessary.
- if not pdu.outlier:
+ if not pdu.internal_metadata.outlier:
# We only backfill backwards to the min depth.
min_depth = yield self.handler.get_min_depth_for_context(
pdu.room_id
@@ -658,19 +668,14 @@ class ReplicationLayer(object):
return "<ReplicationLayer(%s)>" % self.server_name
def event_from_pdu_json(self, pdu_json, outlier=False):
- #TODO: Check we have all the PDU keys here
- pdu_json.setdefault("hashes", {})
- pdu_json.setdefault("signatures", {})
- sender = pdu_json.pop("sender", None)
- if sender is not None:
- pdu_json["user_id"] = sender
- state_hash = pdu_json.get("unsigned", {}).pop("state_hash", None)
- if state_hash is not None:
- pdu_json["state_hash"] = state_hash
- return self.event_factory.create_event(
- pdu_json["type"], outlier=outlier, **pdu_json
+ event = FrozenEvent(
+ pdu_json
)
+ event.internal_metadata.outlier = outlier
+
+ return event
+
class _TransactionQueue(object):
"""This class makes sure we only have one transaction in flight at
@@ -706,15 +711,13 @@ class _TransactionQueue(object):
@defer.inlineCallbacks
@log_function
- def enqueue_pdu(self, pdu, order):
+ def enqueue_pdu(self, pdu, destinations, order):
# We loop through all destinations to see whether we already have
# a transaction in progress. If we do, stick it in the pending_pdus
# table and we'll get back to it later.
- destinations = set([
- d for d in pdu.destinations
- if d != self.server_name
- ])
+ destinations = set(destinations)
+ destinations.discard(self.server_name)
logger.debug("Sending to: %s", str(destinations))
|