diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index cd3c962d50..ca89a0787c 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -439,6 +439,25 @@ class FederationClient(FederationBase):
defer.returnValue(ret)
+ @defer.inlineCallbacks
+ def get_missing_events(self, destination, room_id, earliest_events,
+ latest_events, limit, min_depth):
+ content = yield self.transport_layer.get_missing_events(
+ destination, room_id, earliest_events, latest_events, limit,
+ min_depth,
+ )
+
+ events = [
+ self.event_from_pdu_json(e)
+ for e in content.get("events", [])
+ ]
+
+ signed_events = yield self._check_sigs_and_hash_and_fetch(
+ destination, events, outlier=True
+ )
+
+ defer.returnValue(signed_events)
+
def event_from_pdu_json(self, pdu_json, outlier=False):
event = FrozenEvent(
pdu_json
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index bc9bac809a..4264d857be 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -125,6 +125,7 @@ class FederationServer(FederationBase):
results.append({"error": str(e)})
except Exception as e:
results.append({"error": str(e)})
+ logger.exception("Failed to handle PDU")
if hasattr(transaction, "edus"):
for edu in [Edu(**x) for x in transaction.edus]:
@@ -297,6 +298,20 @@ class FederationServer(FederationBase):
(200, send_content)
)
+ @defer.inlineCallbacks
+ @log_function
+ def on_get_missing_events(self, origin, room_id, earliest_events,
+ latest_events, limit, min_depth):
+ missing_events = yield self.handler.on_get_missing_events(
+ origin, room_id, earliest_events, latest_events, limit, min_depth
+ )
+
+ time_now = self._clock.time_msec()
+
+ defer.returnValue({
+ "events": [ev.get_pdu_json(time_now) for ev in missing_events],
+ })
+
@log_function
def _get_persisted_pdu(self, origin, event_id, do_auth=True):
""" Get a PDU from the database with given origin and id.
@@ -323,7 +338,7 @@ class FederationServer(FederationBase):
@defer.inlineCallbacks
@log_function
- def _handle_new_pdu(self, origin, pdu, max_recursion=10):
+ def _handle_new_pdu(self, origin, pdu, get_missing=True):
# We reprocess pdus when we have seen them only as outliers
existing = yield self._get_persisted_pdu(
origin, pdu.event_id, do_auth=False
@@ -375,48 +390,50 @@ class FederationServer(FederationBase):
pdu.room_id, min_depth
)
+ prevs = {e_id for e_id, _ in pdu.prev_events}
+ seen = set(have_seen.keys())
+
if min_depth and pdu.depth < min_depth:
# This is so that we don't notify the user about this
# message, to work around the fact that some events will
# reference really really old events we really don't want to
# send to the clients.
pdu.internal_metadata.outlier = True
- elif min_depth and pdu.depth > min_depth and max_recursion > 0:
- for event_id, hashes in pdu.prev_events:
- if event_id not in have_seen:
- logger.debug(
- "_handle_new_pdu requesting pdu %s",
- event_id
+ elif min_depth and pdu.depth > min_depth:
+ if get_missing and prevs - seen:
+ latest_tuples = yield self.store.get_latest_events_in_room(
+ pdu.room_id
+ )
+
+ # We add the prev events that we have seen to the latest
+ # list to ensure the remote server doesn't give them to us
+ latest = set(e_id for e_id, _, _ in latest_tuples)
+ latest |= seen
+
+ missing_events = yield self.get_missing_events(
+ origin,
+ pdu.room_id,
+ earliest_events=list(latest),
+ latest_events=[pdu.event_id],
+ limit=10,
+ min_depth=min_depth,
+ )
+
+ for e in missing_events:
+ yield self._handle_new_pdu(
+ origin,
+ e,
+ get_missing=False
)
- try:
- new_pdu = yield self.federation_client.get_pdu(
- [origin, pdu.origin],
- event_id=event_id,
- )
-
- if new_pdu:
- yield self._handle_new_pdu(
- origin,
- new_pdu,
- max_recursion=max_recursion-1
- )
-
- logger.debug("Processed pdu %s", event_id)
- else:
- logger.warn("Failed to get PDU %s", event_id)
- fetch_state = True
- except:
- # TODO(erikj): Do some more intelligent retries.
- logger.exception("Failed to get PDU")
- fetch_state = True
- else:
- prevs = {e_id for e_id, _ in pdu.prev_events}
- seen = set(have_seen.keys())
- if prevs - seen:
- fetch_state = True
- else:
- fetch_state = True
+ have_seen = yield self.store.have_events(
+ [ev for ev, _ in pdu.prev_events]
+ )
+
+ prevs = {e_id for e_id, _ in pdu.prev_events}
+ seen = set(have_seen.keys())
+ if prevs - seen:
+ fetch_state = True
if fetch_state:
# We need to get the state at this event, since we haven't
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 7d30c924d1..741a4e7a1a 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -224,6 +224,8 @@ class TransactionQueue(object):
]
try:
+ self.pending_transactions[destination] = 1
+
limiter = yield get_retry_limiter(
destination,
self._clock,
@@ -239,8 +241,6 @@ class TransactionQueue(object):
len(pending_failures)
)
- self.pending_transactions[destination] = 1
-
logger.debug("TX [%s] Persisting transaction...", destination)
transaction = Transaction.create_new(
@@ -287,7 +287,7 @@ class TransactionQueue(object):
code = 200
if response:
- for e_id, r in getattr(response, "pdus", {}).items():
+ for e_id, r in response.get("pdus", {}).items():
if "error" in r:
logger.warn(
"Transaction returned error for %s: %s",
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 8b137e7128..80d03012b7 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -219,3 +219,22 @@ class TransportLayerClient(object):
)
defer.returnValue(content)
+
+ @defer.inlineCallbacks
+ @log_function
+ def get_missing_events(self, destination, room_id, earliest_events,
+ latest_events, limit, min_depth):
+ path = PREFIX + "/get_missing_events/%s" % (room_id,)
+
+ content = yield self.client.post_json(
+ destination=destination,
+ path=path,
+ data={
+ "limit": int(limit),
+ "min_depth": int(min_depth),
+ "earliest_events": earliest_events,
+ "latest_events": latest_events,
+ }
+ )
+
+ defer.returnValue(content)
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index fce9c0195e..ece6dbcf62 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -242,6 +242,7 @@ class TransportLayerServer(object):
)
)
)
+
self.server.register_path(
"POST",
re.compile("^" + PREFIX + "/query_auth/([^/]*)/([^/]*)$"),
@@ -253,6 +254,17 @@ class TransportLayerServer(object):
)
)
+ self.server.register_path(
+ "POST",
+ re.compile("^" + PREFIX + "/get_missing_events/([^/]*)/?$"),
+ self._with_authentication(
+ lambda origin, content, query, room_id:
+ self._get_missing_events(
+ origin, content, room_id,
+ )
+ )
+ )
+
@defer.inlineCallbacks
@log_function
def _on_send_request(self, origin, content, query, transaction_id):
@@ -352,3 +364,22 @@ class TransportLayerServer(object):
)
defer.returnValue((200, new_content))
+
+ @defer.inlineCallbacks
+ @log_function
+ def _get_missing_events(self, origin, content, room_id):
+ limit = int(content.get("limit", 10))
+ min_depth = int(content.get("min_depth", 0))
+ earliest_events = content.get("earliest_events", [])
+ latest_events = content.get("latest_events", [])
+
+ content = yield self.request_handler.on_get_missing_events(
+ origin,
+ room_id=room_id,
+ earliest_events=earliest_events,
+ latest_events=latest_events,
+ min_depth=min_depth,
+ limit=limit,
+ )
+
+ defer.returnValue((200, content))
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 7deed16f9c..ae4e9b316d 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -581,12 +581,13 @@ class FederationHandler(BaseHandler):
defer.returnValue(event)
@defer.inlineCallbacks
- def get_state_for_pdu(self, origin, room_id, event_id):
+ def get_state_for_pdu(self, origin, room_id, event_id, do_auth=True):
yield run_on_reactor()
- in_room = yield self.auth.check_host_in_room(room_id, origin)
- if not in_room:
- raise AuthError(403, "Host not in room.")
+ if do_auth:
+ in_room = yield self.auth.check_host_in_room(room_id, origin)
+ if not in_room:
+ raise AuthError(403, "Host not in room.")
state_groups = yield self.store.get_state_groups(
[event_id]
@@ -789,6 +790,29 @@ class FederationHandler(BaseHandler):
defer.returnValue(ret)
@defer.inlineCallbacks
+ def on_get_missing_events(self, origin, room_id, earliest_events,
+ latest_events, limit, min_depth):
+ in_room = yield self.auth.check_host_in_room(
+ room_id,
+ origin
+ )
+ if not in_room:
+ raise AuthError(403, "Host not in room.")
+
+ limit = min(limit, 20)
+ min_depth = max(min_depth, 0)
+
+ missing_events = yield self.store.get_missing_events(
+ room_id=room_id,
+ earliest_events=earliest_events,
+ latest_events=latest_events,
+ limit=limit,
+ min_depth=min_depth,
+ )
+
+ defer.returnValue(missing_events)
+
+ @defer.inlineCallbacks
@log_function
def do_auth(self, origin, event, context, auth_events):
# Check if we have all the auth events.
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 3fbc090224..2deda8ac50 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -64,6 +64,9 @@ class EventFederationStore(SQLBaseStore):
for f in front:
txn.execute(base_sql, (f,))
new_front.update([r[0] for r in txn.fetchall()])
+
+ new_front -= results
+
front = new_front
results.update(front)
@@ -378,3 +381,51 @@ class EventFederationStore(SQLBaseStore):
event_results += new_front
return self._get_events_txn(txn, event_results)
+
+ def get_missing_events(self, room_id, earliest_events, latest_events,
+ limit, min_depth):
+ return self.runInteraction(
+ "get_missing_events",
+ self._get_missing_events,
+ room_id, earliest_events, latest_events, limit, min_depth
+ )
+
+ def _get_missing_events(self, txn, room_id, earliest_events, latest_events,
+ limit, min_depth):
+
+ earliest_events = set(earliest_events)
+ front = set(latest_events) - earliest_events
+
+ event_results = set()
+
+ query = (
+ "SELECT prev_event_id FROM event_edges "
+ "WHERE room_id = ? AND event_id = ? AND is_state = 0 "
+ "LIMIT ?"
+ )
+
+ while front and len(event_results) < limit:
+ new_front = set()
+ for event_id in front:
+ txn.execute(
+ query,
+ (room_id, event_id, limit - len(event_results))
+ )
+
+ for e_id, in txn.fetchall():
+ new_front.add(e_id)
+
+ new_front -= earliest_events
+ new_front -= event_results
+
+ front = new_front
+ event_results |= new_front
+
+ events = self._get_events_txn(txn, event_results)
+
+ events = sorted(
+ [ev for ev in events if ev.depth >= min_depth],
+ key=lambda e: e.depth,
+ )
+
+ return events[:limit]
|