summary refs log tree commit diff
path: root/synapse/federation/replication.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation/replication.py')
-rw-r--r--synapse/federation/replication.py44
1 files changed, 23 insertions, 21 deletions
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index 0cb632fb08..9f8aadccca 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -25,6 +25,7 @@ from .persistence import TransactionActions
 
 from synapse.util.logutils import log_function
 from synapse.util.logcontext import PreserveLoggingContext
+from synapse.events import FrozenEvent
 
 import logging
 
@@ -73,7 +74,7 @@ class ReplicationLayer(object):
 
         self._clock = hs.get_clock()
 
-        self.event_factory = hs.get_event_factory()
+        self.event_builder_factory = hs.get_event_builder_factory()
 
     def set_handler(self, handler):
         """Sets the handler that the replication layer will use to communicate
@@ -112,7 +113,7 @@ class ReplicationLayer(object):
         self.query_handlers[query_type] = handler
 
     @log_function
-    def send_pdu(self, pdu):
+    def send_pdu(self, pdu, destinations):
         """Informs the replication layer about a new PDU generated within the
         home server that should be transmitted to others.
 
@@ -131,7 +132,7 @@ class ReplicationLayer(object):
         logger.debug("[%s] transaction_layer.enqueue_pdu... ", pdu.event_id)
 
         # TODO, add errback, etc.
-        self._transaction_queue.enqueue_pdu(pdu, order)
+        self._transaction_queue.enqueue_pdu(pdu, destinations, order)
 
         logger.debug(
             "[%s] transaction_layer.enqueue_pdu... done",
@@ -438,7 +439,9 @@ class ReplicationLayer(object):
 
     @defer.inlineCallbacks
     def on_send_join_request(self, origin, content):
+        logger.debug("on_send_join_request: content: %s", content)
         pdu = self.event_from_pdu_json(content)
+        logger.debug("on_send_join_request: pdu sigs: %s", pdu.signatures)
         res_pdus = yield self.handler.on_send_join_request(origin, pdu)
         time_now = self._clock.time_msec()
         defer.returnValue((200, {
@@ -557,7 +560,13 @@ class ReplicationLayer(object):
             origin, pdu.event_id, do_auth=False
         )
 
-        if existing and (not existing.outlier or pdu.outlier):
+        already_seen = (
+            existing and (
+                not existing.internal_metadata.outlier
+                or pdu.internal_metadata.outlier
+            )
+        )
+        if already_seen:
             logger.debug("Already seen pdu %s", pdu.event_id)
             defer.returnValue({})
             return
@@ -595,7 +604,7 @@ class ReplicationLayer(object):
         #             )
 
         # Get missing pdus if necessary.
-        if not pdu.outlier:
+        if not pdu.internal_metadata.outlier:
             # We only backfill backwards to the min depth.
             min_depth = yield self.handler.get_min_depth_for_context(
                 pdu.room_id
@@ -658,19 +667,14 @@ class ReplicationLayer(object):
         return "<ReplicationLayer(%s)>" % self.server_name
 
     def event_from_pdu_json(self, pdu_json, outlier=False):
-        #TODO: Check we have all the PDU keys here
-        pdu_json.setdefault("hashes", {})
-        pdu_json.setdefault("signatures", {})
-        sender = pdu_json.pop("sender", None)
-        if sender is not None:
-            pdu_json["user_id"] = sender
-        state_hash = pdu_json.get("unsigned", {}).pop("state_hash", None)
-        if state_hash is not None:
-            pdu_json["state_hash"] = state_hash
-        return self.event_factory.create_event(
-            pdu_json["type"], outlier=outlier, **pdu_json
+        event = FrozenEvent(
+            pdu_json
         )
 
+        event.internal_metadata.outlier = outlier
+
+        return event
+
 
 class _TransactionQueue(object):
     """This class makes sure we only have one transaction in flight at
@@ -706,15 +710,13 @@ class _TransactionQueue(object):
 
     @defer.inlineCallbacks
     @log_function
-    def enqueue_pdu(self, pdu, order):
+    def enqueue_pdu(self, pdu, destinations, order):
         # We loop through all destinations to see whether we already have
         # a transaction in progress. If we do, stick it in the pending_pdus
         # table and we'll get back to it later.
 
-        destinations = set([
-            d for d in pdu.destinations
-            if d != self.server_name
-        ])
+        destinations = set(destinations)
+        destinations.discard(self.server_name)
 
         logger.debug("Sending to: %s", str(destinations))