summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorErik Johnston <erikj@jki.re>2019-01-25 15:59:36 +0000
committerGitHub <noreply@github.com>2019-01-25 15:59:36 +0000
commitb6dce9b9fde07d450e22da479108b2859d3bd49d (patch)
tree13f3757fc0ad20bbf297830918b616da41ca3f1f /synapse
parentMerge pull request #4471 from matrix-org/erikj/sqlite_native_upsert (diff)
parentNewsfile (diff)
downloadsynapse-b6dce9b9fde07d450e22da479108b2859d3bd49d.tar.xz
Merge pull request #4470 from matrix-org/erikj/require_format_version
Require event format version to parse or create events
Diffstat (limited to 'synapse')
-rw-r--r--synapse/events/__init__.py24
-rw-r--r--synapse/events/builder.py51
-rw-r--r--synapse/federation/federation_base.py9
-rw-r--r--synapse/federation/federation_client.py60
-rw-r--r--synapse/federation/federation_server.py33
-rw-r--r--synapse/federation/transport/server.py4
-rw-r--r--synapse/handlers/federation.py72
-rw-r--r--synapse/handlers/message.py10
-rw-r--r--synapse/replication/http/federation.py8
-rw-r--r--synapse/replication/http/send_event.py8
-rw-r--r--synapse/storage/events_worker.py8
11 files changed, 205 insertions, 82 deletions
diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py
index 38470ad176..3fe52aaa45 100644
--- a/synapse/events/__init__.py
+++ b/synapse/events/__init__.py
@@ -18,7 +18,11 @@ from distutils.util import strtobool
 
 import six
 
-from synapse.api.constants import KNOWN_ROOM_VERSIONS, EventFormatVersions
+from synapse.api.constants import (
+    KNOWN_EVENT_FORMAT_VERSIONS,
+    KNOWN_ROOM_VERSIONS,
+    EventFormatVersions,
+)
 from synapse.util.caches import intern_dict
 from synapse.util.frozenutils import freeze
 
@@ -256,3 +260,21 @@ def room_version_to_event_format(room_version):
         raise RuntimeError("Unrecognized room version %s" % (room_version,))
 
     return EventFormatVersions.V1
+
+
+def event_type_from_format_version(format_version):
+    """Returns the python type to use to construct an Event object for the
+    given event format version.
+
+    Args:
+        format_version (int): The event format version
+
+    Returns:
+        type: A type that can be initialized as per the initializer of
+        `FrozenEvent`
+    """
+    if format_version not in KNOWN_EVENT_FORMAT_VERSIONS:
+        raise Exception(
+            "No event format %r" % (format_version,)
+        )
+    return FrozenEvent
diff --git a/synapse/events/builder.py b/synapse/events/builder.py
index e662eaef10..7e63371095 100644
--- a/synapse/events/builder.py
+++ b/synapse/events/builder.py
@@ -15,12 +15,39 @@
 
 import copy
 
+from synapse.api.constants import RoomVersions
 from synapse.types import EventID
 from synapse.util.stringutils import random_string
 
 from . import EventBase, FrozenEvent, _event_dict_property
 
 
+def get_event_builder(room_version, key_values={}, internal_metadata_dict={}):
+    """Generate an event builder appropriate for the given room version
+
+    Args:
+        room_version (str): Version of the room that we're creating an
+            event builder for
+        key_values (dict): Fields used as the basis of the new event
+        internal_metadata_dict (dict): Used to create the `_EventInternalMetadata`
+            object.
+
+    Returns:
+        EventBuilder
+    """
+    if room_version in {
+        RoomVersions.V1,
+        RoomVersions.V2,
+        RoomVersions.VDH_TEST,
+        RoomVersions.STATE_V2_TEST,
+    }:
+        return EventBuilder(key_values, internal_metadata_dict)
+    else:
+        raise Exception(
+            "No event format defined for version %r" % (room_version,)
+        )
+
+
 class EventBuilder(EventBase):
     def __init__(self, key_values={}, internal_metadata_dict={}):
         signatures = copy.deepcopy(key_values.pop("signatures", {}))
@@ -58,7 +85,29 @@ class EventBuilderFactory(object):
 
         return e_id.to_string()
 
-    def new(self, key_values={}):
+    def new(self, room_version, key_values={}):
+        """Generate an event builder appropriate for the given room version
+
+        Args:
+            room_version (str): Version of the room that we're creating an
+                event builder for
+            key_values (dict): Fields used as the basis of the new event
+
+        Returns:
+            EventBuilder
+        """
+
+        # There's currently only the one event version defined
+        if room_version not in {
+            RoomVersions.V1,
+            RoomVersions.V2,
+            RoomVersions.VDH_TEST,
+            RoomVersions.STATE_V2_TEST,
+        }:
+            raise Exception(
+                "No event format defined for version %r" % (room_version,)
+            )
+
         key_values["event_id"] = self.create_event_id()
 
         time_now = int(self.clock.time_msec())
diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py
index d749bfdd3a..5c31e5f85f 100644
--- a/synapse/federation/federation_base.py
+++ b/synapse/federation/federation_base.py
@@ -23,7 +23,7 @@ from twisted.internet.defer import DeferredList
 from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
 from synapse.api.errors import Codes, SynapseError
 from synapse.crypto.event_signing import check_event_content_hash
-from synapse.events import FrozenEvent
+from synapse.events import event_type_from_format_version
 from synapse.events.utils import prune_event
 from synapse.http.servlet import assert_params_in_dict
 from synapse.types import get_domain_from_id
@@ -302,11 +302,12 @@ def _is_invite_via_3pid(event):
     )
 
 
-def event_from_pdu_json(pdu_json, outlier=False):
+def event_from_pdu_json(pdu_json, event_format_version, outlier=False):
     """Construct a FrozenEvent from an event json received over federation
 
     Args:
         pdu_json (object): pdu as received over federation
+        event_format_version (int): The event format version
         outlier (bool): True to mark this event as an outlier
 
     Returns:
@@ -330,8 +331,8 @@ def event_from_pdu_json(pdu_json, outlier=False):
     elif depth > MAX_DEPTH:
         raise SynapseError(400, "Depth too large", Codes.BAD_JSON)
 
-    event = FrozenEvent(
-        pdu_json
+    event = event_type_from_format_version(event_format_version)(
+        pdu_json,
     )
 
     event.internal_metadata.outlier = outlier
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 33ecabca29..71809893c5 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -170,13 +170,13 @@ class FederationClient(FederationBase):
 
     @defer.inlineCallbacks
     @log_function
-    def backfill(self, dest, context, limit, extremities):
+    def backfill(self, dest, room_id, limit, extremities):
         """Requests some more historic PDUs for the given context from the
         given destination server.
 
         Args:
             dest (str): The remote home server to ask.
-            context (str): The context to backfill.
+            room_id (str): The room_id to backfill.
             limit (int): The maximum number of PDUs to return.
             extremities (list): List of PDU id and origins of the first pdus
                 we have seen from the context
@@ -191,12 +191,15 @@ class FederationClient(FederationBase):
             return
 
         transaction_data = yield self.transport_layer.backfill(
-            dest, context, extremities, limit)
+            dest, room_id, extremities, limit)
 
         logger.debug("backfill transaction_data=%s", repr(transaction_data))
 
+        room_version = yield self.store.get_room_version(room_id)
+        format_ver = room_version_to_event_format(room_version)
+
         pdus = [
-            event_from_pdu_json(p, outlier=False)
+            event_from_pdu_json(p, format_ver, outlier=False)
             for p in transaction_data["pdus"]
         ]
 
@@ -240,6 +243,8 @@ class FederationClient(FederationBase):
 
         pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {})
 
+        format_ver = room_version_to_event_format(room_version)
+
         signed_pdu = None
         for destination in destinations:
             now = self._clock.time_msec()
@@ -255,7 +260,7 @@ class FederationClient(FederationBase):
                 logger.debug("transaction_data %r", transaction_data)
 
                 pdu_list = [
-                    event_from_pdu_json(p, outlier=outlier)
+                    event_from_pdu_json(p, format_ver, outlier=outlier)
                     for p in transaction_data["pdus"]
                 ]
 
@@ -349,12 +354,16 @@ class FederationClient(FederationBase):
             destination, room_id, event_id=event_id,
         )
 
+        room_version = yield self.store.get_room_version(room_id)
+        format_ver = room_version_to_event_format(room_version)
+
         pdus = [
-            event_from_pdu_json(p, outlier=True) for p in result["pdus"]
+            event_from_pdu_json(p, format_ver, outlier=True)
+            for p in result["pdus"]
         ]
 
         auth_chain = [
-            event_from_pdu_json(p, outlier=True)
+            event_from_pdu_json(p, format_ver, outlier=True)
             for p in result.get("auth_chain", [])
         ]
 
@@ -362,8 +371,6 @@ class FederationClient(FederationBase):
             ev.event_id for ev in itertools.chain(pdus, auth_chain)
         ])
 
-        room_version = yield self.store.get_room_version(room_id)
-
         signed_pdus = yield self._check_sigs_and_hash_and_fetch(
             destination,
             [p for p in pdus if p.event_id not in seen_events],
@@ -462,13 +469,14 @@ class FederationClient(FederationBase):
             destination, room_id, event_id,
         )
 
+        room_version = yield self.store.get_room_version(room_id)
+        format_ver = room_version_to_event_format(room_version)
+
         auth_chain = [
-            event_from_pdu_json(p, outlier=True)
+            event_from_pdu_json(p, format_ver, outlier=True)
             for p in res["auth_chain"]
         ]
 
-        room_version = yield self.store.get_room_version(room_id)
-
         signed_auth = yield self._check_sigs_and_hash_and_fetch(
             destination, auth_chain,
             outlier=True, room_version=room_version,
@@ -605,7 +613,7 @@ class FederationClient(FederationBase):
             pdu_dict.pop("origin_server_ts", None)
             pdu_dict.pop("unsigned", None)
 
-            builder = self.event_builder_factory.new(pdu_dict)
+            builder = self.event_builder_factory.new(room_version, pdu_dict)
             add_hashes_and_signatures(
                 builder,
                 self.hs.hostname,
@@ -621,7 +629,7 @@ class FederationClient(FederationBase):
             "make_" + membership, destinations, send_request,
         )
 
-    def send_join(self, destinations, pdu):
+    def send_join(self, destinations, pdu, event_format_version):
         """Sends a join event to one of a list of homeservers.
 
         Doing so will cause the remote server to add the event to the graph,
@@ -631,6 +639,7 @@ class FederationClient(FederationBase):
             destinations (str): Candidate homeservers which are probably
                 participating in the room.
             pdu (BaseEvent): event to be sent
+            event_format_version (int): The event format version
 
         Return:
             Deferred: resolves to a dict with members ``origin`` (a string
@@ -676,12 +685,12 @@ class FederationClient(FederationBase):
             logger.debug("Got content: %s", content)
 
             state = [
-                event_from_pdu_json(p, outlier=True)
+                event_from_pdu_json(p, event_format_version, outlier=True)
                 for p in content.get("state", [])
             ]
 
             auth_chain = [
-                event_from_pdu_json(p, outlier=True)
+                event_from_pdu_json(p, event_format_version, outlier=True)
                 for p in content.get("auth_chain", [])
             ]
 
@@ -759,7 +768,10 @@ class FederationClient(FederationBase):
 
         logger.debug("Got response to send_invite: %s", pdu_dict)
 
-        pdu = event_from_pdu_json(pdu_dict)
+        room_version = yield self.store.get_room_version(room_id)
+        format_ver = room_version_to_event_format(room_version)
+
+        pdu = event_from_pdu_json(pdu_dict, format_ver)
 
         # Check signatures are correct.
         pdu = yield self._check_sigs_and_hash(pdu)
@@ -837,13 +849,14 @@ class FederationClient(FederationBase):
             content=send_content,
         )
 
+        room_version = yield self.store.get_room_version(room_id)
+        format_ver = room_version_to_event_format(room_version)
+
         auth_chain = [
-            event_from_pdu_json(e)
+            event_from_pdu_json(e, format_ver)
             for e in content["auth_chain"]
         ]
 
-        room_version = yield self.store.get_room_version(room_id)
-
         signed_auth = yield self._check_sigs_and_hash_and_fetch(
             destination, auth_chain, outlier=True, room_version=room_version,
         )
@@ -887,13 +900,14 @@ class FederationClient(FederationBase):
                 timeout=timeout,
             )
 
+            room_version = yield self.store.get_room_version(room_id)
+            format_ver = room_version_to_event_format(room_version)
+
             events = [
-                event_from_pdu_json(e)
+                event_from_pdu_json(e, format_ver)
                 for e in content.get("events", [])
             ]
 
-            room_version = yield self.store.get_room_version(room_id)
-
             signed_events = yield self._check_sigs_and_hash_and_fetch(
                 destination, events, outlier=False, room_version=room_version,
             )
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index dde166e295..4aa04b9588 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -34,6 +34,7 @@ from synapse.api.errors import (
     SynapseError,
 )
 from synapse.crypto.event_signing import compute_event_signature
+from synapse.events import room_version_to_event_format
 from synapse.federation.federation_base import FederationBase, event_from_pdu_json
 from synapse.federation.persistence import TransactionActions
 from synapse.federation.units import Edu, Transaction
@@ -178,14 +179,13 @@ class FederationServer(FederationBase):
                 continue
 
             try:
-                # In future we will actually use the room version to parse the
-                # PDU into an event.
-                yield self.store.get_room_version(room_id)
+                room_version = yield self.store.get_room_version(room_id)
+                format_ver = room_version_to_event_format(room_version)
             except NotFoundError:
                 logger.info("Ignoring PDU for unknown room_id: %s", room_id)
                 continue
 
-            event = event_from_pdu_json(p)
+            event = event_from_pdu_json(p, format_ver)
             pdus_by_room.setdefault(room_id, []).append(event)
 
         pdu_results = {}
@@ -370,7 +370,9 @@ class FederationServer(FederationBase):
 
     @defer.inlineCallbacks
     def on_invite_request(self, origin, content, room_version):
-        pdu = event_from_pdu_json(content)
+        format_ver = room_version_to_event_format(room_version)
+
+        pdu = event_from_pdu_json(content, format_ver)
         origin_host, _ = parse_server_name(origin)
         yield self.check_server_matches_acl(origin_host, pdu.room_id)
         ret_pdu = yield self.handler.on_invite_request(origin, pdu)
@@ -378,9 +380,12 @@ class FederationServer(FederationBase):
         defer.returnValue({"event": ret_pdu.get_pdu_json(time_now)})
 
     @defer.inlineCallbacks
-    def on_send_join_request(self, origin, content):
+    def on_send_join_request(self, origin, content, room_id):
         logger.debug("on_send_join_request: content: %s", content)
-        pdu = event_from_pdu_json(content)
+
+        room_version = yield self.store.get_room_version(room_id)
+        format_ver = room_version_to_event_format(room_version)
+        pdu = event_from_pdu_json(content, format_ver)
 
         origin_host, _ = parse_server_name(origin)
         yield self.check_server_matches_acl(origin_host, pdu.room_id)
@@ -410,9 +415,12 @@ class FederationServer(FederationBase):
         })
 
     @defer.inlineCallbacks
-    def on_send_leave_request(self, origin, content):
+    def on_send_leave_request(self, origin, content, room_id):
         logger.debug("on_send_leave_request: content: %s", content)
-        pdu = event_from_pdu_json(content)
+
+        room_version = yield self.store.get_room_version(room_id)
+        format_ver = room_version_to_event_format(room_version)
+        pdu = event_from_pdu_json(content, format_ver)
 
         origin_host, _ = parse_server_name(origin)
         yield self.check_server_matches_acl(origin_host, pdu.room_id)
@@ -458,13 +466,14 @@ class FederationServer(FederationBase):
             origin_host, _ = parse_server_name(origin)
             yield self.check_server_matches_acl(origin_host, room_id)
 
+            room_version = yield self.store.get_room_version(room_id)
+            format_ver = room_version_to_event_format(room_version)
+
             auth_chain = [
-                event_from_pdu_json(e)
+                event_from_pdu_json(e, format_ver)
                 for e in content["auth_chain"]
             ]
 
-            room_version = yield self.store.get_room_version(room_id)
-
             signed_auth = yield self._check_sigs_and_hash_and_fetch(
                 origin, auth_chain, outlier=True, room_version=room_version,
             )
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 4557a9e66e..67ae0212c3 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -469,7 +469,7 @@ class FederationSendLeaveServlet(BaseFederationServlet):
 
     @defer.inlineCallbacks
     def on_PUT(self, origin, content, query, room_id, event_id):
-        content = yield self.handler.on_send_leave_request(origin, content)
+        content = yield self.handler.on_send_leave_request(origin, content, room_id)
         defer.returnValue((200, content))
 
 
@@ -487,7 +487,7 @@ class FederationSendJoinServlet(BaseFederationServlet):
     def on_PUT(self, origin, content, query, context, event_id):
         # TODO(paul): assert that context/event_id parsed from path actually
         #   match those given in content
-        content = yield self.handler.on_send_join_request(origin, content)
+        content = yield self.handler.on_send_join_request(origin, content, context)
         defer.returnValue((200, content))
 
 
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index c52dca1b81..a4b771049c 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -1061,7 +1061,7 @@ class FederationHandler(BaseHandler):
         """
         logger.debug("Joining %s to %s", joinee, room_id)
 
-        origin, event = yield self._make_and_verify_event(
+        origin, event, event_format_version = yield self._make_and_verify_event(
             target_hosts,
             room_id,
             joinee,
@@ -1091,7 +1091,9 @@ class FederationHandler(BaseHandler):
                 target_hosts.insert(0, origin)
             except ValueError:
                 pass
-            ret = yield self.federation_client.send_join(target_hosts, event)
+            ret = yield self.federation_client.send_join(
+                target_hosts, event, event_format_version,
+            )
 
             origin = ret["origin"]
             state = ret["state"]
@@ -1164,13 +1166,18 @@ class FederationHandler(BaseHandler):
         """
         event_content = {"membership": Membership.JOIN}
 
-        builder = self.event_builder_factory.new({
-            "type": EventTypes.Member,
-            "content": event_content,
-            "room_id": room_id,
-            "sender": user_id,
-            "state_key": user_id,
-        })
+        room_version = yield self.store.get_room_version(room_id)
+
+        builder = self.event_builder_factory.new(
+            room_version,
+            {
+                "type": EventTypes.Member,
+                "content": event_content,
+                "room_id": room_id,
+                "sender": user_id,
+                "state_key": user_id,
+            }
+        )
 
         try:
             event, context = yield self.event_creation_handler.create_new_client_event(
@@ -1304,7 +1311,7 @@ class FederationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def do_remotely_reject_invite(self, target_hosts, room_id, user_id):
-        origin, event = yield self._make_and_verify_event(
+        origin, event, event_format_version = yield self._make_and_verify_event(
             target_hosts,
             room_id,
             user_id,
@@ -1336,7 +1343,7 @@ class FederationHandler(BaseHandler):
     @defer.inlineCallbacks
     def _make_and_verify_event(self, target_hosts, room_id, user_id, membership,
                                content={}, params=None):
-        origin, pdu, _ = yield self.federation_client.make_membership_event(
+        origin, event, format_ver = yield self.federation_client.make_membership_event(
             target_hosts,
             room_id,
             user_id,
@@ -1345,9 +1352,7 @@ class FederationHandler(BaseHandler):
             params=params,
         )
 
-        logger.debug("Got response to make_%s: %s", membership, pdu)
-
-        event = pdu
+        logger.debug("Got response to make_%s: %s", membership, event)
 
         # We should assert some things.
         # FIXME: Do this in a nicer way
@@ -1355,7 +1360,7 @@ class FederationHandler(BaseHandler):
         assert(event.user_id == user_id)
         assert(event.state_key == user_id)
         assert(event.room_id == room_id)
-        defer.returnValue((origin, event))
+        defer.returnValue((origin, event, format_ver))
 
     @defer.inlineCallbacks
     @log_function
@@ -1364,13 +1369,17 @@ class FederationHandler(BaseHandler):
         leave event for the room and return that. We do *not* persist or
         process it until the other server has signed it and sent it back.
         """
-        builder = self.event_builder_factory.new({
-            "type": EventTypes.Member,
-            "content": {"membership": Membership.LEAVE},
-            "room_id": room_id,
-            "sender": user_id,
-            "state_key": user_id,
-        })
+        room_version = yield self.store.get_room_version(room_id)
+        builder = self.event_builder_factory.new(
+            room_version,
+            {
+                "type": EventTypes.Member,
+                "content": {"membership": Membership.LEAVE},
+                "room_id": room_id,
+                "sender": user_id,
+                "state_key": user_id,
+            }
+        )
 
         event, context = yield self.event_creation_handler.create_new_client_event(
             builder=builder,
@@ -2266,14 +2275,16 @@ class FederationHandler(BaseHandler):
         }
 
         if (yield self.auth.check_host_in_room(room_id, self.hs.hostname)):
-            builder = self.event_builder_factory.new(event_dict)
+            room_version = yield self.store.get_room_version(room_id)
+            builder = self.event_builder_factory.new(room_version, event_dict)
+
             EventValidator().validate_new(builder)
             event, context = yield self.event_creation_handler.create_new_client_event(
                 builder=builder
             )
 
             event, context = yield self.add_display_name_to_third_party_invite(
-                event_dict, event, context
+                room_version, event_dict, event, context
             )
 
             try:
@@ -2304,14 +2315,18 @@ class FederationHandler(BaseHandler):
         Returns:
             Deferred: resolves (to None)
         """
-        builder = self.event_builder_factory.new(event_dict)
+        room_version = yield self.store.get_room_version(room_id)
+
+        # NB: event_dict has a particular specced format we might need to fudge
+        # if we change event formats too much.
+        builder = self.event_builder_factory.new(room_version, event_dict)
 
         event, context = yield self.event_creation_handler.create_new_client_event(
             builder=builder,
         )
 
         event, context = yield self.add_display_name_to_third_party_invite(
-            event_dict, event, context
+            room_version, event_dict, event, context
         )
 
         try:
@@ -2331,7 +2346,8 @@ class FederationHandler(BaseHandler):
         yield member_handler.send_membership_event(None, event, context)
 
     @defer.inlineCallbacks
-    def add_display_name_to_third_party_invite(self, event_dict, event, context):
+    def add_display_name_to_third_party_invite(self, room_version, event_dict,
+                                               event, context):
         key = (
             EventTypes.ThirdPartyInvite,
             event.content["third_party_invite"]["signed"]["token"]
@@ -2355,7 +2371,7 @@ class FederationHandler(BaseHandler):
             # auth checks. If we need the invite and don't have it then the
             # auth check code will explode appropriately.
 
-        builder = self.event_builder_factory.new(event_dict)
+        builder = self.event_builder_factory.new(room_version, event_dict)
         EventValidator().validate_new(builder)
         event, context = yield self.event_creation_handler.create_new_client_event(
             builder=builder,
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index a7cd779b02..7aaa4fba33 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -278,7 +278,15 @@ class EventCreationHandler(object):
         """
         yield self.auth.check_auth_blocking(requester.user.to_string())
 
-        builder = self.event_builder_factory.new(event_dict)
+        if event_dict["type"] == EventTypes.Create and event_dict["state_key"] == "":
+            room_version = event_dict["content"]["room_version"]
+        else:
+            try:
+                room_version = yield self.store.get_room_version(event_dict["room_id"])
+            except NotFoundError:
+                raise AuthError(403, "Unknown room")
+
+        builder = self.event_builder_factory.new(room_version, event_dict)
 
         self.validator.validate_new(builder)
 
diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py
index 64a79da162..2e16c69666 100644
--- a/synapse/replication/http/federation.py
+++ b/synapse/replication/http/federation.py
@@ -17,7 +17,7 @@ import logging
 
 from twisted.internet import defer
 
-from synapse.events import FrozenEvent
+from synapse.events import event_type_from_format_version
 from synapse.events.snapshot import EventContext
 from synapse.http.servlet import parse_json_object_from_request
 from synapse.replication.http._base import ReplicationEndpoint
@@ -70,6 +70,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
 
             event_payloads.append({
                 "event": event.get_pdu_json(),
+                "event_format_version": event.format_version,
                 "internal_metadata": event.internal_metadata.get_dict(),
                 "rejected_reason": event.rejected_reason,
                 "context": serialized_context,
@@ -94,9 +95,12 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
             event_and_contexts = []
             for event_payload in event_payloads:
                 event_dict = event_payload["event"]
+                format_ver = content["event_format_version"]
                 internal_metadata = event_payload["internal_metadata"]
                 rejected_reason = event_payload["rejected_reason"]
-                event = FrozenEvent(event_dict, internal_metadata, rejected_reason)
+
+                EventType = event_type_from_format_version(format_ver)
+                event = EventType(event_dict, internal_metadata, rejected_reason)
 
                 context = yield EventContext.deserialize(
                     self.store, event_payload["context"],
diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py
index 5b52c91650..3635015eda 100644
--- a/synapse/replication/http/send_event.py
+++ b/synapse/replication/http/send_event.py
@@ -17,7 +17,7 @@ import logging
 
 from twisted.internet import defer
 
-from synapse.events import FrozenEvent
+from synapse.events import event_type_from_format_version
 from synapse.events.snapshot import EventContext
 from synapse.http.servlet import parse_json_object_from_request
 from synapse.replication.http._base import ReplicationEndpoint
@@ -74,6 +74,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
 
         payload = {
             "event": event.get_pdu_json(),
+            "event_format_version": event.format_version,
             "internal_metadata": event.internal_metadata.get_dict(),
             "rejected_reason": event.rejected_reason,
             "context": serialized_context,
@@ -90,9 +91,12 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
             content = parse_json_object_from_request(request)
 
             event_dict = content["event"]
+            format_ver = content["event_format_version"]
             internal_metadata = content["internal_metadata"]
             rejected_reason = content["rejected_reason"]
-            event = FrozenEvent(event_dict, internal_metadata, rejected_reason)
+
+            EventType = event_type_from_format_version(format_ver)
+            event = EventType(event_dict, internal_metadata, rejected_reason)
 
             requester = Requester.deserialize(self.store, content["requester"])
             context = yield EventContext.deserialize(self.store, content["context"])
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index 599f892858..0a0ca58fc4 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -23,7 +23,7 @@ from twisted.internet import defer
 
 from synapse.api.constants import EventFormatVersions
 from synapse.api.errors import NotFoundError
-from synapse.events import FrozenEvent
+from synapse.events import FrozenEvent, event_type_from_format_version  # noqa: F401
 # these are only included to make the type annotations work
 from synapse.events.snapshot import EventContext  # noqa: F401
 from synapse.events.utils import prune_event
@@ -412,11 +412,7 @@ class EventsWorkerStore(SQLBaseStore):
                 # of a event format version, so it must be a V1 event.
                 format_version = EventFormatVersions.V1
 
-            # TODO: When we implement new event formats we'll need to use a
-            # different event python type
-            assert format_version == EventFormatVersions.V1
-
-            original_ev = FrozenEvent(
+            original_ev = event_type_from_format_version(format_version)(
                 event_dict=d,
                 internal_metadata_dict=internal_metadata,
                 rejected_reason=rejected_reason,