diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py
index a0f5d40eb3..7918d3e442 100644
--- a/synapse/federation/federation_base.py
+++ b/synapse/federation/federation_base.py
@@ -16,7 +16,9 @@ import logging
from synapse.api.errors import SynapseError
from synapse.crypto.event_signing import check_event_content_hash
+from synapse.events import FrozenEvent
from synapse.events.utils import prune_event
+from synapse.http.servlet import assert_params_in_request
from synapse.util import unwrapFirstError, logcontext
from twisted.internet import defer
@@ -169,3 +171,28 @@ class FederationBase(object):
)
return deferreds
+
+
+def event_from_pdu_json(pdu_json, outlier=False):
+ """Construct a FrozenEvent from an event json received over federation
+
+ Args:
+ pdu_json (object): pdu as received over federation
+ outlier (bool): True to mark this event as an outlier
+
+ Returns:
+ FrozenEvent
+
+ Raises:
+ SynapseError: if the pdu is missing required fields
+ """
+ # we could probably enforce a bunch of other fields here (room_id, sender,
+ # origin, etc etc)
+ assert_params_in_request(pdu_json, ('event_id', 'type'))
+ event = FrozenEvent(
+ pdu_json
+ )
+
+ event.internal_metadata.outlier = outlier
+
+ return event
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index b8f02f5391..b1fe03f702 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -14,29 +14,29 @@
# limitations under the License.
+import copy
+import itertools
+import logging
+import random
+
from twisted.internet import defer
-from .federation_base import FederationBase
from synapse.api.constants import Membership
-
from synapse.api.errors import (
CodeMessageException, HttpResponseException, SynapseError,
)
-from synapse.util import unwrapFirstError, logcontext
+from synapse.events import builder
+from synapse.federation.federation_base import (
+ FederationBase,
+ event_from_pdu_json,
+)
+import synapse.metrics
+from synapse.util import logcontext, unwrapFirstError
from synapse.util.caches.expiringcache import ExpiringCache
-from synapse.util.logutils import log_function
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
-from synapse.events import FrozenEvent, builder
-import synapse.metrics
-
+from synapse.util.logutils import log_function
from synapse.util.retryutils import NotRetryingDestination
-import copy
-import itertools
-import logging
-import random
-
-
logger = logging.getLogger(__name__)
@@ -184,7 +184,7 @@ class FederationClient(FederationBase):
logger.debug("backfill transaction_data=%s", repr(transaction_data))
pdus = [
- self.event_from_pdu_json(p, outlier=False)
+ event_from_pdu_json(p, outlier=False)
for p in transaction_data["pdus"]
]
@@ -244,7 +244,7 @@ class FederationClient(FederationBase):
logger.debug("transaction_data %r", transaction_data)
pdu_list = [
- self.event_from_pdu_json(p, outlier=outlier)
+ event_from_pdu_json(p, outlier=outlier)
for p in transaction_data["pdus"]
]
@@ -336,11 +336,11 @@ class FederationClient(FederationBase):
)
pdus = [
- self.event_from_pdu_json(p, outlier=True) for p in result["pdus"]
+ event_from_pdu_json(p, outlier=True) for p in result["pdus"]
]
auth_chain = [
- self.event_from_pdu_json(p, outlier=True)
+ event_from_pdu_json(p, outlier=True)
for p in result.get("auth_chain", [])
]
@@ -441,7 +441,7 @@ class FederationClient(FederationBase):
)
auth_chain = [
- self.event_from_pdu_json(p, outlier=True)
+ event_from_pdu_json(p, outlier=True)
for p in res["auth_chain"]
]
@@ -570,12 +570,12 @@ class FederationClient(FederationBase):
logger.debug("Got content: %s", content)
state = [
- self.event_from_pdu_json(p, outlier=True)
+ event_from_pdu_json(p, outlier=True)
for p in content.get("state", [])
]
auth_chain = [
- self.event_from_pdu_json(p, outlier=True)
+ event_from_pdu_json(p, outlier=True)
for p in content.get("auth_chain", [])
]
@@ -650,7 +650,7 @@ class FederationClient(FederationBase):
logger.debug("Got response to send_invite: %s", pdu_dict)
- pdu = self.event_from_pdu_json(pdu_dict)
+ pdu = event_from_pdu_json(pdu_dict)
# Check signatures are correct.
pdu = yield self._check_sigs_and_hash(pdu)
@@ -740,7 +740,7 @@ class FederationClient(FederationBase):
)
auth_chain = [
- self.event_from_pdu_json(e)
+ event_from_pdu_json(e)
for e in content["auth_chain"]
]
@@ -788,7 +788,7 @@ class FederationClient(FederationBase):
)
events = [
- self.event_from_pdu_json(e)
+ event_from_pdu_json(e)
for e in content.get("events", [])
]
@@ -805,15 +805,6 @@ class FederationClient(FederationBase):
defer.returnValue(signed_events)
- def event_from_pdu_json(self, pdu_json, outlier=False):
- event = FrozenEvent(
- pdu_json
- )
-
- event.internal_metadata.outlier = outlier
-
- return event
-
@defer.inlineCallbacks
def forward_third_party_invite(self, destinations, room_id, event_dict):
for destination in destinations:
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index a2327f24b6..9849953c9b 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -12,25 +12,24 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-from twisted.internet import defer
+import logging
-from .federation_base import FederationBase
-from .units import Transaction, Edu
+import simplejson as json
+from twisted.internet import defer
+from synapse.api.errors import AuthError, FederationError, SynapseError
+from synapse.crypto.event_signing import compute_event_signature
+from synapse.federation.federation_base import (
+ FederationBase,
+ event_from_pdu_json,
+)
+from synapse.federation.units import Edu, Transaction
+import synapse.metrics
+from synapse.types import get_domain_from_id
from synapse.util import async
+from synapse.util.caches.response_cache import ResponseCache
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.util.logutils import log_function
-from synapse.util.caches.response_cache import ResponseCache
-from synapse.events import FrozenEvent
-from synapse.types import get_domain_from_id
-import synapse.metrics
-
-from synapse.api.errors import AuthError, FederationError, SynapseError
-
-from synapse.crypto.event_signing import compute_event_signature
-
-import simplejson as json
-import logging
# when processing incoming transactions, we try to handle multiple rooms in
# parallel, up to this limit.
@@ -172,7 +171,7 @@ class FederationServer(FederationBase):
p["age_ts"] = request_time - int(p["age"])
del p["age"]
- event = self.event_from_pdu_json(p)
+ event = event_from_pdu_json(p)
room_id = event.room_id
pdus_by_room.setdefault(room_id, []).append(event)
@@ -346,7 +345,7 @@ class FederationServer(FederationBase):
@defer.inlineCallbacks
def on_invite_request(self, origin, content):
- pdu = self.event_from_pdu_json(content)
+ pdu = event_from_pdu_json(content)
ret_pdu = yield self.handler.on_invite_request(origin, pdu)
time_now = self._clock.time_msec()
defer.returnValue((200, {"event": ret_pdu.get_pdu_json(time_now)}))
@@ -354,7 +353,7 @@ class FederationServer(FederationBase):
@defer.inlineCallbacks
def on_send_join_request(self, origin, content):
logger.debug("on_send_join_request: content: %s", content)
- pdu = self.event_from_pdu_json(content)
+ pdu = event_from_pdu_json(content)
logger.debug("on_send_join_request: pdu sigs: %s", pdu.signatures)
res_pdus = yield self.handler.on_send_join_request(origin, pdu)
time_now = self._clock.time_msec()
@@ -374,7 +373,7 @@ class FederationServer(FederationBase):
@defer.inlineCallbacks
def on_send_leave_request(self, origin, content):
logger.debug("on_send_leave_request: content: %s", content)
- pdu = self.event_from_pdu_json(content)
+ pdu = event_from_pdu_json(content)
logger.debug("on_send_leave_request: pdu sigs: %s", pdu.signatures)
yield self.handler.on_send_leave_request(origin, pdu)
defer.returnValue((200, {}))
@@ -411,7 +410,7 @@ class FederationServer(FederationBase):
"""
with (yield self._server_linearizer.queue((origin, room_id))):
auth_chain = [
- self.event_from_pdu_json(e)
+ event_from_pdu_json(e)
for e in content["auth_chain"]
]
@@ -586,15 +585,6 @@ class FederationServer(FederationBase):
def __str__(self):
return "<ReplicationLayer(%s)>" % self.server_name
- def event_from_pdu_json(self, pdu_json, outlier=False):
- event = FrozenEvent(
- pdu_json
- )
-
- event.internal_metadata.outlier = outlier
-
- return event
-
@defer.inlineCallbacks
def exchange_third_party_invite(
self,
|