diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 0f9302a6a8..98722ae543 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -162,8 +162,30 @@ class FederationServer(FederationBase):
p["age_ts"] = request_time - int(p["age"])
del p["age"]
+ # We try and pull out an event ID so that if later checks fail we
+ # can log something sensible. We don't mandate an event ID here in
+ # case future event formats get rid of the key.
+ possible_event_id = p.get("event_id", "<Unknown>")
+
+ # Now we get the room ID so that we can check that we know the
+ # version of the room.
+ room_id = p.get("room_id")
+ if not room_id:
+ logger.info(
+ "Ignoring PDU as does not have a room_id. Event ID: %s",
+ possible_event_id,
+ )
+ continue
+
+ try:
+ # In future we will actually use the room version to parse the
+ # PDU into an event.
+ yield self.store.get_room_version(room_id)
+ except NotFoundError:
+ logger.info("Ignoring PDU for unknown room_id: %s", room_id)
+ continue
+
event = event_from_pdu_json(p)
- room_id = event.room_id
pdus_by_room.setdefault(room_id, []).append(event)
pdu_results = {}
@@ -324,11 +346,6 @@ class FederationServer(FederationBase):
defer.returnValue((404, ""))
@defer.inlineCallbacks
- @log_function
- def on_pull_request(self, origin, versions):
- raise NotImplementedError("Pull transactions not implemented")
-
- @defer.inlineCallbacks
def on_query_request(self, query_type, args):
received_queries_counter.labels(query_type).inc()
resp = yield self.registry.on_query(query_type, args)
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 3fdd63be95..099ace28c1 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -183,9 +183,7 @@ class TransactionQueue(object):
# banned then it won't receive the event because it won't
# be in the room after the ban.
destinations = yield self.state.get_current_hosts_in_room(
- event.room_id, latest_event_ids=[
- prev_id for prev_id, _ in event.prev_events
- ],
+ event.room_id, latest_event_ids=event.prev_event_ids(),
)
except Exception:
logger.exception(
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 7288d49074..3553f418f1 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -362,14 +362,6 @@ class FederationSendServlet(BaseFederationServlet):
defer.returnValue((code, response))
-class FederationPullServlet(BaseFederationServlet):
- PATH = "/pull/"
-
- # This is for when someone asks us for everything since version X
- def on_GET(self, origin, content, query):
- return self.handler.on_pull_request(query["origin"][0], query["v"])
-
-
class FederationEventServlet(BaseFederationServlet):
PATH = "/event/(?P<event_id>[^/]*)/"
@@ -1261,7 +1253,6 @@ class FederationGroupsSettingJoinPolicyServlet(BaseFederationServlet):
FEDERATION_SERVLET_CLASSES = (
FederationSendServlet,
- FederationPullServlet,
FederationEventServlet,
FederationStateServlet,
FederationStateIdsServlet,
diff --git a/synapse/federation/units.py b/synapse/federation/units.py
index c5ab14314e..025a79c022 100644
--- a/synapse/federation/units.py
+++ b/synapse/federation/units.py
@@ -117,9 +117,6 @@ class Transaction(JsonEncodedObject):
"Require 'transaction_id' to construct a Transaction"
)
- for p in pdus:
- p.transaction_id = kwargs["transaction_id"]
-
kwargs["pdus"] = [p.get_pdu_json() for p in pdus]
return Transaction(**kwargs)
|