diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index fe62f78e67..f7d9fd621e 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -15,8 +15,6 @@
import logging
-from six import itervalues
-
from prometheus_client import Counter
from twisted.internet import defer
@@ -116,6 +114,12 @@ class ApplicationServicesHandler(object):
for service in services:
self.scheduler.submit_event_for_as(service, event)
+ now = self.clock.time_msec()
+ ts = yield self.store.get_received_ts(event.event_id)
+ synapse.metrics.event_processing_lag_by_event.labels(
+ "appservice_sender"
+ ).observe(now - ts)
+
@defer.inlineCallbacks
def handle_room_events(events):
for event in events:
@@ -125,7 +129,7 @@ class ApplicationServicesHandler(object):
defer.gatherResults(
[
run_in_background(handle_room_events, evs)
- for evs in itervalues(events_by_room)
+ for evs in events_by_room.values()
],
consumeErrors=True,
)
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index bb3b43d5ae..c3f86e7414 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -297,7 +297,7 @@ class AuthHandler(BaseHandler):
# Convert the URI and method to strings.
uri = request.uri.decode("utf-8")
- method = request.uri.decode("utf-8")
+ method = request.method.decode("utf-8")
# If there's no session ID, create a new session.
if not sid:
diff --git a/synapse/handlers/cas_handler.py b/synapse/handlers/cas_handler.py
index 64aaa1335c..76f213723a 100644
--- a/synapse/handlers/cas_handler.py
+++ b/synapse/handlers/cas_handler.py
@@ -14,11 +14,10 @@
# limitations under the License.
import logging
+import urllib
import xml.etree.ElementTree as ET
from typing import Dict, Optional, Tuple
-from six.moves import urllib
-
from twisted.web.client import PartialDownloadError
from synapse.api.errors import Codes, LoginError
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 230d170258..31346b56c3 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -17,8 +17,6 @@
import logging
from typing import Any, Dict, Optional
-from six import iteritems, itervalues
-
from twisted.internet import defer
from synapse.api import errors
@@ -159,7 +157,7 @@ class DeviceWorkerHandler(BaseHandler):
# The user may have left the room
# TODO: Check if they actually did or if we were just invited.
if room_id not in room_ids:
- for key, event_id in iteritems(current_state_ids):
+ for key, event_id in current_state_ids.items():
etype, state_key = key
if etype != EventTypes.Member:
continue
@@ -182,7 +180,7 @@ class DeviceWorkerHandler(BaseHandler):
log_kv(
{"event": "encountered empty previous state", "room_id": room_id}
)
- for key, event_id in iteritems(current_state_ids):
+ for key, event_id in current_state_ids.items():
etype, state_key = key
if etype != EventTypes.Member:
continue
@@ -198,10 +196,10 @@ class DeviceWorkerHandler(BaseHandler):
# Check if we've joined the room? If so we just blindly add all the users to
# the "possibly changed" users.
- for state_dict in itervalues(prev_state_ids):
+ for state_dict in prev_state_ids.values():
member_event = state_dict.get((EventTypes.Member, user_id), None)
if not member_event or member_event != current_member_id:
- for key, event_id in iteritems(current_state_ids):
+ for key, event_id in current_state_ids.items():
etype, state_key = key
if etype != EventTypes.Member:
continue
@@ -211,14 +209,14 @@ class DeviceWorkerHandler(BaseHandler):
# If there has been any change in membership, include them in the
# possibly changed list. We'll check if they are joined below,
# and we're not toooo worried about spuriously adding users.
- for key, event_id in iteritems(current_state_ids):
+ for key, event_id in current_state_ids.items():
etype, state_key = key
if etype != EventTypes.Member:
continue
# check if this member has changed since any of the extremities
# at the stream_ordering, and add them to the list if so.
- for state_dict in itervalues(prev_state_ids):
+ for state_dict in prev_state_ids.values():
prev_event_id = state_dict.get(key, None)
if not prev_event_id or prev_event_id != event_id:
if state_key != user_id:
@@ -693,6 +691,7 @@ class DeviceListUpdater(object):
return False
+ @trace
@defer.inlineCallbacks
def _maybe_retry_device_resync(self):
"""Retry to resync device lists that are out of sync, except if another retry is
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index 05c4b3eec0..610b08d00b 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -18,8 +18,6 @@ from typing import Any, Dict
from canonicaljson import json
-from twisted.internet import defer
-
from synapse.api.errors import SynapseError
from synapse.logging.context import run_in_background
from synapse.logging.opentracing import (
@@ -51,8 +49,7 @@ class DeviceMessageHandler(object):
self._device_list_updater = hs.get_device_handler().device_list_updater
- @defer.inlineCallbacks
- def on_direct_to_device_edu(self, origin, content):
+ async def on_direct_to_device_edu(self, origin, content):
local_messages = {}
sender_user_id = content["sender"]
if origin != get_domain_from_id(sender_user_id):
@@ -82,11 +79,11 @@ class DeviceMessageHandler(object):
}
local_messages[user_id] = messages_by_device
- yield self._check_for_unknown_devices(
+ await self._check_for_unknown_devices(
message_type, sender_user_id, by_device
)
- stream_id = yield self.store.add_messages_from_remote_to_device_inbox(
+ stream_id = await self.store.add_messages_from_remote_to_device_inbox(
origin, message_id, local_messages
)
@@ -94,14 +91,13 @@ class DeviceMessageHandler(object):
"to_device_key", stream_id, users=local_messages.keys()
)
- @defer.inlineCallbacks
- def _check_for_unknown_devices(
+ async def _check_for_unknown_devices(
self,
message_type: str,
sender_user_id: str,
by_device: Dict[str, Dict[str, Any]],
):
- """Checks inbound device messages for unkown remote devices, and if
+ """Checks inbound device messages for unknown remote devices, and if
found marks the remote cache for the user as stale.
"""
@@ -115,7 +111,7 @@ class DeviceMessageHandler(object):
requesting_device_ids.add(device_id)
# Check if we are tracking the devices of the remote user.
- room_ids = yield self.store.get_rooms_for_user(sender_user_id)
+ room_ids = await self.store.get_rooms_for_user(sender_user_id)
if not room_ids:
logger.info(
"Received device message from remote device we don't"
@@ -127,7 +123,7 @@ class DeviceMessageHandler(object):
# If we are tracking check that we know about the sending
# devices.
- cached_devices = yield self.store.get_cached_devices_for_user(sender_user_id)
+ cached_devices = await self.store.get_cached_devices_for_user(sender_user_id)
unknown_devices = requesting_device_ids - set(cached_devices)
if unknown_devices:
@@ -136,15 +132,14 @@ class DeviceMessageHandler(object):
sender_user_id,
unknown_devices,
)
- yield self.store.mark_remote_user_device_cache_as_stale(sender_user_id)
+ await self.store.mark_remote_user_device_cache_as_stale(sender_user_id)
# Immediately attempt a resync in the background
run_in_background(
self._device_list_updater.user_device_resync, sender_user_id
)
- @defer.inlineCallbacks
- def send_device_message(self, sender_user_id, message_type, messages):
+ async def send_device_message(self, sender_user_id, message_type, messages):
set_tag("number_of_messages", len(messages))
set_tag("sender", sender_user_id)
local_messages = {}
@@ -183,7 +178,7 @@ class DeviceMessageHandler(object):
}
log_kv({"local_messages": local_messages})
- stream_id = yield self.store.add_messages_to_device_inbox(
+ stream_id = await self.store.add_messages_to_device_inbox(
local_messages, remote_edu_contents
)
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index f2f16b1e43..79a2df6201 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -17,8 +17,6 @@ import logging
import string
from typing import Iterable, List, Optional
-from twisted.internet import defer
-
from synapse.api.constants import MAX_ALIAS_LENGTH, EventTypes
from synapse.api.errors import (
AuthError,
@@ -55,8 +53,7 @@ class DirectoryHandler(BaseHandler):
self.spam_checker = hs.get_spam_checker()
- @defer.inlineCallbacks
- def _create_association(
+ async def _create_association(
self,
room_alias: RoomAlias,
room_id: str,
@@ -76,13 +73,13 @@ class DirectoryHandler(BaseHandler):
# TODO(erikj): Add transactions.
# TODO(erikj): Check if there is a current association.
if not servers:
- users = yield self.state.get_current_users_in_room(room_id)
+ users = await self.state.get_current_users_in_room(room_id)
servers = {get_domain_from_id(u) for u in users}
if not servers:
raise SynapseError(400, "Failed to get server list")
- yield self.store.create_room_alias_association(
+ await self.store.create_room_alias_association(
room_alias, room_id, servers, creator=creator
)
@@ -93,7 +90,7 @@ class DirectoryHandler(BaseHandler):
room_id: str,
servers: Optional[List[str]] = None,
check_membership: bool = True,
- ):
+ ) -> None:
"""Attempt to create a new alias
Args:
@@ -103,9 +100,6 @@ class DirectoryHandler(BaseHandler):
servers: Iterable of servers that others servers should try and join via
check_membership: Whether to check if the user is in the room
before the alias can be set (if the server's config requires it).
-
- Returns:
- Deferred
"""
user_id = requester.user.to_string()
@@ -148,7 +142,7 @@ class DirectoryHandler(BaseHandler):
# per alias creation rule?
raise SynapseError(403, "Not allowed to create alias")
- can_create = await self.can_modify_alias(room_alias, user_id=user_id)
+ can_create = self.can_modify_alias(room_alias, user_id=user_id)
if not can_create:
raise AuthError(
400,
@@ -158,7 +152,9 @@ class DirectoryHandler(BaseHandler):
await self._create_association(room_alias, room_id, servers, creator=user_id)
- async def delete_association(self, requester: Requester, room_alias: RoomAlias):
+ async def delete_association(
+ self, requester: Requester, room_alias: RoomAlias
+ ) -> str:
"""Remove an alias from the directory
(this is only meant for human users; AS users should call
@@ -169,7 +165,7 @@ class DirectoryHandler(BaseHandler):
room_alias
Returns:
- Deferred[unicode]: room id that the alias used to point to
+ room id that the alias used to point to
Raises:
NotFoundError: if the alias doesn't exist
@@ -191,7 +187,7 @@ class DirectoryHandler(BaseHandler):
if not can_delete:
raise AuthError(403, "You don't have permission to delete the alias.")
- can_delete = await self.can_modify_alias(room_alias, user_id=user_id)
+ can_delete = self.can_modify_alias(room_alias, user_id=user_id)
if not can_delete:
raise SynapseError(
400,
@@ -208,8 +204,7 @@ class DirectoryHandler(BaseHandler):
return room_id
- @defer.inlineCallbacks
- def delete_appservice_association(
+ async def delete_appservice_association(
self, service: ApplicationService, room_alias: RoomAlias
):
if not service.is_interested_in_alias(room_alias.to_string()):
@@ -218,29 +213,27 @@ class DirectoryHandler(BaseHandler):
"This application service has not reserved this kind of alias",
errcode=Codes.EXCLUSIVE,
)
- yield self._delete_association(room_alias)
+ await self._delete_association(room_alias)
- @defer.inlineCallbacks
- def _delete_association(self, room_alias: RoomAlias):
+ async def _delete_association(self, room_alias: RoomAlias):
if not self.hs.is_mine(room_alias):
raise SynapseError(400, "Room alias must be local")
- room_id = yield self.store.delete_room_alias(room_alias)
+ room_id = await self.store.delete_room_alias(room_alias)
return room_id
- @defer.inlineCallbacks
- def get_association(self, room_alias: RoomAlias):
+ async def get_association(self, room_alias: RoomAlias):
room_id = None
if self.hs.is_mine(room_alias):
- result = yield self.get_association_from_room_alias(room_alias)
+ result = await self.get_association_from_room_alias(room_alias)
if result:
room_id = result.room_id
servers = result.servers
else:
try:
- result = yield self.federation.make_query(
+ result = await self.federation.make_query(
destination=room_alias.domain,
query_type="directory",
args={"room_alias": room_alias.to_string()},
@@ -265,7 +258,7 @@ class DirectoryHandler(BaseHandler):
Codes.NOT_FOUND,
)
- users = yield self.state.get_current_users_in_room(room_id)
+ users = await self.state.get_current_users_in_room(room_id)
extra_servers = {get_domain_from_id(u) for u in users}
servers = set(extra_servers) | set(servers)
@@ -277,13 +270,12 @@ class DirectoryHandler(BaseHandler):
return {"room_id": room_id, "servers": servers}
- @defer.inlineCallbacks
- def on_directory_query(self, args):
+ async def on_directory_query(self, args):
room_alias = RoomAlias.from_string(args["room_alias"])
if not self.hs.is_mine(room_alias):
raise SynapseError(400, "Room Alias is not hosted on this homeserver")
- result = yield self.get_association_from_room_alias(room_alias)
+ result = await self.get_association_from_room_alias(room_alias)
if result is not None:
return {"room_id": result.room_id, "servers": result.servers}
@@ -344,16 +336,15 @@ class DirectoryHandler(BaseHandler):
ratelimit=False,
)
- @defer.inlineCallbacks
- def get_association_from_room_alias(self, room_alias: RoomAlias):
- result = yield self.store.get_association_from_room_alias(room_alias)
+ async def get_association_from_room_alias(self, room_alias: RoomAlias):
+ result = await self.store.get_association_from_room_alias(room_alias)
if not result:
# Query AS to see if it exists
as_handler = self.appservice_handler
- result = yield as_handler.query_room_alias_exists(room_alias)
+ result = await as_handler.query_room_alias_exists(room_alias)
return result
- def can_modify_alias(self, alias: RoomAlias, user_id: Optional[str] = None):
+ def can_modify_alias(self, alias: RoomAlias, user_id: Optional[str] = None) -> bool:
# Any application service "interested" in an alias they are regexing on
# can modify the alias.
# Users can only modify the alias if ALL the interested services have
@@ -366,12 +357,12 @@ class DirectoryHandler(BaseHandler):
for service in interested_services:
if user_id == service.sender:
# this user IS the app service so they can do whatever they like
- return defer.succeed(True)
+ return True
elif service.is_exclusive_alias(alias.to_string()):
# another service has an exclusive lock on this alias.
- return defer.succeed(False)
+ return False
# either no interested services, or no service with an exclusive lock
- return defer.succeed(True)
+ return True
async def _user_can_delete_alias(self, alias: RoomAlias, user_id: str):
"""Determine whether a user can delete an alias.
@@ -459,8 +450,7 @@ class DirectoryHandler(BaseHandler):
await self.store.set_room_is_public(room_id, making_public)
- @defer.inlineCallbacks
- def edit_published_appservice_room_list(
+ async def edit_published_appservice_room_list(
self, appservice_id: str, network_id: str, room_id: str, visibility: str
):
"""Add or remove a room from the appservice/network specific public
@@ -475,7 +465,7 @@ class DirectoryHandler(BaseHandler):
if visibility not in ["public", "private"]:
raise SynapseError(400, "Invalid visibility setting")
- yield self.store.set_room_is_public_appservice(
+ await self.store.set_room_is_public_appservice(
room_id, appservice_id, network_id, visibility == "public"
)
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 774a252619..a7e60cbc26 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -17,8 +17,6 @@
import logging
-from six import iteritems
-
import attr
from canonicaljson import encode_canonical_json, json
from signedjson.key import decode_verify_key_bytes
@@ -135,7 +133,7 @@ class E2eKeysHandler(object):
remote_queries_not_in_cache = {}
if remote_queries:
query_list = []
- for user_id, device_ids in iteritems(remote_queries):
+ for user_id, device_ids in remote_queries.items():
if device_ids:
query_list.extend((user_id, device_id) for device_id in device_ids)
else:
@@ -145,9 +143,9 @@ class E2eKeysHandler(object):
user_ids_not_in_cache,
remote_results,
) = yield self.store.get_user_devices_from_cache(query_list)
- for user_id, devices in iteritems(remote_results):
+ for user_id, devices in remote_results.items():
user_devices = results.setdefault(user_id, {})
- for device_id, device in iteritems(devices):
+ for device_id, device in devices.items():
keys = device.get("keys", None)
device_display_name = device.get("device_display_name", None)
if keys:
@@ -446,9 +444,9 @@ class E2eKeysHandler(object):
",".join(
(
"%s for %s:%s" % (key_id, user_id, device_id)
- for user_id, user_keys in iteritems(json_result)
- for device_id, device_keys in iteritems(user_keys)
- for key_id, _ in iteritems(device_keys)
+ for user_id, user_keys in json_result.items()
+ for device_id, device_keys in user_keys.items()
+ for key_id, _ in device_keys.items()
)
),
)
diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py
index 9abaf13b8f..f55470a707 100644
--- a/synapse/handlers/e2e_room_keys.py
+++ b/synapse/handlers/e2e_room_keys.py
@@ -16,8 +16,6 @@
import logging
-from six import iteritems
-
from twisted.internet import defer
from synapse.api.errors import (
@@ -205,8 +203,8 @@ class E2eRoomKeysHandler(object):
)
to_insert = [] # batch the inserts together
changed = False # if anything has changed, we need to update the etag
- for room_id, room in iteritems(room_keys["rooms"]):
- for session_id, room_key in iteritems(room["sessions"]):
+ for room_id, room in room_keys["rooms"].items():
+ for session_id, room_key in room["sessions"].items():
if not isinstance(room_key["is_verified"], bool):
msg = (
"is_verified must be a boolean in keys for session %s in"
@@ -351,6 +349,7 @@ class E2eRoomKeysHandler(object):
raise
res["count"] = yield self.store.count_e2e_room_keys(user_id, res["version"])
+ res["etag"] = str(res["etag"])
return res
@trace
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index d0b62f4cf2..4dbd8e1d98 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -19,12 +19,9 @@
import itertools
import logging
+from http import HTTPStatus
from typing import Dict, Iterable, List, Optional, Sequence, Tuple
-import six
-from six import iteritems, itervalues
-from six.moves import http_client, zip
-
import attr
from signedjson.key import decode_verify_key_bytes
from signedjson.sign import verify_signed_json
@@ -33,7 +30,12 @@ from unpaddedbase64 import decode_base64
from twisted.internet import defer
from synapse import event_auth
-from synapse.api.constants import EventTypes, Membership, RejectedReason
+from synapse.api.constants import (
+ EventTypes,
+ Membership,
+ RejectedReason,
+ RoomEncryptionAlgorithms,
+)
from synapse.api.errors import (
AuthError,
CodeMessageException,
@@ -374,6 +376,7 @@ class FederationHandler(BaseHandler):
room_version = await self.store.get_room_version_id(room_id)
state_map = await resolve_events_with_store(
+ self.clock,
room_id,
room_version,
state_maps,
@@ -393,7 +396,7 @@ class FederationHandler(BaseHandler):
)
event_map.update(evs)
- state = [event_map[e] for e in six.itervalues(state_map)]
+ state = [event_map[e] for e in state_map.values()]
except Exception:
logger.warning(
"[%s %s] Error attempting to resolve state at missing "
@@ -742,7 +745,10 @@ class FederationHandler(BaseHandler):
if device:
keys = device.get("keys", {}).get("keys", {})
- if event.content.get("algorithm") == "m.megolm.v1.aes-sha2":
+ if (
+ event.content.get("algorithm")
+ == RoomEncryptionAlgorithms.MEGOLM_V1_AES_SHA2
+ ):
# For this algorithm we expect a curve25519 key.
key_name = "curve25519:%s" % (device_id,)
current_keys = [keys.get(key_name)]
@@ -1001,7 +1007,7 @@ class FederationHandler(BaseHandler):
"""
joined_users = [
(state_key, int(event.depth))
- for (e_type, state_key), event in iteritems(state)
+ for (e_type, state_key), event in state.items()
if e_type == EventTypes.Member and event.membership == Membership.JOIN
]
@@ -1091,16 +1097,16 @@ class FederationHandler(BaseHandler):
states = dict(zip(event_ids, [s.state for s in states]))
state_map = await self.store.get_events(
- [e_id for ids in itervalues(states) for e_id in itervalues(ids)],
+ [e_id for ids in states.values() for e_id in ids.values()],
get_prev_content=False,
)
states = {
key: {
k: state_map[e_id]
- for k, e_id in iteritems(state_dict)
+ for k, e_id in state_dict.items()
if e_id in state_map
}
- for key, state_dict in iteritems(states)
+ for key, state_dict in states.items()
}
for e_id, _ in sorted_extremeties_tuple:
@@ -1188,7 +1194,7 @@ class FederationHandler(BaseHandler):
ev.event_id,
len(ev.prev_event_ids()),
)
- raise SynapseError(http_client.BAD_REQUEST, "Too many prev_events")
+ raise SynapseError(HTTPStatus.BAD_REQUEST, "Too many prev_events")
if len(ev.auth_event_ids()) > 10:
logger.warning(
@@ -1196,7 +1202,7 @@ class FederationHandler(BaseHandler):
ev.event_id,
len(ev.auth_event_ids()),
)
- raise SynapseError(http_client.BAD_REQUEST, "Too many auth_events")
+ raise SynapseError(HTTPStatus.BAD_REQUEST, "Too many auth_events")
async def send_invite(self, target_host, event):
""" Sends the invite to the remote server for signing.
@@ -1539,7 +1545,7 @@ class FederationHandler(BaseHandler):
# block any attempts to invite the server notices mxid
if event.state_key == self._server_notices_mxid:
- raise SynapseError(http_client.FORBIDDEN, "Cannot invite this user")
+ raise SynapseError(HTTPStatus.FORBIDDEN, "Cannot invite this user")
# keep a record of the room version, if we don't yet know it.
# (this may get overwritten if we later get a different room version in a
@@ -1725,7 +1731,7 @@ class FederationHandler(BaseHandler):
state_groups = await self.state_store.get_state_groups(room_id, [event_id])
if state_groups:
- _, state = list(iteritems(state_groups)).pop()
+ _, state = list(state_groups.items()).pop()
results = {(e.type, e.state_key): e for e in state}
if event.is_state():
@@ -2088,7 +2094,7 @@ class FederationHandler(BaseHandler):
room_version, state_sets, event
)
current_state_ids = {
- k: e.event_id for k, e in iteritems(current_state_ids)
+ k: e.event_id for k, e in current_state_ids.items()
}
else:
current_state_ids = await self.state_handler.get_current_state_ids(
@@ -2104,7 +2110,7 @@ class FederationHandler(BaseHandler):
# Now check if event pass auth against said current state
auth_types = auth_types_for_event(event)
current_state_ids = [
- e for k, e in iteritems(current_state_ids) if k in auth_types
+ e for k, e in current_state_ids.items() if k in auth_types
]
current_auth_events = await self.store.get_events(current_state_ids)
@@ -2420,7 +2426,7 @@ class FederationHandler(BaseHandler):
else:
event_key = None
state_updates = {
- k: a.event_id for k, a in iteritems(auth_events) if k != event_key
+ k: a.event_id for k, a in auth_events.items() if k != event_key
}
current_state_ids = await context.get_current_state_ids()
@@ -2431,7 +2437,7 @@ class FederationHandler(BaseHandler):
prev_state_ids = await context.get_prev_state_ids()
prev_state_ids = dict(prev_state_ids)
- prev_state_ids.update({k: a.event_id for k, a in iteritems(auth_events)})
+ prev_state_ids.update({k: a.event_id for k, a in auth_events.items()})
# create a new state group as a delta from the existing one.
prev_group = context.state_group
diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py
index ebe8d25bd8..7cb106e365 100644
--- a/synapse/handlers/groups_local.py
+++ b/synapse/handlers/groups_local.py
@@ -16,8 +16,6 @@
import logging
-from six import iteritems
-
from synapse.api.errors import HttpResponseException, RequestSendFailed, SynapseError
from synapse.types import get_domain_from_id
@@ -227,7 +225,7 @@ class GroupsLocalWorkerHandler(object):
results = {}
failed_results = []
- for destination, dest_user_ids in iteritems(destinations):
+ for destination, dest_user_ids in destinations.items():
try:
r = await self.transport_client.bulk_get_publicised_groups(
destination, list(dest_user_ids)
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 649ca1f08a..70b5cb0f89 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -17,8 +17,6 @@
import logging
from typing import Optional, Tuple
-from six import iteritems, itervalues, string_types
-
from canonicaljson import encode_canonical_json, json
from twisted.internet import defer
@@ -233,7 +231,7 @@ class MessageHandler(object):
# If this is an AS, double check that they are allowed to see the members.
# This can either be because the AS user is in the room or because there
# is a user in the room that the AS is "interested in"
- if requester.app_service and user_id not in users_with_profile:
+ if False and requester.app_service and user_id not in users_with_profile:
for uid in users_with_profile:
if requester.app_service.is_interested_in_user(uid):
break
@@ -246,7 +244,7 @@ class MessageHandler(object):
"avatar_url": profile.avatar_url,
"display_name": profile.display_name,
}
- for user_id, profile in iteritems(users_with_profile)
+ for user_id, profile in users_with_profile.items()
}
def maybe_schedule_expiry(self, event):
@@ -404,8 +402,10 @@ class EventCreationHandler(object):
if self._block_events_without_consent_error:
self._consent_uri_builder = ConsentURIBuilder(self.config)
+ self._is_worker_app = self.config.worker_app is not None
+
if (
- not self.config.worker_app
+ not self._is_worker_app
and self.config.cleanup_extremities_with_dummy_events
):
self.clock.looping_call(
@@ -715,7 +715,7 @@ class EventCreationHandler(object):
spam_error = self.spam_checker.check_event_for_spam(event)
if spam_error:
- if not isinstance(spam_error, string_types):
+ if not isinstance(spam_error, str):
spam_error = "Spam is not permitted here"
raise SynapseError(403, spam_error, Codes.FORBIDDEN)
@@ -881,7 +881,9 @@ class EventCreationHandler(object):
"""
room_alias = RoomAlias.from_string(room_alias_str)
try:
- mapping = yield directory_handler.get_association(room_alias)
+ mapping = yield defer.ensureDeferred(
+ directory_handler.get_association(room_alias)
+ )
except SynapseError as e:
# Turn M_NOT_FOUND errors into M_BAD_ALIAS errors.
if e.errcode == Codes.NOT_FOUND:
@@ -988,7 +990,7 @@ class EventCreationHandler(object):
state_to_include_ids = [
e_id
- for k, e_id in iteritems(current_state_ids)
+ for k, e_id in current_state_ids.items()
if k[0] in self.room_invite_state_types
or k == (EventTypes.Member, event.sender)
]
@@ -1002,7 +1004,7 @@ class EventCreationHandler(object):
"content": e.content,
"sender": e.sender,
}
- for e in itervalues(state_to_include)
+ for e in state_to_include.values()
]
invitee = UserID.from_string(event.state_key)
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index d7442c62a7..da06582d4b 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -15,9 +15,6 @@
# limitations under the License.
import logging
-from six import iteritems
-
-from twisted.internet import defer
from twisted.python.failure import Failure
from synapse.api.constants import EventTypes, Membership
@@ -99,8 +96,7 @@ class PaginationHandler(object):
job["longest_max_lifetime"],
)
- @defer.inlineCallbacks
- def purge_history_for_rooms_in_range(self, min_ms, max_ms):
+ async def purge_history_for_rooms_in_range(self, min_ms, max_ms):
"""Purge outdated events from rooms within the given retention range.
If a default retention policy is defined in the server's configuration and its
@@ -139,13 +135,13 @@ class PaginationHandler(object):
include_null,
)
- rooms = yield self.store.get_rooms_for_retention_period_in_range(
+ rooms = await self.store.get_rooms_for_retention_period_in_range(
min_ms, max_ms, include_null
)
logger.debug("[purge] Rooms to purge: %s", rooms)
- for room_id, retention_policy in iteritems(rooms):
+ for room_id, retention_policy in rooms.items():
logger.info("[purge] Attempting to purge messages in room %s", room_id)
if room_id in self._purges_in_progress_by_room:
@@ -167,9 +163,9 @@ class PaginationHandler(object):
# Figure out what token we should start purging at.
ts = self.clock.time_msec() - max_lifetime
- stream_ordering = yield self.store.find_first_stream_ordering_after_ts(ts)
+ stream_ordering = await self.store.find_first_stream_ordering_after_ts(ts)
- r = yield self.store.get_room_event_before_stream_ordering(
+ r = await self.store.get_room_event_before_stream_ordering(
room_id, stream_ordering,
)
if not r:
@@ -229,8 +225,7 @@ class PaginationHandler(object):
)
return purge_id
- @defer.inlineCallbacks
- def _purge_history(self, purge_id, room_id, token, delete_local_events):
+ async def _purge_history(self, purge_id, room_id, token, delete_local_events):
"""Carry out a history purge on a room.
Args:
@@ -239,14 +234,11 @@ class PaginationHandler(object):
token (str): topological token to delete events before
delete_local_events (bool): True to delete local events as well as
remote ones
-
- Returns:
- Deferred
"""
self._purges_in_progress_by_room.add(room_id)
try:
- with (yield self.pagination_lock.write(room_id)):
- yield self.storage.purge_events.purge_history(
+ with await self.pagination_lock.write(room_id):
+ await self.storage.purge_events.purge_history(
room_id, token, delete_local_events
)
logger.info("[purge] complete")
@@ -284,9 +276,7 @@ class PaginationHandler(object):
await self.store.get_room_version_id(room_id)
# first check that we have no users in this room
- joined = await defer.maybeDeferred(
- self.store.is_host_joined, room_id, self._server_name
- )
+ joined = await self.store.is_host_joined(room_id, self._server_name)
if joined:
raise SynapseError(400, "Users are still joined to this room")
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 3594f3b00f..d2f25ae12a 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -25,9 +25,7 @@ The methods that define policy are:
import abc
import logging
from contextlib import contextmanager
-from typing import Dict, Iterable, List, Set
-
-from six import iteritems, itervalues
+from typing import Dict, Iterable, List, Set, Tuple
from prometheus_client import Counter
from typing_extensions import ContextManager
@@ -170,14 +168,14 @@ class BasePresenceHandler(abc.ABC):
for user_id in user_ids
}
- missing = [user_id for user_id, state in iteritems(states) if not state]
+ missing = [user_id for user_id, state in states.items() if not state]
if missing:
# There are things not in our in memory cache. Lets pull them out of
# the database.
res = await self.store.get_presence_for_users(missing)
states.update(res)
- missing = [user_id for user_id, state in iteritems(states) if not state]
+ missing = [user_id for user_id, state in states.items() if not state]
if missing:
new = {
user_id: UserPresenceState.default(user_id) for user_id in missing
@@ -632,7 +630,7 @@ class PresenceHandler(BasePresenceHandler):
await self._update_states(
[
prev_state.copy_and_replace(last_user_sync_ts=time_now_ms)
- for prev_state in itervalues(prev_states)
+ for prev_state in prev_states.values()
]
)
self.external_process_last_updated_ms.pop(process_id, None)
@@ -775,7 +773,9 @@ class PresenceHandler(BasePresenceHandler):
return False
- async def get_all_presence_updates(self, last_id, current_id, limit):
+ async def get_all_presence_updates(
+ self, instance_name: str, last_id: int, current_id: int, limit: int
+ ) -> Tuple[List[Tuple[int, list]], int, bool]:
"""
Gets a list of presence update rows from between the given stream ids.
Each row has:
@@ -787,10 +787,31 @@ class PresenceHandler(BasePresenceHandler):
- last_user_sync_ts(int)
- status_msg(int)
- currently_active(int)
+
+ Args:
+ instance_name: The writer we want to fetch updates from. Unused
+ here since there is only ever one writer.
+ last_id: The token to fetch updates from. Exclusive.
+ current_id: The token to fetch updates up to. Inclusive.
+ limit: The requested limit for the number of rows to return. The
+ function may return more or fewer rows.
+
+ Returns:
+ A tuple consisting of: the updates, a token to use to fetch
+ subsequent updates, and whether we returned fewer rows than exists
+ between the requested tokens due to the limit.
+
+ The token returned can be used in a subsequent call to this
+ function to get further updatees.
+
+ The updates are a list of 2-tuples of stream ID and the row data
"""
+
# TODO(markjh): replicate the unpersisted changes.
# This could use the in-memory stores for recent changes.
- rows = await self.store.get_all_presence_updates(last_id, current_id, limit)
+ rows = await self.store.get_all_presence_updates(
+ instance_name, last_id, current_id, limit
+ )
return rows
def notify_new_event(self):
@@ -1087,7 +1108,7 @@ class PresenceEventSource(object):
return (list(updates.values()), max_token)
else:
return (
- [s for s in itervalues(updates) if s.state != PresenceState.OFFLINE],
+ [s for s in updates.values() if s.state != PresenceState.OFFLINE],
max_token,
)
@@ -1323,11 +1344,11 @@ def get_interested_remotes(store, states, state_handler):
# hosts in those rooms.
room_ids_to_states, users_to_states = yield get_interested_parties(store, states)
- for room_id, states in iteritems(room_ids_to_states):
+ for room_id, states in room_ids_to_states.items():
hosts = yield state_handler.get_current_hosts_in_room(room_id)
hosts_and_states.append((hosts, states))
- for user_id, states in iteritems(users_to_states):
+ for user_id, states in users_to_states.items():
host = get_domain_from_id(user_id)
hosts_and_states.append(([host], states))
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 302efc1b9a..4b1e3073a8 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -15,8 +15,6 @@
import logging
-from six import raise_from
-
from twisted.internet import defer
from synapse.api.errors import (
@@ -84,7 +82,7 @@ class BaseProfileHandler(BaseHandler):
)
return result
except RequestSendFailed as e:
- raise_from(SynapseError(502, "Failed to fetch profile"), e)
+ raise SynapseError(502, "Failed to fetch profile") from e
except HttpResponseException as e:
raise e.to_synapse_error()
@@ -135,7 +133,7 @@ class BaseProfileHandler(BaseHandler):
ignore_backoff=True,
)
except RequestSendFailed as e:
- raise_from(SynapseError(502, "Failed to fetch profile"), e)
+ raise SynapseError(502, "Failed to fetch profile") from e
except HttpResponseException as e:
raise e.to_synapse_error()
@@ -212,7 +210,7 @@ class BaseProfileHandler(BaseHandler):
ignore_backoff=True,
)
except RequestSendFailed as e:
- raise_from(SynapseError(502, "Failed to fetch profile"), e)
+ raise SynapseError(502, "Failed to fetch profile") from e
except HttpResponseException as e:
raise e.to_synapse_error()
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 51979ea43e..78c3772ac1 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -17,7 +17,7 @@
import logging
from synapse import types
-from synapse.api.constants import MAX_USERID_LENGTH, LoginType
+from synapse.api.constants import MAX_USERID_LENGTH, EventTypes, JoinRules, LoginType
from synapse.api.errors import AuthError, Codes, ConsentNotGivenError, SynapseError
from synapse.config.server import is_threepid_reserved
from synapse.http.servlet import assert_params_in_dict
@@ -26,7 +26,8 @@ from synapse.replication.http.register import (
ReplicationPostRegisterActionsServlet,
ReplicationRegisterServlet,
)
-from synapse.types import RoomAlias, RoomID, UserID, create_requester
+from synapse.storage.state import StateFilter
+from synapse.types import RoomAlias, UserID, create_requester
from synapse.util.async_helpers import Linearizer
from ._base import BaseHandler
@@ -270,51 +271,157 @@ class RegistrationHandler(BaseHandler):
return user_id
- async def _auto_join_rooms(self, user_id):
- """Automatically joins users to auto join rooms - creating the room in the first place
- if the user is the first to be created.
+ async def _create_and_join_rooms(self, user_id: str):
+ """
+ Create the auto-join rooms and join or invite the user to them.
+
+ This should only be called when the first "real" user registers.
Args:
- user_id(str): The user to join
+ user_id: The user to join
"""
- # auto-join the user to any rooms we're supposed to dump them into
- fake_requester = create_requester(user_id)
+ # Getting the handlers during init gives a dependency loop.
+ room_creation_handler = self.hs.get_room_creation_handler()
+ room_member_handler = self.hs.get_room_member_handler()
- # try to create the room if we're the first real user on the server. Note
- # that an auto-generated support or bot user is not a real user and will never be
- # the user to create the room
- should_auto_create_rooms = False
- is_real_user = await self.store.is_real_user(user_id)
- if self.hs.config.autocreate_auto_join_rooms and is_real_user:
- count = await self.store.count_real_users()
- should_auto_create_rooms = count == 1
- for r in self.hs.config.auto_join_rooms:
+ # Generate a stub for how the rooms will be configured.
+ stub_config = {
+ "preset": self.hs.config.registration.autocreate_auto_join_room_preset,
+ }
+
+ # If the configuration providers a user ID to create rooms with, use
+ # that instead of the first user registered.
+ requires_join = False
+ if self.hs.config.registration.auto_join_user_id:
+ fake_requester = create_requester(
+ self.hs.config.registration.auto_join_user_id
+ )
+
+ # If the room requires an invite, add the user to the list of invites.
+ if self.hs.config.registration.auto_join_room_requires_invite:
+ stub_config["invite"] = [user_id]
+
+ # If the room is being created by a different user, the first user
+ # registered needs to join it. Note that in the case of an invitation
+ # being necessary this will occur after the invite was sent.
+ requires_join = True
+ else:
+ fake_requester = create_requester(user_id)
+
+ # Choose whether to federate the new room.
+ if not self.hs.config.registration.autocreate_auto_join_rooms_federated:
+ stub_config["creation_content"] = {"m.federate": False}
+
+ for r in self.hs.config.registration.auto_join_rooms:
logger.info("Auto-joining %s to %s", user_id, r)
+
try:
- if should_auto_create_rooms:
- room_alias = RoomAlias.from_string(r)
- if self.hs.hostname != room_alias.domain:
- logger.warning(
- "Cannot create room alias %s, "
- "it does not match server domain",
- r,
- )
- else:
- # create room expects the localpart of the room alias
- room_alias_localpart = room_alias.localpart
-
- # getting the RoomCreationHandler during init gives a dependency
- # loop
- await self.hs.get_room_creation_handler().create_room(
- fake_requester,
- config={
- "preset": "public_chat",
- "room_alias_name": room_alias_localpart,
- },
+ room_alias = RoomAlias.from_string(r)
+
+ if self.hs.hostname != room_alias.domain:
+ logger.warning(
+ "Cannot create room alias %s, "
+ "it does not match server domain",
+ r,
+ )
+ else:
+ # A shallow copy is OK here since the only key that is
+ # modified is room_alias_name.
+ config = stub_config.copy()
+ # create room expects the localpart of the room alias
+ config["room_alias_name"] = room_alias.localpart
+
+ info, _ = await room_creation_handler.create_room(
+ fake_requester, config=config, ratelimit=False,
+ )
+
+ # If the room does not require an invite, but another user
+ # created it, then ensure the first user joins it.
+ if requires_join:
+ await room_member_handler.update_membership(
+ requester=create_requester(user_id),
+ target=UserID.from_string(user_id),
+ room_id=info["room_id"],
+ # Since it was just created, there are no remote hosts.
+ remote_room_hosts=[],
+ action="join",
ratelimit=False,
)
+
+ except ConsentNotGivenError as e:
+ # Technically not necessary to pull out this error though
+ # moving away from bare excepts is a good thing to do.
+ logger.error("Failed to join new user to %r: %r", r, e)
+ except Exception as e:
+ logger.error("Failed to join new user to %r: %r", r, e)
+
+ async def _join_rooms(self, user_id: str):
+ """
+ Join or invite the user to the auto-join rooms.
+
+ Args:
+ user_id: The user to join
+ """
+ room_member_handler = self.hs.get_room_member_handler()
+
+ for r in self.hs.config.registration.auto_join_rooms:
+ logger.info("Auto-joining %s to %s", user_id, r)
+
+ try:
+ room_alias = RoomAlias.from_string(r)
+
+ if RoomAlias.is_valid(r):
+ (
+ room_id,
+ remote_room_hosts,
+ ) = await room_member_handler.lookup_room_alias(room_alias)
+ room_id = room_id.to_string()
else:
- await self._join_user_to_room(fake_requester, r)
+ raise SynapseError(
+ 400, "%s was not legal room ID or room alias" % (r,)
+ )
+
+ # Calculate whether the room requires an invite or can be
+ # joined directly. Note that unless a join rule of public exists,
+ # it is treated as requiring an invite.
+ requires_invite = True
+
+ state = await self.store.get_filtered_current_state_ids(
+ room_id, StateFilter.from_types([(EventTypes.JoinRules, "")])
+ )
+
+ event_id = state.get((EventTypes.JoinRules, ""))
+ if event_id:
+ join_rules_event = await self.store.get_event(
+ event_id, allow_none=True
+ )
+ if join_rules_event:
+ join_rule = join_rules_event.content.get("join_rule", None)
+ requires_invite = join_rule and join_rule != JoinRules.PUBLIC
+
+ # Send the invite, if necessary.
+ if requires_invite:
+ await room_member_handler.update_membership(
+ requester=create_requester(
+ self.hs.config.registration.auto_join_user_id
+ ),
+ target=UserID.from_string(user_id),
+ room_id=room_id,
+ remote_room_hosts=remote_room_hosts,
+ action="invite",
+ ratelimit=False,
+ )
+
+ # Send the join.
+ await room_member_handler.update_membership(
+ requester=create_requester(user_id),
+ target=UserID.from_string(user_id),
+ room_id=room_id,
+ remote_room_hosts=remote_room_hosts,
+ action="join",
+ ratelimit=False,
+ )
+
except ConsentNotGivenError as e:
# Technically not necessary to pull out this error though
# moving away from bare excepts is a good thing to do.
@@ -322,6 +429,29 @@ class RegistrationHandler(BaseHandler):
except Exception as e:
logger.error("Failed to join new user to %r: %r", r, e)
+ async def _auto_join_rooms(self, user_id: str):
+ """Automatically joins users to auto join rooms - creating the room in the first place
+ if the user is the first to be created.
+
+ Args:
+ user_id: The user to join
+ """
+ # auto-join the user to any rooms we're supposed to dump them into
+
+ # try to create the room if we're the first real user on the server. Note
+ # that an auto-generated support or bot user is not a real user and will never be
+ # the user to create the room
+ should_auto_create_rooms = False
+ is_real_user = await self.store.is_real_user(user_id)
+ if self.hs.config.registration.autocreate_auto_join_rooms and is_real_user:
+ count = await self.store.count_real_users()
+ should_auto_create_rooms = count == 1
+
+ if should_auto_create_rooms:
+ await self._create_and_join_rooms(user_id)
+ else:
+ await self._join_rooms(user_id)
+
async def post_consent_actions(self, user_id):
"""A series of registration actions that can only be carried out once consent
has been granted
@@ -392,30 +522,6 @@ class RegistrationHandler(BaseHandler):
self._next_generated_user_id += 1
return str(id)
- async def _join_user_to_room(self, requester, room_identifier):
- room_member_handler = self.hs.get_room_member_handler()
- if RoomID.is_valid(room_identifier):
- room_id = room_identifier
- elif RoomAlias.is_valid(room_identifier):
- room_alias = RoomAlias.from_string(room_identifier)
- room_id, remote_room_hosts = await room_member_handler.lookup_room_alias(
- room_alias
- )
- room_id = room_id.to_string()
- else:
- raise SynapseError(
- 400, "%s was not legal room ID or room alias" % (room_identifier,)
- )
-
- await room_member_handler.update_membership(
- requester=requester,
- target=requester.user,
- room_id=room_id,
- remote_room_hosts=remote_room_hosts,
- action="join",
- ratelimit=False,
- )
-
def check_registration_ratelimit(self, address):
"""A simple helper method to check whether the registration rate limit has been hit
for a given IP address
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 61db3ccc43..950a84acd0 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -24,9 +24,12 @@ import string
from collections import OrderedDict
from typing import Tuple
-from six import iteritems, string_types
-
-from synapse.api.constants import EventTypes, JoinRules, RoomCreationPreset
+from synapse.api.constants import (
+ EventTypes,
+ JoinRules,
+ RoomCreationPreset,
+ RoomEncryptionAlgorithms,
+)
from synapse.api.errors import AuthError, Codes, NotFoundError, StoreError, SynapseError
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
from synapse.events.utils import copy_power_levels_contents
@@ -56,31 +59,6 @@ FIVE_MINUTES_IN_MS = 5 * 60 * 1000
class RoomCreationHandler(BaseHandler):
-
- PRESETS_DICT = {
- RoomCreationPreset.PRIVATE_CHAT: {
- "join_rules": JoinRules.INVITE,
- "history_visibility": "shared",
- "original_invitees_have_ops": False,
- "guest_can_join": True,
- "power_level_content_override": {"invite": 0},
- },
- RoomCreationPreset.TRUSTED_PRIVATE_CHAT: {
- "join_rules": JoinRules.INVITE,
- "history_visibility": "shared",
- "original_invitees_have_ops": True,
- "guest_can_join": True,
- "power_level_content_override": {"invite": 0},
- },
- RoomCreationPreset.PUBLIC_CHAT: {
- "join_rules": JoinRules.PUBLIC,
- "history_visibility": "shared",
- "original_invitees_have_ops": False,
- "guest_can_join": False,
- "power_level_content_override": {},
- },
- }
-
def __init__(self, hs):
super(RoomCreationHandler, self).__init__(hs)
@@ -89,6 +67,39 @@ class RoomCreationHandler(BaseHandler):
self.room_member_handler = hs.get_room_member_handler()
self.config = hs.config
+ # Room state based off defined presets
+ self._presets_dict = {
+ RoomCreationPreset.PRIVATE_CHAT: {
+ "join_rules": JoinRules.INVITE,
+ "history_visibility": "shared",
+ "original_invitees_have_ops": False,
+ "guest_can_join": True,
+ "power_level_content_override": {"invite": 0},
+ },
+ RoomCreationPreset.TRUSTED_PRIVATE_CHAT: {
+ "join_rules": JoinRules.INVITE,
+ "history_visibility": "shared",
+ "original_invitees_have_ops": True,
+ "guest_can_join": True,
+ "power_level_content_override": {"invite": 0},
+ },
+ RoomCreationPreset.PUBLIC_CHAT: {
+ "join_rules": JoinRules.PUBLIC,
+ "history_visibility": "shared",
+ "original_invitees_have_ops": False,
+ "guest_can_join": False,
+ "power_level_content_override": {},
+ },
+ }
+
+ # Modify presets to selectively enable encryption by default per homeserver config
+ for preset_name, preset_config in self._presets_dict.items():
+ encrypted = (
+ preset_name
+ in self.config.encryption_enabled_by_default_for_room_presets
+ )
+ preset_config["encrypted"] = encrypted
+
self._replication = hs.get_replication_data_handler()
# linearizer to stop two upgrades happening at once
@@ -364,7 +375,7 @@ class RoomCreationHandler(BaseHandler):
# map from event_id to BaseEvent
old_room_state_events = await self.store.get_events(old_room_state_ids.values())
- for k, old_event_id in iteritems(old_room_state_ids):
+ for k, old_event_id in old_room_state_ids.items():
old_event = old_room_state_events.get(old_event_id)
if old_event:
initial_state[k] = old_event.content
@@ -417,7 +428,7 @@ class RoomCreationHandler(BaseHandler):
old_room_member_state_events = await self.store.get_events(
old_room_member_state_ids.values()
)
- for k, old_event in iteritems(old_room_member_state_events):
+ for k, old_event in old_room_member_state_events.items():
# Only transfer ban events
if (
"membership" in old_event.content
@@ -582,7 +593,7 @@ class RoomCreationHandler(BaseHandler):
"room_version", self.config.default_room_version.identifier
)
- if not isinstance(room_version_id, string_types):
+ if not isinstance(room_version_id, str):
raise SynapseError(400, "room_version must be a string", Codes.BAD_JSON)
room_version = KNOWN_ROOM_VERSIONS.get(room_version_id)
@@ -798,7 +809,7 @@ class RoomCreationHandler(BaseHandler):
)
return last_stream_id
- config = RoomCreationHandler.PRESETS_DICT[preset_config]
+ config = self._presets_dict[preset_config]
creator_id = creator.user.to_string()
@@ -888,6 +899,13 @@ class RoomCreationHandler(BaseHandler):
etype=etype, state_key=state_key, content=content
)
+ if config["encrypted"]:
+ last_sent_stream_id = await send(
+ etype=EventTypes.RoomEncryption,
+ state_key="",
+ content={"algorithm": RoomEncryptionAlgorithms.DEFAULT},
+ )
+
return last_sent_stream_id
async def _generate_room_id(
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index 4cbc02b0d0..e3dbbcc052 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -17,8 +17,6 @@ import logging
from collections import namedtuple
from typing import Any, Dict, Optional
-from six import iteritems
-
import msgpack
from unpaddedbase64 import decode_base64, encode_base64
@@ -44,7 +42,8 @@ class RoomListHandler(BaseHandler):
def __init__(self, hs):
super(RoomListHandler, self).__init__(hs)
self.enable_room_list_search = hs.config.enable_room_list_search
- self.response_cache = ResponseCache(hs, "room_list")
+
+ self.response_cache = ResponseCache(hs, "room_list", timeout_ms=10 * 60 * 1000)
self.remote_response_cache = ResponseCache(
hs, "remote_room_list", timeout_ms=30 * 1000
)
@@ -271,7 +270,7 @@ class RoomListHandler(BaseHandler):
event_map = yield self.store.get_events(
[
event_id
- for key, event_id in iteritems(current_state_ids)
+ for key, event_id in current_state_ids.items()
if key[0]
in (
EventTypes.Create,
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 0f7af982f0..c0b8742586 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -17,10 +17,9 @@
import abc
import logging
+from http import HTTPStatus
from typing import Dict, Iterable, List, Optional, Tuple
-from six.moves import http_client
-
from synapse import types
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError, Codes, SynapseError
@@ -61,6 +60,7 @@ class RoomMemberHandler(object):
self.event_creation_handler = hs.get_event_creation_handler()
self.member_linearizer = Linearizer(name="member")
+ self.member_limiter = Linearizer(max_count=10, name="member_as_limiter")
self.clock = hs.get_clock()
self.spam_checker = hs.get_spam_checker()
@@ -292,19 +292,38 @@ class RoomMemberHandler(object):
) -> Tuple[Optional[str], int]:
key = (room_id,)
- with (await self.member_linearizer.queue(key)):
- result = await self._update_membership(
- requester,
- target,
- room_id,
- action,
- txn_id=txn_id,
- remote_room_hosts=remote_room_hosts,
- third_party_signed=third_party_signed,
- ratelimit=ratelimit,
- content=content,
- require_consent=require_consent,
- )
+ as_id = object()
+ if requester.app_service:
+ as_id = requester.app_service.id
+
+ then = self.clock.time_msec()
+
+ with (await self.member_limiter.queue(as_id)):
+ diff = self.clock.time_msec() - then
+
+ if diff > 80 * 1000:
+ # haproxy would have timed the request out anyway...
+ raise SynapseError(504, "took to long to process")
+
+ with (await self.member_linearizer.queue(key)):
+ diff = self.clock.time_msec() - then
+
+ if diff > 80 * 1000:
+ # haproxy would have timed the request out anyway...
+ raise SynapseError(504, "took to long to process")
+
+ result = await self._update_membership(
+ requester,
+ target,
+ room_id,
+ action,
+ txn_id=txn_id,
+ remote_room_hosts=remote_room_hosts,
+ third_party_signed=third_party_signed,
+ ratelimit=ratelimit,
+ content=content,
+ require_consent=require_consent,
+ )
return result
@@ -361,7 +380,7 @@ class RoomMemberHandler(object):
if effective_membership_state == Membership.INVITE:
# block any attempts to invite the server notices mxid
if target.to_string() == self._server_notices_mxid:
- raise SynapseError(http_client.FORBIDDEN, "Cannot invite this user")
+ raise SynapseError(HTTPStatus.FORBIDDEN, "Cannot invite this user")
block_invite = False
@@ -444,7 +463,7 @@ class RoomMemberHandler(object):
is_blocked = await self._is_server_notice_room(room_id)
if is_blocked:
raise SynapseError(
- http_client.FORBIDDEN,
+ HTTPStatus.FORBIDDEN,
"You cannot reject this invite",
errcode=Codes.CANNOT_LEAVE_SERVER_NOTICE_ROOM,
)
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 6bdb24baff..f631c34f92 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -18,8 +18,6 @@ import itertools
import logging
from typing import Any, Dict, FrozenSet, List, Optional, Set, Tuple
-from six import iteritems, itervalues
-
import attr
from prometheus_client import Counter
@@ -50,6 +48,7 @@ logger = logging.getLogger(__name__)
# Debug logger for https://github.com/matrix-org/synapse/issues/4422
issue4422_logger = logging.getLogger("synapse.handler.sync.4422_debug")
+SYNC_RESPONSE_CACHE_MS = 2 * 60 * 1000
# Counts the number of times we returned a non-empty sync. `type` is one of
# "initial_sync", "full_state_sync" or "incremental_sync", `lazy_loaded` is
@@ -246,7 +245,9 @@ class SyncHandler(object):
self.presence_handler = hs.get_presence_handler()
self.event_sources = hs.get_event_sources()
self.clock = hs.get_clock()
- self.response_cache = ResponseCache(hs, "sync")
+ self.response_cache = ResponseCache(
+ hs, "sync", timeout_ms=SYNC_RESPONSE_CACHE_MS
+ )
self.state = hs.get_state_handler()
self.auth = hs.get_auth()
self.storage = hs.get_storage()
@@ -390,7 +391,7 @@ class SyncHandler(object):
# result returned by the event source is poor form (it might cache
# the object)
room_id = event["room_id"]
- event_copy = {k: v for (k, v) in iteritems(event) if k != "room_id"}
+ event_copy = {k: v for (k, v) in event.items() if k != "room_id"}
ephemeral_by_room.setdefault(room_id, []).append(event_copy)
receipt_key = since_token.receipt_key if since_token else "0"
@@ -408,7 +409,7 @@ class SyncHandler(object):
for event in receipts:
room_id = event["room_id"]
# exclude room id, as above
- event_copy = {k: v for (k, v) in iteritems(event) if k != "room_id"}
+ event_copy = {k: v for (k, v) in event.items() if k != "room_id"}
ephemeral_by_room.setdefault(room_id, []).append(event_copy)
return now_token, ephemeral_by_room
@@ -454,7 +455,7 @@ class SyncHandler(object):
current_state_ids_map = await self.state.get_current_state_ids(
room_id
)
- current_state_ids = frozenset(itervalues(current_state_ids_map))
+ current_state_ids = frozenset(current_state_ids_map.values())
recents = await filter_events_for_client(
self.storage,
@@ -509,7 +510,7 @@ class SyncHandler(object):
current_state_ids_map = await self.state.get_current_state_ids(
room_id
)
- current_state_ids = frozenset(itervalues(current_state_ids_map))
+ current_state_ids = frozenset(current_state_ids_map.values())
loaded_recents = await filter_events_for_client(
self.storage,
@@ -909,7 +910,7 @@ class SyncHandler(object):
logger.debug("filtering state from %r...", state_ids)
state_ids = {
t: event_id
- for t, event_id in iteritems(state_ids)
+ for t, event_id in state_ids.items()
if cache.get(t[1]) != event_id
}
logger.debug("...to %r", state_ids)
@@ -1430,7 +1431,7 @@ class SyncHandler(object):
if since_token:
for joined_sync in sync_result_builder.joined:
it = itertools.chain(
- joined_sync.timeline.events, itervalues(joined_sync.state)
+ joined_sync.timeline.events, joined_sync.state.values()
)
for event in it:
if event.type == EventTypes.Member:
@@ -1505,7 +1506,7 @@ class SyncHandler(object):
newly_left_rooms = []
room_entries = []
invited = []
- for room_id, events in iteritems(mem_change_events_by_room_id):
+ for room_id, events in mem_change_events_by_room_id.items():
logger.debug(
"Membership changes in %s: [%s]",
room_id,
@@ -1993,17 +1994,17 @@ def _calculate_state(
event_id_to_key = {
e: key
for key, e in itertools.chain(
- iteritems(timeline_contains),
- iteritems(previous),
- iteritems(timeline_start),
- iteritems(current),
+ timeline_contains.items(),
+ previous.items(),
+ timeline_start.items(),
+ current.items(),
)
}
- c_ids = set(itervalues(current))
- ts_ids = set(itervalues(timeline_start))
- p_ids = set(itervalues(previous))
- tc_ids = set(itervalues(timeline_contains))
+ c_ids = set(current.values())
+ ts_ids = set(timeline_start.values())
+ p_ids = set(previous.values())
+ tc_ids = set(timeline_contains.values())
# If we are lazyloading room members, we explicitly add the membership events
# for the senders in the timeline into the state block returned by /sync,
@@ -2017,7 +2018,7 @@ def _calculate_state(
if lazy_load_members:
p_ids.difference_update(
- e for t, e in iteritems(timeline_start) if t[0] == EventTypes.Member
+ e for t, e in timeline_start.items() if t[0] == EventTypes.Member
)
state_ids = ((c_ids | ts_ids) - p_ids) - tc_ids
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index c7bc14c623..6c7abaa578 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -15,9 +15,7 @@
import logging
from collections import namedtuple
-from typing import List
-
-from twisted.internet import defer
+from typing import List, Tuple
from synapse.api.errors import AuthError, SynapseError
from synapse.logging.context import run_in_background
@@ -115,8 +113,7 @@ class TypingHandler(object):
def is_typing(self, member):
return member.user_id in self._room_typing.get(member.room_id, [])
- @defer.inlineCallbacks
- def started_typing(self, target_user, auth_user, room_id, timeout):
+ async def started_typing(self, target_user, auth_user, room_id, timeout):
target_user_id = target_user.to_string()
auth_user_id = auth_user.to_string()
@@ -126,7 +123,7 @@ class TypingHandler(object):
if target_user_id != auth_user_id:
raise AuthError(400, "Cannot set another user's typing state")
- yield self.auth.check_user_in_room(room_id, target_user_id)
+ await self.auth.check_user_in_room(room_id, target_user_id)
logger.debug("%s has started typing in %s", target_user_id, room_id)
@@ -145,8 +142,7 @@ class TypingHandler(object):
self._push_update(member=member, typing=True)
- @defer.inlineCallbacks
- def stopped_typing(self, target_user, auth_user, room_id):
+ async def stopped_typing(self, target_user, auth_user, room_id):
target_user_id = target_user.to_string()
auth_user_id = auth_user.to_string()
@@ -156,7 +152,7 @@ class TypingHandler(object):
if target_user_id != auth_user_id:
raise AuthError(400, "Cannot set another user's typing state")
- yield self.auth.check_user_in_room(room_id, target_user_id)
+ await self.auth.check_user_in_room(room_id, target_user_id)
logger.debug("%s has stopped typing in %s", target_user_id, room_id)
@@ -164,12 +160,11 @@ class TypingHandler(object):
self._stopped_typing(member)
- @defer.inlineCallbacks
def user_left_room(self, user, room_id):
user_id = user.to_string()
if self.is_mine_id(user_id):
member = RoomMember(room_id=room_id, user_id=user_id)
- yield self._stopped_typing(member)
+ self._stopped_typing(member)
def _stopped_typing(self, member):
if member.user_id not in self._room_typing.get(member.room_id, set()):
@@ -188,10 +183,9 @@ class TypingHandler(object):
self._push_update_local(member=member, typing=typing)
- @defer.inlineCallbacks
- def _push_remote(self, member, typing):
+ async def _push_remote(self, member, typing):
try:
- users = yield self.state.get_current_users_in_room(member.room_id)
+ users = await self.state.get_current_users_in_room(member.room_id)
self._member_last_federation_poke[member] = self.clock.time_msec()
now = self.clock.time_msec()
@@ -215,8 +209,7 @@ class TypingHandler(object):
except Exception:
logger.exception("Error pushing typing notif to remotes")
- @defer.inlineCallbacks
- def _recv_edu(self, origin, content):
+ async def _recv_edu(self, origin, content):
room_id = content["room_id"]
user_id = content["user_id"]
@@ -231,7 +224,7 @@ class TypingHandler(object):
)
return
- users = yield self.state.get_current_users_in_room(room_id)
+ users = await self.state.get_current_users_in_room(room_id)
domains = {get_domain_from_id(u) for u in users}
if self.server_name in domains:
@@ -259,14 +252,31 @@ class TypingHandler(object):
)
async def get_all_typing_updates(
- self, last_id: int, current_id: int, limit: int
- ) -> List[dict]:
- """Get up to `limit` typing updates between the given tokens, earliest
- updates first.
+ self, instance_name: str, last_id: int, current_id: int, limit: int
+ ) -> Tuple[List[Tuple[int, list]], int, bool]:
+ """Get updates for typing replication stream.
+
+ Args:
+ instance_name: The writer we want to fetch updates from. Unused
+ here since there is only ever one writer.
+ last_id: The token to fetch updates from. Exclusive.
+ current_id: The token to fetch updates up to. Inclusive.
+ limit: The requested limit for the number of rows to return. The
+ function may return more or fewer rows.
+
+ Returns:
+ A tuple consisting of: the updates, a token to use to fetch
+ subsequent updates, and whether we returned fewer rows than exists
+ between the requested tokens due to the limit.
+
+ The token returned can be used in a subsequent call to this
+ function to get further updatees.
+
+ The updates are a list of 2-tuples of stream ID and the row data
"""
if last_id == current_id:
- return []
+ return [], current_id, False
changed_rooms = self._typing_stream_change_cache.get_all_entities_changed(
last_id
@@ -280,9 +290,16 @@ class TypingHandler(object):
serial = self._room_serials[room_id]
if last_id < serial <= current_id:
typing = self._room_typing[room_id]
- rows.append((serial, room_id, list(typing)))
+ rows.append((serial, [room_id, list(typing)]))
rows.sort()
- return rows[:limit]
+
+ limited = False
+ if len(rows) > limit:
+ rows = rows[:limit]
+ current_id = rows[-1][0]
+ limited = True
+
+ return rows, current_id, limited
def get_current_token(self):
return self._latest_room_serial
@@ -306,7 +323,7 @@ class TypingNotificationEventSource(object):
"content": {"user_ids": list(typing)},
}
- def get_new_events(self, from_key, room_ids, **kwargs):
+ async def get_new_events(self, from_key, room_ids, **kwargs):
with Measure(self.clock, "typing.get_new_events"):
from_key = int(from_key)
handler = self.get_typing_handler()
@@ -320,7 +337,7 @@ class TypingNotificationEventSource(object):
events.append(self._make_event_for(room_id))
- return defer.succeed((events, handler._latest_room_serial))
+ return (events, handler._latest_room_serial)
def get_current_key(self):
return self.get_typing_handler()._latest_room_serial
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index 12423b909a..521b6d620d 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -15,8 +15,6 @@
import logging
-from six import iteritems, iterkeys
-
import synapse.metrics
from synapse.api.constants import EventTypes, JoinRules, Membership
from synapse.handlers.state_deltas import StateDeltasHandler
@@ -289,7 +287,7 @@ class UserDirectoryHandler(StateDeltasHandler):
users_with_profile = await self.state.get_current_users_in_room(room_id)
# Remove every user from the sharing tables for that room.
- for user_id in iterkeys(users_with_profile):
+ for user_id in users_with_profile.keys():
await self.store.remove_user_who_share_room(user_id, room_id)
# Then, re-add them to the tables.
@@ -298,7 +296,7 @@ class UserDirectoryHandler(StateDeltasHandler):
# which when ran over an entire room, will result in the same values
# being added multiple times. The batching upserts shouldn't make this
# too bad, though.
- for user_id, profile in iteritems(users_with_profile):
+ for user_id, profile in users_with_profile.items():
await self._handle_new_user(room_id, user_id, profile)
async def _handle_new_user(self, room_id, user_id, profile):
|