summary refs log tree commit diff
path: root/synapse/federation/replication.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation/replication.py')
-rw-r--r--synapse/federation/replication.py72
1 files changed, 29 insertions, 43 deletions
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index 159af4eed7..838e660a46 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -111,14 +111,6 @@ class ReplicationLayer(object):
         """Informs the replication layer about a new PDU generated within the
         home server that should be transmitted to others.
 
-        This will fill out various attributes on the PDU object, e.g. the
-        `prev_pdus` key.
-
-        *Note:* The home server should always call `send_pdu` even if it knows
-        that it does not need to be replicated to other home servers. This is
-        in case e.g. someone else joins via a remote home server and then
-        backfills.
-
         TODO: Figure out when we should actually resolve the deferred.
 
         Args:
@@ -131,18 +123,12 @@ class ReplicationLayer(object):
         order = self._order
         self._order += 1
 
-        logger.debug("[%s] Persisting PDU", pdu.pdu_id)
-
-        # Save *before* trying to send
-        # yield self.store.persist_event(pdu=pdu)
-
-        logger.debug("[%s] Persisted PDU", pdu.pdu_id)
-        logger.debug("[%s] transaction_layer.enqueue_pdu... ", pdu.pdu_id)
+        logger.debug("[%s] transaction_layer.enqueue_pdu... ", pdu.event_id)
 
         # TODO, add errback, etc.
         self._transaction_queue.enqueue_pdu(pdu, order)
 
-        logger.debug("[%s] transaction_layer.enqueue_pdu... done", pdu.pdu_id)
+        logger.debug("[%s] transaction_layer.enqueue_pdu... done", pdu.event_id)
 
     @log_function
     def send_edu(self, destination, edu_type, content):
@@ -215,7 +201,7 @@ class ReplicationLayer(object):
 
     @defer.inlineCallbacks
     @log_function
-    def get_pdu(self, destination, pdu_origin, pdu_id, outlier=False):
+    def get_pdu(self, destination, event_id, outlier=False):
         """Requests the PDU with given origin and ID from the remote home
         server.
 
@@ -224,7 +210,7 @@ class ReplicationLayer(object):
         Args:
             destination (str): Which home server to query
             pdu_origin (str): The home server that originally sent the pdu.
-            pdu_id (str)
+            event_id (str)
             outlier (bool): Indicates whether the PDU is an `outlier`, i.e. if
                 it's from an arbitary point in the context as opposed to part
                 of the current block of PDUs. Defaults to `False`
@@ -233,8 +219,9 @@ class ReplicationLayer(object):
             Deferred: Results in the requested PDU.
         """
 
-        transaction_data = yield self.transport_layer.get_pdu(
-            destination, pdu_origin, pdu_id)
+        transaction_data = yield self.transport_layer.get_event(
+            destination, event_id
+        )
 
         transaction = Transaction(**transaction_data)
 
@@ -249,8 +236,7 @@ class ReplicationLayer(object):
 
     @defer.inlineCallbacks
     @log_function
-    def get_state_for_context(self, destination, context, pdu_id=None,
-                              pdu_origin=None):
+    def get_state_for_context(self, destination, context, event_id=None):
         """Requests all of the `current` state PDUs for a given context from
         a remote home server.
 
@@ -263,7 +249,9 @@ class ReplicationLayer(object):
         """
 
         transaction_data = yield self.transport_layer.get_context_state(
-            destination, context, pdu_id=pdu_id, pdu_origin=pdu_origin,
+            destination,
+            context,
+            event_id=event_id,
         )
 
         transaction = Transaction(**transaction_data)
@@ -352,10 +340,10 @@ class ReplicationLayer(object):
 
     @defer.inlineCallbacks
     @log_function
-    def on_context_state_request(self, context, pdu_id, pdu_origin):
-        if pdu_id and pdu_origin:
+    def on_context_state_request(self, context, event_id):
+        if event_id:
             pdus = yield self.handler.get_state_for_pdu(
-                pdu_id, pdu_origin
+                event_id
             )
         else:
             raise NotImplementedError("Specify an event")
@@ -370,8 +358,8 @@ class ReplicationLayer(object):
 
     @defer.inlineCallbacks
     @log_function
-    def on_pdu_request(self, pdu_origin, pdu_id):
-        pdu = yield self._get_persisted_pdu(pdu_id, pdu_origin)
+    def on_pdu_request(self, event_id):
+        pdu = yield self._get_persisted_pdu(event_id)
 
         if pdu:
             defer.returnValue(
@@ -443,9 +431,8 @@ class ReplicationLayer(object):
     def send_join(self, destination, pdu):
         _, content = yield self.transport_layer.send_join(
             destination,
-            pdu.context,
-            pdu.pdu_id,
-            pdu.origin,
+            pdu.room_id,
+            pdu.event_id,
             pdu.get_dict(),
         )
 
@@ -457,13 +444,13 @@ class ReplicationLayer(object):
         defer.returnValue(pdus)
 
     @log_function
-    def _get_persisted_pdu(self, pdu_id, pdu_origin):
+    def _get_persisted_pdu(self, event_id):
         """ Get a PDU from the database with given origin and id.
 
         Returns:
             Deferred: Results in a `Pdu`.
         """
-        return self.handler.get_persisted_pdu(pdu_id, pdu_origin)
+        return self.handler.get_persisted_pdu(event_id)
 
     def _transaction_from_pdus(self, pdu_list):
         """Returns a new Transaction containing the given PDUs suitable for
@@ -487,10 +474,10 @@ class ReplicationLayer(object):
     @log_function
     def _handle_new_pdu(self, origin, pdu, backfilled=False):
         # We reprocess pdus when we have seen them only as outliers
-        existing = yield self._get_persisted_pdu(pdu.pdu_id, pdu.origin)
+        existing = yield self._get_persisted_pdu(pdu.event_id)
 
         if existing and (not existing.outlier or pdu.outlier):
-            logger.debug("Already seen pdu %s %s", pdu.pdu_id, pdu.origin)
+            logger.debug("Already seen pdu %s", pdu.event_id)
             defer.returnValue({})
             return
 
@@ -500,23 +487,22 @@ class ReplicationLayer(object):
         if not pdu.outlier:
             # We only backfill backwards to the min depth.
             min_depth = yield self.handler.get_min_depth_for_context(
-                pdu.context
+                pdu.room_id
             )
 
             if min_depth and pdu.depth > min_depth:
-                for pdu_id, origin, hashes in pdu.prev_pdus:
-                    exists = yield self._get_persisted_pdu(pdu_id, origin)
+                for event_id, hashes in pdu.prev_events:
+                    exists = yield self._get_persisted_pdu(event_id)
 
                     if not exists:
-                        logger.debug("Requesting pdu %s %s", pdu_id, origin)
+                        logger.debug("Requesting pdu %s", event_id)
 
                         try:
                             yield self.get_pdu(
                                 pdu.origin,
-                                pdu_id=pdu_id,
-                                pdu_origin=origin
+                                event_id=event_id,
                             )
-                            logger.debug("Processed pdu %s %s", pdu_id, origin)
+                            logger.debug("Processed pdu %s", event_id)
                         except:
                             # TODO(erikj): Do some more intelligent retries.
                             logger.exception("Failed to get PDU")
@@ -524,7 +510,7 @@ class ReplicationLayer(object):
                 # We need to get the state at this event, since we have reached
                 # a backward extremity edge.
                 state = yield self.get_state_for_context(
-                    origin, pdu.context, pdu.pdu_id, pdu.origin,
+                    origin, pdu.room_id, pdu.event_id,
                 )
 
         # Persist the Pdu, but don't mark it as processed yet.