summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/federation/replication.py27
-rw-r--r--synapse/handlers/federation.py22
-rw-r--r--synapse/storage/event_federation.py7
3 files changed, 42 insertions, 14 deletions
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index 89dbf3e2e9..a0bd2e0572 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -106,7 +106,6 @@ class ReplicationLayer(object):
 
         self.query_handlers[query_type] = handler
 
-    @defer.inlineCallbacks
     @log_function
     def send_pdu(self, pdu):
         """Informs the replication layer about a new PDU generated within the
@@ -135,7 +134,7 @@ class ReplicationLayer(object):
         logger.debug("[%s] Persisting PDU", pdu.pdu_id)
 
         # Save *before* trying to send
-        yield self.store.persist_event(pdu=pdu)
+        # yield self.store.persist_event(pdu=pdu)
 
         logger.debug("[%s] Persisted PDU", pdu.pdu_id)
         logger.debug("[%s] transaction_layer.enqueue_pdu... ", pdu.pdu_id)
@@ -359,12 +358,13 @@ class ReplicationLayer(object):
                 pdu_id, pdu_origin
             )
         else:
-            results = yield self.store.get_current_state_for_context(
-                context
-            )
-            pdus = [Pdu.from_pdu_tuple(p) for p in results]
-
-        logger.debug("Context returning %d results", len(pdus))
+            raise NotImplementedError("Specify an event")
+        #     results = yield self.store.get_current_state_for_context(
+        #         context
+        #     )
+        #     pdus = [Pdu.from_pdu_tuple(p) for p in results]
+        #
+        # logger.debug("Context returning %d results", len(pdus))
 
         defer.returnValue((200, self._transaction_from_pdus(pdus).get_dict()))
 
@@ -456,7 +456,6 @@ class ReplicationLayer(object):
 
         defer.returnValue(pdus)
 
-    @defer.inlineCallbacks
     @log_function
     def _get_persisted_pdu(self, pdu_id, pdu_origin):
         """ Get a PDU from the database with given origin and id.
@@ -464,9 +463,7 @@ class ReplicationLayer(object):
         Returns:
             Deferred: Results in a `Pdu`.
         """
-        pdu_tuple = yield self.store.get_pdu(pdu_id, pdu_origin)
-
-        defer.returnValue(Pdu.from_pdu_tuple(pdu_tuple))
+        return self.handler.get_persisted_pdu(pdu_id, pdu_origin)
 
     def _transaction_from_pdus(self, pdu_list):
         """Returns a new Transaction containing the given PDUs suitable for
@@ -502,7 +499,9 @@ class ReplicationLayer(object):
         # Get missing pdus if necessary.
         if not pdu.outlier:
             # We only backfill backwards to the min depth.
-            min_depth = yield self.store.get_min_depth_for_context(pdu.context)
+            min_depth = yield self.handler.get_min_depth_for_context(
+                pdu.context
+            )
 
             if min_depth and pdu.depth > min_depth:
                 for pdu_id, origin, hashes in pdu.prev_pdus:
@@ -529,7 +528,7 @@ class ReplicationLayer(object):
                 )
 
         # Persist the Pdu, but don't mark it as processed yet.
-        yield self.store.persist_event(pdu=pdu)
+        # yield self.store.persist_event(pdu=pdu)
 
         if not backfilled:
             ret = yield self.handler.on_receive_pdu(
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 9f457ce292..18cb1d4e97 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -415,6 +415,28 @@ class FederationHandler(BaseHandler):
             for e in events
         ])
 
+    @defer.inlineCallbacks
+    @log_function
+    def get_persisted_pdu(self, pdu_id, origin):
+        """ Get a PDU from the database with given origin and id.
+
+        Returns:
+            Deferred: Results in a `Pdu`.
+        """
+        event = yield self.store.get_event(
+            self.pdu_codec.encode_event_id(pdu_id, origin),
+            allow_none=True,
+        )
+
+        if event:
+            defer.returnValue(self.pdu_codec.pdu_from_event(event))
+        else:
+            defer.returnValue(None)
+
+    @log_function
+    def get_min_depth_for_context(self, context):
+        return self.store.get_min_depth(context)
+
     @log_function
     def _on_user_joined(self, user, room_id):
         waiters = self.waiting_for_join_list.get((user.to_string(), room_id), [])
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 438b42c1da..8357071db6 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -78,6 +78,13 @@ class EventFederationStore(SQLBaseStore):
 
         return results
 
+    def get_min_depth(self, room_id):
+        return self.runInteraction(
+            "get_min_depth",
+            self._get_min_depth_interaction,
+            room_id,
+        )
+
     def _get_min_depth_interaction(self, txn, room_id):
         min_depth = self._simple_select_one_onecol_txn(
             txn,