diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index 89dbf3e2e9..a0bd2e0572 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -106,7 +106,6 @@ class ReplicationLayer(object):
self.query_handlers[query_type] = handler
- @defer.inlineCallbacks
@log_function
def send_pdu(self, pdu):
"""Informs the replication layer about a new PDU generated within the
@@ -135,7 +134,7 @@ class ReplicationLayer(object):
logger.debug("[%s] Persisting PDU", pdu.pdu_id)
# Save *before* trying to send
- yield self.store.persist_event(pdu=pdu)
+ # yield self.store.persist_event(pdu=pdu)
logger.debug("[%s] Persisted PDU", pdu.pdu_id)
logger.debug("[%s] transaction_layer.enqueue_pdu... ", pdu.pdu_id)
@@ -359,12 +358,13 @@ class ReplicationLayer(object):
pdu_id, pdu_origin
)
else:
- results = yield self.store.get_current_state_for_context(
- context
- )
- pdus = [Pdu.from_pdu_tuple(p) for p in results]
-
- logger.debug("Context returning %d results", len(pdus))
+ raise NotImplementedError("Specify an event")
+ # results = yield self.store.get_current_state_for_context(
+ # context
+ # )
+ # pdus = [Pdu.from_pdu_tuple(p) for p in results]
+ #
+ # logger.debug("Context returning %d results", len(pdus))
defer.returnValue((200, self._transaction_from_pdus(pdus).get_dict()))
@@ -456,7 +456,6 @@ class ReplicationLayer(object):
defer.returnValue(pdus)
- @defer.inlineCallbacks
@log_function
def _get_persisted_pdu(self, pdu_id, pdu_origin):
""" Get a PDU from the database with given origin and id.
@@ -464,9 +463,7 @@ class ReplicationLayer(object):
Returns:
Deferred: Results in a `Pdu`.
"""
- pdu_tuple = yield self.store.get_pdu(pdu_id, pdu_origin)
-
- defer.returnValue(Pdu.from_pdu_tuple(pdu_tuple))
+ return self.handler.get_persisted_pdu(pdu_id, pdu_origin)
def _transaction_from_pdus(self, pdu_list):
"""Returns a new Transaction containing the given PDUs suitable for
@@ -502,7 +499,9 @@ class ReplicationLayer(object):
# Get missing pdus if necessary.
if not pdu.outlier:
# We only backfill backwards to the min depth.
- min_depth = yield self.store.get_min_depth_for_context(pdu.context)
+ min_depth = yield self.handler.get_min_depth_for_context(
+ pdu.context
+ )
if min_depth and pdu.depth > min_depth:
for pdu_id, origin, hashes in pdu.prev_pdus:
@@ -529,7 +528,7 @@ class ReplicationLayer(object):
)
# Persist the Pdu, but don't mark it as processed yet.
- yield self.store.persist_event(pdu=pdu)
+ # yield self.store.persist_event(pdu=pdu)
if not backfilled:
ret = yield self.handler.on_receive_pdu(
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 9f457ce292..18cb1d4e97 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -415,6 +415,28 @@ class FederationHandler(BaseHandler):
for e in events
])
+ @defer.inlineCallbacks
+ @log_function
+ def get_persisted_pdu(self, pdu_id, origin):
+ """ Get a PDU from the database with given origin and id.
+
+ Returns:
+ Deferred: Results in a `Pdu`.
+ """
+ event = yield self.store.get_event(
+ self.pdu_codec.encode_event_id(pdu_id, origin),
+ allow_none=True,
+ )
+
+ if event:
+ defer.returnValue(self.pdu_codec.pdu_from_event(event))
+ else:
+ defer.returnValue(None)
+
+ @log_function
+ def get_min_depth_for_context(self, context):
+ return self.store.get_min_depth(context)
+
@log_function
def _on_user_joined(self, user, room_id):
waiters = self.waiting_for_join_list.get((user.to_string(), room_id), [])
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 438b42c1da..8357071db6 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -78,6 +78,13 @@ class EventFederationStore(SQLBaseStore):
return results
+ def get_min_depth(self, room_id):
+ return self.runInteraction(
+ "get_min_depth",
+ self._get_min_depth_interaction,
+ room_id,
+ )
+
def _get_min_depth_interaction(self, txn, room_id):
min_depth = self._simple_select_one_onecol_txn(
txn,
|