diff --git a/changelog.d/4437.misc b/changelog.d/4437.misc
new file mode 100644
index 0000000000..43f8963614
--- /dev/null
+++ b/changelog.d/4437.misc
@@ -0,0 +1 @@
+Add infrastructure to support different event formats
diff --git a/changelog.d/4447.misc b/changelog.d/4447.misc
new file mode 100644
index 0000000000..43f8963614
--- /dev/null
+++ b/changelog.d/4447.misc
@@ -0,0 +1 @@
+Add infrastructure to support different event formats
diff --git a/changelog.d/4448.misc b/changelog.d/4448.misc
new file mode 100644
index 0000000000..43f8963614
--- /dev/null
+++ b/changelog.d/4448.misc
@@ -0,0 +1 @@
+Add infrastructure to support different event formats
diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index 46c4b4b9dc..51ee078bc3 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -120,6 +120,19 @@ KNOWN_ROOM_VERSIONS = {
RoomVersions.STATE_V2_TEST,
}
+
+class EventFormatVersions(object):
+ """This is an internal enum for tracking the version of the event format,
+ independently from the room version.
+ """
+ V1 = 1
+
+
+KNOWN_EVENT_FORMAT_VERSIONS = {
+ EventFormatVersions.V1,
+}
+
+
ServerNoticeMsgType = "m.server_notice"
ServerNoticeLimitReached = "m.server_notice.usage_limit_reached"
diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py
index 5030636c7e..01db729847 100644
--- a/synapse/events/__init__.py
+++ b/synapse/events/__init__.py
@@ -18,6 +18,7 @@ from distutils.util import strtobool
import six
+from synapse.api.constants import KNOWN_ROOM_VERSIONS, EventFormatVersions
from synapse.util.caches import intern_dict
from synapse.util.frozenutils import freeze
@@ -183,6 +184,8 @@ class EventBase(object):
class FrozenEvent(EventBase):
+ format_version = EventFormatVersions.V1 # All events of this type are V1
+
def __init__(self, event_dict, internal_metadata_dict={}, rejected_reason=None):
event_dict = dict(event_dict)
@@ -236,3 +239,18 @@ class FrozenEvent(EventBase):
self.get("type", None),
self.get("state_key", None),
)
+
+
+def room_version_to_event_format(room_version):
+ """Converts a room version string to the event format
+
+ Args:
+ room_version (str)
+
+ Returns:
+ int
+ """
+ if room_version not in KNOWN_ROOM_VERSIONS:
+ raise
+
+ return EventFormatVersions.V1
diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py
index b7ad729c63..d749bfdd3a 100644
--- a/synapse/federation/federation_base.py
+++ b/synapse/federation/federation_base.py
@@ -43,8 +43,8 @@ class FederationBase(object):
self._clock = hs.get_clock()
@defer.inlineCallbacks
- def _check_sigs_and_hash_and_fetch(self, origin, pdus, outlier=False,
- include_none=False):
+ def _check_sigs_and_hash_and_fetch(self, origin, pdus, room_version,
+ outlier=False, include_none=False):
"""Takes a list of PDUs and checks the signatures and hashs of each
one. If a PDU fails its signature check then we check if we have it in
the database and if not then request if from the originating server of
@@ -56,8 +56,12 @@ class FederationBase(object):
a new list.
Args:
+ origin (str)
pdu (list)
- outlier (bool)
+ room_version (str)
+ outlier (bool): Whether the events are outliers or not
+ include_none (str): Whether to include None in the returned list
+ for events that have failed their checks
Returns:
Deferred : A list of PDUs that have valid signatures and hashes.
@@ -84,6 +88,7 @@ class FederationBase(object):
res = yield self.get_pdu(
destinations=[pdu.origin],
event_id=pdu.event_id,
+ room_version=room_version,
outlier=outlier,
timeout=10000,
)
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index f4adcb556d..9ba3e1c42f 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -25,13 +25,19 @@ from prometheus_client import Counter
from twisted.internet import defer
-from synapse.api.constants import KNOWN_ROOM_VERSIONS, EventTypes, Membership
+from synapse.api.constants import (
+ KNOWN_ROOM_VERSIONS,
+ EventTypes,
+ Membership,
+ RoomVersions,
+)
from synapse.api.errors import (
CodeMessageException,
FederationDeniedError,
HttpResponseException,
SynapseError,
)
+from synapse.events import room_version_to_event_format
from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
from synapse.util import logcontext, unwrapFirstError
@@ -204,7 +210,8 @@ class FederationClient(FederationBase):
@defer.inlineCallbacks
@log_function
- def get_pdu(self, destinations, event_id, outlier=False, timeout=None):
+ def get_pdu(self, destinations, event_id, room_version, outlier=False,
+ timeout=None):
"""Requests the PDU with given origin and ID from the remote home
servers.
@@ -214,6 +221,7 @@ class FederationClient(FederationBase):
Args:
destinations (list): Which home servers to query
event_id (str): event to fetch
+ room_version (str): version of the room
outlier (bool): Indicates whether the PDU is an `outlier`, i.e. if
it's from an arbitary point in the context as opposed to part
of the current block of PDUs. Defaults to `False`
@@ -354,10 +362,13 @@ class FederationClient(FederationBase):
ev.event_id for ev in itertools.chain(pdus, auth_chain)
])
+ room_version = yield self.store.get_room_version(room_id)
+
signed_pdus = yield self._check_sigs_and_hash_and_fetch(
destination,
[p for p in pdus if p.event_id not in seen_events],
- outlier=True
+ outlier=True,
+ room_version=room_version,
)
signed_pdus.extend(
seen_events[p.event_id] for p in pdus if p.event_id in seen_events
@@ -366,7 +377,8 @@ class FederationClient(FederationBase):
signed_auth = yield self._check_sigs_and_hash_and_fetch(
destination,
[p for p in auth_chain if p.event_id not in seen_events],
- outlier=True
+ outlier=True,
+ room_version=room_version,
)
signed_auth.extend(
seen_events[p.event_id] for p in auth_chain if p.event_id in seen_events
@@ -413,6 +425,8 @@ class FederationClient(FederationBase):
random.shuffle(srvs)
return srvs
+ room_version = yield self.store.get_room_version(room_id)
+
batch_size = 20
missing_events = list(missing_events)
for i in range(0, len(missing_events), batch_size):
@@ -423,6 +437,7 @@ class FederationClient(FederationBase):
self.get_pdu,
destinations=random_server_list(),
event_id=e_id,
+ room_version=room_version,
)
for e_id in batch
]
@@ -452,8 +467,11 @@ class FederationClient(FederationBase):
for p in res["auth_chain"]
]
+ room_version = yield self.store.get_room_version(room_id)
+
signed_auth = yield self._check_sigs_and_hash_and_fetch(
- destination, auth_chain, outlier=True
+ destination, auth_chain,
+ outlier=True, room_version=room_version,
)
signed_auth.sort(key=lambda e: e.depth)
@@ -538,8 +556,9 @@ class FederationClient(FederationBase):
params (dict[str, str|Iterable[str]]): Query parameters to include in the
request.
Return:
- Deferred: resolves to a tuple of (origin (str), event (object))
- where origin is the remote homeserver which generated the event.
+ Deferred[tuple[str, dict, int]]: resolves to a tuple of
+ `(origin, event, event_format)` where origin is the remote
+ homeserver which generated the event.
Fails with a ``SynapseError`` if the chosen remote server
returns a 300/400 code.
@@ -559,6 +578,11 @@ class FederationClient(FederationBase):
destination, room_id, user_id, membership, params,
)
+ # Note: If not supplied, the room version may be either v1 or v2,
+ # however either way the event format version will be v1.
+ room_version = ret.get("room_version", RoomVersions.V1)
+ event_format = room_version_to_event_format(room_version)
+
pdu_dict = ret.get("event", None)
if not isinstance(pdu_dict, dict):
raise InvalidResponseError("Bad 'event' field in response")
@@ -587,7 +611,7 @@ class FederationClient(FederationBase):
ev = builder.build()
defer.returnValue(
- (destination, ev)
+ (destination, ev, event_format)
)
return self._try_destination_list(
@@ -663,9 +687,20 @@ class FederationClient(FederationBase):
for p in itertools.chain(state, auth_chain)
}
+ room_version = None
+ for e in state:
+ if (e.type, e.state_key) == (EventTypes.Create, ""):
+ room_version = e.content.get("room_version", RoomVersions.V1)
+ break
+
+ if room_version is None:
+ # We use this error has that is what
+ raise SynapseError(400, "No create event in state")
+
valid_pdus = yield self._check_sigs_and_hash_and_fetch(
destination, list(pdus.values()),
outlier=True,
+ room_version=room_version,
)
valid_pdus_map = {
@@ -803,8 +838,10 @@ class FederationClient(FederationBase):
for e in content["auth_chain"]
]
+ room_version = yield self.store.get_room_version(room_id)
+
signed_auth = yield self._check_sigs_and_hash_and_fetch(
- destination, auth_chain, outlier=True
+ destination, auth_chain, outlier=True, room_version=room_version,
)
signed_auth.sort(key=lambda e: e.depth)
@@ -851,8 +888,10 @@ class FederationClient(FederationBase):
for e in content.get("events", [])
]
+ room_version = yield self.store.get_room_version(room_id)
+
signed_events = yield self._check_sigs_and_hash_and_fetch(
- destination, events, outlier=False
+ destination, events, outlier=False, room_version=room_version,
)
except HttpResponseException as e:
if not e.code == 400:
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 37d29e7027..dde166e295 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -400,8 +400,14 @@ class FederationServer(FederationBase):
origin_host, _ = parse_server_name(origin)
yield self.check_server_matches_acl(origin_host, room_id)
pdu = yield self.handler.on_make_leave_request(room_id, user_id)
+
+ room_version = yield self.store.get_room_version(room_id)
+
time_now = self._clock.time_msec()
- defer.returnValue({"event": pdu.get_pdu_json(time_now)})
+ defer.returnValue({
+ "event": pdu.get_pdu_json(time_now),
+ "room_version": room_version,
+ })
@defer.inlineCallbacks
def on_send_leave_request(self, origin, content):
@@ -457,8 +463,10 @@ class FederationServer(FederationBase):
for e in content["auth_chain"]
]
+ room_version = yield self.store.get_room_version(room_id)
+
signed_auth = yield self._check_sigs_and_hash_and_fetch(
- origin, auth_chain, outlier=True
+ origin, auth_chain, outlier=True, room_version=room_version,
)
ret = yield self.handler.on_query_auth(
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index e017cab777..d4d945030e 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -34,6 +34,7 @@ from synapse.api.constants import (
EventTypes,
Membership,
RejectedReason,
+ RoomVersions,
)
from synapse.api.errors import (
AuthError,
@@ -338,6 +339,8 @@ class FederationHandler(BaseHandler):
room_id, event_id, p,
)
+ room_version = yield self.store.get_room_version(room_id)
+
with logcontext.nested_logging_context(p):
# note that if any of the missing prevs share missing state or
# auth events, the requests to fetch those events are deduped
@@ -351,7 +354,7 @@ class FederationHandler(BaseHandler):
# we want the state *after* p; get_state_for_room returns the
# state *before* p.
remote_event = yield self.federation_client.get_pdu(
- [origin], p, outlier=True,
+ [origin], p, room_version, outlier=True,
)
if remote_event is None:
@@ -375,7 +378,6 @@ class FederationHandler(BaseHandler):
for x in remote_state:
event_map[x.event_id] = x
- room_version = yield self.store.get_room_version(room_id)
state_map = yield resolve_events_with_store(
room_version, state_maps, event_map,
state_res_store=StateResolutionStore(self.store),
@@ -651,6 +653,8 @@ class FederationHandler(BaseHandler):
if dest == self.server_name:
raise SynapseError(400, "Can't backfill from self.")
+ room_version = yield self.store.get_room_version(room_id)
+
events = yield self.federation_client.backfill(
dest,
room_id,
@@ -744,6 +748,7 @@ class FederationHandler(BaseHandler):
self.federation_client.get_pdu,
[dest],
event_id,
+ room_version=room_version,
outlier=True,
timeout=10000,
)
@@ -1333,7 +1338,7 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
def _make_and_verify_event(self, target_hosts, room_id, user_id, membership,
content={}, params=None):
- origin, pdu = yield self.federation_client.make_membership_event(
+ origin, pdu, _ = yield self.federation_client.make_membership_event(
target_hosts,
room_id,
user_id,
@@ -1635,6 +1640,8 @@ class FederationHandler(BaseHandler):
create_event = e
break
+ room_version = create_event.content.get("room_version", RoomVersions.V1)
+
missing_auth_events = set()
for e in itertools.chain(auth_events, state, [event]):
for e_id in e.auth_event_ids():
@@ -1645,6 +1652,7 @@ class FederationHandler(BaseHandler):
m_ev = yield self.federation_client.get_pdu(
[origin],
e_id,
+ room_version=room_version,
outlier=True,
timeout=10000,
)
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 79e0276de6..3e1915fb87 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -1268,6 +1268,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
event.internal_metadata.get_dict()
),
"json": encode_json(event_dict(event)),
+ "format_version": event.format_version,
}
for event, _ in events_and_contexts
],
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index a8326f5296..599f892858 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -21,10 +21,10 @@ from canonicaljson import json
from twisted.internet import defer
+from synapse.api.constants import EventFormatVersions
from synapse.api.errors import NotFoundError
-# these are only included to make the type annotations work
-from synapse.events import EventBase # noqa: F401
from synapse.events import FrozenEvent
+# these are only included to make the type annotations work
from synapse.events.snapshot import EventContext # noqa: F401
from synapse.events.utils import prune_event
from synapse.metrics.background_process_metrics import run_as_background_process
@@ -353,6 +353,7 @@ class EventsWorkerStore(SQLBaseStore):
self._get_event_from_row,
row["internal_metadata"], row["json"], row["redacts"],
rejected_reason=row["rejects"],
+ format_version=row["format_version"],
)
for row in rows
],
@@ -377,6 +378,7 @@ class EventsWorkerStore(SQLBaseStore):
" e.event_id as event_id, "
" e.internal_metadata,"
" e.json,"
+ " e.format_version, "
" r.redacts as redacts,"
" rej.event_id as rejects "
" FROM event_json as e"
@@ -392,7 +394,7 @@ class EventsWorkerStore(SQLBaseStore):
@defer.inlineCallbacks
def _get_event_from_row(self, internal_metadata, js, redacted,
- rejected_reason=None):
+ format_version, rejected_reason=None):
with Measure(self._clock, "_get_event_from_row"):
d = json.loads(js)
internal_metadata = json.loads(internal_metadata)
@@ -405,8 +407,17 @@ class EventsWorkerStore(SQLBaseStore):
desc="_get_event_from_row_rejected_reason",
)
+ if format_version is None:
+ # This means that we stored the event before we had the concept
+ # of a event format version, so it must be a V1 event.
+ format_version = EventFormatVersions.V1
+
+ # TODO: When we implement new event formats we'll need to use a
+ # different event python type
+ assert format_version == EventFormatVersions.V1
+
original_ev = FrozenEvent(
- d,
+ event_dict=d,
internal_metadata_dict=internal_metadata,
rejected_reason=rejected_reason,
)
diff --git a/synapse/storage/schema/delta/53/event_format_version.sql b/synapse/storage/schema/delta/53/event_format_version.sql
new file mode 100644
index 0000000000..1d977c2834
--- /dev/null
+++ b/synapse/storage/schema/delta/53/event_format_version.sql
@@ -0,0 +1,16 @@
+/* Copyright 2019 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE event_json ADD COLUMN format_version INTEGER;
|