diff --git a/synapse/handlers/account_data.py b/synapse/handlers/account_data.py
index 38bc67191c..2d7e6df6e4 100644
--- a/synapse/handlers/account_data.py
+++ b/synapse/handlers/account_data.py
@@ -38,9 +38,10 @@ class AccountDataEventSource(object):
{"type": "m.tag", "content": {"tags": room_tags}, "room_id": room_id}
)
- account_data, room_account_data = (
- yield self.store.get_updated_account_data_for_user(user_id, last_stream_id)
- )
+ (
+ account_data,
+ room_account_data,
+ ) = yield self.store.get_updated_account_data_for_user(user_id, last_stream_id)
for account_data_type, content in account_data.items():
results.append({"type": account_data_type, "content": content})
diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py
index 1a87b58838..6407d56f8e 100644
--- a/synapse/handlers/admin.py
+++ b/synapse/handlers/admin.py
@@ -30,6 +30,9 @@ class AdminHandler(BaseHandler):
def __init__(self, hs):
super(AdminHandler, self).__init__(hs)
+ self.storage = hs.get_storage()
+ self.state_store = self.storage.state
+
@defer.inlineCallbacks
def get_whois(self, user):
connections = []
@@ -205,7 +208,7 @@ class AdminHandler(BaseHandler):
from_key = events[-1].internal_metadata.after
- events = yield filter_events_for_client(self.store, user_id, events)
+ events = yield filter_events_for_client(self.storage, user_id, events)
writer.write_events(room_id, events)
@@ -241,7 +244,7 @@ class AdminHandler(BaseHandler):
for event_id in extremities:
if not event_to_unseen_prevs[event_id]:
continue
- state = yield self.store.get_state_for_event(event_id)
+ state = yield self.state_store.get_state_for_event(event_id)
writer.write_state(room_id, event_id, state)
return writer.finished()
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 3e9b298154..fe62f78e67 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -73,7 +73,10 @@ class ApplicationServicesHandler(object):
try:
limit = 100
while True:
- upper_bound, events = yield self.store.get_new_events_for_appservice(
+ (
+ upper_bound,
+ events,
+ ) = yield self.store.get_new_events_for_appservice(
self.current_max, limit
)
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 333eb30625..7a0f54ca24 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -525,7 +525,7 @@ class AuthHandler(BaseHandler):
result = None
if not user_infos:
- logger.warn("Attempted to login as %s but they do not exist", user_id)
+ logger.warning("Attempted to login as %s but they do not exist", user_id)
elif len(user_infos) == 1:
# a single match (possibly not exact)
result = user_infos.popitem()
@@ -534,7 +534,7 @@ class AuthHandler(BaseHandler):
result = (user_id, user_infos[user_id])
else:
# multiple matches, none of them exact
- logger.warn(
+ logger.warning(
"Attempted to login as %s but it matches more than one user "
"inexactly: %r",
user_id,
@@ -728,7 +728,7 @@ class AuthHandler(BaseHandler):
result = yield self.validate_hash(password, password_hash)
if not result:
- logger.warn("Failed password login for user %s", user_id)
+ logger.warning("Failed password login for user %s", user_id)
return None
return user_id
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 5f23ee4488..26ef5e150c 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -46,6 +46,7 @@ class DeviceWorkerHandler(BaseHandler):
self.hs = hs
self.state = hs.get_state_handler()
+ self.state_store = hs.get_storage().state
self._auth_handler = hs.get_auth_handler()
@trace
@@ -178,7 +179,7 @@ class DeviceWorkerHandler(BaseHandler):
continue
# mapping from event_id -> state_dict
- prev_state_ids = yield self.store.get_state_ids_for_events(event_ids)
+ prev_state_ids = yield self.state_store.get_state_ids_for_events(event_ids)
# Check if we've joined the room? If so we just blindly add all the users to
# the "possibly changed" users.
@@ -458,7 +459,18 @@ class DeviceHandler(DeviceWorkerHandler):
@defer.inlineCallbacks
def on_federation_query_user_devices(self, user_id):
stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id)
- return {"user_id": user_id, "stream_id": stream_id, "devices": devices}
+ master_key = yield self.store.get_e2e_cross_signing_key(user_id, "master")
+ self_signing_key = yield self.store.get_e2e_cross_signing_key(
+ user_id, "self_signing"
+ )
+
+ return {
+ "user_id": user_id,
+ "stream_id": stream_id,
+ "devices": devices,
+ "master_key": master_key,
+ "self_signing_key": self_signing_key,
+ }
@defer.inlineCallbacks
def user_left_room(self, user, room_id):
@@ -656,7 +668,7 @@ class DeviceListUpdater(object):
except (NotRetryingDestination, RequestSendFailed, HttpResponseException):
# TODO: Remember that we are now out of sync and try again
# later
- logger.warn("Failed to handle device list update for %s", user_id)
+ logger.warning("Failed to handle device list update for %s", user_id)
# We abort on exceptions rather than accepting the update
# as otherwise synapse will 'forget' that its device list
# is out of date. If we bail then we will retry the resync
@@ -694,7 +706,7 @@ class DeviceListUpdater(object):
# up on storing the total list of devices and only handle the
# delta instead.
if len(devices) > 1000:
- logger.warn(
+ logger.warning(
"Ignoring device list snapshot for %s as it has >1K devs (%d)",
user_id,
len(devices),
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index 0043cbea17..73b9e120f5 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -52,7 +52,7 @@ class DeviceMessageHandler(object):
local_messages = {}
sender_user_id = content["sender"]
if origin != get_domain_from_id(sender_user_id):
- logger.warn(
+ logger.warning(
"Dropping device message from %r with spoofed sender %r",
origin,
sender_user_id,
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 526379c6f7..c4632f8984 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -250,7 +250,7 @@ class DirectoryHandler(BaseHandler):
ignore_backoff=True,
)
except CodeMessageException as e:
- logging.warn("Error retrieving alias")
+ logging.warning("Error retrieving alias")
if e.code == 404:
result = None
else:
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 5ea54f60be..f09a0b73c8 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -36,6 +36,8 @@ from synapse.types import (
get_verify_key_from_cross_signing_key,
)
from synapse.util import unwrapFirstError
+from synapse.util.async_helpers import Linearizer
+from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.retryutils import NotRetryingDestination
logger = logging.getLogger(__name__)
@@ -49,10 +51,19 @@ class E2eKeysHandler(object):
self.is_mine = hs.is_mine
self.clock = hs.get_clock()
+ self._edu_updater = SigningKeyEduUpdater(hs, self)
+
+ federation_registry = hs.get_federation_registry()
+
+ # FIXME: switch to m.signing_key_update when MSC1756 is merged into the spec
+ federation_registry.register_edu_handler(
+ "org.matrix.signing_key_update",
+ self._edu_updater.incoming_signing_key_update,
+ )
# doesn't really work as part of the generic query API, because the
# query request requires an object POST, but we abuse the
# "query handler" interface.
- hs.get_federation_registry().register_query_handler(
+ federation_registry.register_query_handler(
"client_keys", self.on_federation_query_client_keys
)
@@ -119,9 +130,10 @@ class E2eKeysHandler(object):
else:
query_list.append((user_id, None))
- user_ids_not_in_cache, remote_results = (
- yield self.store.get_user_devices_from_cache(query_list)
- )
+ (
+ user_ids_not_in_cache,
+ remote_results,
+ ) = yield self.store.get_user_devices_from_cache(query_list)
for user_id, devices in iteritems(remote_results):
user_devices = results.setdefault(user_id, {})
for device_id, device in iteritems(devices):
@@ -207,13 +219,15 @@ class E2eKeysHandler(object):
if user_id in destination_query:
results[user_id] = keys
- for user_id, key in remote_result["master_keys"].items():
- if user_id in destination_query:
- cross_signing_keys["master_keys"][user_id] = key
+ if "master_keys" in remote_result:
+ for user_id, key in remote_result["master_keys"].items():
+ if user_id in destination_query:
+ cross_signing_keys["master_keys"][user_id] = key
- for user_id, key in remote_result["self_signing_keys"].items():
- if user_id in destination_query:
- cross_signing_keys["self_signing_keys"][user_id] = key
+ if "self_signing_keys" in remote_result:
+ for user_id, key in remote_result["self_signing_keys"].items():
+ if user_id in destination_query:
+ cross_signing_keys["self_signing_keys"][user_id] = key
except Exception as e:
failure = _exception_to_failure(e)
@@ -251,7 +265,7 @@ class E2eKeysHandler(object):
Returns:
defer.Deferred[dict[str, dict[str, dict]]]: map from
- (master|self_signing|user_signing) -> user_id -> key
+ (master_keys|self_signing_keys|user_signing_keys) -> user_id -> key
"""
master_keys = {}
self_signing_keys = {}
@@ -343,7 +357,16 @@ class E2eKeysHandler(object):
"""
device_keys_query = query_body.get("device_keys", {})
res = yield self.query_local_devices(device_keys_query)
- return {"device_keys": res}
+ ret = {"device_keys": res}
+
+ # add in the cross-signing keys
+ cross_signing_keys = yield self.get_cross_signing_keys_from_cache(
+ device_keys_query, None
+ )
+
+ ret.update(cross_signing_keys)
+
+ return ret
@trace
@defer.inlineCallbacks
@@ -688,17 +711,21 @@ class E2eKeysHandler(object):
try:
# get our self-signing key to verify the signatures
- _, self_signing_key_id, self_signing_verify_key = yield self._get_e2e_cross_signing_verify_key(
- user_id, "self_signing"
- )
+ (
+ _,
+ self_signing_key_id,
+ self_signing_verify_key,
+ ) = yield self._get_e2e_cross_signing_verify_key(user_id, "self_signing")
# get our master key, since we may have received a signature of it.
# We need to fetch it here so that we know what its key ID is, so
# that we can check if a signature that was sent is a signature of
# the master key or of a device
- master_key, _, master_verify_key = yield self._get_e2e_cross_signing_verify_key(
- user_id, "master"
- )
+ (
+ master_key,
+ _,
+ master_verify_key,
+ ) = yield self._get_e2e_cross_signing_verify_key(user_id, "master")
# fetch our stored devices. This is used to 1. verify
# signatures on the master key, and 2. to compare with what
@@ -838,9 +865,11 @@ class E2eKeysHandler(object):
try:
# get our user-signing key to verify the signatures
- user_signing_key, user_signing_key_id, user_signing_verify_key = yield self._get_e2e_cross_signing_verify_key(
- user_id, "user_signing"
- )
+ (
+ user_signing_key,
+ user_signing_key_id,
+ user_signing_verify_key,
+ ) = yield self._get_e2e_cross_signing_verify_key(user_id, "user_signing")
except SynapseError as e:
failure = _exception_to_failure(e)
for user, devicemap in signatures.items():
@@ -859,7 +888,11 @@ class E2eKeysHandler(object):
try:
# get the target user's master key, to make sure it matches
# what was sent
- master_key, master_key_id, _ = yield self._get_e2e_cross_signing_verify_key(
+ (
+ master_key,
+ master_key_id,
+ _,
+ ) = yield self._get_e2e_cross_signing_verify_key(
target_user, "master", user_id
)
@@ -1047,3 +1080,100 @@ class SignatureListItem:
target_user_id = attr.ib()
target_device_id = attr.ib()
signature = attr.ib()
+
+
+class SigningKeyEduUpdater(object):
+ """Handles incoming signing key updates from federation and updates the DB"""
+
+ def __init__(self, hs, e2e_keys_handler):
+ self.store = hs.get_datastore()
+ self.federation = hs.get_federation_client()
+ self.clock = hs.get_clock()
+ self.e2e_keys_handler = e2e_keys_handler
+
+ self._remote_edu_linearizer = Linearizer(name="remote_signing_key")
+
+ # user_id -> list of updates waiting to be handled.
+ self._pending_updates = {}
+
+ # Recently seen stream ids. We don't bother keeping these in the DB,
+ # but they're useful to have them about to reduce the number of spurious
+ # resyncs.
+ self._seen_updates = ExpiringCache(
+ cache_name="signing_key_update_edu",
+ clock=self.clock,
+ max_len=10000,
+ expiry_ms=30 * 60 * 1000,
+ iterable=True,
+ )
+
+ @defer.inlineCallbacks
+ def incoming_signing_key_update(self, origin, edu_content):
+ """Called on incoming signing key update from federation. Responsible for
+ parsing the EDU and adding to pending updates list.
+
+ Args:
+ origin (string): the server that sent the EDU
+ edu_content (dict): the contents of the EDU
+ """
+
+ user_id = edu_content.pop("user_id")
+ master_key = edu_content.pop("master_key", None)
+ self_signing_key = edu_content.pop("self_signing_key", None)
+
+ if get_domain_from_id(user_id) != origin:
+ logger.warning("Got signing key update edu for %r from %r", user_id, origin)
+ return
+
+ room_ids = yield self.store.get_rooms_for_user(user_id)
+ if not room_ids:
+ # We don't share any rooms with this user. Ignore update, as we
+ # probably won't get any further updates.
+ return
+
+ self._pending_updates.setdefault(user_id, []).append(
+ (master_key, self_signing_key)
+ )
+
+ yield self._handle_signing_key_updates(user_id)
+
+ @defer.inlineCallbacks
+ def _handle_signing_key_updates(self, user_id):
+ """Actually handle pending updates.
+
+ Args:
+ user_id (string): the user whose updates we are processing
+ """
+
+ device_handler = self.e2e_keys_handler.device_handler
+
+ with (yield self._remote_edu_linearizer.queue(user_id)):
+ pending_updates = self._pending_updates.pop(user_id, [])
+ if not pending_updates:
+ # This can happen since we batch updates
+ return
+
+ device_ids = []
+
+ logger.info("pending updates: %r", pending_updates)
+
+ for master_key, self_signing_key in pending_updates:
+ if master_key:
+ yield self.store.set_e2e_cross_signing_key(
+ user_id, "master", master_key
+ )
+ _, verify_key = get_verify_key_from_cross_signing_key(master_key)
+ # verify_key is a VerifyKey from signedjson, which uses
+ # .version to denote the portion of the key ID after the
+ # algorithm and colon, which is the device ID
+ device_ids.append(verify_key.version)
+ if self_signing_key:
+ yield self.store.set_e2e_cross_signing_key(
+ user_id, "self_signing", self_signing_key
+ )
+ _, verify_key = get_verify_key_from_cross_signing_key(
+ self_signing_key
+ )
+ device_ids.append(verify_key.version)
+
+ yield device_handler.notify_device_update(user_id, device_ids)
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 5e748687e3..45fe13c62f 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -147,6 +147,10 @@ class EventStreamHandler(BaseHandler):
class EventHandler(BaseHandler):
+ def __init__(self, hs):
+ super(EventHandler, self).__init__(hs)
+ self.storage = hs.get_storage()
+
@defer.inlineCallbacks
def get_event(self, user, room_id, event_id):
"""Retrieve a single specified event.
@@ -172,7 +176,7 @@ class EventHandler(BaseHandler):
is_peeking = user.to_string() not in users
filtered = yield filter_events_for_client(
- self.store, user.to_string(), [event], is_peeking=is_peeking
+ self.storage, user.to_string(), [event], is_peeking=is_peeking
)
if not filtered:
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 488058fe68..8cafcfdab0 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -45,6 +45,7 @@ from synapse.api.errors import (
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersions
from synapse.crypto.event_signing import compute_event_signature
from synapse.event_auth import auth_types_for_event
+from synapse.events.snapshot import EventContext
from synapse.events.validator import EventValidator
from synapse.logging.context import (
make_deferred_yieldable,
@@ -109,6 +110,8 @@ class FederationHandler(BaseHandler):
self.hs = hs
self.store = hs.get_datastore()
+ self.storage = hs.get_storage()
+ self.state_store = self.storage.state
self.federation_client = hs.get_federation_client()
self.state_handler = hs.get_state_handler()
self.server_name = hs.hostname
@@ -180,7 +183,7 @@ class FederationHandler(BaseHandler):
try:
self._sanity_check_event(pdu)
except SynapseError as err:
- logger.warn(
+ logger.warning(
"[%s %s] Received event failed sanity checks", room_id, event_id
)
raise FederationError("ERROR", err.code, err.msg, affected=pdu.event_id)
@@ -301,7 +304,7 @@ class FederationHandler(BaseHandler):
# following.
if sent_to_us_directly:
- logger.warn(
+ logger.warning(
"[%s %s] Rejecting: failed to fetch %d prev events: %s",
room_id,
event_id,
@@ -324,7 +327,7 @@ class FederationHandler(BaseHandler):
event_map = {event_id: pdu}
try:
# Get the state of the events we know about
- ours = yield self.store.get_state_groups_ids(room_id, seen)
+ ours = yield self.state_store.get_state_groups_ids(room_id, seen)
# state_maps is a list of mappings from (type, state_key) to event_id
state_maps = list(
@@ -350,10 +353,11 @@ class FederationHandler(BaseHandler):
# note that if any of the missing prevs share missing state or
# auth events, the requests to fetch those events are deduped
# by the get_pdu_cache in federation_client.
- remote_state, got_auth_chain = (
- yield self.federation_client.get_state_for_room(
- origin, room_id, p
- )
+ (
+ remote_state,
+ got_auth_chain,
+ ) = yield self.federation_client.get_state_for_room(
+ origin, room_id, p
)
# we want the state *after* p; get_state_for_room returns the
@@ -405,7 +409,7 @@ class FederationHandler(BaseHandler):
state = [event_map[e] for e in six.itervalues(state_map)]
auth_chain = list(auth_chains)
except Exception:
- logger.warn(
+ logger.warning(
"[%s %s] Error attempting to resolve state at missing "
"prev_events",
room_id,
@@ -518,7 +522,9 @@ class FederationHandler(BaseHandler):
# We failed to get the missing events, but since we need to handle
# the case of `get_missing_events` not returning the necessary
# events anyway, it is safe to simply log the error and continue.
- logger.warn("[%s %s]: Failed to get prev_events: %s", room_id, event_id, e)
+ logger.warning(
+ "[%s %s]: Failed to get prev_events: %s", room_id, event_id, e
+ )
return
logger.info(
@@ -545,7 +551,7 @@ class FederationHandler(BaseHandler):
yield self.on_receive_pdu(origin, ev, sent_to_us_directly=False)
except FederationError as e:
if e.code == 403:
- logger.warn(
+ logger.warning(
"[%s %s] Received prev_event %s failed history check.",
room_id,
event_id,
@@ -888,7 +894,7 @@ class FederationHandler(BaseHandler):
# We set `check_history_visibility_only` as we might otherwise get false
# positives from users having been erased.
filtered_extremities = yield filter_events_for_server(
- self.store,
+ self.storage,
self.server_name,
list(extremities_events.values()),
redact=False,
@@ -1059,7 +1065,7 @@ class FederationHandler(BaseHandler):
SynapseError if the event does not pass muster
"""
if len(ev.prev_event_ids()) > 20:
- logger.warn(
+ logger.warning(
"Rejecting event %s which has %i prev_events",
ev.event_id,
len(ev.prev_event_ids()),
@@ -1067,7 +1073,7 @@ class FederationHandler(BaseHandler):
raise SynapseError(http_client.BAD_REQUEST, "Too many prev_events")
if len(ev.auth_event_ids()) > 10:
- logger.warn(
+ logger.warning(
"Rejecting event %s which has %i auth_events",
ev.event_id,
len(ev.auth_event_ids()),
@@ -1101,7 +1107,7 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
def do_invite_join(self, target_hosts, room_id, joinee, content):
""" Attempts to join the `joinee` to the room `room_id` via the
- server `target_host`.
+ servers contained in `target_hosts`.
This first triggers a /make_join/ request that returns a partial
event that we can fill out and sign. This is then sent to the
@@ -1110,6 +1116,15 @@ class FederationHandler(BaseHandler):
We suspend processing of any received events from this room until we
have finished processing the join.
+
+ Args:
+ target_hosts (Iterable[str]): List of servers to attempt to join the room with.
+
+ room_id (str): The ID of the room to join.
+
+ joinee (str): The User ID of the joining user.
+
+ content (dict): The event content to use for the join event.
"""
logger.debug("Joining %s to %s", joinee, room_id)
@@ -1169,6 +1184,22 @@ class FederationHandler(BaseHandler):
yield self._persist_auth_tree(origin, auth_chain, state, event)
+ # Check whether this room is the result of an upgrade of a room we already know
+ # about. If so, migrate over user information
+ predecessor = yield self.store.get_room_predecessor(room_id)
+ if not predecessor:
+ return
+ old_room_id = predecessor["room_id"]
+ logger.debug(
+ "Found predecessor for %s during remote join: %s", room_id, old_room_id
+ )
+
+ # We retrieve the room member handler here as to not cause a cyclic dependency
+ member_handler = self.hs.get_room_member_handler()
+ yield member_handler.transfer_room_state_on_room_upgrade(
+ old_room_id, room_id
+ )
+
logger.debug("Finished joining %s to %s", joinee, room_id)
finally:
room_queue = self.room_queues[room_id]
@@ -1203,7 +1234,7 @@ class FederationHandler(BaseHandler):
with nested_logging_context(p.event_id):
yield self.on_receive_pdu(origin, p, sent_to_us_directly=True)
except Exception as e:
- logger.warn(
+ logger.warning(
"Error handling queued PDU %s from %s: %s", p.event_id, origin, e
)
@@ -1250,7 +1281,7 @@ class FederationHandler(BaseHandler):
builder=builder
)
except AuthError as e:
- logger.warn("Failed to create join %r because %s", event, e)
+ logger.warning("Failed to create join to %s because %s", room_id, e)
raise e
event_allowed = yield self.third_party_event_rules.check_event_allowed(
@@ -1494,7 +1525,7 @@ class FederationHandler(BaseHandler):
room_version, event, context, do_sig_check=False
)
except AuthError as e:
- logger.warn("Failed to create new leave %r because %s", event, e)
+ logger.warning("Failed to create new leave %r because %s", event, e)
raise e
return event
@@ -1549,7 +1580,7 @@ class FederationHandler(BaseHandler):
event_id, allow_none=False, check_room_id=room_id
)
- state_groups = yield self.store.get_state_groups(room_id, [event_id])
+ state_groups = yield self.state_store.get_state_groups(room_id, [event_id])
if state_groups:
_, state = list(iteritems(state_groups)).pop()
@@ -1578,7 +1609,7 @@ class FederationHandler(BaseHandler):
event_id, allow_none=False, check_room_id=room_id
)
- state_groups = yield self.store.get_state_groups_ids(room_id, [event_id])
+ state_groups = yield self.state_store.get_state_groups_ids(room_id, [event_id])
if state_groups:
_, state = list(state_groups.items()).pop()
@@ -1606,7 +1637,7 @@ class FederationHandler(BaseHandler):
events = yield self.store.get_backfill_events(room_id, pdu_list, limit)
- events = yield filter_events_for_server(self.store, origin, events)
+ events = yield filter_events_for_server(self.storage, origin, events)
return events
@@ -1636,7 +1667,7 @@ class FederationHandler(BaseHandler):
if not in_room:
raise AuthError(403, "Host not in room.")
- events = yield filter_events_for_server(self.store, origin, [event])
+ events = yield filter_events_for_server(self.storage, origin, [event])
event = events[0]
return event
else:
@@ -1788,7 +1819,7 @@ class FederationHandler(BaseHandler):
# cause SynapseErrors in auth.check. We don't want to give up
# the attempt to federate altogether in such cases.
- logger.warn("Rejecting %s because %s", e.event_id, err.msg)
+ logger.warning("Rejecting %s because %s", e.event_id, err.msg)
if e == event:
raise
@@ -1841,12 +1872,7 @@ class FederationHandler(BaseHandler):
if c and c.type == EventTypes.Create:
auth_events[(c.type, c.state_key)] = c
- try:
- yield self.do_auth(origin, event, context, auth_events=auth_events)
- except AuthError as e:
- logger.warn("[%s %s] Rejecting: %s", event.room_id, event.event_id, e.msg)
-
- context.rejected = RejectedReason.AUTH_ERROR
+ context = yield self.do_auth(origin, event, context, auth_events=auth_events)
if not context.rejected:
yield self._check_for_soft_fail(event, state, backfilled)
@@ -1902,7 +1928,7 @@ class FederationHandler(BaseHandler):
# given state at the event. This should correctly handle cases
# like bans, especially with state res v2.
- state_sets = yield self.store.get_state_groups(
+ state_sets = yield self.state_store.get_state_groups(
event.room_id, extrem_ids
)
state_sets = list(state_sets.values())
@@ -1938,7 +1964,7 @@ class FederationHandler(BaseHandler):
try:
event_auth.check(room_version, event, auth_events=current_auth_events)
except AuthError as e:
- logger.warn("Soft-failing %r because %s", event, e)
+ logger.warning("Soft-failing %r because %s", event, e)
event.internal_metadata.soft_failed = True
@defer.inlineCallbacks
@@ -1993,7 +2019,7 @@ class FederationHandler(BaseHandler):
)
missing_events = yield filter_events_for_server(
- self.store, origin, missing_events
+ self.storage, origin, missing_events
)
return missing_events
@@ -2015,12 +2041,12 @@ class FederationHandler(BaseHandler):
Also NB that this function adds entries to it.
Returns:
- defer.Deferred[None]
+ defer.Deferred[EventContext]: updated context object
"""
room_version = yield self.store.get_room_version(event.room_id)
try:
- yield self._update_auth_events_and_context_for_auth(
+ context = yield self._update_auth_events_and_context_for_auth(
origin, event, context, auth_events
)
except Exception:
@@ -2037,8 +2063,10 @@ class FederationHandler(BaseHandler):
try:
event_auth.check(room_version, event, auth_events=auth_events)
except AuthError as e:
- logger.warn("Failed auth resolution for %r because %s", event, e)
- raise e
+ logger.warning("Failed auth resolution for %r because %s", event, e)
+ context.rejected = RejectedReason.AUTH_ERROR
+
+ return context
@defer.inlineCallbacks
def _update_auth_events_and_context_for_auth(
@@ -2062,7 +2090,7 @@ class FederationHandler(BaseHandler):
auth_events (dict[(str, str)->synapse.events.EventBase]):
Returns:
- defer.Deferred[None]
+ defer.Deferred[EventContext]: updated context
"""
event_auth_events = set(event.auth_event_ids())
@@ -2101,7 +2129,7 @@ class FederationHandler(BaseHandler):
# The other side isn't around or doesn't implement the
# endpoint, so lets just bail out.
logger.info("Failed to get event auth from remote: %s", e)
- return
+ return context
seen_remotes = yield self.store.have_seen_events(
[e.event_id for e in remote_auth_chain]
@@ -2142,7 +2170,7 @@ class FederationHandler(BaseHandler):
if event.internal_metadata.is_outlier():
logger.info("Skipping auth_event fetch for outlier")
- return
+ return context
# FIXME: Assumes we have and stored all the state for all the
# prev_events
@@ -2151,7 +2179,7 @@ class FederationHandler(BaseHandler):
)
if not different_auth:
- return
+ return context
logger.info(
"auth_events refers to events which are not in our calculated auth "
@@ -2198,10 +2226,12 @@ class FederationHandler(BaseHandler):
auth_events.update(new_state)
- yield self._update_context_for_auth_events(
+ context = yield self._update_context_for_auth_events(
event, context, auth_events, event_key
)
+ return context
+
@defer.inlineCallbacks
def _update_context_for_auth_events(self, event, context, auth_events, event_key):
"""Update the state_ids in an event context after auth event resolution,
@@ -2210,14 +2240,16 @@ class FederationHandler(BaseHandler):
Args:
event (Event): The event we're handling the context for
- context (synapse.events.snapshot.EventContext): event context
- to be updated
+ context (synapse.events.snapshot.EventContext): initial event context
auth_events (dict[(str, str)->str]): Events to update in the event
context.
event_key ((str, str)): (type, state_key) for the current event.
this will not be included in the current_state in the context.
+
+ Returns:
+ Deferred[EventContext]: new event context
"""
state_updates = {
k: a.event_id for k, a in iteritems(auth_events) if k != event_key
@@ -2234,7 +2266,7 @@ class FederationHandler(BaseHandler):
# create a new state group as a delta from the existing one.
prev_group = context.state_group
- state_group = yield self.store.store_state_group(
+ state_group = yield self.state_store.store_state_group(
event.event_id,
event.room_id,
prev_group=prev_group,
@@ -2242,7 +2274,7 @@ class FederationHandler(BaseHandler):
current_state_ids=current_state_ids,
)
- yield context.update_state(
+ return EventContext.with_state(
state_group=state_group,
current_state_ids=current_state_ids,
prev_state_ids=prev_state_ids,
@@ -2431,10 +2463,12 @@ class FederationHandler(BaseHandler):
try:
yield self.auth.check_from_context(room_version, event, context)
except AuthError as e:
- logger.warn("Denying new third party invite %r because %s", event, e)
+ logger.warning("Denying new third party invite %r because %s", event, e)
raise e
yield self._check_signature(event, context)
+
+ # We retrieve the room member handler here as to not cause a cyclic dependency
member_handler = self.hs.get_room_member_handler()
yield member_handler.send_membership_event(None, event, context)
else:
@@ -2487,7 +2521,7 @@ class FederationHandler(BaseHandler):
try:
yield self.auth.check_from_context(room_version, event, context)
except AuthError as e:
- logger.warn("Denying third party invite %r because %s", event, e)
+ logger.warning("Denying third party invite %r because %s", event, e)
raise e
yield self._check_signature(event, context)
@@ -2495,6 +2529,7 @@ class FederationHandler(BaseHandler):
# though the sender isn't a local user.
event.internal_metadata.send_on_behalf_of = get_domain_from_id(event.sender)
+ # We retrieve the room member handler here as to not cause a cyclic dependency
member_handler = self.hs.get_room_member_handler()
yield member_handler.send_membership_event(None, event, context)
@@ -2664,7 +2699,7 @@ class FederationHandler(BaseHandler):
backfilled=backfilled,
)
else:
- max_stream_id = yield self.store.persist_events(
+ max_stream_id = yield self.storage.persistence.persist_events(
event_and_contexts, backfilled=backfilled
)
diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py
index 46eb9ee88b..92fecbfc44 100644
--- a/synapse/handlers/groups_local.py
+++ b/synapse/handlers/groups_local.py
@@ -392,7 +392,7 @@ class GroupsLocalHandler(object):
try:
user_profile = yield self.profile_handler.get_profile(user_id)
except Exception as e:
- logger.warn("No profile for user %s: %s", user_id, e)
+ logger.warning("No profile for user %s: %s", user_id, e)
user_profile = {}
return {"state": "invite", "user_profile": user_profile}
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index ba99ddf76d..000fbf090f 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -272,7 +272,7 @@ class IdentityHandler(BaseHandler):
changed = False
if e.code in (400, 404, 501):
# The remote server probably doesn't support unbinding (yet)
- logger.warn("Received %d response while unbinding threepid", e.code)
+ logger.warning("Received %d response while unbinding threepid", e.code)
else:
logger.error("Failed to unbind threepid on identity server: %s", e)
raise SynapseError(500, "Failed to contact identity server")
@@ -403,7 +403,7 @@ class IdentityHandler(BaseHandler):
if self.hs.config.using_identity_server_from_trusted_list:
# Warn that a deprecated config option is in use
- logger.warn(
+ logger.warning(
'The config option "trust_identity_server_for_password_resets" '
'has been replaced by "account_threepid_delegate". '
"Please consult the sample config at docs/sample_config.yaml for "
@@ -457,7 +457,7 @@ class IdentityHandler(BaseHandler):
if self.hs.config.using_identity_server_from_trusted_list:
# Warn that a deprecated config option is in use
- logger.warn(
+ logger.warning(
'The config option "trust_identity_server_for_password_resets" '
'has been replaced by "account_threepid_delegate". '
"Please consult the sample config at docs/sample_config.yaml for "
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index f991efeee3..81dce96f4b 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -43,6 +43,8 @@ class InitialSyncHandler(BaseHandler):
self.validator = EventValidator()
self.snapshot_cache = SnapshotCache()
self._event_serializer = hs.get_event_client_serializer()
+ self.storage = hs.get_storage()
+ self.state_store = self.storage.state
def snapshot_all_rooms(
self,
@@ -126,8 +128,8 @@ class InitialSyncHandler(BaseHandler):
tags_by_room = yield self.store.get_tags_for_user(user_id)
- account_data, account_data_by_room = (
- yield self.store.get_account_data_for_user(user_id)
+ account_data, account_data_by_room = yield self.store.get_account_data_for_user(
+ user_id
)
public_room_ids = yield self.store.get_public_room_ids()
@@ -169,7 +171,7 @@ class InitialSyncHandler(BaseHandler):
elif event.membership == Membership.LEAVE:
room_end_token = "s%d" % (event.stream_ordering,)
deferred_room_state = run_in_background(
- self.store.get_state_for_events, [event.event_id]
+ self.state_store.get_state_for_events, [event.event_id]
)
deferred_room_state.addCallback(
lambda states: states[event.event_id]
@@ -189,7 +191,9 @@ class InitialSyncHandler(BaseHandler):
)
).addErrback(unwrapFirstError)
- messages = yield filter_events_for_client(self.store, user_id, messages)
+ messages = yield filter_events_for_client(
+ self.storage, user_id, messages
+ )
start_token = now_token.copy_and_replace("room_key", token)
end_token = now_token.copy_and_replace("room_key", room_end_token)
@@ -307,7 +311,7 @@ class InitialSyncHandler(BaseHandler):
def _room_initial_sync_parted(
self, user_id, room_id, pagin_config, membership, member_event_id, is_peeking
):
- room_state = yield self.store.get_state_for_events([member_event_id])
+ room_state = yield self.state_store.get_state_for_events([member_event_id])
room_state = room_state[member_event_id]
@@ -322,7 +326,7 @@ class InitialSyncHandler(BaseHandler):
)
messages = yield filter_events_for_client(
- self.store, user_id, messages, is_peeking=is_peeking
+ self.storage, user_id, messages, is_peeking=is_peeking
)
start_token = StreamToken.START.copy_and_replace("room_key", token)
@@ -414,7 +418,7 @@ class InitialSyncHandler(BaseHandler):
)
messages = yield filter_events_for_client(
- self.store, user_id, messages, is_peeking=is_peeking
+ self.storage, user_id, messages, is_peeking=is_peeking
)
start_token = now_token.copy_and_replace("room_key", token)
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 0f8cce8ffe..d682dc2b7a 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -59,6 +59,8 @@ class MessageHandler(object):
self.clock = hs.get_clock()
self.state = hs.get_state_handler()
self.store = hs.get_datastore()
+ self.storage = hs.get_storage()
+ self.state_store = self.storage.state
self._event_serializer = hs.get_event_client_serializer()
@defer.inlineCallbacks
@@ -74,15 +76,16 @@ class MessageHandler(object):
Raises:
SynapseError if something went wrong.
"""
- membership, membership_event_id = yield self.auth.check_in_room_or_world_readable(
- room_id, user_id
- )
+ (
+ membership,
+ membership_event_id,
+ ) = yield self.auth.check_in_room_or_world_readable(room_id, user_id)
if membership == Membership.JOIN:
data = yield self.state.get_current_state(room_id, event_type, state_key)
elif membership == Membership.LEAVE:
key = (event_type, state_key)
- room_state = yield self.store.get_state_for_events(
+ room_state = yield self.state_store.get_state_for_events(
[membership_event_id], StateFilter.from_types([key])
)
data = room_state[membership_event_id].get(key)
@@ -135,12 +138,12 @@ class MessageHandler(object):
raise NotFoundError("Can't find event for token %s" % (at_token,))
visible_events = yield filter_events_for_client(
- self.store, user_id, last_events
+ self.storage, user_id, last_events
)
event = last_events[0]
if visible_events:
- room_state = yield self.store.get_state_for_events(
+ room_state = yield self.state_store.get_state_for_events(
[event.event_id], state_filter=state_filter
)
room_state = room_state[event.event_id]
@@ -151,9 +154,10 @@ class MessageHandler(object):
% (user_id, room_id, at_token),
)
else:
- membership, membership_event_id = (
- yield self.auth.check_in_room_or_world_readable(room_id, user_id)
- )
+ (
+ membership,
+ membership_event_id,
+ ) = yield self.auth.check_in_room_or_world_readable(room_id, user_id)
if membership == Membership.JOIN:
state_ids = yield self.store.get_filtered_current_state_ids(
@@ -161,7 +165,7 @@ class MessageHandler(object):
)
room_state = yield self.store.get_events(state_ids.values())
elif membership == Membership.LEAVE:
- room_state = yield self.store.get_state_for_events(
+ room_state = yield self.state_store.get_state_for_events(
[membership_event_id], state_filter=state_filter
)
room_state = room_state[membership_event_id]
@@ -234,6 +238,7 @@ class EventCreationHandler(object):
self.hs = hs
self.auth = hs.get_auth()
self.store = hs.get_datastore()
+ self.storage = hs.get_storage()
self.state = hs.get_state_handler()
self.clock = hs.get_clock()
self.validator = EventValidator()
@@ -687,7 +692,7 @@ class EventCreationHandler(object):
try:
yield self.auth.check_from_context(room_version, event, context)
except AuthError as err:
- logger.warn("Denying new event %r because %s", event, err)
+ logger.warning("Denying new event %r because %s", event, err)
raise err
# Ensure that we can round trip before trying to persist in db
@@ -868,7 +873,7 @@ class EventCreationHandler(object):
if prev_state_ids:
raise AuthError(403, "Changing the room create event is forbidden")
- (event_stream_id, max_stream_id) = yield self.store.persist_event(
+ event_stream_id, max_stream_id = yield self.storage.persistence.persist_event(
event, context=context
)
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 5744f4579d..97f15a1c32 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -69,6 +69,8 @@ class PaginationHandler(object):
self.hs = hs
self.auth = hs.get_auth()
self.store = hs.get_datastore()
+ self.storage = hs.get_storage()
+ self.state_store = self.storage.state
self.clock = hs.get_clock()
self._server_name = hs.hostname
@@ -210,9 +212,10 @@ class PaginationHandler(object):
source_config = pagin_config.get_source_config("room")
with (yield self.pagination_lock.read(room_id)):
- membership, member_event_id = yield self.auth.check_in_room_or_world_readable(
- room_id, user_id
- )
+ (
+ membership,
+ member_event_id,
+ ) = yield self.auth.check_in_room_or_world_readable(room_id, user_id)
if source_config.direction == "b":
# if we're going backwards, we might need to backfill. This
@@ -255,7 +258,7 @@ class PaginationHandler(object):
events = event_filter.filter(events)
events = yield filter_events_for_client(
- self.store, user_id, events, is_peeking=(member_event_id is None)
+ self.storage, user_id, events, is_peeking=(member_event_id is None)
)
if not events:
@@ -274,7 +277,7 @@ class PaginationHandler(object):
(EventTypes.Member, event.sender) for event in events
)
- state_ids = yield self.store.get_state_ids_for_event(
+ state_ids = yield self.state_store.get_state_ids_for_event(
events[0].event_id, state_filter=state_filter
)
@@ -295,10 +298,8 @@ class PaginationHandler(object):
}
if state:
- chunk["state"] = (
- yield self._event_serializer.serialize_events(
- state, time_now, as_client_event=as_client_event
- )
+ chunk["state"] = yield self._event_serializer.serialize_events(
+ state, time_now, as_client_event=as_client_event
)
return chunk
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 8690f69d45..22e0a04da4 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -275,7 +275,7 @@ class BaseProfileHandler(BaseHandler):
ratelimit=False, # Try to hide that these events aren't atomic.
)
except Exception as e:
- logger.warn(
+ logger.warning(
"Failed to update join event for room %s - %s", room_id, str(e)
)
diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py
index 3e4d8c93a4..e3b528d271 100644
--- a/synapse/handlers/read_marker.py
+++ b/synapse/handlers/read_marker.py
@@ -15,8 +15,6 @@
import logging
-from twisted.internet import defer
-
from synapse.util.async_helpers import Linearizer
from ._base import BaseHandler
@@ -32,8 +30,7 @@ class ReadMarkerHandler(BaseHandler):
self.read_marker_linearizer = Linearizer(name="read_marker")
self.notifier = hs.get_notifier()
- @defer.inlineCallbacks
- def received_client_read_marker(self, room_id, user_id, event_id):
+ async def received_client_read_marker(self, room_id, user_id, event_id):
"""Updates the read marker for a given user in a given room if the event ID given
is ahead in the stream relative to the current read marker.
@@ -41,8 +38,8 @@ class ReadMarkerHandler(BaseHandler):
the read marker has changed.
"""
- with (yield self.read_marker_linearizer.queue((room_id, user_id))):
- existing_read_marker = yield self.store.get_account_data_for_room_and_type(
+ with await self.read_marker_linearizer.queue((room_id, user_id)):
+ existing_read_marker = await self.store.get_account_data_for_room_and_type(
user_id, room_id, "m.fully_read"
)
@@ -50,13 +47,13 @@ class ReadMarkerHandler(BaseHandler):
if existing_read_marker:
# Only update if the new marker is ahead in the stream
- should_update = yield self.store.is_event_after(
+ should_update = await self.store.is_event_after(
event_id, existing_read_marker["event_id"]
)
if should_update:
content = {"event_id": event_id}
- max_id = yield self.store.add_account_data_to_room(
+ max_id = await self.store.add_account_data_to_room(
user_id, room_id, "m.fully_read", content
)
self.notifier.on_new_event("account_data_key", max_id, users=[user_id])
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 6854c751a6..9283c039e3 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -18,6 +18,7 @@ from twisted.internet import defer
from synapse.handlers._base import BaseHandler
from synapse.types import ReadReceipt, get_domain_from_id
+from synapse.util.async_helpers import maybe_awaitable
logger = logging.getLogger(__name__)
@@ -36,8 +37,7 @@ class ReceiptsHandler(BaseHandler):
self.clock = self.hs.get_clock()
self.state = hs.get_state_handler()
- @defer.inlineCallbacks
- def _received_remote_receipt(self, origin, content):
+ async def _received_remote_receipt(self, origin, content):
"""Called when we receive an EDU of type m.receipt from a remote HS.
"""
receipts = []
@@ -62,17 +62,16 @@ class ReceiptsHandler(BaseHandler):
)
)
- yield self._handle_new_receipts(receipts)
+ await self._handle_new_receipts(receipts)
- @defer.inlineCallbacks
- def _handle_new_receipts(self, receipts):
+ async def _handle_new_receipts(self, receipts):
"""Takes a list of receipts, stores them and informs the notifier.
"""
min_batch_id = None
max_batch_id = None
for receipt in receipts:
- res = yield self.store.insert_receipt(
+ res = await self.store.insert_receipt(
receipt.room_id,
receipt.receipt_type,
receipt.user_id,
@@ -99,14 +98,15 @@ class ReceiptsHandler(BaseHandler):
self.notifier.on_new_event("receipt_key", max_batch_id, rooms=affected_room_ids)
# Note that the min here shouldn't be relied upon to be accurate.
- yield self.hs.get_pusherpool().on_new_receipts(
- min_batch_id, max_batch_id, affected_room_ids
+ await maybe_awaitable(
+ self.hs.get_pusherpool().on_new_receipts(
+ min_batch_id, max_batch_id, affected_room_ids
+ )
)
return True
- @defer.inlineCallbacks
- def received_client_receipt(self, room_id, receipt_type, user_id, event_id):
+ async def received_client_receipt(self, room_id, receipt_type, user_id, event_id):
"""Called when a client tells us a local user has read up to the given
event_id in the room.
"""
@@ -118,24 +118,11 @@ class ReceiptsHandler(BaseHandler):
data={"ts": int(self.clock.time_msec())},
)
- is_new = yield self._handle_new_receipts([receipt])
+ is_new = await self._handle_new_receipts([receipt])
if not is_new:
return
- yield self.federation.send_read_receipt(receipt)
-
- @defer.inlineCallbacks
- def get_receipts_for_room(self, room_id, to_key):
- """Gets all receipts for a room, upto the given key.
- """
- result = yield self.store.get_linearized_receipts_for_room(
- room_id, to_key=to_key
- )
-
- if not result:
- return []
-
- return result
+ await self.federation.send_read_receipt(receipt)
class ReceiptEventSource(object):
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 53410f120b..cff6b0d375 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -396,8 +396,8 @@ class RegistrationHandler(BaseHandler):
room_id = room_identifier
elif RoomAlias.is_valid(room_identifier):
room_alias = RoomAlias.from_string(room_identifier)
- room_id, remote_room_hosts = (
- yield room_member_handler.lookup_room_alias(room_alias)
+ room_id, remote_room_hosts = yield room_member_handler.lookup_room_alias(
+ room_alias
)
room_id = room_id.to_string()
else:
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 2816bd8f87..e92b2eafd5 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -129,6 +129,7 @@ class RoomCreationHandler(BaseHandler):
old_room_id,
new_version, # args for _upgrade_room
)
+
return ret
@defer.inlineCallbacks
@@ -147,21 +148,22 @@ class RoomCreationHandler(BaseHandler):
# we create and auth the tombstone event before properly creating the new
# room, to check our user has perms in the old room.
- tombstone_event, tombstone_context = (
- yield self.event_creation_handler.create_event(
- requester,
- {
- "type": EventTypes.Tombstone,
- "state_key": "",
- "room_id": old_room_id,
- "sender": user_id,
- "content": {
- "body": "This room has been replaced",
- "replacement_room": new_room_id,
- },
+ (
+ tombstone_event,
+ tombstone_context,
+ ) = yield self.event_creation_handler.create_event(
+ requester,
+ {
+ "type": EventTypes.Tombstone,
+ "state_key": "",
+ "room_id": old_room_id,
+ "sender": user_id,
+ "content": {
+ "body": "This room has been replaced",
+ "replacement_room": new_room_id,
},
- token_id=requester.access_token_id,
- )
+ },
+ token_id=requester.access_token_id,
)
old_room_version = yield self.store.get_room_version(old_room_id)
yield self.auth.check_from_context(
@@ -188,7 +190,12 @@ class RoomCreationHandler(BaseHandler):
requester, old_room_id, new_room_id, old_room_state
)
- # and finally, shut down the PLs in the old room, and update them in the new
+ # Copy over user push rules, tags and migrate room directory state
+ yield self.room_member_handler.transfer_room_state_on_room_upgrade(
+ old_room_id, new_room_id
+ )
+
+ # finally, shut down the PLs in the old room, and update them in the new
# room.
yield self._update_upgraded_room_pls(
requester, old_room_id, new_room_id, old_room_state
@@ -822,6 +829,8 @@ class RoomContextHandler(object):
def __init__(self, hs):
self.hs = hs
self.store = hs.get_datastore()
+ self.storage = hs.get_storage()
+ self.state_store = self.storage.state
@defer.inlineCallbacks
def get_event_context(self, user, room_id, event_id, limit, event_filter):
@@ -848,7 +857,7 @@ class RoomContextHandler(object):
def filter_evts(events):
return filter_events_for_client(
- self.store, user.to_string(), events, is_peeking=is_peeking
+ self.storage, user.to_string(), events, is_peeking=is_peeking
)
event = yield self.store.get_event(
@@ -890,7 +899,7 @@ class RoomContextHandler(object):
# first? Shouldn't we be consistent with /sync?
# https://github.com/matrix-org/matrix-doc/issues/687
- state = yield self.store.get_state_for_events(
+ state = yield self.state_store.get_state_for_events(
[last_event_id], state_filter=state_filter
)
results["state"] = list(state[last_event_id].values())
@@ -922,7 +931,7 @@ class RoomEventSource(object):
from_token = RoomStreamToken.parse(from_key)
if from_token.topological:
- logger.warn("Stream has topological part!!!! %r", from_key)
+ logger.warning("Stream has topological part!!!! %r", from_key)
from_key = "s%s" % (from_token.stream,)
app_service = self.store.get_app_service_by_user_id(user.to_string())
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 380e2fad5e..06d09c2947 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -203,10 +203,6 @@ class RoomMemberHandler(object):
prev_member_event = yield self.store.get_event(prev_member_event_id)
newly_joined = prev_member_event.membership != Membership.JOIN
if newly_joined:
- # Copy over user state if we're joining an upgraded room
- yield self.copy_user_state_if_room_upgrade(
- room_id, requester.user.to_string()
- )
yield self._user_joined_room(target, room_id)
elif event.membership == Membership.LEAVE:
if prev_member_event_id:
@@ -455,11 +451,6 @@ class RoomMemberHandler(object):
requester, remote_room_hosts, room_id, target, content
)
- # Copy over user state if this is a join on an remote upgraded room
- yield self.copy_user_state_if_room_upgrade(
- room_id, requester.user.to_string()
- )
-
return remote_join_response
elif effective_membership_state == Membership.LEAVE:
@@ -498,36 +489,72 @@ class RoomMemberHandler(object):
return res
@defer.inlineCallbacks
- def copy_user_state_if_room_upgrade(self, new_room_id, user_id):
- """Copy user-specific information when they join a new room if that new room is the
+ def transfer_room_state_on_room_upgrade(self, old_room_id, room_id):
+ """Upon our server becoming aware of an upgraded room, either by upgrading a room
+ ourselves or joining one, we can transfer over information from the previous room.
+
+ Copies user state (tags/push rules) for every local user that was in the old room, as
+ well as migrating the room directory state.
+
+ Args:
+ old_room_id (str): The ID of the old room
+
+ room_id (str): The ID of the new room
+
+ Returns:
+ Deferred
+ """
+ # Find all local users that were in the old room and copy over each user's state
+ users = yield self.store.get_users_in_room(old_room_id)
+ yield self.copy_user_state_on_room_upgrade(old_room_id, room_id, users)
+
+ # Add new room to the room directory if the old room was there
+ # Remove old room from the room directory
+ old_room = yield self.store.get_room(old_room_id)
+ if old_room and old_room["is_public"]:
+ yield self.store.set_room_is_public(old_room_id, False)
+ yield self.store.set_room_is_public(room_id, True)
+
+ @defer.inlineCallbacks
+ def copy_user_state_on_room_upgrade(self, old_room_id, new_room_id, user_ids):
+ """Copy user-specific information when they join a new room when that new room is the
result of a room upgrade
Args:
- new_room_id (str): The ID of the room the user is joining
- user_id (str): The ID of the user
+ old_room_id (str): The ID of upgraded room
+ new_room_id (str): The ID of the new room
+ user_ids (Iterable[str]): User IDs to copy state for
Returns:
Deferred
"""
- # Check if the new room is an upgraded room
- predecessor = yield self.store.get_room_predecessor(new_room_id)
- if not predecessor:
- return
logger.debug(
- "Found predecessor for %s: %s. Copying over room tags and push " "rules",
+ "Copying over room tags and push rules from %s to %s for users %s",
+ old_room_id,
new_room_id,
- predecessor,
+ user_ids,
)
- # It is an upgraded room. Copy over old tags
- yield self.copy_room_tags_and_direct_to_room(
- predecessor["room_id"], new_room_id, user_id
- )
- # Copy over push rules
- yield self.store.copy_push_rules_from_room_to_room_for_user(
- predecessor["room_id"], new_room_id, user_id
- )
+ for user_id in user_ids:
+ try:
+ # It is an upgraded room. Copy over old tags
+ yield self.copy_room_tags_and_direct_to_room(
+ old_room_id, new_room_id, user_id
+ )
+ # Copy over push rules
+ yield self.store.copy_push_rules_from_room_to_room_for_user(
+ old_room_id, new_room_id, user_id
+ )
+ except Exception:
+ logger.exception(
+ "Error copying tags and/or push rules from rooms %s to %s for user %s. "
+ "Skipping...",
+ old_room_id,
+ new_room_id,
+ user_id,
+ )
+ continue
@defer.inlineCallbacks
def send_membership_event(self, requester, event, context, ratelimit=True):
@@ -759,22 +786,25 @@ class RoomMemberHandler(object):
if room_avatar_event:
room_avatar_url = room_avatar_event.content.get("url", "")
- token, public_keys, fallback_public_key, display_name = (
- yield self.identity_handler.ask_id_server_for_third_party_invite(
- requester=requester,
- id_server=id_server,
- medium=medium,
- address=address,
- room_id=room_id,
- inviter_user_id=user.to_string(),
- room_alias=canonical_room_alias,
- room_avatar_url=room_avatar_url,
- room_join_rules=room_join_rules,
- room_name=room_name,
- inviter_display_name=inviter_display_name,
- inviter_avatar_url=inviter_avatar_url,
- id_access_token=id_access_token,
- )
+ (
+ token,
+ public_keys,
+ fallback_public_key,
+ display_name,
+ ) = yield self.identity_handler.ask_id_server_for_third_party_invite(
+ requester=requester,
+ id_server=id_server,
+ medium=medium,
+ address=address,
+ room_id=room_id,
+ inviter_user_id=user.to_string(),
+ room_alias=canonical_room_alias,
+ room_avatar_url=room_avatar_url,
+ room_join_rules=room_join_rules,
+ room_name=room_name,
+ inviter_display_name=inviter_display_name,
+ inviter_avatar_url=inviter_avatar_url,
+ id_access_token=id_access_token,
)
yield self.event_creation_handler.create_and_send_nonmember_event(
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index cd5e90bacb..56ed262a1f 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -35,6 +35,8 @@ class SearchHandler(BaseHandler):
def __init__(self, hs):
super(SearchHandler, self).__init__(hs)
self._event_serializer = hs.get_event_client_serializer()
+ self.storage = hs.get_storage()
+ self.state_store = self.storage.state
@defer.inlineCallbacks
def get_old_rooms_from_upgraded_room(self, room_id):
@@ -221,7 +223,7 @@ class SearchHandler(BaseHandler):
filtered_events = search_filter.filter([r["event"] for r in results])
events = yield filter_events_for_client(
- self.store, user.to_string(), filtered_events
+ self.storage, user.to_string(), filtered_events
)
events.sort(key=lambda e: -rank_map[e.event_id])
@@ -271,7 +273,7 @@ class SearchHandler(BaseHandler):
filtered_events = search_filter.filter([r["event"] for r in results])
events = yield filter_events_for_client(
- self.store, user.to_string(), filtered_events
+ self.storage, user.to_string(), filtered_events
)
room_events.extend(events)
@@ -340,11 +342,11 @@ class SearchHandler(BaseHandler):
)
res["events_before"] = yield filter_events_for_client(
- self.store, user.to_string(), res["events_before"]
+ self.storage, user.to_string(), res["events_before"]
)
res["events_after"] = yield filter_events_for_client(
- self.store, user.to_string(), res["events_after"]
+ self.storage, user.to_string(), res["events_after"]
)
res["start"] = now_token.copy_and_replace(
@@ -372,7 +374,7 @@ class SearchHandler(BaseHandler):
[(EventTypes.Member, sender) for sender in senders]
)
- state = yield self.store.get_state_for_event(
+ state = yield self.state_store.get_state_for_event(
last_event_id, state_filter
)
@@ -394,15 +396,11 @@ class SearchHandler(BaseHandler):
time_now = self.clock.time_msec()
for context in contexts.values():
- context["events_before"] = (
- yield self._event_serializer.serialize_events(
- context["events_before"], time_now
- )
+ context["events_before"] = yield self._event_serializer.serialize_events(
+ context["events_before"], time_now
)
- context["events_after"] = (
- yield self._event_serializer.serialize_events(
- context["events_after"], time_now
- )
+ context["events_after"] = yield self._event_serializer.serialize_events(
+ context["events_after"], time_now
)
state_results = {}
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
index 26bc276692..7f7d56390e 100644
--- a/synapse/handlers/stats.py
+++ b/synapse/handlers/stats.py
@@ -108,7 +108,10 @@ class StatsHandler(StateDeltasHandler):
user_deltas = {}
# Then count deltas for total_events and total_event_bytes.
- room_count, user_count = yield self.store.get_changes_room_total_events_and_bytes(
+ (
+ room_count,
+ user_count,
+ ) = yield self.store.get_changes_room_total_events_and_bytes(
self.pos, max_pos
)
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index d99160e9d7..b536d410e5 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -230,6 +230,8 @@ class SyncHandler(object):
self.response_cache = ResponseCache(hs, "sync")
self.state = hs.get_state_handler()
self.auth = hs.get_auth()
+ self.storage = hs.get_storage()
+ self.state_store = self.storage.state
# ExpiringCache((User, Device)) -> LruCache(state_key => event_id)
self.lazy_loaded_members_cache = ExpiringCache(
@@ -417,7 +419,7 @@ class SyncHandler(object):
current_state_ids = frozenset(itervalues(current_state_ids))
recents = yield filter_events_for_client(
- self.store,
+ self.storage,
sync_config.user.to_string(),
recents,
always_include_ids=current_state_ids,
@@ -470,7 +472,7 @@ class SyncHandler(object):
current_state_ids = frozenset(itervalues(current_state_ids))
loaded_recents = yield filter_events_for_client(
- self.store,
+ self.storage,
sync_config.user.to_string(),
loaded_recents,
always_include_ids=current_state_ids,
@@ -509,7 +511,7 @@ class SyncHandler(object):
Returns:
A Deferred map from ((type, state_key)->Event)
"""
- state_ids = yield self.store.get_state_ids_for_event(
+ state_ids = yield self.state_store.get_state_ids_for_event(
event.event_id, state_filter=state_filter
)
if event.is_state():
@@ -580,7 +582,7 @@ class SyncHandler(object):
return None
last_event = last_events[-1]
- state_ids = yield self.store.get_state_ids_for_event(
+ state_ids = yield self.state_store.get_state_ids_for_event(
last_event.event_id,
state_filter=StateFilter.from_types(
[(EventTypes.Name, ""), (EventTypes.CanonicalAlias, "")]
@@ -757,11 +759,11 @@ class SyncHandler(object):
if full_state:
if batch:
- current_state_ids = yield self.store.get_state_ids_for_event(
+ current_state_ids = yield self.state_store.get_state_ids_for_event(
batch.events[-1].event_id, state_filter=state_filter
)
- state_ids = yield self.store.get_state_ids_for_event(
+ state_ids = yield self.state_store.get_state_ids_for_event(
batch.events[0].event_id, state_filter=state_filter
)
@@ -781,7 +783,7 @@ class SyncHandler(object):
)
elif batch.limited:
if batch:
- state_at_timeline_start = yield self.store.get_state_ids_for_event(
+ state_at_timeline_start = yield self.state_store.get_state_ids_for_event(
batch.events[0].event_id, state_filter=state_filter
)
else:
@@ -810,7 +812,7 @@ class SyncHandler(object):
)
if batch:
- current_state_ids = yield self.store.get_state_ids_for_event(
+ current_state_ids = yield self.state_store.get_state_ids_for_event(
batch.events[-1].event_id, state_filter=state_filter
)
else:
@@ -841,7 +843,7 @@ class SyncHandler(object):
# So we fish out all the member events corresponding to the
# timeline here, and then dedupe any redundant ones below.
- state_ids = yield self.store.get_state_ids_for_event(
+ state_ids = yield self.state_store.get_state_ids_for_event(
batch.events[0].event_id,
# we only want members!
state_filter=StateFilter.from_types(
@@ -1204,10 +1206,11 @@ class SyncHandler(object):
since_token = sync_result_builder.since_token
if since_token and not sync_result_builder.full_state:
- account_data, account_data_by_room = (
- yield self.store.get_updated_account_data_for_user(
- user_id, since_token.account_data_key
- )
+ (
+ account_data,
+ account_data_by_room,
+ ) = yield self.store.get_updated_account_data_for_user(
+ user_id, since_token.account_data_key
)
push_rules_changed = yield self.store.have_push_rules_changed_for_user(
@@ -1219,9 +1222,10 @@ class SyncHandler(object):
sync_config.user
)
else:
- account_data, account_data_by_room = (
- yield self.store.get_account_data_for_user(sync_config.user.to_string())
- )
+ (
+ account_data,
+ account_data_by_room,
+ ) = yield self.store.get_account_data_for_user(sync_config.user.to_string())
account_data["m.push_rules"] = yield self.push_rules_for_user(
sync_config.user
diff --git a/synapse/handlers/ui_auth/checkers.py b/synapse/handlers/ui_auth/checkers.py
index 29aa1e5aaf..8363d887a9 100644
--- a/synapse/handlers/ui_auth/checkers.py
+++ b/synapse/handlers/ui_auth/checkers.py
@@ -81,7 +81,7 @@ class RecaptchaAuthChecker(UserInteractiveAuthChecker):
def __init__(self, hs):
super().__init__(hs)
self._enabled = bool(hs.config.recaptcha_private_key)
- self._http_client = hs.get_simple_http_client()
+ self._http_client = hs.get_proxied_http_client()
self._url = hs.config.recaptcha_siteverify_api
self._secret = hs.config.recaptcha_private_key
|