diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index 159af4eed7..838e660a46 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -111,14 +111,6 @@ class ReplicationLayer(object):
"""Informs the replication layer about a new PDU generated within the
home server that should be transmitted to others.
- This will fill out various attributes on the PDU object, e.g. the
- `prev_pdus` key.
-
- *Note:* The home server should always call `send_pdu` even if it knows
- that it does not need to be replicated to other home servers. This is
- in case e.g. someone else joins via a remote home server and then
- backfills.
-
TODO: Figure out when we should actually resolve the deferred.
Args:
@@ -131,18 +123,12 @@ class ReplicationLayer(object):
order = self._order
self._order += 1
- logger.debug("[%s] Persisting PDU", pdu.pdu_id)
-
- # Save *before* trying to send
- # yield self.store.persist_event(pdu=pdu)
-
- logger.debug("[%s] Persisted PDU", pdu.pdu_id)
- logger.debug("[%s] transaction_layer.enqueue_pdu... ", pdu.pdu_id)
+ logger.debug("[%s] transaction_layer.enqueue_pdu... ", pdu.event_id)
# TODO, add errback, etc.
self._transaction_queue.enqueue_pdu(pdu, order)
- logger.debug("[%s] transaction_layer.enqueue_pdu... done", pdu.pdu_id)
+ logger.debug("[%s] transaction_layer.enqueue_pdu... done", pdu.event_id)
@log_function
def send_edu(self, destination, edu_type, content):
@@ -215,7 +201,7 @@ class ReplicationLayer(object):
@defer.inlineCallbacks
@log_function
- def get_pdu(self, destination, pdu_origin, pdu_id, outlier=False):
+ def get_pdu(self, destination, event_id, outlier=False):
"""Requests the PDU with given origin and ID from the remote home
server.
@@ -224,7 +210,7 @@ class ReplicationLayer(object):
Args:
destination (str): Which home server to query
pdu_origin (str): The home server that originally sent the pdu.
- pdu_id (str)
+ event_id (str)
outlier (bool): Indicates whether the PDU is an `outlier`, i.e. if
it's from an arbitary point in the context as opposed to part
of the current block of PDUs. Defaults to `False`
@@ -233,8 +219,9 @@ class ReplicationLayer(object):
Deferred: Results in the requested PDU.
"""
- transaction_data = yield self.transport_layer.get_pdu(
- destination, pdu_origin, pdu_id)
+ transaction_data = yield self.transport_layer.get_event(
+ destination, event_id
+ )
transaction = Transaction(**transaction_data)
@@ -249,8 +236,7 @@ class ReplicationLayer(object):
@defer.inlineCallbacks
@log_function
- def get_state_for_context(self, destination, context, pdu_id=None,
- pdu_origin=None):
+ def get_state_for_context(self, destination, context, event_id=None):
"""Requests all of the `current` state PDUs for a given context from
a remote home server.
@@ -263,7 +249,9 @@ class ReplicationLayer(object):
"""
transaction_data = yield self.transport_layer.get_context_state(
- destination, context, pdu_id=pdu_id, pdu_origin=pdu_origin,
+ destination,
+ context,
+ event_id=event_id,
)
transaction = Transaction(**transaction_data)
@@ -352,10 +340,10 @@ class ReplicationLayer(object):
@defer.inlineCallbacks
@log_function
- def on_context_state_request(self, context, pdu_id, pdu_origin):
- if pdu_id and pdu_origin:
+ def on_context_state_request(self, context, event_id):
+ if event_id:
pdus = yield self.handler.get_state_for_pdu(
- pdu_id, pdu_origin
+ event_id
)
else:
raise NotImplementedError("Specify an event")
@@ -370,8 +358,8 @@ class ReplicationLayer(object):
@defer.inlineCallbacks
@log_function
- def on_pdu_request(self, pdu_origin, pdu_id):
- pdu = yield self._get_persisted_pdu(pdu_id, pdu_origin)
+ def on_pdu_request(self, event_id):
+ pdu = yield self._get_persisted_pdu(event_id)
if pdu:
defer.returnValue(
@@ -443,9 +431,8 @@ class ReplicationLayer(object):
def send_join(self, destination, pdu):
_, content = yield self.transport_layer.send_join(
destination,
- pdu.context,
- pdu.pdu_id,
- pdu.origin,
+ pdu.room_id,
+ pdu.event_id,
pdu.get_dict(),
)
@@ -457,13 +444,13 @@ class ReplicationLayer(object):
defer.returnValue(pdus)
@log_function
- def _get_persisted_pdu(self, pdu_id, pdu_origin):
+ def _get_persisted_pdu(self, event_id):
""" Get a PDU from the database with given origin and id.
Returns:
Deferred: Results in a `Pdu`.
"""
- return self.handler.get_persisted_pdu(pdu_id, pdu_origin)
+ return self.handler.get_persisted_pdu(event_id)
def _transaction_from_pdus(self, pdu_list):
"""Returns a new Transaction containing the given PDUs suitable for
@@ -487,10 +474,10 @@ class ReplicationLayer(object):
@log_function
def _handle_new_pdu(self, origin, pdu, backfilled=False):
# We reprocess pdus when we have seen them only as outliers
- existing = yield self._get_persisted_pdu(pdu.pdu_id, pdu.origin)
+ existing = yield self._get_persisted_pdu(pdu.event_id)
if existing and (not existing.outlier or pdu.outlier):
- logger.debug("Already seen pdu %s %s", pdu.pdu_id, pdu.origin)
+ logger.debug("Already seen pdu %s", pdu.event_id)
defer.returnValue({})
return
@@ -500,23 +487,22 @@ class ReplicationLayer(object):
if not pdu.outlier:
# We only backfill backwards to the min depth.
min_depth = yield self.handler.get_min_depth_for_context(
- pdu.context
+ pdu.room_id
)
if min_depth and pdu.depth > min_depth:
- for pdu_id, origin, hashes in pdu.prev_pdus:
- exists = yield self._get_persisted_pdu(pdu_id, origin)
+ for event_id, hashes in pdu.prev_events:
+ exists = yield self._get_persisted_pdu(event_id)
if not exists:
- logger.debug("Requesting pdu %s %s", pdu_id, origin)
+ logger.debug("Requesting pdu %s", event_id)
try:
yield self.get_pdu(
pdu.origin,
- pdu_id=pdu_id,
- pdu_origin=origin
+ event_id=event_id,
)
- logger.debug("Processed pdu %s %s", pdu_id, origin)
+ logger.debug("Processed pdu %s", event_id)
except:
# TODO(erikj): Do some more intelligent retries.
logger.exception("Failed to get PDU")
@@ -524,7 +510,7 @@ class ReplicationLayer(object):
# We need to get the state at this event, since we have reached
# a backward extremity edge.
state = yield self.get_state_for_context(
- origin, pdu.context, pdu.pdu_id, pdu.origin,
+ origin, pdu.room_id, pdu.event_id,
)
# Persist the Pdu, but don't mark it as processed yet.
|