diff --git a/synapse/federation/handler.py b/synapse/federation/handler.py
index 68243d31d0..984c1558e9 100644
--- a/synapse/federation/handler.py
+++ b/synapse/federation/handler.py
@@ -74,10 +74,18 @@ class FederationEventHandler(object):
@log_function
@defer.inlineCallbacks
- def backfill(self, room_id, limit):
- # TODO: Work out which destinations to ask for backfill
- # self.replication_layer.backfill(dest, room_id, limit)
- pass
+ def backfill(self, dest, room_id, limit):
+ pdus = yield self.replication_layer.backfill(dest, room_id, limit)
+
+ if not pdus:
+ defer.returnValue([])
+
+ events = [
+ self.pdu_codec.event_from_pdu(pdu)
+ for pdu in pdus
+ ]
+
+ defer.returnValue(events)
@log_function
def get_state_for_room(self, destination, room_id):
@@ -87,7 +95,7 @@ class FederationEventHandler(object):
@log_function
@defer.inlineCallbacks
- def on_receive_pdu(self, pdu):
+ def on_receive_pdu(self, pdu, backfilled):
""" Called by the ReplicationLayer when we have a new pdu. We need to
do auth checks and put it throught the StateHandler.
"""
@@ -95,7 +103,7 @@ class FederationEventHandler(object):
try:
with (yield self.lock_manager.lock(pdu.context)):
- if event.is_state:
+ if event.is_state and not backfilled:
is_new_state = yield self.state_handler.handle_new_state(
pdu
)
@@ -104,7 +112,7 @@ class FederationEventHandler(object):
else:
is_new_state = False
- yield self.event_handler.on_receive(event, is_new_state)
+ yield self.event_handler.on_receive(event, is_new_state, backfilled)
except AuthError:
# TODO: Implement something in federation that allows us to
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index bc9df2f214..3e5f1a4108 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -208,7 +208,7 @@ class ReplicationLayer(object):
pdus = [Pdu(outlier=False, **p) for p in transaction.pdus]
for pdu in pdus:
- yield self._handle_new_pdu(pdu)
+ yield self._handle_new_pdu(pdu, backfilled=True)
defer.returnValue(pdus)
@@ -415,7 +415,7 @@ class ReplicationLayer(object):
@defer.inlineCallbacks
@log_function
- def _handle_new_pdu(self, pdu):
+ def _handle_new_pdu(self, pdu, backfilled=False):
# We reprocess pdus when we have seen them only as outliers
existing = yield self._get_persisted_pdu(pdu.pdu_id, pdu.origin)
@@ -451,7 +451,10 @@ class ReplicationLayer(object):
# Persist the Pdu, but don't mark it as processed yet.
yield self.pdu_actions.persist_received(pdu)
- ret = yield self.handler.on_receive_pdu(pdu)
+ if not backfilled:
+ ret = yield self.handler.on_receive_pdu(pdu, backfilled=backfilled)
+ else:
+ ret = None
yield self.pdu_actions.mark_as_processed(pdu)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 7026df90a2..ef9ed274df 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -35,7 +35,7 @@ class FederationHandler(BaseHandler):
@log_function
@defer.inlineCallbacks
- def on_receive(self, event, is_new_state):
+ def on_receive(self, event, is_new_state, backfilled):
if hasattr(event, "state_key") and not is_new_state:
logger.debug("Ignoring old state.")
return
@@ -70,6 +70,21 @@ class FederationHandler(BaseHandler):
else:
with (yield self.room_lock.lock(event.room_id)):
- store_id = yield self.store.persist_event(event)
+ store_id = yield self.store.persist_event(event, backfilled)
- yield self.notifier.on_new_room_event(event, store_id)
+ if not backfilled:
+ yield self.notifier.on_new_room_event(event, store_id)
+
+
+ @log_function
+ @defer.inlineCallbacks
+ def backfill(self, dest, room_id, limit):
+ events = yield self.hs.get_federation().backfill(dest, room_id, limit)
+
+ for event in events:
+ try:
+ yield self.store.persist_event(event, backfilled=True)
+ except:
+ logger.debug("Failed to persiste event: %s", event)
+
+ defer.returnValue(events)
diff --git a/synapse/rest/room.py b/synapse/rest/room.py
index dfb2aabe70..89ea9f0d25 100644
--- a/synapse/rest/room.py
+++ b/synapse/rest/room.py
@@ -383,6 +383,21 @@ class RoomMessageListRestServlet(RestServlet):
defer.returnValue((200, msgs))
+class RoomTriggerBackfill(RestServlet):
+ PATTERN = client_path_pattern("/rooms/(?P<room_id>[^/]*)/backfill$")
+
+ @defer.inlineCallbacks
+ def on_GET(self, request, room_id):
+ remote_server = urllib.unquote(request.args["remote"][0])
+ room_id = urllib.unquote(room_id)
+ limit = int(request.args["limit"][0])
+
+ handler = self.handlers.federation_handler
+ events = yield handler.backfill(remote_server, room_id, limit)
+
+ res = [event.get_dict() for event in events]
+ defer.returnValue((200, res))
+
def _parse_json(request):
try:
content = json.loads(request.content.read())
@@ -403,3 +418,4 @@ def register_servlets(hs, http_server):
RoomMemberListRestServlet(hs).register(http_server)
RoomMessageListRestServlet(hs).register(http_server)
JoinRoomAliasServlet(hs).register(http_server)
+ RoomTriggerBackfill(hs).register(http_server)
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index b846081d49..2243a710db 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -20,6 +20,8 @@ from synapse.api.events.room import (
RoomConfigEvent, RoomNameEvent,
)
+from synapse.util.logutils import log_function
+
from .directory import DirectoryStore
from .feedback import FeedbackStore
from .presence import PresenceStore
@@ -32,9 +34,13 @@ from .pdu import StatePduStore, PduStore
from .transactions import TransactionStore
import json
+import logging
import os
+logger = logging.getLogger(__name__)
+
+
class DataStore(RoomMemberStore, RoomStore,
RegistrationStore, StreamStore, ProfileStore, FeedbackStore,
PresenceStore, PduStore, StatePduStore, TransactionStore,
@@ -49,6 +55,7 @@ class DataStore(RoomMemberStore, RoomStore,
self.min_token = None
@defer.inlineCallbacks
+ @log_function
def persist_event(self, event, backfilled=False):
if event.type == RoomMemberEvent.TYPE:
yield self._store_room_member(event)
@@ -83,6 +90,7 @@ class DataStore(RoomMemberStore, RoomStore,
defer.returnValue(event)
@defer.inlineCallbacks
+ @log_function
def _store_event(self, event, backfilled):
# FIXME (erikj): This should be removed when we start amalgamating
# event and pdu storage
@@ -101,7 +109,7 @@ class DataStore(RoomMemberStore, RoomStore,
if not self.min_token_deferred.called:
yield self.min_token_deferred
self.min_token -= 1
- vals["token_ordering"] = self.min_token
+ vals["stream_ordering"] = self.min_token
unrec = {
k: v
@@ -110,7 +118,11 @@ class DataStore(RoomMemberStore, RoomStore,
}
vals["unrecognized_keys"] = json.dumps(unrec)
- yield self._simple_insert("events", vals)
+ try:
+ yield self._simple_insert("events", vals)
+ except:
+ logger.exception("Failed to persist, probably duplicate")
+ return
if not backfilled and hasattr(event, "state_key"):
vals = {
@@ -161,10 +173,12 @@ class DataStore(RoomMemberStore, RoomStore,
def _get_min_token(self):
row = yield self._execute(
None,
- "SELECT MIN(token_ordering) FROM events"
+ "SELECT MIN(stream_ordering) FROM events"
)
- self.min_token = rows[0][0] if rows and rows[0] else 0
+ self.min_token = min(row[0][0], -1) if row and row[0] else -1
+
+ logger.debug("min_token is: %s", self.min_token)
defer.returnValue(self.min_token)
diff --git a/synapse/storage/pdu.py b/synapse/storage/pdu.py
index a24ce7ab78..7655f43ede 100644
--- a/synapse/storage/pdu.py
+++ b/synapse/storage/pdu.py
@@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from twisted.internet import defer
+
from ._base import SQLBaseStore, Table, JoinHelper
from synapse.util.logutils import log_function
@@ -319,6 +321,7 @@ class PduStore(SQLBaseStore):
return [(row[0], row[1], row[2]) for row in results]
+ @defer.inlineCallbacks
def get_oldest_pdus_in_context(self, context):
"""Get a list of Pdus that we haven't backfilled beyond yet (and haven't
seen). This list is used when we want to backfill backwards and is the
@@ -331,17 +334,14 @@ class PduStore(SQLBaseStore):
Returns:
list: A list of PduIdTuple.
"""
- return self._db_pool.runInteraction(
- self._get_oldest_pdus_in_context, context
- )
-
- def _get_oldest_pdus_in_context(self, txn, context):
- txn.execute(
+ results = yield self._execute(
+ None,
"SELECT pdu_id, origin FROM %(back)s WHERE context = ?"
% {"back": PduBackwardExtremitiesTable.table_name, },
- (context,)
+ context
)
- return [PduIdTuple(i, o) for i, o in txn.fetchall()]
+
+ defer.returnValue([PduIdTuple(i, o) for i, o in results])
def is_pdu_new(self, pdu_id, origin, context, depth):
"""For a given Pdu, try and figure out if it's 'new', i.e., if it's
|