diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index 46c4b4b9dc..51ee078bc3 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -120,6 +120,19 @@ KNOWN_ROOM_VERSIONS = {
RoomVersions.STATE_V2_TEST,
}
+
+class EventFormatVersions(object):
+ """This is an internal enum for tracking the version of the event format,
+ independently from the room version.
+ """
+ V1 = 1
+
+
+KNOWN_EVENT_FORMAT_VERSIONS = {
+ EventFormatVersions.V1,
+}
+
+
ServerNoticeMsgType = "m.server_notice"
ServerNoticeLimitReached = "m.server_notice.usage_limit_reached"
diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py
index 16ad654864..3906475403 100644
--- a/synapse/api/filtering.py
+++ b/synapse/api/filtering.py
@@ -444,6 +444,20 @@ class Filter(object):
def include_redundant_members(self):
return self.filter_json.get("include_redundant_members", False)
+ def with_room_ids(self, room_ids):
+ """Returns a new filter with the given room IDs appended.
+
+ Args:
+ room_ids (iterable[unicode]): The room_ids to add
+
+ Returns:
+ filter: A new filter including the given rooms and the old
+ filter's rooms.
+ """
+ newFilter = Filter(self.filter_json)
+ newFilter.rooms += room_ids
+ return newFilter
+
def _matches_wildcard(actual_value, filter_value):
if filter_value.endswith("*"):
diff --git a/synapse/config/registration.py b/synapse/config/registration.py
index fe520d6855..d808a989f3 100644
--- a/synapse/config/registration.py
+++ b/synapse/config/registration.py
@@ -84,11 +84,11 @@ class RegistrationConfig(Config):
#
# allowed_local_3pids:
# - medium: email
- # pattern: ".*@matrix\\.org"
+ # pattern: '.*@matrix\\.org'
# - medium: email
- # pattern: ".*@vector\\.im"
+ # pattern: '.*@vector\\.im'
# - medium: msisdn
- # pattern: "\\+44"
+ # pattern: '\\+44'
# If set, allows registration by anyone who also has the shared
# secret, even if registration is otherwise disabled.
diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py
index 84c75495d5..38470ad176 100644
--- a/synapse/events/__init__.py
+++ b/synapse/events/__init__.py
@@ -18,6 +18,7 @@ from distutils.util import strtobool
import six
+from synapse.api.constants import KNOWN_ROOM_VERSIONS, EventFormatVersions
from synapse.util.caches import intern_dict
from synapse.util.frozenutils import freeze
@@ -41,8 +42,13 @@ class _EventInternalMetadata(object):
def is_outlier(self):
return getattr(self, "outlier", False)
- def is_invite_from_remote(self):
- return getattr(self, "invite_from_remote", False)
+ def is_out_of_band_membership(self):
+ """Whether this is an out of band membership, like an invite or an invite
+ rejection. This is needed as those events are marked as outliers, but
+ they still need to be processed as if they're new events (e.g. updating
+ invite state in the database, relaying to clients, etc).
+ """
+ return getattr(self, "out_of_band_membership", False)
def get_send_on_behalf_of(self):
"""Whether this server should send the event on behalf of another server.
@@ -179,6 +185,8 @@ class EventBase(object):
class FrozenEvent(EventBase):
+ format_version = EventFormatVersions.V1 # All events of this type are V1
+
def __init__(self, event_dict, internal_metadata_dict={}, rejected_reason=None):
event_dict = dict(event_dict)
@@ -232,3 +240,19 @@ class FrozenEvent(EventBase):
self.get("type", None),
self.get("state_key", None),
)
+
+
+def room_version_to_event_format(room_version):
+ """Converts a room version string to the event format
+
+ Args:
+ room_version (str)
+
+ Returns:
+ int
+ """
+ if room_version not in KNOWN_ROOM_VERSIONS:
+ # We should have already checked version, so this should not happen
+ raise RuntimeError("Unrecognized room version %s" % (room_version,))
+
+ return EventFormatVersions.V1
diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py
index b7ad729c63..d749bfdd3a 100644
--- a/synapse/federation/federation_base.py
+++ b/synapse/federation/federation_base.py
@@ -43,8 +43,8 @@ class FederationBase(object):
self._clock = hs.get_clock()
@defer.inlineCallbacks
- def _check_sigs_and_hash_and_fetch(self, origin, pdus, outlier=False,
- include_none=False):
+ def _check_sigs_and_hash_and_fetch(self, origin, pdus, room_version,
+ outlier=False, include_none=False):
"""Takes a list of PDUs and checks the signatures and hashs of each
one. If a PDU fails its signature check then we check if we have it in
the database and if not then request if from the originating server of
@@ -56,8 +56,12 @@ class FederationBase(object):
a new list.
Args:
+ origin (str)
pdu (list)
- outlier (bool)
+ room_version (str)
+ outlier (bool): Whether the events are outliers or not
+ include_none (str): Whether to include None in the returned list
+ for events that have failed their checks
Returns:
Deferred : A list of PDUs that have valid signatures and hashes.
@@ -84,6 +88,7 @@ class FederationBase(object):
res = yield self.get_pdu(
destinations=[pdu.origin],
event_id=pdu.event_id,
+ room_version=room_version,
outlier=outlier,
timeout=10000,
)
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index d05ed91d64..33ecabca29 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -25,14 +25,20 @@ from prometheus_client import Counter
from twisted.internet import defer
-from synapse.api.constants import KNOWN_ROOM_VERSIONS, EventTypes, Membership
+from synapse.api.constants import (
+ KNOWN_ROOM_VERSIONS,
+ EventTypes,
+ Membership,
+ RoomVersions,
+)
from synapse.api.errors import (
CodeMessageException,
FederationDeniedError,
HttpResponseException,
SynapseError,
)
-from synapse.events import builder
+from synapse.crypto.event_signing import add_hashes_and_signatures
+from synapse.events import room_version_to_event_format
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
from synapse.util import logcontext, unwrapFirstError
from synapse.util.caches.expiringcache import ExpiringCache
@@ -66,6 +72,8 @@ class FederationClient(FederationBase):
self.state = hs.get_state_handler()
self.transport_layer = hs.get_federation_transport_client()
+ self.event_builder_factory = hs.get_event_builder_factory()
+
self._get_pdu_cache = ExpiringCache(
cache_name="get_pdu_cache",
clock=self._clock,
@@ -202,7 +210,8 @@ class FederationClient(FederationBase):
@defer.inlineCallbacks
@log_function
- def get_pdu(self, destinations, event_id, outlier=False, timeout=None):
+ def get_pdu(self, destinations, event_id, room_version, outlier=False,
+ timeout=None):
"""Requests the PDU with given origin and ID from the remote home
servers.
@@ -212,6 +221,7 @@ class FederationClient(FederationBase):
Args:
destinations (list): Which home servers to query
event_id (str): event to fetch
+ room_version (str): version of the room
outlier (bool): Indicates whether the PDU is an `outlier`, i.e. if
it's from an arbitary point in the context as opposed to part
of the current block of PDUs. Defaults to `False`
@@ -352,10 +362,13 @@ class FederationClient(FederationBase):
ev.event_id for ev in itertools.chain(pdus, auth_chain)
])
+ room_version = yield self.store.get_room_version(room_id)
+
signed_pdus = yield self._check_sigs_and_hash_and_fetch(
destination,
[p for p in pdus if p.event_id not in seen_events],
- outlier=True
+ outlier=True,
+ room_version=room_version,
)
signed_pdus.extend(
seen_events[p.event_id] for p in pdus if p.event_id in seen_events
@@ -364,7 +377,8 @@ class FederationClient(FederationBase):
signed_auth = yield self._check_sigs_and_hash_and_fetch(
destination,
[p for p in auth_chain if p.event_id not in seen_events],
- outlier=True
+ outlier=True,
+ room_version=room_version,
)
signed_auth.extend(
seen_events[p.event_id] for p in auth_chain if p.event_id in seen_events
@@ -411,6 +425,8 @@ class FederationClient(FederationBase):
random.shuffle(srvs)
return srvs
+ room_version = yield self.store.get_room_version(room_id)
+
batch_size = 20
missing_events = list(missing_events)
for i in range(0, len(missing_events), batch_size):
@@ -421,6 +437,7 @@ class FederationClient(FederationBase):
self.get_pdu,
destinations=random_server_list(),
event_id=e_id,
+ room_version=room_version,
)
for e_id in batch
]
@@ -450,8 +467,11 @@ class FederationClient(FederationBase):
for p in res["auth_chain"]
]
+ room_version = yield self.store.get_room_version(room_id)
+
signed_auth = yield self._check_sigs_and_hash_and_fetch(
- destination, auth_chain, outlier=True
+ destination, auth_chain,
+ outlier=True, room_version=room_version,
)
signed_auth.sort(key=lambda e: e.depth)
@@ -522,6 +542,8 @@ class FederationClient(FederationBase):
Does so by asking one of the already participating servers to create an
event with proper context.
+ Returns a fully signed and hashed event.
+
Note that this does not append any events to any graphs.
Args:
@@ -536,8 +558,10 @@ class FederationClient(FederationBase):
params (dict[str, str|Iterable[str]]): Query parameters to include in the
request.
Return:
- Deferred: resolves to a tuple of (origin (str), event (object))
- where origin is the remote homeserver which generated the event.
+ Deferred[tuple[str, FrozenEvent, int]]: resolves to a tuple of
+ `(origin, event, event_format)` where origin is the remote
+ homeserver which generated the event, and event_format is one of
+ `synapse.api.constants.EventFormatVersions`.
Fails with a ``SynapseError`` if the chosen remote server
returns a 300/400 code.
@@ -557,6 +581,11 @@ class FederationClient(FederationBase):
destination, room_id, user_id, membership, params,
)
+ # Note: If not supplied, the room version may be either v1 or v2,
+ # however either way the event format version will be v1.
+ room_version = ret.get("room_version", RoomVersions.V1)
+ event_format = room_version_to_event_format(room_version)
+
pdu_dict = ret.get("event", None)
if not isinstance(pdu_dict, dict):
raise InvalidResponseError("Bad 'event' field in response")
@@ -571,10 +600,21 @@ class FederationClient(FederationBase):
if "prev_state" not in pdu_dict:
pdu_dict["prev_state"] = []
- ev = builder.EventBuilder(pdu_dict)
+ # Strip off the fields that we want to clobber.
+ pdu_dict.pop("origin", None)
+ pdu_dict.pop("origin_server_ts", None)
+ pdu_dict.pop("unsigned", None)
+
+ builder = self.event_builder_factory.new(pdu_dict)
+ add_hashes_and_signatures(
+ builder,
+ self.hs.hostname,
+ self.hs.config.signing_key[0]
+ )
+ ev = builder.build()
defer.returnValue(
- (destination, ev)
+ (destination, ev, event_format)
)
return self._try_destination_list(
@@ -650,9 +690,21 @@ class FederationClient(FederationBase):
for p in itertools.chain(state, auth_chain)
}
+ room_version = None
+ for e in state:
+ if (e.type, e.state_key) == (EventTypes.Create, ""):
+ room_version = e.content.get("room_version", RoomVersions.V1)
+ break
+
+ if room_version is None:
+ # If the state doesn't have a create event then the room is
+ # invalid, and it would fail auth checks anyway.
+ raise SynapseError(400, "No create event in state")
+
valid_pdus = yield self._check_sigs_and_hash_and_fetch(
destination, list(pdus.values()),
outlier=True,
+ room_version=room_version,
)
valid_pdus_map = {
@@ -790,8 +842,10 @@ class FederationClient(FederationBase):
for e in content["auth_chain"]
]
+ room_version = yield self.store.get_room_version(room_id)
+
signed_auth = yield self._check_sigs_and_hash_and_fetch(
- destination, auth_chain, outlier=True
+ destination, auth_chain, outlier=True, room_version=room_version,
)
signed_auth.sort(key=lambda e: e.depth)
@@ -838,8 +892,10 @@ class FederationClient(FederationBase):
for e in content.get("events", [])
]
+ room_version = yield self.store.get_room_version(room_id)
+
signed_events = yield self._check_sigs_and_hash_and_fetch(
- destination, events, outlier=False
+ destination, events, outlier=False, room_version=room_version,
)
except HttpResponseException as e:
if not e.code == 400:
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 37d29e7027..dde166e295 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -400,8 +400,14 @@ class FederationServer(FederationBase):
origin_host, _ = parse_server_name(origin)
yield self.check_server_matches_acl(origin_host, room_id)
pdu = yield self.handler.on_make_leave_request(room_id, user_id)
+
+ room_version = yield self.store.get_room_version(room_id)
+
time_now = self._clock.time_msec()
- defer.returnValue({"event": pdu.get_pdu_json(time_now)})
+ defer.returnValue({
+ "event": pdu.get_pdu_json(time_now),
+ "room_version": room_version,
+ })
@defer.inlineCallbacks
def on_send_leave_request(self, origin, content):
@@ -457,8 +463,10 @@ class FederationServer(FederationBase):
for e in content["auth_chain"]
]
+ room_version = yield self.store.get_room_version(room_id)
+
signed_auth = yield self._check_sigs_and_hash_and_fetch(
- origin, auth_chain, outlier=True
+ origin, auth_chain, outlier=True, room_version=room_version,
)
ret = yield self.handler.on_query_auth(
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index a3bb864bb2..c52dca1b81 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -34,6 +34,7 @@ from synapse.api.constants import (
EventTypes,
Membership,
RejectedReason,
+ RoomVersions,
)
from synapse.api.errors import (
AuthError,
@@ -43,10 +44,7 @@ from synapse.api.errors import (
StoreError,
SynapseError,
)
-from synapse.crypto.event_signing import (
- add_hashes_and_signatures,
- compute_event_signature,
-)
+from synapse.crypto.event_signing import compute_event_signature
from synapse.events.validator import EventValidator
from synapse.replication.http.federation import (
ReplicationCleanRoomRestServlet,
@@ -58,7 +56,6 @@ from synapse.types import UserID, get_domain_from_id
from synapse.util import logcontext, unwrapFirstError
from synapse.util.async_helpers import Linearizer
from synapse.util.distributor import user_joined_room
-from synapse.util.frozenutils import unfreeze
from synapse.util.logutils import log_function
from synapse.util.retryutils import NotRetryingDestination
from synapse.visibility import filter_events_for_server
@@ -342,6 +339,8 @@ class FederationHandler(BaseHandler):
room_id, event_id, p,
)
+ room_version = yield self.store.get_room_version(room_id)
+
with logcontext.nested_logging_context(p):
# note that if any of the missing prevs share missing state or
# auth events, the requests to fetch those events are deduped
@@ -355,7 +354,7 @@ class FederationHandler(BaseHandler):
# we want the state *after* p; get_state_for_room returns the
# state *before* p.
remote_event = yield self.federation_client.get_pdu(
- [origin], p, outlier=True,
+ [origin], p, room_version, outlier=True,
)
if remote_event is None:
@@ -379,7 +378,6 @@ class FederationHandler(BaseHandler):
for x in remote_state:
event_map[x.event_id] = x
- room_version = yield self.store.get_room_version(room_id)
state_map = yield resolve_events_with_store(
room_version, state_maps, event_map,
state_res_store=StateResolutionStore(self.store),
@@ -655,6 +653,8 @@ class FederationHandler(BaseHandler):
if dest == self.server_name:
raise SynapseError(400, "Can't backfill from self.")
+ room_version = yield self.store.get_room_version(room_id)
+
events = yield self.federation_client.backfill(
dest,
room_id,
@@ -748,6 +748,7 @@ class FederationHandler(BaseHandler):
self.federation_client.get_pdu,
[dest],
event_id,
+ room_version=room_version,
outlier=True,
timeout=10000,
)
@@ -1083,7 +1084,6 @@ class FederationHandler(BaseHandler):
handled_events = set()
try:
- event = self._sign_event(event)
# Try the host we successfully got a response to /make_join/
# request first.
try:
@@ -1287,7 +1287,7 @@ class FederationHandler(BaseHandler):
)
event.internal_metadata.outlier = True
- event.internal_metadata.invite_from_remote = True
+ event.internal_metadata.out_of_band_membership = True
event.signatures.update(
compute_event_signature(
@@ -1313,7 +1313,7 @@ class FederationHandler(BaseHandler):
# Mark as outlier as we don't have any state for this event; we're not
# even in the room.
event.internal_metadata.outlier = True
- event = self._sign_event(event)
+ event.internal_metadata.out_of_band_membership = True
# Try the host that we succesfully called /make_leave/ on first for
# the /send_leave/ request.
@@ -1336,7 +1336,7 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
def _make_and_verify_event(self, target_hosts, room_id, user_id, membership,
content={}, params=None):
- origin, pdu = yield self.federation_client.make_membership_event(
+ origin, pdu, _ = yield self.federation_client.make_membership_event(
target_hosts,
room_id,
user_id,
@@ -1357,27 +1357,6 @@ class FederationHandler(BaseHandler):
assert(event.room_id == room_id)
defer.returnValue((origin, event))
- def _sign_event(self, event):
- event.internal_metadata.outlier = False
-
- builder = self.event_builder_factory.new(
- unfreeze(event.get_pdu_json())
- )
-
- builder.event_id = self.event_builder_factory.create_event_id()
- builder.origin = self.hs.hostname
-
- if not hasattr(event, "signatures"):
- builder.signatures = {}
-
- add_hashes_and_signatures(
- builder,
- self.hs.hostname,
- self.hs.config.signing_key[0],
- )
-
- return builder.build()
-
@defer.inlineCallbacks
@log_function
def on_make_leave_request(self, room_id, user_id):
@@ -1659,6 +1638,13 @@ class FederationHandler(BaseHandler):
create_event = e
break
+ if create_event is None:
+ # If the state doesn't have a create event then the room is
+ # invalid, and it would fail auth checks anyway.
+ raise SynapseError(400, "No create event in state")
+
+ room_version = create_event.content.get("room_version", RoomVersions.V1)
+
missing_auth_events = set()
for e in itertools.chain(auth_events, state, [event]):
for e_id in e.auth_event_ids():
@@ -1669,6 +1655,7 @@ class FederationHandler(BaseHandler):
m_ev = yield self.federation_client.get_pdu(
[origin],
e_id,
+ room_version=room_version,
outlier=True,
timeout=10000,
)
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index ec936bbb4e..49c439313e 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -38,6 +38,41 @@ class SearchHandler(BaseHandler):
super(SearchHandler, self).__init__(hs)
@defer.inlineCallbacks
+ def get_old_rooms_from_upgraded_room(self, room_id):
+ """Retrieves room IDs of old rooms in the history of an upgraded room.
+
+ We do so by checking the m.room.create event of the room for a
+ `predecessor` key. If it exists, we add the room ID to our return
+ list and then check that room for a m.room.create event and so on
+ until we can no longer find any more previous rooms.
+
+ The full list of all found rooms in then returned.
+
+ Args:
+ room_id (str): id of the room to search through.
+
+ Returns:
+ Deferred[iterable[unicode]]: predecessor room ids
+ """
+
+ historical_room_ids = []
+
+ while True:
+ predecessor = yield self.store.get_room_predecessor(room_id)
+
+ # If no predecessor, assume we've hit a dead end
+ if not predecessor:
+ break
+
+ # Add predecessor's room ID
+ historical_room_ids.append(predecessor["room_id"])
+
+ # Scan through the old room for further predecessors
+ room_id = predecessor["room_id"]
+
+ defer.returnValue(historical_room_ids)
+
+ @defer.inlineCallbacks
def search(self, user, content, batch=None):
"""Performs a full text search for a user.
@@ -137,6 +172,18 @@ class SearchHandler(BaseHandler):
)
room_ids = set(r.room_id for r in rooms)
+ # If doing a subset of all rooms seearch, check if any of the rooms
+ # are from an upgraded room, and search their contents as well
+ if search_filter.rooms:
+ historical_room_ids = []
+ for room_id in search_filter.rooms:
+ # Add any previous rooms to the search if they exist
+ ids = yield self.get_old_rooms_from_upgraded_room(room_id)
+ historical_room_ids += ids
+
+ # Prevent any historical events from being filtered
+ search_filter = search_filter.with_room_ids(historical_room_ids)
+
room_ids = search_filter.filter_rooms(room_ids)
if batch_group == "room_id":
diff --git a/synapse/http/federation/matrix_federation_agent.py b/synapse/http/federation/matrix_federation_agent.py
index 0ec28c6696..1788e9a34a 100644
--- a/synapse/http/federation/matrix_federation_agent.py
+++ b/synapse/http/federation/matrix_federation_agent.py
@@ -19,6 +19,7 @@ from zope.interface import implementer
from twisted.internet import defer
from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
from twisted.web.client import URI, Agent, HTTPConnectionPool
+from twisted.web.http_headers import Headers
from twisted.web.iweb import IAgent
from synapse.http.endpoint import parse_server_name
@@ -109,6 +110,15 @@ class MatrixFederationAgent(object):
else:
target = pick_server_from_list(server_list)
+ # make sure that the Host header is set correctly
+ if headers is None:
+ headers = Headers()
+ else:
+ headers = headers.copy()
+
+ if not headers.hasHeader(b'host'):
+ headers.addRawHeader(b'host', server_name_bytes)
+
class EndpointFactory(object):
@staticmethod
def endpointForURI(_uri):
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 980e912348..bb2e64ed80 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -255,7 +255,6 @@ class MatrixFederationHttpClient(object):
headers_dict = {
b"User-Agent": [self.version_string_bytes],
- b"Host": [destination_bytes],
}
with limiter:
diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py
index c64d73ff21..31b8449ca1 100644
--- a/synapse/storage/engines/sqlite.py
+++ b/synapse/storage/engines/sqlite.py
@@ -15,7 +15,6 @@
import struct
import threading
-from sqlite3 import sqlite_version_info
from synapse.storage.prepare_database import prepare_database
@@ -34,10 +33,14 @@ class Sqlite3Engine(object):
@property
def can_native_upsert(self):
"""
- Do we support native UPSERTs? This requires SQLite3 3.24+, plus some
- more work we haven't done yet to tell what was inserted vs updated.
+ Do we support native UPSERTs?
"""
- return sqlite_version_info >= (3, 24, 0)
+ # SQLite3 3.24+ supports them, but empirically the unit tests don't work
+ # when its enabled.
+ # FIXME: Figure out what is wrong so we can re-enable native upserts
+
+ # return self.module.sqlite_version_info >= (3, 24, 0)
+ return False
def check_database(self, txn):
pass
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 79e0276de6..3e1915fb87 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -1268,6 +1268,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
event.internal_metadata.get_dict()
),
"json": encode_json(event_dict(event)),
+ "format_version": event.format_version,
}
for event, _ in events_and_contexts
],
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index a8326f5296..599f892858 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -21,10 +21,10 @@ from canonicaljson import json
from twisted.internet import defer
+from synapse.api.constants import EventFormatVersions
from synapse.api.errors import NotFoundError
-# these are only included to make the type annotations work
-from synapse.events import EventBase # noqa: F401
from synapse.events import FrozenEvent
+# these are only included to make the type annotations work
from synapse.events.snapshot import EventContext # noqa: F401
from synapse.events.utils import prune_event
from synapse.metrics.background_process_metrics import run_as_background_process
@@ -353,6 +353,7 @@ class EventsWorkerStore(SQLBaseStore):
self._get_event_from_row,
row["internal_metadata"], row["json"], row["redacts"],
rejected_reason=row["rejects"],
+ format_version=row["format_version"],
)
for row in rows
],
@@ -377,6 +378,7 @@ class EventsWorkerStore(SQLBaseStore):
" e.event_id as event_id, "
" e.internal_metadata,"
" e.json,"
+ " e.format_version, "
" r.redacts as redacts,"
" rej.event_id as rejects "
" FROM event_json as e"
@@ -392,7 +394,7 @@ class EventsWorkerStore(SQLBaseStore):
@defer.inlineCallbacks
def _get_event_from_row(self, internal_metadata, js, redacted,
- rejected_reason=None):
+ format_version, rejected_reason=None):
with Measure(self._clock, "_get_event_from_row"):
d = json.loads(js)
internal_metadata = json.loads(internal_metadata)
@@ -405,8 +407,17 @@ class EventsWorkerStore(SQLBaseStore):
desc="_get_event_from_row_rejected_reason",
)
+ if format_version is None:
+ # This means that we stored the event before we had the concept
+ # of a event format version, so it must be a V1 event.
+ format_version = EventFormatVersions.V1
+
+ # TODO: When we implement new event formats we'll need to use a
+ # different event python type
+ assert format_version == EventFormatVersions.V1
+
original_ev = FrozenEvent(
- d,
+ event_dict=d,
internal_metadata_dict=internal_metadata,
rejected_reason=rejected_reason,
)
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index d384526ba5..2874dabbd1 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -595,12 +595,12 @@ class RoomMemberStore(RoomMemberWorkerStore):
)
# We update the local_invites table only if the event is "current",
- # i.e., its something that has just happened.
- # The only current event that can also be an outlier is if its an
- # invite that has come in across federation.
+ # i.e., its something that has just happened. If the event is an
+ # outlier it is only current if its an "out of band membership",
+ # like a remote invite or a rejection of a remote invite.
is_new_state = not backfilled and (
not event.internal_metadata.is_outlier()
- or event.internal_metadata.is_invite_from_remote()
+ or event.internal_metadata.is_out_of_band_membership()
)
is_mine = self.hs.is_mine_id(event.state_key)
if is_new_state and is_mine:
diff --git a/synapse/storage/schema/delta/53/event_format_version.sql b/synapse/storage/schema/delta/53/event_format_version.sql
new file mode 100644
index 0000000000..1d977c2834
--- /dev/null
+++ b/synapse/storage/schema/delta/53/event_format_version.sql
@@ -0,0 +1,16 @@
+/* Copyright 2019 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE event_json ADD COLUMN format_version INTEGER;
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index a134e9b3e8..c3ab7db7ae 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -437,6 +437,30 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
create_event = yield self.get_event(create_id)
defer.returnValue(create_event.content.get("room_version", "1"))
+ @defer.inlineCallbacks
+ def get_room_predecessor(self, room_id):
+ """Get the predecessor room of an upgraded room if one exists.
+ Otherwise return None.
+
+ Args:
+ room_id (str)
+
+ Returns:
+ Deferred[unicode|None]: predecessor room id
+ """
+ state_ids = yield self.get_current_state_ids(room_id)
+ create_id = state_ids.get((EventTypes.Create, ""))
+
+ # If we can't find the create event, assume we've hit a dead end
+ if not create_id:
+ defer.returnValue(None)
+
+ # Retrieve the room's create event
+ create_event = yield self.get_event(create_id)
+
+ # Return predecessor if present
+ defer.returnValue(create_event.content.get("predecessor", None))
+
@cached(max_entries=100000, iterable=True)
def get_current_state_ids(self, room_id):
"""Get the current state event ids for a room based on the
|