summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/federation/handler.py22
-rw-r--r--synapse/federation/replication.py9
-rw-r--r--synapse/handlers/federation.py21
-rw-r--r--synapse/rest/room.py16
-rw-r--r--synapse/storage/__init__.py22
-rw-r--r--synapse/storage/pdu.py16
6 files changed, 81 insertions, 25 deletions
diff --git a/synapse/federation/handler.py b/synapse/federation/handler.py
index 68243d31d0..984c1558e9 100644
--- a/synapse/federation/handler.py
+++ b/synapse/federation/handler.py
@@ -74,10 +74,18 @@ class FederationEventHandler(object):
 
     @log_function
     @defer.inlineCallbacks
-    def backfill(self, room_id, limit):
-        # TODO: Work out which destinations to ask for backfill
-        # self.replication_layer.backfill(dest, room_id, limit)
-        pass
+    def backfill(self, dest, room_id, limit):
+        pdus = yield self.replication_layer.backfill(dest, room_id, limit)
+
+        if not pdus:
+            defer.returnValue([])
+
+        events = [
+            self.pdu_codec.event_from_pdu(pdu)
+            for pdu in pdus
+        ]
+
+        defer.returnValue(events)
 
     @log_function
     def get_state_for_room(self, destination, room_id):
@@ -87,7 +95,7 @@ class FederationEventHandler(object):
 
     @log_function
     @defer.inlineCallbacks
-    def on_receive_pdu(self, pdu):
+    def on_receive_pdu(self, pdu, backfilled):
         """ Called by the ReplicationLayer when we have a new pdu. We need to
         do auth checks and put it throught the StateHandler.
         """
@@ -95,7 +103,7 @@ class FederationEventHandler(object):
 
         try:
             with (yield self.lock_manager.lock(pdu.context)):
-                if event.is_state:
+                if event.is_state and not backfilled:
                     is_new_state = yield self.state_handler.handle_new_state(
                         pdu
                     )
@@ -104,7 +112,7 @@ class FederationEventHandler(object):
                 else:
                     is_new_state = False
 
-            yield self.event_handler.on_receive(event, is_new_state)
+            yield self.event_handler.on_receive(event, is_new_state, backfilled)
 
         except AuthError:
             # TODO: Implement something in federation that allows us to
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index bc9df2f214..3e5f1a4108 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -208,7 +208,7 @@ class ReplicationLayer(object):
 
         pdus = [Pdu(outlier=False, **p) for p in transaction.pdus]
         for pdu in pdus:
-            yield self._handle_new_pdu(pdu)
+            yield self._handle_new_pdu(pdu, backfilled=True)
 
         defer.returnValue(pdus)
 
@@ -415,7 +415,7 @@ class ReplicationLayer(object):
 
     @defer.inlineCallbacks
     @log_function
-    def _handle_new_pdu(self, pdu):
+    def _handle_new_pdu(self, pdu, backfilled=False):
         # We reprocess pdus when we have seen them only as outliers
         existing = yield self._get_persisted_pdu(pdu.pdu_id, pdu.origin)
 
@@ -451,7 +451,10 @@ class ReplicationLayer(object):
         # Persist the Pdu, but don't mark it as processed yet.
         yield self.pdu_actions.persist_received(pdu)
 
-        ret = yield self.handler.on_receive_pdu(pdu)
+        if not backfilled:
+            ret = yield self.handler.on_receive_pdu(pdu, backfilled=backfilled)
+        else:
+            ret = None
 
         yield self.pdu_actions.mark_as_processed(pdu)
 
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 7026df90a2..ef9ed274df 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -35,7 +35,7 @@ class FederationHandler(BaseHandler):
 
     @log_function
     @defer.inlineCallbacks
-    def on_receive(self, event, is_new_state):
+    def on_receive(self, event, is_new_state, backfilled):
         if hasattr(event, "state_key") and not is_new_state:
             logger.debug("Ignoring old state.")
             return
@@ -70,6 +70,21 @@ class FederationHandler(BaseHandler):
 
         else:
             with (yield self.room_lock.lock(event.room_id)):
-                store_id = yield self.store.persist_event(event)
+                store_id = yield self.store.persist_event(event, backfilled)
 
-            yield self.notifier.on_new_room_event(event, store_id)
+            if not backfilled:
+                yield self.notifier.on_new_room_event(event, store_id)
+
+
+    @log_function
+    @defer.inlineCallbacks
+    def backfill(self, dest, room_id, limit):
+        events = yield self.hs.get_federation().backfill(dest, room_id, limit)
+
+        for event in events:
+            try:
+                yield self.store.persist_event(event, backfilled=True)
+            except:
+                logger.debug("Failed to persiste event: %s", event)
+
+        defer.returnValue(events)
diff --git a/synapse/rest/room.py b/synapse/rest/room.py
index dfb2aabe70..89ea9f0d25 100644
--- a/synapse/rest/room.py
+++ b/synapse/rest/room.py
@@ -383,6 +383,21 @@ class RoomMessageListRestServlet(RestServlet):
         defer.returnValue((200, msgs))
 
 
+class RoomTriggerBackfill(RestServlet):
+    PATTERN = client_path_pattern("/rooms/(?P<room_id>[^/]*)/backfill$")
+
+    @defer.inlineCallbacks
+    def on_GET(self, request, room_id):
+        remote_server = urllib.unquote(request.args["remote"][0])
+        room_id = urllib.unquote(room_id)
+        limit = int(request.args["limit"][0])
+
+        handler = self.handlers.federation_handler
+        events = yield handler.backfill(remote_server, room_id, limit)
+
+        res = [event.get_dict() for event in events]
+        defer.returnValue((200, res))
+
 def _parse_json(request):
     try:
         content = json.loads(request.content.read())
@@ -403,3 +418,4 @@ def register_servlets(hs, http_server):
     RoomMemberListRestServlet(hs).register(http_server)
     RoomMessageListRestServlet(hs).register(http_server)
     JoinRoomAliasServlet(hs).register(http_server)
+    RoomTriggerBackfill(hs).register(http_server)
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index b846081d49..2243a710db 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -20,6 +20,8 @@ from synapse.api.events.room import (
     RoomConfigEvent, RoomNameEvent,
 )
 
+from synapse.util.logutils import log_function
+
 from .directory import DirectoryStore
 from .feedback import FeedbackStore
 from .presence import PresenceStore
@@ -32,9 +34,13 @@ from .pdu import StatePduStore, PduStore
 from .transactions import TransactionStore
 
 import json
+import logging
 import os
 
 
+logger = logging.getLogger(__name__)
+
+
 class DataStore(RoomMemberStore, RoomStore,
                 RegistrationStore, StreamStore, ProfileStore, FeedbackStore,
                 PresenceStore, PduStore, StatePduStore, TransactionStore,
@@ -49,6 +55,7 @@ class DataStore(RoomMemberStore, RoomStore,
         self.min_token = None
 
     @defer.inlineCallbacks
+    @log_function
     def persist_event(self, event, backfilled=False):
         if event.type == RoomMemberEvent.TYPE:
             yield self._store_room_member(event)
@@ -83,6 +90,7 @@ class DataStore(RoomMemberStore, RoomStore,
         defer.returnValue(event)
 
     @defer.inlineCallbacks
+    @log_function
     def _store_event(self, event, backfilled):
         # FIXME (erikj): This should be removed when we start amalgamating
         # event and pdu storage
@@ -101,7 +109,7 @@ class DataStore(RoomMemberStore, RoomStore,
             if not self.min_token_deferred.called:
                 yield self.min_token_deferred
             self.min_token -= 1
-            vals["token_ordering"] = self.min_token
+            vals["stream_ordering"] = self.min_token
 
         unrec = {
             k: v
@@ -110,7 +118,11 @@ class DataStore(RoomMemberStore, RoomStore,
         }
         vals["unrecognized_keys"] = json.dumps(unrec)
 
-        yield self._simple_insert("events", vals)
+        try:
+            yield self._simple_insert("events", vals)
+        except:
+            logger.exception("Failed to persist, probably duplicate")
+            return
 
         if not backfilled and hasattr(event, "state_key"):
             vals = {
@@ -161,10 +173,12 @@ class DataStore(RoomMemberStore, RoomStore,
     def _get_min_token(self):
         row = yield self._execute(
             None,
-            "SELECT MIN(token_ordering) FROM events"
+            "SELECT MIN(stream_ordering) FROM events"
         )
 
-        self.min_token = rows[0][0] if rows and rows[0] else 0
+        self.min_token = min(row[0][0], -1) if row and row[0] else -1
+
+        logger.debug("min_token is: %s", self.min_token)
 
         defer.returnValue(self.min_token)
 
diff --git a/synapse/storage/pdu.py b/synapse/storage/pdu.py
index a24ce7ab78..7655f43ede 100644
--- a/synapse/storage/pdu.py
+++ b/synapse/storage/pdu.py
@@ -13,6 +13,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from twisted.internet import defer
+
 from ._base import SQLBaseStore, Table, JoinHelper
 
 from synapse.util.logutils import log_function
@@ -319,6 +321,7 @@ class PduStore(SQLBaseStore):
 
         return [(row[0], row[1], row[2]) for row in results]
 
+    @defer.inlineCallbacks
     def get_oldest_pdus_in_context(self, context):
         """Get a list of Pdus that we haven't backfilled beyond yet (and haven't    
         seen). This list is used when we want to backfill backwards and is the 
@@ -331,17 +334,14 @@ class PduStore(SQLBaseStore):
         Returns:
             list: A list of PduIdTuple.
         """
-        return self._db_pool.runInteraction(
-            self._get_oldest_pdus_in_context, context
-        )
-
-    def _get_oldest_pdus_in_context(self, txn, context):
-        txn.execute(
+        results = yield self._execute(
+            None,
             "SELECT pdu_id, origin FROM %(back)s WHERE context = ?"
             % {"back": PduBackwardExtremitiesTable.table_name, },
-            (context,)
+            context
         )
-        return [PduIdTuple(i, o) for i, o in txn.fetchall()]
+
+        defer.returnValue([PduIdTuple(i, o) for i, o in results])
 
     def is_pdu_new(self, pdu_id, origin, context, depth):
         """For a given Pdu, try and figure out if it's 'new', i.e., if it's