diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py
index 8f8fd82eb0..d358842b3e 100644
--- a/synapse/handlers/__init__.py
+++ b/synapse/handlers/__init__.py
@@ -14,9 +14,7 @@
# limitations under the License.
from .register import RegistrationHandler
-from .room import (
- RoomCreationHandler, RoomContextHandler,
-)
+from .room import RoomContextHandler
from .message import MessageHandler
from .federation import FederationHandler
from .directory import DirectoryHandler
@@ -47,7 +45,6 @@ class Handlers(object):
def __init__(self, hs):
self.registration_handler = RegistrationHandler(hs)
self.message_handler = MessageHandler(hs)
- self.room_creation_handler = RoomCreationHandler(hs)
self.federation_handler = FederationHandler(hs)
self.directory_handler = DirectoryHandler(hs)
self.admin_handler = AdminHandler(hs)
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 3dd3fa2a27..d9f35a5dba 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -18,15 +18,16 @@ from twisted.internet import defer
import synapse
from synapse.api.constants import EventTypes
from synapse.util.metrics import Measure
-from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
+from synapse.util.logcontext import (
+ make_deferred_yieldable, run_in_background,
+)
+from prometheus_client import Counter
import logging
logger = logging.getLogger(__name__)
-metrics = synapse.metrics.get_metrics_for(__name__)
-
-events_processed_counter = metrics.register_counter("events_processed")
+events_processed_counter = Counter("synapse_handlers_appservice_events_processed", "")
def log_failure(failure):
@@ -84,11 +85,16 @@ class ApplicationServicesHandler(object):
if not events:
break
+ events_by_room = {}
for event in events:
+ events_by_room.setdefault(event.room_id, []).append(event)
+
+ @defer.inlineCallbacks
+ def handle_event(event):
# Gather interested services
services = yield self._get_services_for_event(event)
if len(services) == 0:
- continue # no services need notifying
+ return # no services need notifying
# Do we know this user exists? If not, poke the user
# query API for all services which match that user regex.
@@ -104,13 +110,32 @@ class ApplicationServicesHandler(object):
# Fork off pushes to these services
for service in services:
- preserve_fn(self.scheduler.submit_event_for_as)(
- service, event
- )
+ self.scheduler.submit_event_for_as(service, event)
- events_processed_counter.inc_by(len(events))
+ @defer.inlineCallbacks
+ def handle_room_events(events):
+ for event in events:
+ yield handle_event(event)
+
+ yield make_deferred_yieldable(defer.gatherResults([
+ run_in_background(handle_room_events, evs)
+ for evs in events_by_room.itervalues()
+ ], consumeErrors=True))
yield self.store.set_appservice_last_pos(upper_bound)
+
+ now = self.clock.time_msec()
+ ts = yield self.store.get_received_ts(events[-1].event_id)
+
+ synapse.metrics.event_processing_positions.labels(
+ "appservice_sender").set(upper_bound)
+
+ events_processed_counter.inc(len(events))
+
+ synapse.metrics.event_processing_lag.labels(
+ "appservice_sender").set(now - ts)
+ synapse.metrics.event_processing_last_ts.labels(
+ "appservice_sender").set(ts)
finally:
self.is_processing = False
@@ -167,7 +192,10 @@ class ApplicationServicesHandler(object):
services = yield self._get_services_for_3pn(protocol)
results = yield make_deferred_yieldable(defer.DeferredList([
- preserve_fn(self.appservice_api.query_3pe)(service, kind, protocol, fields)
+ run_in_background(
+ self.appservice_api.query_3pe,
+ service, kind, protocol, fields,
+ )
for service in services
], consumeErrors=True))
@@ -228,11 +256,15 @@ class ApplicationServicesHandler(object):
event based on the service regex.
"""
services = self.store.get_app_services()
- interested_list = [
- s for s in services if (
- yield s.is_interested(event, self.store)
- )
- ]
+
+ # we can't use a list comprehension here. Since python 3, list
+ # comprehensions use a generator internally. This means you can't yield
+ # inside of a list comprehension anymore.
+ interested_list = []
+ for s in services:
+ if (yield s.is_interested(event, self.store)):
+ interested_list.append(s)
+
defer.returnValue(interested_list)
def _get_services_for_user(self, user_id):
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index b1d3814909..c5e92f6214 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
-# Copyright 2017 New Vector Ltd
+# Copyright 2017, 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -12,9 +12,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-from twisted.internet import defer
+from twisted.internet import defer, reactor
from ._base import BaseHandler
+from synapse.types import UserID, create_requester
+from synapse.util.logcontext import run_in_background
import logging
@@ -27,6 +29,15 @@ class DeactivateAccountHandler(BaseHandler):
super(DeactivateAccountHandler, self).__init__(hs)
self._auth_handler = hs.get_auth_handler()
self._device_handler = hs.get_device_handler()
+ self._room_member_handler = hs.get_room_member_handler()
+ self.user_directory_handler = hs.get_user_directory_handler()
+
+ # Flag that indicates whether the process to part users from rooms is running
+ self._user_parter_running = False
+
+ # Start the user parter loop so it can resume parting users from rooms where
+ # it left off (if it has work left to do).
+ reactor.callWhenRunning(self._start_user_parting)
@defer.inlineCallbacks
def deactivate_account(self, user_id):
@@ -50,3 +61,73 @@ class DeactivateAccountHandler(BaseHandler):
yield self.store.user_delete_threepids(user_id)
yield self.store.user_set_password_hash(user_id, None)
+
+ # Add the user to a table of users pending deactivation (ie.
+ # removal from all the rooms they're a member of)
+ yield self.store.add_user_pending_deactivation(user_id)
+
+ # delete from user directory
+ yield self.user_directory_handler.handle_user_deactivated(user_id)
+
+ # Now start the process that goes through that list and
+ # parts users from rooms (if it isn't already running)
+ self._start_user_parting()
+
+ def _start_user_parting(self):
+ """
+ Start the process that goes through the table of users
+ pending deactivation, if it isn't already running.
+
+ Returns:
+ None
+ """
+ if not self._user_parter_running:
+ run_in_background(self._user_parter_loop)
+
+ @defer.inlineCallbacks
+ def _user_parter_loop(self):
+ """Loop that parts deactivated users from rooms
+
+ Returns:
+ None
+ """
+ self._user_parter_running = True
+ logger.info("Starting user parter")
+ try:
+ while True:
+ user_id = yield self.store.get_user_pending_deactivation()
+ if user_id is None:
+ break
+ logger.info("User parter parting %r", user_id)
+ yield self._part_user(user_id)
+ yield self.store.del_user_pending_deactivation(user_id)
+ logger.info("User parter finished parting %r", user_id)
+ logger.info("User parter finished: stopping")
+ finally:
+ self._user_parter_running = False
+
+ @defer.inlineCallbacks
+ def _part_user(self, user_id):
+ """Causes the given user_id to leave all the rooms they're joined to
+
+ Returns:
+ None
+ """
+ user = UserID.from_string(user_id)
+
+ rooms_for_user = yield self.store.get_rooms_for_user(user_id)
+ for room_id in rooms_for_user:
+ logger.info("User parter parting %r from %r", user_id, room_id)
+ try:
+ yield self._room_member_handler.update_membership(
+ create_requester(user),
+ user,
+ room_id,
+ "leave",
+ ratelimit=False,
+ )
+ except Exception:
+ logger.exception(
+ "Failed to part user %r from room %r: ignoring and continuing",
+ user_id, room_id,
+ )
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 40f3d24678..31bd0e60c6 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -26,6 +26,8 @@ from ._base import BaseHandler
import logging
+from six import itervalues, iteritems
+
logger = logging.getLogger(__name__)
@@ -155,7 +157,7 @@ class DeviceHandler(BaseHandler):
try:
yield self.store.delete_device(user_id, device_id)
- except errors.StoreError, e:
+ except errors.StoreError as e:
if e.code == 404:
# no match
pass
@@ -204,7 +206,7 @@ class DeviceHandler(BaseHandler):
try:
yield self.store.delete_devices(user_id, device_ids)
- except errors.StoreError, e:
+ except errors.StoreError as e:
if e.code == 404:
# no match
pass
@@ -243,7 +245,7 @@ class DeviceHandler(BaseHandler):
new_display_name=content.get("display_name")
)
yield self.notify_device_update(user_id, [device_id])
- except errors.StoreError, e:
+ except errors.StoreError as e:
if e.code == 404:
raise errors.NotFoundError()
else:
@@ -318,7 +320,7 @@ class DeviceHandler(BaseHandler):
# The user may have left the room
# TODO: Check if they actually did or if we were just invited.
if room_id not in room_ids:
- for key, event_id in current_state_ids.iteritems():
+ for key, event_id in iteritems(current_state_ids):
etype, state_key = key
if etype != EventTypes.Member:
continue
@@ -338,7 +340,7 @@ class DeviceHandler(BaseHandler):
# special-case for an empty prev state: include all members
# in the changed list
if not event_ids:
- for key, event_id in current_state_ids.iteritems():
+ for key, event_id in iteritems(current_state_ids):
etype, state_key = key
if etype != EventTypes.Member:
continue
@@ -354,10 +356,10 @@ class DeviceHandler(BaseHandler):
# Check if we've joined the room? If so we just blindly add all the users to
# the "possibly changed" users.
- for state_dict in prev_state_ids.itervalues():
+ for state_dict in itervalues(prev_state_ids):
member_event = state_dict.get((EventTypes.Member, user_id), None)
if not member_event or member_event != current_member_id:
- for key, event_id in current_state_ids.iteritems():
+ for key, event_id in iteritems(current_state_ids):
etype, state_key = key
if etype != EventTypes.Member:
continue
@@ -367,14 +369,14 @@ class DeviceHandler(BaseHandler):
# If there has been any change in membership, include them in the
# possibly changed list. We'll check if they are joined below,
# and we're not toooo worried about spuriously adding users.
- for key, event_id in current_state_ids.iteritems():
+ for key, event_id in iteritems(current_state_ids):
etype, state_key = key
if etype != EventTypes.Member:
continue
# check if this member has changed since any of the extremities
# at the stream_ordering, and add them to the list if so.
- for state_dict in prev_state_ids.itervalues():
+ for state_dict in itervalues(prev_state_ids):
prev_event_id = state_dict.get(key, None)
if not prev_event_id or prev_event_id != event_id:
if state_key != user_id:
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 31b1ece13e..8a2d177539 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -13,17 +14,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import ujson as json
+import simplejson as json
import logging
from canonicaljson import encode_canonical_json
from twisted.internet import defer
+from six import iteritems
from synapse.api.errors import (
SynapseError, CodeMessageException, FederationDeniedError,
)
from synapse.types import get_domain_from_id, UserID
-from synapse.util.logcontext import preserve_fn, make_deferred_yieldable
+from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.util.retryutils import NotRetryingDestination
logger = logging.getLogger(__name__)
@@ -91,7 +93,7 @@ class E2eKeysHandler(object):
remote_queries_not_in_cache = {}
if remote_queries:
query_list = []
- for user_id, device_ids in remote_queries.iteritems():
+ for user_id, device_ids in iteritems(remote_queries):
if device_ids:
query_list.extend((user_id, device_id) for device_id in device_ids)
else:
@@ -102,9 +104,9 @@ class E2eKeysHandler(object):
query_list
)
)
- for user_id, devices in remote_results.iteritems():
+ for user_id, devices in iteritems(remote_results):
user_devices = results.setdefault(user_id, {})
- for device_id, device in devices.iteritems():
+ for device_id, device in iteritems(devices):
keys = device.get("keys", None)
device_display_name = device.get("device_display_name", None)
if keys:
@@ -134,28 +136,13 @@ class E2eKeysHandler(object):
if user_id in destination_query:
results[user_id] = keys
- except CodeMessageException as e:
- failures[destination] = {
- "status": e.code, "message": e.message
- }
- except NotRetryingDestination as e:
- failures[destination] = {
- "status": 503, "message": "Not ready for retry",
- }
- except FederationDeniedError as e:
- failures[destination] = {
- "status": 403, "message": "Federation Denied",
- }
except Exception as e:
- # include ConnectionRefused and other errors
- failures[destination] = {
- "status": 503, "message": e.message
- }
+ failures[destination] = _exception_to_failure(e)
yield make_deferred_yieldable(defer.gatherResults([
- preserve_fn(do_remote_query)(destination)
+ run_in_background(do_remote_query, destination)
for destination in remote_queries_not_in_cache
- ]))
+ ], consumeErrors=True))
defer.returnValue({
"device_keys": results, "failures": failures,
@@ -252,32 +239,21 @@ class E2eKeysHandler(object):
for user_id, keys in remote_result["one_time_keys"].items():
if user_id in device_keys:
json_result[user_id] = keys
- except CodeMessageException as e:
- failures[destination] = {
- "status": e.code, "message": e.message
- }
- except NotRetryingDestination as e:
- failures[destination] = {
- "status": 503, "message": "Not ready for retry",
- }
except Exception as e:
- # include ConnectionRefused and other errors
- failures[destination] = {
- "status": 503, "message": e.message
- }
+ failures[destination] = _exception_to_failure(e)
yield make_deferred_yieldable(defer.gatherResults([
- preserve_fn(claim_client_keys)(destination)
+ run_in_background(claim_client_keys, destination)
for destination in remote_queries
- ]))
+ ], consumeErrors=True))
logger.info(
"Claimed one-time-keys: %s",
",".join((
"%s for %s:%s" % (key_id, user_id, device_id)
- for user_id, user_keys in json_result.iteritems()
- for device_id, device_keys in user_keys.iteritems()
- for key_id, _ in device_keys.iteritems()
+ for user_id, user_keys in iteritems(json_result)
+ for device_id, device_keys in iteritems(user_keys)
+ for key_id, _ in iteritems(device_keys)
)),
)
@@ -362,6 +338,31 @@ class E2eKeysHandler(object):
)
+def _exception_to_failure(e):
+ if isinstance(e, CodeMessageException):
+ return {
+ "status": e.code, "message": e.message,
+ }
+
+ if isinstance(e, NotRetryingDestination):
+ return {
+ "status": 503, "message": "Not ready for retry",
+ }
+
+ if isinstance(e, FederationDeniedError):
+ return {
+ "status": 403, "message": "Federation Denied",
+ }
+
+ # include ConnectionRefused and other errors
+ #
+ # Note that some Exceptions (notably twisted's ResponseFailed etc) don't
+ # give a string for e.message, which simplejson then fails to serialize.
+ return {
+ "status": 503, "message": str(e.message),
+ }
+
+
def _one_time_keys_match(old_key_json, new_key):
old_key = json.loads(old_key_json)
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index d3685fb12a..8bc642675f 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -48,6 +48,7 @@ class EventStreamHandler(BaseHandler):
self.notifier = hs.get_notifier()
self.state = hs.get_state_handler()
+ self._server_notices_sender = hs.get_server_notices_sender()
@defer.inlineCallbacks
@log_function
@@ -58,6 +59,10 @@ class EventStreamHandler(BaseHandler):
If `only_keys` is not None, events from keys will be sent down.
"""
+
+ # send any outstanding server notices to the user.
+ yield self._server_notices_sender.on_user_syncing(auth_user_id)
+
auth_user = UserID.from_string(auth_user_id)
presence_handler = self.hs.get_presence_handler()
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 080aca3d71..87c0615820 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -15,8 +15,17 @@
# limitations under the License.
"""Contains handlers for federation events."""
+
+import itertools
+import logging
+import sys
+
from signedjson.key import decode_verify_key_bytes
from signedjson.sign import verify_signed_json
+import six
+from six.moves import http_client
+from six import iteritems
+from twisted.internet import defer
from unpaddedbase64 import decode_base64
from ._base import BaseHandler
@@ -43,10 +52,6 @@ from synapse.util.retryutils import NotRetryingDestination
from synapse.util.distributor import user_joined_room
-from twisted.internet import defer
-
-import itertools
-import logging
logger = logging.getLogger(__name__)
@@ -77,6 +82,7 @@ class FederationHandler(BaseHandler):
self.pusher_pool = hs.get_pusherpool()
self.spam_checker = hs.get_spam_checker()
self.event_creation_handler = hs.get_event_creation_handler()
+ self._server_notices_mxid = hs.config.server_notices_mxid
# When joining a room we need to queue any events for that room up
self.room_queues = {}
@@ -115,6 +121,19 @@ class FederationHandler(BaseHandler):
logger.debug("Already seen pdu %s", pdu.event_id)
return
+ # do some initial sanity-checking of the event. In particular, make
+ # sure it doesn't have hundreds of prev_events or auth_events, which
+ # could cause a huge state resolution or cascade of event fetches.
+ try:
+ self._sanity_check_event(pdu)
+ except SynapseError as err:
+ raise FederationError(
+ "ERROR",
+ err.code,
+ err.msg,
+ affected=pdu.event_id,
+ )
+
# If we are currently in the process of joining this room, then we
# queue up events for later processing.
if pdu.room_id in self.room_queues:
@@ -149,10 +168,6 @@ class FederationHandler(BaseHandler):
auth_chain = []
- have_seen = yield self.store.have_events(
- [ev for ev, _ in pdu.prev_events]
- )
-
fetch_state = False
# Get missing pdus if necessary.
@@ -168,7 +183,7 @@ class FederationHandler(BaseHandler):
)
prevs = {e_id for e_id, _ in pdu.prev_events}
- seen = set(have_seen.keys())
+ seen = yield self.store.have_seen_events(prevs)
if min_depth and pdu.depth < min_depth:
# This is so that we don't notify the user about this
@@ -196,8 +211,7 @@ class FederationHandler(BaseHandler):
# Update the set of things we've seen after trying to
# fetch the missing stuff
- have_seen = yield self.store.have_events(prevs)
- seen = set(have_seen.iterkeys())
+ seen = yield self.store.have_seen_events(prevs)
if not prevs - seen:
logger.info(
@@ -248,8 +262,7 @@ class FederationHandler(BaseHandler):
min_depth (int): Minimum depth of events to return.
"""
# We recalculate seen, since it may have changed.
- have_seen = yield self.store.have_events(prevs)
- seen = set(have_seen.keys())
+ seen = yield self.store.have_seen_events(prevs)
if not prevs - seen:
return
@@ -361,9 +374,7 @@ class FederationHandler(BaseHandler):
if auth_chain:
event_ids |= {e.event_id for e in auth_chain}
- seen_ids = set(
- (yield self.store.have_events(event_ids)).keys()
- )
+ seen_ids = yield self.store.have_seen_events(event_ids)
if state and auth_chain is not None:
# If we have any state or auth_chain given to us by the replication
@@ -469,18 +480,18 @@ class FederationHandler(BaseHandler):
# to get all state ids that we're interested in.
event_map = yield self.store.get_events([
e_id
- for key_to_eid in event_to_state_ids.values()
- for key, e_id in key_to_eid.items()
+ for key_to_eid in event_to_state_ids.itervalues()
+ for key, e_id in key_to_eid.iteritems()
if key[0] != EventTypes.Member or check_match(key[1])
])
event_to_state = {
e_id: {
key: event_map[inner_e_id]
- for key, inner_e_id in key_to_eid.items()
+ for key, inner_e_id in key_to_eid.iteritems()
if inner_e_id in event_map
}
- for e_id, key_to_eid in event_to_state_ids.items()
+ for e_id, key_to_eid in event_to_state_ids.iteritems()
}
def redact_disallowed(event, state):
@@ -495,7 +506,7 @@ class FederationHandler(BaseHandler):
# membership states for the requesting server to determine
# if the server is either in the room or has been invited
# into the room.
- for ev in state.values():
+ for ev in state.itervalues():
if ev.type != EventTypes.Member:
continue
try:
@@ -527,9 +538,16 @@ class FederationHandler(BaseHandler):
def backfill(self, dest, room_id, limit, extremities):
""" Trigger a backfill request to `dest` for the given `room_id`
- This will attempt to get more events from the remote. This may return
- be successfull and still return no events if the other side has no new
- events to offer.
+ This will attempt to get more events from the remote. If the other side
+ has no new events to offer, this will return an empty list.
+
+ As the events are received, we check their signatures, and also do some
+ sanity-checking on them. If any of the backfilled events are invalid,
+ this method throws a SynapseError.
+
+ TODO: make this more useful to distinguish failures of the remote
+ server from invalid events (there is probably no point in trying to
+ re-fetch invalid events from every other HS in the room.)
"""
if dest == self.server_name:
raise SynapseError(400, "Can't backfill from self.")
@@ -541,6 +559,16 @@ class FederationHandler(BaseHandler):
extremities=extremities,
)
+ # ideally we'd sanity check the events here for excess prev_events etc,
+ # but it's hard to reject events at this point without completely
+ # breaking backfill in the same way that it is currently broken by
+ # events whose signature we cannot verify (#3121).
+ #
+ # So for now we accept the events anyway. #3124 tracks this.
+ #
+ # for ev in events:
+ # self._sanity_check_event(ev)
+
# Don't bother processing events we already have.
seen_events = yield self.store.have_events_in_timeline(
set(e.event_id for e in events)
@@ -613,7 +641,8 @@ class FederationHandler(BaseHandler):
results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
- logcontext.preserve_fn(self.replication_layer.get_pdu)(
+ logcontext.run_in_background(
+ self.replication_layer.get_pdu,
[dest],
event_id,
outlier=True,
@@ -633,7 +662,7 @@ class FederationHandler(BaseHandler):
failed_to_fetch = missing_auth - set(auth_events)
- seen_events = yield self.store.have_events(
+ seen_events = yield self.store.have_seen_events(
set(auth_events.keys()) | set(state_events.keys())
)
@@ -723,9 +752,19 @@ class FederationHandler(BaseHandler):
curr_state = yield self.state_handler.get_current_state(room_id)
def get_domains_from_state(state):
+ """Get joined domains from state
+
+ Args:
+ state (dict[tuple, FrozenEvent]): State map from type/state
+ key to event.
+
+ Returns:
+ list[tuple[str, int]]: Returns a list of servers with the
+ lowest depth of their joins. Sorted by lowest depth first.
+ """
joined_users = [
(state_key, int(event.depth))
- for (e_type, state_key), event in state.items()
+ for (e_type, state_key), event in state.iteritems()
if e_type == EventTypes.Member
and event.membership == Membership.JOIN
]
@@ -742,7 +781,7 @@ class FederationHandler(BaseHandler):
except Exception:
pass
- return sorted(joined_domains.items(), key=lambda d: d[1])
+ return sorted(joined_domains.iteritems(), key=lambda d: d[1])
curr_domains = get_domains_from_state(curr_state)
@@ -759,7 +798,7 @@ class FederationHandler(BaseHandler):
yield self.backfill(
dom, room_id,
limit=100,
- extremities=[e for e in extremities.keys()]
+ extremities=extremities,
)
# If this succeeded then we probably already have the
# appropriate stuff.
@@ -805,7 +844,7 @@ class FederationHandler(BaseHandler):
tried_domains = set(likely_domains)
tried_domains.add(self.server_name)
- event_ids = list(extremities.keys())
+ event_ids = list(extremities.iterkeys())
logger.debug("calling resolve_state_groups in _maybe_backfill")
resolve = logcontext.preserve_fn(
@@ -815,34 +854,69 @@ class FederationHandler(BaseHandler):
[resolve(room_id, [e]) for e in event_ids],
consumeErrors=True,
))
+
+ # dict[str, dict[tuple, str]], a map from event_id to state map of
+ # event_ids.
states = dict(zip(event_ids, [s.state for s in states]))
state_map = yield self.store.get_events(
- [e_id for ids in states.values() for e_id in ids],
+ [e_id for ids in states.itervalues() for e_id in ids.itervalues()],
get_prev_content=False
)
states = {
key: {
k: state_map[e_id]
- for k, e_id in state_dict.items()
+ for k, e_id in state_dict.iteritems()
if e_id in state_map
- } for key, state_dict in states.items()
+ } for key, state_dict in states.iteritems()
}
for e_id, _ in sorted_extremeties_tuple:
likely_domains = get_domains_from_state(states[e_id])
success = yield try_backfill([
- dom for dom in likely_domains
+ dom for dom, _ in likely_domains
if dom not in tried_domains
])
if success:
defer.returnValue(True)
- tried_domains.update(likely_domains)
+ tried_domains.update(dom for dom, _ in likely_domains)
defer.returnValue(False)
+ def _sanity_check_event(self, ev):
+ """
+ Do some early sanity checks of a received event
+
+ In particular, checks it doesn't have an excessive number of
+ prev_events or auth_events, which could cause a huge state resolution
+ or cascade of event fetches.
+
+ Args:
+ ev (synapse.events.EventBase): event to be checked
+
+ Returns: None
+
+ Raises:
+ SynapseError if the event does not pass muster
+ """
+ if len(ev.prev_events) > 20:
+ logger.warn("Rejecting event %s which has %i prev_events",
+ ev.event_id, len(ev.prev_events))
+ raise SynapseError(
+ http_client.BAD_REQUEST,
+ "Too many prev_events",
+ )
+
+ if len(ev.auth_events) > 10:
+ logger.warn("Rejecting event %s which has %i auth_events",
+ ev.event_id, len(ev.auth_events))
+ raise SynapseError(
+ http_client.BAD_REQUEST,
+ "Too many auth_events",
+ )
+
@defer.inlineCallbacks
def send_invite(self, target_host, event):
""" Sends the invite to the remote server for signing.
@@ -967,7 +1041,7 @@ class FederationHandler(BaseHandler):
# lots of requests for missing prev_events which we do actually
# have. Hence we fire off the deferred, but don't wait for it.
- logcontext.preserve_fn(self._handle_queued_pdus)(room_queue)
+ logcontext.run_in_background(self._handle_queued_pdus, room_queue)
defer.returnValue(True)
@@ -1121,6 +1195,13 @@ class FederationHandler(BaseHandler):
if not self.is_mine_id(event.state_key):
raise SynapseError(400, "The invite event must be for this server")
+ # block any attempts to invite the server notices mxid
+ if event.state_key == self._server_notices_mxid:
+ raise SynapseError(
+ http_client.FORBIDDEN,
+ "Cannot invite this user",
+ )
+
event.internal_metadata.outlier = True
event.internal_metadata.invite_from_remote = True
@@ -1308,7 +1389,7 @@ class FederationHandler(BaseHandler):
)
if state_groups:
- _, state = state_groups.items().pop()
+ _, state = list(iteritems(state_groups)).pop()
results = {
(e.type, e.state_key): e for e in state
}
@@ -1457,18 +1538,21 @@ class FederationHandler(BaseHandler):
backfilled=backfilled,
)
except: # noqa: E722, as we reraise the exception this is fine.
- # Ensure that we actually remove the entries in the push actions
- # staging area
- logcontext.preserve_fn(
- self.store.remove_push_actions_from_staging
- )(event.event_id)
- raise
+ tp, value, tb = sys.exc_info()
+
+ logcontext.run_in_background(
+ self.store.remove_push_actions_from_staging,
+ event.event_id,
+ )
+
+ six.reraise(tp, value, tb)
if not backfilled:
# this intentionally does not yield: we don't care about the result
# and don't need to wait for it.
- logcontext.preserve_fn(self.pusher_pool.on_new_notifications)(
- event_stream_id, max_stream_id
+ logcontext.run_in_background(
+ self.pusher_pool.on_new_notifications,
+ event_stream_id, max_stream_id,
)
defer.returnValue((context, event_stream_id, max_stream_id))
@@ -1482,7 +1566,8 @@ class FederationHandler(BaseHandler):
"""
contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
- logcontext.preserve_fn(self._prep_event)(
+ logcontext.run_in_background(
+ self._prep_event,
origin,
ev_info["event"],
state=ev_info.get("state"),
@@ -1736,7 +1821,8 @@ class FederationHandler(BaseHandler):
event_key = None
if event_auth_events - current_state:
- have_events = yield self.store.have_events(
+ # TODO: can we use store.have_seen_events here instead?
+ have_events = yield self.store.get_seen_events_with_rejections(
event_auth_events - current_state
)
else:
@@ -1759,12 +1845,12 @@ class FederationHandler(BaseHandler):
origin, event.room_id, event.event_id
)
- seen_remotes = yield self.store.have_events(
+ seen_remotes = yield self.store.have_seen_events(
[e.event_id for e in remote_auth_chain]
)
for e in remote_auth_chain:
- if e.event_id in seen_remotes.keys():
+ if e.event_id in seen_remotes:
continue
if e.event_id == event.event_id:
@@ -1791,7 +1877,7 @@ class FederationHandler(BaseHandler):
except AuthError:
pass
- have_events = yield self.store.have_events(
+ have_events = yield self.store.get_seen_events_with_rejections(
[e_id for e_id, _ in event.auth_events]
)
seen_events = set(have_events.keys())
@@ -1810,7 +1896,8 @@ class FederationHandler(BaseHandler):
different_events = yield logcontext.make_deferred_yieldable(
defer.gatherResults([
- logcontext.preserve_fn(self.store.get_event)(
+ logcontext.run_in_background(
+ self.store.get_event,
d,
allow_none=True,
allow_rejected=False,
@@ -1876,13 +1963,13 @@ class FederationHandler(BaseHandler):
local_auth_chain,
)
- seen_remotes = yield self.store.have_events(
+ seen_remotes = yield self.store.have_seen_events(
[e.event_id for e in result["auth_chain"]]
)
# 3. Process any remote auth chain events we haven't seen.
for ev in result["auth_chain"]:
- if ev.event_id in seen_remotes.keys():
+ if ev.event_id in seen_remotes:
continue
if ev.event_id == event.event_id:
@@ -1948,7 +2035,7 @@ class FederationHandler(BaseHandler):
this will not be included in the current_state in the context.
"""
state_updates = {
- k: a.event_id for k, a in auth_events.iteritems()
+ k: a.event_id for k, a in iteritems(auth_events)
if k != event_key
}
context.current_state_ids = dict(context.current_state_ids)
@@ -1958,7 +2045,7 @@ class FederationHandler(BaseHandler):
context.delta_ids.update(state_updates)
context.prev_state_ids = dict(context.prev_state_ids)
context.prev_state_ids.update({
- k: a.event_id for k, a in auth_events.iteritems()
+ k: a.event_id for k, a in iteritems(auth_events)
})
context.state_group = yield self.store.store_state_group(
event.event_id,
@@ -2010,7 +2097,7 @@ class FederationHandler(BaseHandler):
def get_next(it, opt=None):
try:
- return it.next()
+ return next(it)
except Exception:
return opt
diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py
index e4d0cc8b02..dcae083734 100644
--- a/synapse/handlers/groups_local.py
+++ b/synapse/handlers/groups_local.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2017 Vector Creations Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,6 +15,7 @@
# limitations under the License.
from twisted.internet import defer
+from six import iteritems
from synapse.api.errors import SynapseError
from synapse.types import get_domain_from_id
@@ -90,6 +92,8 @@ class GroupsLocalHandler(object):
get_group_role = _create_rerouter("get_group_role")
get_group_roles = _create_rerouter("get_group_roles")
+ set_group_join_policy = _create_rerouter("set_group_join_policy")
+
@defer.inlineCallbacks
def get_group_summary(self, group_id, requester_user_id):
"""Get the group summary for a group.
@@ -226,7 +230,45 @@ class GroupsLocalHandler(object):
def join_group(self, group_id, user_id, content):
"""Request to join a group
"""
- raise NotImplementedError() # TODO
+ if self.is_mine_id(group_id):
+ yield self.groups_server_handler.join_group(
+ group_id, user_id, content
+ )
+ local_attestation = None
+ remote_attestation = None
+ else:
+ local_attestation = self.attestations.create_attestation(group_id, user_id)
+ content["attestation"] = local_attestation
+
+ res = yield self.transport_client.join_group(
+ get_domain_from_id(group_id), group_id, user_id, content,
+ )
+
+ remote_attestation = res["attestation"]
+
+ yield self.attestations.verify_attestation(
+ remote_attestation,
+ group_id=group_id,
+ user_id=user_id,
+ server_name=get_domain_from_id(group_id),
+ )
+
+ # TODO: Check that the group is public and we're being added publically
+ is_publicised = content.get("publicise", False)
+
+ token = yield self.store.register_user_group_membership(
+ group_id, user_id,
+ membership="join",
+ is_admin=False,
+ local_attestation=local_attestation,
+ remote_attestation=remote_attestation,
+ is_publicised=is_publicised,
+ )
+ self.notifier.on_new_event(
+ "groups_key", token, users=[user_id],
+ )
+
+ defer.returnValue({})
@defer.inlineCallbacks
def accept_invite(self, group_id, user_id, content):
@@ -408,7 +450,7 @@ class GroupsLocalHandler(object):
results = {}
failed_results = []
- for destination, dest_user_ids in destinations.iteritems():
+ for destination, dest_user_ids in iteritems(destinations):
try:
r = yield self.transport_client.bulk_get_publicised_groups(
destination, list(dest_user_ids),
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 9efcdff1d6..91a0898860 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -15,6 +15,11 @@
# limitations under the License.
"""Utilities for interacting with Identity Servers"""
+
+import logging
+
+import simplejson as json
+
from twisted.internet import defer
from synapse.api.errors import (
@@ -24,9 +29,6 @@ from ._base import BaseHandler
from synapse.util.async import run_on_reactor
from synapse.api.errors import SynapseError, Codes
-import json
-import logging
-
logger = logging.getLogger(__name__)
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index c5267b4b84..71af86fe21 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -27,7 +27,7 @@ from synapse.types import (
from synapse.util import unwrapFirstError
from synapse.util.async import concurrently_execute
from synapse.util.caches.snapshot_cache import SnapshotCache
-from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
+from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.visibility import filter_events_for_client
from ._base import BaseHandler
@@ -166,7 +166,8 @@ class InitialSyncHandler(BaseHandler):
(messages, token), current_state = yield make_deferred_yieldable(
defer.gatherResults(
[
- preserve_fn(self.store.get_recent_events_for_room)(
+ run_in_background(
+ self.store.get_recent_events_for_room,
event.room_id,
limit=limit,
end_token=room_end_token,
@@ -180,8 +181,8 @@ class InitialSyncHandler(BaseHandler):
self.store, user_id, messages
)
- start_token = now_token.copy_and_replace("room_key", token[0])
- end_token = now_token.copy_and_replace("room_key", token[1])
+ start_token = now_token.copy_and_replace("room_key", token)
+ end_token = now_token.copy_and_replace("room_key", room_end_token)
time_now = self.clock.time_msec()
d["messages"] = {
@@ -324,8 +325,8 @@ class InitialSyncHandler(BaseHandler):
self.store, user_id, messages, is_peeking=is_peeking
)
- start_token = StreamToken.START.copy_and_replace("room_key", token[0])
- end_token = StreamToken.START.copy_and_replace("room_key", token[1])
+ start_token = StreamToken.START.copy_and_replace("room_key", token)
+ end_token = StreamToken.START.copy_and_replace("room_key", stream_token)
time_now = self.clock.time_msec()
@@ -391,9 +392,10 @@ class InitialSyncHandler(BaseHandler):
presence, receipts, (messages, token) = yield defer.gatherResults(
[
- preserve_fn(get_presence)(),
- preserve_fn(get_receipts)(),
- preserve_fn(self.store.get_recent_events_for_room)(
+ run_in_background(get_presence),
+ run_in_background(get_receipts),
+ run_in_background(
+ self.store.get_recent_events_for_room,
room_id,
limit=limit,
end_token=now_token.room_key,
@@ -406,8 +408,8 @@ class InitialSyncHandler(BaseHandler):
self.store, user_id, messages, is_peeking=is_peeking,
)
- start_token = now_token.copy_and_replace("room_key", token[0])
- end_token = now_token.copy_and_replace("room_key", token[1])
+ start_token = now_token.copy_and_replace("room_key", token)
+ end_token = now_token
time_now = self.clock.time_msec()
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 4f97c8db79..81cff0870e 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -13,11 +13,23 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
+import simplejson
+import sys
+
+from canonicaljson import encode_canonical_json
+import six
+from six import string_types, itervalues, iteritems
from twisted.internet import defer, reactor
+from twisted.internet.defer import succeed
from twisted.python.failure import Failure
-from synapse.api.constants import EventTypes, Membership
-from synapse.api.errors import AuthError, Codes, SynapseError
+from synapse.api.constants import EventTypes, Membership, MAX_DEPTH
+from synapse.api.errors import (
+ AuthError, Codes, SynapseError,
+ ConsentNotGivenError,
+)
+from synapse.api.urls import ConsentURIBuilder
from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
@@ -25,21 +37,15 @@ from synapse.types import (
UserID, RoomAlias, RoomStreamToken,
)
from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter
-from synapse.util.logcontext import preserve_fn, run_in_background
+from synapse.util.logcontext import run_in_background
from synapse.util.metrics import measure_func
-from synapse.util.frozenutils import unfreeze
+from synapse.util.frozenutils import frozendict_json_encoder
from synapse.util.stringutils import random_string
from synapse.visibility import filter_events_for_client
from synapse.replication.http.send_event import send_event_to_master
from ._base import BaseHandler
-from canonicaljson import encode_canonical_json
-
-import logging
-import random
-import ujson
-
logger = logging.getLogger(__name__)
@@ -86,14 +92,14 @@ class MessageHandler(BaseHandler):
# map from purge id to PurgeStatus
self._purges_by_id = {}
- def start_purge_history(self, room_id, topological_ordering,
+ def start_purge_history(self, room_id, token,
delete_local_events=False):
"""Start off a history purge on a room.
Args:
room_id (str): The room to purge from
- topological_ordering (int): minimum topo ordering to preserve
+ token (str): topological token to delete events before
delete_local_events (bool): True to delete local events as well as
remote ones
@@ -115,19 +121,19 @@ class MessageHandler(BaseHandler):
self._purges_by_id[purge_id] = PurgeStatus()
run_in_background(
self._purge_history,
- purge_id, room_id, topological_ordering, delete_local_events,
+ purge_id, room_id, token, delete_local_events,
)
return purge_id
@defer.inlineCallbacks
- def _purge_history(self, purge_id, room_id, topological_ordering,
+ def _purge_history(self, purge_id, room_id, token,
delete_local_events):
"""Carry out a history purge on a room.
Args:
purge_id (str): The id for this purge
room_id (str): The room to purge from
- topological_ordering (int): minimum topo ordering to preserve
+ token (str): topological token to delete events before
delete_local_events (bool): True to delete local events as well as
remote ones
@@ -138,7 +144,7 @@ class MessageHandler(BaseHandler):
try:
with (yield self.pagination_lock.write(room_id)):
yield self.store.purge_history(
- room_id, topological_ordering, delete_local_events,
+ room_id, token, delete_local_events,
)
logger.info("[purge] complete")
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
@@ -397,7 +403,7 @@ class MessageHandler(BaseHandler):
"avatar_url": profile.avatar_url,
"display_name": profile.display_name,
}
- for user_id, profile in users_with_profile.iteritems()
+ for user_id, profile in iteritems(users_with_profile)
})
@@ -431,9 +437,12 @@ class EventCreationHandler(object):
self.spam_checker = hs.get_spam_checker()
+ if self.config.block_events_without_consent_error is not None:
+ self._consent_uri_builder = ConsentURIBuilder(self.config)
+
@defer.inlineCallbacks
def create_event(self, requester, event_dict, token_id=None, txn_id=None,
- prev_event_ids=None):
+ prev_events_and_hashes=None):
"""
Given a dict from a client, create a new event.
@@ -447,50 +456,136 @@ class EventCreationHandler(object):
event_dict (dict): An entire event
token_id (str)
txn_id (str)
- prev_event_ids (list): The prev event ids to use when creating the event
+
+ prev_events_and_hashes (list[(str, dict[str, str], int)]|None):
+ the forward extremities to use as the prev_events for the
+ new event. For each event, a tuple of (event_id, hashes, depth)
+ where *hashes* is a map from algorithm to hash.
+
+ If None, they will be requested from the database.
Returns:
Tuple of created event (FrozenEvent), Context
"""
builder = self.event_builder_factory.new(event_dict)
- with (yield self.limiter.queue(builder.room_id)):
- self.validator.validate_new(builder)
-
- if builder.type == EventTypes.Member:
- membership = builder.content.get("membership", None)
- target = UserID.from_string(builder.state_key)
-
- if membership in {Membership.JOIN, Membership.INVITE}:
- # If event doesn't include a display name, add one.
- profile = self.profile_handler
- content = builder.content
-
- try:
- if "displayname" not in content:
- content["displayname"] = yield profile.get_displayname(target)
- if "avatar_url" not in content:
- content["avatar_url"] = yield profile.get_avatar_url(target)
- except Exception as e:
- logger.info(
- "Failed to get profile information for %r: %s",
- target, e
- )
+ self.validator.validate_new(builder)
+
+ if builder.type == EventTypes.Member:
+ membership = builder.content.get("membership", None)
+ target = UserID.from_string(builder.state_key)
+
+ if membership in {Membership.JOIN, Membership.INVITE}:
+ # If event doesn't include a display name, add one.
+ profile = self.profile_handler
+ content = builder.content
+
+ try:
+ if "displayname" not in content:
+ content["displayname"] = yield profile.get_displayname(target)
+ if "avatar_url" not in content:
+ content["avatar_url"] = yield profile.get_avatar_url(target)
+ except Exception as e:
+ logger.info(
+ "Failed to get profile information for %r: %s",
+ target, e
+ )
- if token_id is not None:
- builder.internal_metadata.token_id = token_id
+ is_exempt = yield self._is_exempt_from_privacy_policy(builder)
+ if not is_exempt:
+ yield self.assert_accepted_privacy_policy(requester)
- if txn_id is not None:
- builder.internal_metadata.txn_id = txn_id
+ if token_id is not None:
+ builder.internal_metadata.token_id = token_id
- event, context = yield self.create_new_client_event(
- builder=builder,
- requester=requester,
- prev_event_ids=prev_event_ids,
- )
+ if txn_id is not None:
+ builder.internal_metadata.txn_id = txn_id
+
+ event, context = yield self.create_new_client_event(
+ builder=builder,
+ requester=requester,
+ prev_events_and_hashes=prev_events_and_hashes,
+ )
defer.returnValue((event, context))
+ def _is_exempt_from_privacy_policy(self, builder):
+ """"Determine if an event to be sent is exempt from having to consent
+ to the privacy policy
+
+ Args:
+ builder (synapse.events.builder.EventBuilder): event being created
+
+ Returns:
+ Deferred[bool]: true if the event can be sent without the user
+ consenting
+ """
+ # the only thing the user can do is join the server notices room.
+ if builder.type == EventTypes.Member:
+ membership = builder.content.get("membership", None)
+ if membership == Membership.JOIN:
+ return self._is_server_notices_room(builder.room_id)
+ return succeed(False)
+
+ @defer.inlineCallbacks
+ def _is_server_notices_room(self, room_id):
+ if self.config.server_notices_mxid is None:
+ defer.returnValue(False)
+ user_ids = yield self.store.get_users_in_room(room_id)
+ defer.returnValue(self.config.server_notices_mxid in user_ids)
+
+ @defer.inlineCallbacks
+ def assert_accepted_privacy_policy(self, requester):
+ """Check if a user has accepted the privacy policy
+
+ Called when the given user is about to do something that requires
+ privacy consent. We see if the user is exempt and otherwise check that
+ they have given consent. If they have not, a ConsentNotGiven error is
+ raised.
+
+ Args:
+ requester (synapse.types.Requester):
+ The user making the request
+
+ Returns:
+ Deferred[None]: returns normally if the user has consented or is
+ exempt
+
+ Raises:
+ ConsentNotGivenError: if the user has not given consent yet
+ """
+ if self.config.block_events_without_consent_error is None:
+ return
+
+ # exempt AS users from needing consent
+ if requester.app_service is not None:
+ return
+
+ user_id = requester.user.to_string()
+
+ # exempt the system notices user
+ if (
+ self.config.server_notices_mxid is not None and
+ user_id == self.config.server_notices_mxid
+ ):
+ return
+
+ u = yield self.store.get_user_by_id(user_id)
+ assert u is not None
+ if u["consent_version"] == self.config.user_consent_version:
+ return
+
+ consent_uri = self._consent_uri_builder.build_user_consent_uri(
+ requester.user.localpart,
+ )
+ msg = self.config.block_events_without_consent_error % {
+ 'consent_uri': consent_uri,
+ }
+ raise ConsentNotGivenError(
+ msg=msg,
+ consent_uri=consent_uri,
+ )
+
@defer.inlineCallbacks
def send_nonmember_event(self, requester, event, context, ratelimit=True):
"""
@@ -557,64 +652,80 @@ class EventCreationHandler(object):
See self.create_event and self.send_nonmember_event.
"""
- event, context = yield self.create_event(
- requester,
- event_dict,
- token_id=requester.access_token_id,
- txn_id=txn_id
- )
- spam_error = self.spam_checker.check_event_for_spam(event)
- if spam_error:
- if not isinstance(spam_error, basestring):
- spam_error = "Spam is not permitted here"
- raise SynapseError(
- 403, spam_error, Codes.FORBIDDEN
+ # We limit the number of concurrent event sends in a room so that we
+ # don't fork the DAG too much. If we don't limit then we can end up in
+ # a situation where event persistence can't keep up, causing
+ # extremities to pile up, which in turn leads to state resolution
+ # taking longer.
+ with (yield self.limiter.queue(event_dict["room_id"])):
+ event, context = yield self.create_event(
+ requester,
+ event_dict,
+ token_id=requester.access_token_id,
+ txn_id=txn_id
)
- yield self.send_nonmember_event(
- requester,
- event,
- context,
- ratelimit=ratelimit,
- )
+ spam_error = self.spam_checker.check_event_for_spam(event)
+ if spam_error:
+ if not isinstance(spam_error, string_types):
+ spam_error = "Spam is not permitted here"
+ raise SynapseError(
+ 403, spam_error, Codes.FORBIDDEN
+ )
+
+ yield self.send_nonmember_event(
+ requester,
+ event,
+ context,
+ ratelimit=ratelimit,
+ )
defer.returnValue(event)
@measure_func("create_new_client_event")
@defer.inlineCallbacks
- def create_new_client_event(self, builder, requester=None, prev_event_ids=None):
- if prev_event_ids:
- prev_events = yield self.store.add_event_hashes(prev_event_ids)
- prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids)
- depth = prev_max_depth + 1
- else:
- latest_ret = yield self.store.get_latest_event_ids_and_hashes_in_room(
- builder.room_id,
- )
+ def create_new_client_event(self, builder, requester=None,
+ prev_events_and_hashes=None):
+ """Create a new event for a local client
- # We want to limit the max number of prev events we point to in our
- # new event
- if len(latest_ret) > 10:
- # Sort by reverse depth, so we point to the most recent.
- latest_ret.sort(key=lambda a: -a[2])
- new_latest_ret = latest_ret[:5]
-
- # We also randomly point to some of the older events, to make
- # sure that we don't completely ignore the older events.
- if latest_ret[5:]:
- sample_size = min(5, len(latest_ret[5:]))
- new_latest_ret.extend(random.sample(latest_ret[5:], sample_size))
- latest_ret = new_latest_ret
-
- if latest_ret:
- depth = max([d for _, _, d in latest_ret]) + 1
- else:
- depth = 1
+ Args:
+ builder (EventBuilder):
+
+ requester (synapse.types.Requester|None):
+
+ prev_events_and_hashes (list[(str, dict[str, str], int)]|None):
+ the forward extremities to use as the prev_events for the
+ new event. For each event, a tuple of (event_id, hashes, depth)
+ where *hashes* is a map from algorithm to hash.
- prev_events = [
- (event_id, prev_hashes)
- for event_id, prev_hashes, _ in latest_ret
- ]
+ If None, they will be requested from the database.
+
+ Returns:
+ Deferred[(synapse.events.EventBase, synapse.events.snapshot.EventContext)]
+ """
+
+ if prev_events_and_hashes is not None:
+ assert len(prev_events_and_hashes) <= 10, \
+ "Attempting to create an event with %i prev_events" % (
+ len(prev_events_and_hashes),
+ )
+ else:
+ prev_events_and_hashes = \
+ yield self.store.get_prev_events_for_room(builder.room_id)
+
+ if prev_events_and_hashes:
+ depth = max([d for _, _, d in prev_events_and_hashes]) + 1
+ # we cap depth of generated events, to ensure that they are not
+ # rejected by other servers (and so that they can be persisted in
+ # the db)
+ depth = min(depth, MAX_DEPTH)
+ else:
+ depth = 1
+
+ prev_events = [
+ (event_id, prev_hashes)
+ for event_id, prev_hashes, _ in prev_events_and_hashes
+ ]
builder.prev_events = prev_events
builder.depth = depth
@@ -678,8 +789,8 @@ class EventCreationHandler(object):
# Ensure that we can round trip before trying to persist in db
try:
- dump = ujson.dumps(unfreeze(event.content))
- ujson.loads(dump)
+ dump = frozendict_json_encoder.encode(event.content)
+ simplejson.loads(dump)
except Exception:
logger.exception("Failed to encode content: %r", event.content)
raise
@@ -713,8 +824,14 @@ class EventCreationHandler(object):
except: # noqa: E722, as we reraise the exception this is fine.
# Ensure that we actually remove the entries in the push actions
# staging area, if we calculated them.
- preserve_fn(self.store.remove_push_actions_from_staging)(event.event_id)
- raise
+ tp, value, tb = sys.exc_info()
+
+ run_in_background(
+ self.store.remove_push_actions_from_staging,
+ event.event_id,
+ )
+
+ six.reraise(tp, value, tb)
@defer.inlineCallbacks
def persist_and_notify_client_event(
@@ -765,7 +882,7 @@ class EventCreationHandler(object):
state_to_include_ids = [
e_id
- for k, e_id in context.current_state_ids.iteritems()
+ for k, e_id in iteritems(context.current_state_ids)
if k[0] in self.hs.config.room_invite_state_types
or k == (EventTypes.Member, event.sender)
]
@@ -779,7 +896,7 @@ class EventCreationHandler(object):
"content": e.content,
"sender": e.sender,
}
- for e in state_to_include.itervalues()
+ for e in itervalues(state_to_include)
]
invitee = UserID.from_string(event.state_key)
@@ -834,22 +951,33 @@ class EventCreationHandler(object):
# this intentionally does not yield: we don't care about the result
# and don't need to wait for it.
- preserve_fn(self.pusher_pool.on_new_notifications)(
+ run_in_background(
+ self.pusher_pool.on_new_notifications,
event_stream_id, max_stream_id
)
@defer.inlineCallbacks
def _notify():
yield run_on_reactor()
- self.notifier.on_new_room_event(
- event, event_stream_id, max_stream_id,
- extra_users=extra_users
- )
+ try:
+ self.notifier.on_new_room_event(
+ event, event_stream_id, max_stream_id,
+ extra_users=extra_users
+ )
+ except Exception:
+ logger.exception("Error notifying about new room event")
- preserve_fn(_notify)()
+ run_in_background(_notify)
if event.type == EventTypes.Message:
- presence = self.hs.get_presence_handler()
# We don't want to block sending messages on any presence code. This
# matters as sometimes presence code can take a while.
- preserve_fn(presence.bump_presence_active_time)(requester.user)
+ run_in_background(self._bump_active_time, requester.user)
+
+ @defer.inlineCallbacks
+ def _bump_active_time(self, user):
+ try:
+ presence = self.hs.get_presence_handler()
+ yield presence.bump_presence_active_time(user)
+ except Exception:
+ logger.exception("Error bumping presence active time")
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index a5e501897c..26fc0d3ec7 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -25,38 +25,42 @@ The methods that define policy are:
from twisted.internet import defer, reactor
from contextlib import contextmanager
+from six import itervalues, iteritems
+
from synapse.api.errors import SynapseError
from synapse.api.constants import PresenceState
from synapse.storage.presence import UserPresenceState
from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.util.async import Linearizer
-from synapse.util.logcontext import preserve_fn
+from synapse.util.logcontext import run_in_background
from synapse.util.logutils import log_function
from synapse.util.metrics import Measure
from synapse.util.wheel_timer import WheelTimer
from synapse.types import UserID, get_domain_from_id
-import synapse.metrics
+from synapse.metrics import LaterGauge
import logging
+from prometheus_client import Counter
logger = logging.getLogger(__name__)
-metrics = synapse.metrics.get_metrics_for(__name__)
-notified_presence_counter = metrics.register_counter("notified_presence")
-federation_presence_out_counter = metrics.register_counter("federation_presence_out")
-presence_updates_counter = metrics.register_counter("presence_updates")
-timers_fired_counter = metrics.register_counter("timers_fired")
-federation_presence_counter = metrics.register_counter("federation_presence")
-bump_active_time_counter = metrics.register_counter("bump_active_time")
+notified_presence_counter = Counter("synapse_handler_presence_notified_presence", "")
+federation_presence_out_counter = Counter(
+ "synapse_handler_presence_federation_presence_out", "")
+presence_updates_counter = Counter("synapse_handler_presence_presence_updates", "")
+timers_fired_counter = Counter("synapse_handler_presence_timers_fired", "")
+federation_presence_counter = Counter("synapse_handler_presence_federation_presence", "")
+bump_active_time_counter = Counter("synapse_handler_presence_bump_active_time", "")
-get_updates_counter = metrics.register_counter("get_updates", labels=["type"])
+get_updates_counter = Counter("synapse_handler_presence_get_updates", "", ["type"])
-notify_reason_counter = metrics.register_counter("notify_reason", labels=["reason"])
-state_transition_counter = metrics.register_counter(
- "state_transition", labels=["from", "to"]
+notify_reason_counter = Counter(
+ "synapse_handler_presence_notify_reason", "", ["reason"])
+state_transition_counter = Counter(
+ "synapse_handler_presence_state_transition", "", ["from", "to"]
)
@@ -87,6 +91,11 @@ assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER
class PresenceHandler(object):
def __init__(self, hs):
+ """
+
+ Args:
+ hs (synapse.server.HomeServer):
+ """
self.is_mine = hs.is_mine
self.is_mine_id = hs.is_mine_id
self.clock = hs.get_clock()
@@ -94,7 +103,6 @@ class PresenceHandler(object):
self.wheel_timer = WheelTimer()
self.notifier = hs.get_notifier()
self.federation = hs.get_federation_sender()
-
self.state = hs.get_state_handler()
federation_registry = hs.get_federation_registry()
@@ -137,8 +145,9 @@ class PresenceHandler(object):
for state in active_presence
}
- metrics.register_callback(
- "user_to_current_state_size", lambda: len(self.user_to_current_state)
+ LaterGauge(
+ "synapse_handlers_presence_user_to_current_state_size", "", [],
+ lambda: len(self.user_to_current_state)
)
now = self.clock.time_msec()
@@ -208,7 +217,8 @@ class PresenceHandler(object):
60 * 1000,
)
- metrics.register_callback("wheel_timer_size", lambda: len(self.wheel_timer))
+ LaterGauge("synapse_handlers_presence_wheel_timer_size", "", [],
+ lambda: len(self.wheel_timer))
@defer.inlineCallbacks
def _on_shutdown(self):
@@ -255,6 +265,14 @@ class PresenceHandler(object):
logger.info("Finished _persist_unpersisted_changes")
@defer.inlineCallbacks
+ def _update_states_and_catch_exception(self, new_states):
+ try:
+ res = yield self._update_states(new_states)
+ defer.returnValue(res)
+ except Exception:
+ logger.exception("Error updating presence")
+
+ @defer.inlineCallbacks
def _update_states(self, new_states):
"""Updates presence of users. Sets the appropriate timeouts. Pokes
the notifier and federation if and only if the changed presence state
@@ -303,10 +321,10 @@ class PresenceHandler(object):
# TODO: We should probably ensure there are no races hereafter
- presence_updates_counter.inc_by(len(new_states))
+ presence_updates_counter.inc(len(new_states))
if to_notify:
- notified_presence_counter.inc_by(len(to_notify))
+ notified_presence_counter.inc(len(to_notify))
yield self._persist_and_notify(to_notify.values())
self.unpersisted_users_changes |= set(s.user_id for s in new_states)
@@ -317,7 +335,7 @@ class PresenceHandler(object):
if user_id not in to_notify
}
if to_federation_ping:
- federation_presence_out_counter.inc_by(len(to_federation_ping))
+ federation_presence_out_counter.inc(len(to_federation_ping))
self._push_to_remotes(to_federation_ping.values())
@@ -355,7 +373,7 @@ class PresenceHandler(object):
for user_id in users_to_check
]
- timers_fired_counter.inc_by(len(states))
+ timers_fired_counter.inc(len(states))
changes = handle_timeouts(
states,
@@ -364,7 +382,7 @@ class PresenceHandler(object):
now=now,
)
- preserve_fn(self._update_states)(changes)
+ run_in_background(self._update_states_and_catch_exception, changes)
except Exception:
logger.exception("Exception in _handle_timeouts loop")
@@ -422,20 +440,23 @@ class PresenceHandler(object):
@defer.inlineCallbacks
def _end():
- if affect_presence:
+ try:
self.user_to_num_current_syncs[user_id] -= 1
prev_state = yield self.current_state_for_user(user_id)
yield self._update_states([prev_state.copy_and_replace(
last_user_sync_ts=self.clock.time_msec(),
)])
+ except Exception:
+ logger.exception("Error updating presence after sync")
@contextmanager
def _user_syncing():
try:
yield
finally:
- preserve_fn(_end)()
+ if affect_presence:
+ run_in_background(_end)
defer.returnValue(_user_syncing())
@@ -453,61 +474,6 @@ class PresenceHandler(object):
return syncing_user_ids
@defer.inlineCallbacks
- def update_external_syncs(self, process_id, syncing_user_ids):
- """Update the syncing users for an external process
-
- Args:
- process_id(str): An identifier for the process the users are
- syncing against. This allows synapse to process updates
- as user start and stop syncing against a given process.
- syncing_user_ids(set(str)): The set of user_ids that are
- currently syncing on that server.
- """
-
- # Grab the previous list of user_ids that were syncing on that process
- prev_syncing_user_ids = (
- self.external_process_to_current_syncs.get(process_id, set())
- )
- # Grab the current presence state for both the users that are syncing
- # now and the users that were syncing before this update.
- prev_states = yield self.current_state_for_users(
- syncing_user_ids | prev_syncing_user_ids
- )
- updates = []
- time_now_ms = self.clock.time_msec()
-
- # For each new user that is syncing check if we need to mark them as
- # being online.
- for new_user_id in syncing_user_ids - prev_syncing_user_ids:
- prev_state = prev_states[new_user_id]
- if prev_state.state == PresenceState.OFFLINE:
- updates.append(prev_state.copy_and_replace(
- state=PresenceState.ONLINE,
- last_active_ts=time_now_ms,
- last_user_sync_ts=time_now_ms,
- ))
- else:
- updates.append(prev_state.copy_and_replace(
- last_user_sync_ts=time_now_ms,
- ))
-
- # For each user that is still syncing or stopped syncing update the
- # last sync time so that we will correctly apply the grace period when
- # they stop syncing.
- for old_user_id in prev_syncing_user_ids:
- prev_state = prev_states[old_user_id]
- updates.append(prev_state.copy_and_replace(
- last_user_sync_ts=time_now_ms,
- ))
-
- yield self._update_states(updates)
-
- # Update the last updated time for the process. We expire the entries
- # if we don't receive an update in the given timeframe.
- self.external_process_last_updated_ms[process_id] = self.clock.time_msec()
- self.external_process_to_current_syncs[process_id] = syncing_user_ids
-
- @defer.inlineCallbacks
def update_external_syncs_row(self, process_id, user_id, is_syncing, sync_time_msec):
"""Update the syncing users for an external process as a delta.
@@ -570,7 +536,7 @@ class PresenceHandler(object):
prev_state.copy_and_replace(
last_user_sync_ts=time_now_ms,
)
- for prev_state in prev_states.itervalues()
+ for prev_state in itervalues(prev_states)
])
self.external_process_last_updated_ms.pop(process_id, None)
@@ -593,14 +559,14 @@ class PresenceHandler(object):
for user_id in user_ids
}
- missing = [user_id for user_id, state in states.iteritems() if not state]
+ missing = [user_id for user_id, state in iteritems(states) if not state]
if missing:
# There are things not in our in memory cache. Lets pull them out of
# the database.
res = yield self.store.get_presence_for_users(missing)
states.update(res)
- missing = [user_id for user_id, state in states.iteritems() if not state]
+ missing = [user_id for user_id, state in iteritems(states) if not state]
if missing:
new = {
user_id: UserPresenceState.default(user_id)
@@ -696,7 +662,7 @@ class PresenceHandler(object):
updates.append(prev_state.copy_and_replace(**new_fields))
if updates:
- federation_presence_counter.inc_by(len(updates))
+ federation_presence_counter.inc(len(updates))
yield self._update_states(updates)
@defer.inlineCallbacks
@@ -971,28 +937,28 @@ def should_notify(old_state, new_state):
return False
if old_state.status_msg != new_state.status_msg:
- notify_reason_counter.inc("status_msg_change")
+ notify_reason_counter.labels("status_msg_change").inc()
return True
if old_state.state != new_state.state:
- notify_reason_counter.inc("state_change")
- state_transition_counter.inc(old_state.state, new_state.state)
+ notify_reason_counter.labels("state_change").inc()
+ state_transition_counter.labels(old_state.state, new_state.state).inc()
return True
if old_state.state == PresenceState.ONLINE:
if new_state.currently_active != old_state.currently_active:
- notify_reason_counter.inc("current_active_change")
+ notify_reason_counter.labels("current_active_change").inc()
return True
if new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY:
# Only notify about last active bumps if we're not currently acive
if not new_state.currently_active:
- notify_reason_counter.inc("last_active_change_online")
+ notify_reason_counter.labels("last_active_change_online").inc()
return True
elif new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY:
# Always notify for a transition where last active gets bumped.
- notify_reason_counter.inc("last_active_change_not_online")
+ notify_reason_counter.labels("last_active_change_not_online").inc()
return True
return False
@@ -1066,14 +1032,14 @@ class PresenceEventSource(object):
if changed is not None and len(changed) < 500:
# For small deltas, its quicker to get all changes and then
# work out if we share a room or they're in our presence list
- get_updates_counter.inc("stream")
+ get_updates_counter.labels("stream").inc()
for other_user_id in changed:
if other_user_id in users_interested_in:
user_ids_changed.add(other_user_id)
else:
# Too many possible updates. Find all users we can see and check
# if any of them have changed.
- get_updates_counter.inc("full")
+ get_updates_counter.labels("full").inc()
if from_key:
user_ids_changed = stream_change_cache.get_entities_changed(
@@ -1088,7 +1054,7 @@ class PresenceEventSource(object):
defer.returnValue((updates.values(), max_token))
else:
defer.returnValue(([
- s for s in updates.itervalues()
+ s for s in itervalues(updates)
if s.state != PresenceState.OFFLINE
], max_token))
@@ -1345,11 +1311,11 @@ def get_interested_remotes(store, states, state_handler):
# hosts in those rooms.
room_ids_to_states, users_to_states = yield get_interested_parties(store, states)
- for room_id, states in room_ids_to_states.iteritems():
+ for room_id, states in iteritems(room_ids_to_states):
hosts = yield state_handler.get_current_hosts_in_room(room_id)
hosts_and_states.append((hosts, states))
- for user_id, states in users_to_states.iteritems():
+ for user_id, states in iteritems(users_to_states):
host = get_domain_from_id(user_id)
hosts_and_states.append(([host], states))
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 3f215c2b4e..2e0672161c 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -135,37 +135,40 @@ class ReceiptsHandler(BaseHandler):
"""Given a list of receipts, works out which remote servers should be
poked and pokes them.
"""
- # TODO: Some of this stuff should be coallesced.
- for receipt in receipts:
- room_id = receipt["room_id"]
- receipt_type = receipt["receipt_type"]
- user_id = receipt["user_id"]
- event_ids = receipt["event_ids"]
- data = receipt["data"]
-
- users = yield self.state.get_current_user_in_room(room_id)
- remotedomains = set(get_domain_from_id(u) for u in users)
- remotedomains = remotedomains.copy()
- remotedomains.discard(self.server_name)
-
- logger.debug("Sending receipt to: %r", remotedomains)
-
- for domain in remotedomains:
- self.federation.send_edu(
- destination=domain,
- edu_type="m.receipt",
- content={
- room_id: {
- receipt_type: {
- user_id: {
- "event_ids": event_ids,
- "data": data,
+ try:
+ # TODO: Some of this stuff should be coallesced.
+ for receipt in receipts:
+ room_id = receipt["room_id"]
+ receipt_type = receipt["receipt_type"]
+ user_id = receipt["user_id"]
+ event_ids = receipt["event_ids"]
+ data = receipt["data"]
+
+ users = yield self.state.get_current_user_in_room(room_id)
+ remotedomains = set(get_domain_from_id(u) for u in users)
+ remotedomains = remotedomains.copy()
+ remotedomains.discard(self.server_name)
+
+ logger.debug("Sending receipt to: %r", remotedomains)
+
+ for domain in remotedomains:
+ self.federation.send_edu(
+ destination=domain,
+ edu_type="m.receipt",
+ content={
+ room_id: {
+ receipt_type: {
+ user_id: {
+ "event_ids": event_ids,
+ "data": data,
+ }
}
- }
+ },
},
- },
- key=(room_id, receipt_type, user_id),
- )
+ key=(room_id, receipt_type, user_id),
+ )
+ except Exception:
+ logger.exception("Error pushing receipts to remote servers")
@defer.inlineCallbacks
def get_receipts_for_room(self, room_id, to_key):
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index ed5939880a..7e52adda3c 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -23,8 +23,8 @@ from synapse.api.errors import (
)
from synapse.http.client import CaptchaServerHttpClient
from synapse import types
-from synapse.types import UserID
-from synapse.util.async import run_on_reactor
+from synapse.types import UserID, create_requester, RoomID, RoomAlias
+from synapse.util.async import run_on_reactor, Linearizer
from synapse.util.threepids import check_3pid_allowed
from ._base import BaseHandler
@@ -34,6 +34,11 @@ logger = logging.getLogger(__name__)
class RegistrationHandler(BaseHandler):
def __init__(self, hs):
+ """
+
+ Args:
+ hs (synapse.server.HomeServer):
+ """
super(RegistrationHandler, self).__init__(hs)
self.auth = hs.get_auth()
@@ -46,6 +51,11 @@ class RegistrationHandler(BaseHandler):
self.macaroon_gen = hs.get_macaroon_generator()
+ self._generate_user_id_linearizer = Linearizer(
+ name="_generate_user_id_linearizer",
+ )
+ self._server_notices_mxid = hs.config.server_notices_mxid
+
@defer.inlineCallbacks
def check_username(self, localpart, guest_access_token=None,
assigned_user_id=None):
@@ -201,10 +211,17 @@ class RegistrationHandler(BaseHandler):
token = None
attempts += 1
+ # auto-join the user to any rooms we're supposed to dump them into
+ fake_requester = create_requester(user_id)
+ for r in self.hs.config.auto_join_rooms:
+ try:
+ yield self._join_user_to_room(fake_requester, r)
+ except Exception as e:
+ logger.error("Failed to join new user to %r: %r", r, e)
+
# We used to generate default identicons here, but nowadays
# we want clients to generate their own as part of their branding
# rather than there being consistent matrix-wide ones, so we don't.
-
defer.returnValue((user_id, token))
@defer.inlineCallbacks
@@ -327,6 +344,14 @@ class RegistrationHandler(BaseHandler):
yield identity_handler.bind_threepid(c, user_id)
def check_user_id_not_appservice_exclusive(self, user_id, allowed_appservice=None):
+ # don't allow people to register the server notices mxid
+ if self._server_notices_mxid is not None:
+ if user_id == self._server_notices_mxid:
+ raise SynapseError(
+ 400, "This user ID is reserved.",
+ errcode=Codes.EXCLUSIVE
+ )
+
# valid user IDs must not clash with any user ID namespaces claimed by
# application services.
services = self.store.get_app_services()
@@ -345,9 +370,11 @@ class RegistrationHandler(BaseHandler):
@defer.inlineCallbacks
def _generate_user_id(self, reseed=False):
if reseed or self._next_generated_user_id is None:
- self._next_generated_user_id = (
- yield self.store.find_next_generated_user_id_localpart()
- )
+ with (yield self._generate_user_id_linearizer.queue(())):
+ if reseed or self._next_generated_user_id is None:
+ self._next_generated_user_id = (
+ yield self.store.find_next_generated_user_id_localpart()
+ )
id = self._next_generated_user_id
self._next_generated_user_id += 1
@@ -477,3 +504,28 @@ class RegistrationHandler(BaseHandler):
)
defer.returnValue((user_id, access_token))
+
+ @defer.inlineCallbacks
+ def _join_user_to_room(self, requester, room_identifier):
+ room_id = None
+ room_member_handler = self.hs.get_room_member_handler()
+ if RoomID.is_valid(room_identifier):
+ room_id = room_identifier
+ elif RoomAlias.is_valid(room_identifier):
+ room_alias = RoomAlias.from_string(room_identifier)
+ room_id, remote_room_hosts = (
+ yield room_member_handler.lookup_room_alias(room_alias)
+ )
+ room_id = room_id.to_string()
+ else:
+ raise SynapseError(400, "%s was not legal room ID or room alias" % (
+ room_identifier,
+ ))
+
+ yield room_member_handler.update_membership(
+ requester=requester,
+ target=requester.user,
+ room_id=room_id,
+ remote_room_hosts=remote_room_hosts,
+ action="join",
+ )
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 8df8fcbbad..b5850db42f 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -68,14 +68,27 @@ class RoomCreationHandler(BaseHandler):
self.event_creation_handler = hs.get_event_creation_handler()
@defer.inlineCallbacks
- def create_room(self, requester, config, ratelimit=True):
+ def create_room(self, requester, config, ratelimit=True,
+ creator_join_profile=None):
""" Creates a new room.
Args:
- requester (Requester): The user who requested the room creation.
+ requester (synapse.types.Requester):
+ The user who requested the room creation.
config (dict) : A dict of configuration options.
+ ratelimit (bool): set to False to disable the rate limiter
+
+ creator_join_profile (dict|None):
+ Set to override the displayname and avatar for the creating
+ user in this room. If unset, displayname and avatar will be
+ derived from the user's profile. If set, should contain the
+ values to go in the body of the 'join' event (typically
+ `avatar_url` and/or `displayname`.
+
Returns:
- The new room ID.
+ Deferred[dict]:
+ a dict containing the keys `room_id` and, if an alias was
+ requested, `room_alias`.
Raises:
SynapseError if the room ID couldn't be stored, or something went
horribly wrong.
@@ -113,6 +126,10 @@ class RoomCreationHandler(BaseHandler):
except Exception:
raise SynapseError(400, "Invalid user_id: %s" % (i,))
+ yield self.event_creation_handler.assert_accepted_privacy_policy(
+ requester,
+ )
+
invite_3pid_list = config.get("invite_3pid", [])
visibility = config.get("visibility", None)
@@ -176,7 +193,8 @@ class RoomCreationHandler(BaseHandler):
initial_state=initial_state,
creation_content=creation_content,
room_alias=room_alias,
- power_level_content_override=config.get("power_level_content_override", {})
+ power_level_content_override=config.get("power_level_content_override", {}),
+ creator_join_profile=creator_join_profile,
)
if "name" in config:
@@ -256,6 +274,7 @@ class RoomCreationHandler(BaseHandler):
creation_content,
room_alias,
power_level_content_override,
+ creator_join_profile,
):
def create(etype, content, **kwargs):
e = {
@@ -299,6 +318,7 @@ class RoomCreationHandler(BaseHandler):
room_id,
"join",
ratelimit=False,
+ content=creator_join_profile,
)
# We treat the power levels override specially as this needs to be one
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index 5d81f59b44..5757bb7f8a 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -15,12 +15,13 @@
from twisted.internet import defer
+from six.moves import range
+
from ._base import BaseHandler
from synapse.api.constants import (
EventTypes, JoinRules,
)
-from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.util.async import concurrently_execute
from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.util.caches.response_cache import ResponseCache
@@ -44,8 +45,9 @@ EMTPY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None)
class RoomListHandler(BaseHandler):
def __init__(self, hs):
super(RoomListHandler, self).__init__(hs)
- self.response_cache = ResponseCache(hs)
- self.remote_response_cache = ResponseCache(hs, timeout_ms=30 * 1000)
+ self.response_cache = ResponseCache(hs, "room_list")
+ self.remote_response_cache = ResponseCache(hs, "remote_room_list",
+ timeout_ms=30 * 1000)
def get_local_public_room_list(self, limit=None, since_token=None,
search_filter=None,
@@ -77,18 +79,11 @@ class RoomListHandler(BaseHandler):
)
key = (limit, since_token, network_tuple)
- result = self.response_cache.get(key)
- if not result:
- logger.info("No cached result, calculating one.")
- result = self.response_cache.set(
- key,
- preserve_fn(self._get_public_room_list)(
- limit, since_token, network_tuple=network_tuple
- )
- )
- else:
- logger.info("Using cached deferred result.")
- return make_deferred_yieldable(result)
+ return self.response_cache.wrap(
+ key,
+ self._get_public_room_list,
+ limit, since_token, network_tuple=network_tuple,
+ )
@defer.inlineCallbacks
def _get_public_room_list(self, limit=None, since_token=None,
@@ -207,7 +202,7 @@ class RoomListHandler(BaseHandler):
step = len(rooms_to_scan) if len(rooms_to_scan) != 0 else 1
chunk = []
- for i in xrange(0, len(rooms_to_scan), step):
+ for i in range(0, len(rooms_to_scan), step):
batch = rooms_to_scan[i:i + step]
logger.info("Processing %i rooms for result", len(batch))
yield concurrently_execute(
@@ -422,18 +417,14 @@ class RoomListHandler(BaseHandler):
server_name, limit, since_token, include_all_networks,
third_party_instance_id,
)
- result = self.remote_response_cache.get(key)
- if not result:
- result = self.remote_response_cache.set(
- key,
- repl_layer.get_public_rooms(
- server_name, limit=limit, since_token=since_token,
- search_filter=search_filter,
- include_all_networks=include_all_networks,
- third_party_instance_id=third_party_instance_id,
- )
- )
- return result
+ return self.remote_response_cache.wrap(
+ key,
+ repl_layer.get_public_rooms,
+ server_name, limit=limit, since_token=since_token,
+ search_filter=search_filter,
+ include_all_networks=include_all_networks,
+ third_party_instance_id=third_party_instance_id,
+ )
class RoomListNextBatch(namedtuple("RoomListNextBatch", (
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 9977be8831..f930e939e8 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -17,11 +17,14 @@
import abc
import logging
+from six.moves import http_client
+
from signedjson.key import decode_verify_key_bytes
from signedjson.sign import verify_signed_json
from twisted.internet import defer
from unpaddedbase64 import decode_base64
+import synapse.server
import synapse.types
from synapse.api.constants import (
EventTypes, Membership,
@@ -46,6 +49,11 @@ class RoomMemberHandler(object):
__metaclass__ = abc.ABCMeta
def __init__(self, hs):
+ """
+
+ Args:
+ hs (synapse.server.HomeServer):
+ """
self.hs = hs
self.store = hs.get_datastore()
self.auth = hs.get_auth()
@@ -63,6 +71,7 @@ class RoomMemberHandler(object):
self.clock = hs.get_clock()
self.spam_checker = hs.get_spam_checker()
+ self._server_notices_mxid = self.config.server_notices_mxid
@abc.abstractmethod
def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
@@ -149,7 +158,7 @@ class RoomMemberHandler(object):
@defer.inlineCallbacks
def _local_membership_update(
self, requester, target, room_id, membership,
- prev_event_ids,
+ prev_events_and_hashes,
txn_id=None,
ratelimit=True,
content=None,
@@ -175,7 +184,7 @@ class RoomMemberHandler(object):
},
token_id=requester.access_token_id,
txn_id=txn_id,
- prev_event_ids=prev_event_ids,
+ prev_events_and_hashes=prev_events_and_hashes,
)
# Check if this event matches the previous membership event for the user.
@@ -290,11 +299,26 @@ class RoomMemberHandler(object):
if is_blocked:
raise SynapseError(403, "This room has been blocked on this server")
- if effective_membership_state == "invite":
+ if effective_membership_state == Membership.INVITE:
+ # block any attempts to invite the server notices mxid
+ if target.to_string() == self._server_notices_mxid:
+ raise SynapseError(
+ http_client.FORBIDDEN,
+ "Cannot invite this user",
+ )
+
block_invite = False
- is_requester_admin = yield self.auth.is_server_admin(
- requester.user,
- )
+
+ if (self._server_notices_mxid is not None and
+ requester.user.to_string() == self._server_notices_mxid):
+ # allow the server notices mxid to send invites
+ is_requester_admin = True
+
+ else:
+ is_requester_admin = yield self.auth.is_server_admin(
+ requester.user,
+ )
+
if not is_requester_admin:
if self.config.block_non_admin_invites:
logger.info(
@@ -314,7 +338,12 @@ class RoomMemberHandler(object):
403, "Invites have been disabled on this server",
)
- latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
+ prev_events_and_hashes = yield self.store.get_prev_events_for_room(
+ room_id,
+ )
+ latest_event_ids = (
+ event_id for (event_id, _, _) in prev_events_and_hashes
+ )
current_state_ids = yield self.state_handler.get_current_state_ids(
room_id, latest_event_ids=latest_event_ids,
)
@@ -344,6 +373,20 @@ class RoomMemberHandler(object):
if same_sender and same_membership and same_content:
defer.returnValue(old_state)
+ # we don't allow people to reject invites to the server notice
+ # room, but they can leave it once they are joined.
+ if (
+ old_membership == Membership.INVITE and
+ effective_membership_state == Membership.LEAVE
+ ):
+ is_blocked = yield self._is_server_notice_room(room_id)
+ if is_blocked:
+ raise SynapseError(
+ http_client.FORBIDDEN,
+ "You cannot reject this invite",
+ errcode=Codes.CANNOT_LEAVE_SERVER_NOTICE_ROOM,
+ )
+
is_host_in_room = yield self._is_host_in_room(current_state_ids)
if effective_membership_state == Membership.JOIN:
@@ -403,7 +446,7 @@ class RoomMemberHandler(object):
membership=effective_membership_state,
txn_id=txn_id,
ratelimit=ratelimit,
- prev_event_ids=latest_event_ids,
+ prev_events_and_hashes=prev_events_and_hashes,
content=content,
)
defer.returnValue(res)
@@ -839,6 +882,13 @@ class RoomMemberHandler(object):
defer.returnValue(False)
+ @defer.inlineCallbacks
+ def _is_server_notice_room(self, room_id):
+ if self._server_notices_mxid is None:
+ defer.returnValue(False)
+ user_ids = yield self.store.get_users_in_room(room_id)
+ defer.returnValue(self._server_notices_mxid in user_ids)
+
class RoomMemberMasterHandler(RoomMemberHandler):
def __init__(self, hs):
@@ -852,6 +902,14 @@ class RoomMemberMasterHandler(RoomMemberHandler):
def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
"""Implements RoomMemberHandler._remote_join
"""
+ # filter ourselves out of remote_room_hosts: do_invite_join ignores it
+ # and if it is the only entry we'd like to return a 404 rather than a
+ # 500.
+
+ remote_room_hosts = [
+ host for host in remote_room_hosts if host != self.hs.hostname
+ ]
+
if len(remote_room_hosts) == 0:
raise SynapseError(404, "No known servers")
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index f521d22e91..05bf6d46dd 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -16,7 +16,7 @@
from synapse.api.constants import Membership, EventTypes
from synapse.util.async import concurrently_execute
-from synapse.util.logcontext import LoggingContext, make_deferred_yieldable, preserve_fn
+from synapse.util.logcontext import LoggingContext
from synapse.util.metrics import Measure, measure_func
from synapse.util.caches.response_cache import ResponseCache
from synapse.push.clientformat import format_push_rules_for_user
@@ -29,6 +29,8 @@ import collections
import logging
import itertools
+from six import itervalues, iteritems
+
logger = logging.getLogger(__name__)
@@ -53,6 +55,7 @@ class TimelineBatch(collections.namedtuple("TimelineBatch", [
to tell if room needs to be part of the sync result.
"""
return bool(self.events)
+ __bool__ = __nonzero__ # python3
class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
@@ -77,6 +80,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
# nb the notification count does not, er, count: if there's nothing
# else in the result, we don't need to send it.
)
+ __bool__ = __nonzero__ # python3
class ArchivedSyncResult(collections.namedtuple("ArchivedSyncResult", [
@@ -96,6 +100,7 @@ class ArchivedSyncResult(collections.namedtuple("ArchivedSyncResult", [
or self.state
or self.account_data
)
+ __bool__ = __nonzero__ # python3
class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [
@@ -107,6 +112,7 @@ class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [
def __nonzero__(self):
"""Invited rooms should always be reported to the client"""
return True
+ __bool__ = __nonzero__ # python3
class GroupsSyncResult(collections.namedtuple("GroupsSyncResult", [
@@ -118,6 +124,7 @@ class GroupsSyncResult(collections.namedtuple("GroupsSyncResult", [
def __nonzero__(self):
return bool(self.join or self.invite or self.leave)
+ __bool__ = __nonzero__ # python3
class DeviceLists(collections.namedtuple("DeviceLists", [
@@ -128,6 +135,7 @@ class DeviceLists(collections.namedtuple("DeviceLists", [
def __nonzero__(self):
return bool(self.changed or self.left)
+ __bool__ = __nonzero__ # python3
class SyncResult(collections.namedtuple("SyncResult", [
@@ -160,6 +168,7 @@ class SyncResult(collections.namedtuple("SyncResult", [
self.device_lists or
self.groups
)
+ __bool__ = __nonzero__ # python3
class SyncHandler(object):
@@ -170,7 +179,7 @@ class SyncHandler(object):
self.presence_handler = hs.get_presence_handler()
self.event_sources = hs.get_event_sources()
self.clock = hs.get_clock()
- self.response_cache = ResponseCache(hs)
+ self.response_cache = ResponseCache(hs, "sync")
self.state = hs.get_state_handler()
def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
@@ -181,15 +190,11 @@ class SyncHandler(object):
Returns:
A Deferred SyncResult.
"""
- result = self.response_cache.get(sync_config.request_key)
- if not result:
- result = self.response_cache.set(
- sync_config.request_key,
- preserve_fn(self._wait_for_sync_for_user)(
- sync_config, since_token, timeout, full_state
- )
- )
- return make_deferred_yieldable(result)
+ return self.response_cache.wrap(
+ sync_config.request_key,
+ self._wait_for_sync_for_user,
+ sync_config, since_token, timeout, full_state,
+ )
@defer.inlineCallbacks
def _wait_for_sync_for_user(self, sync_config, since_token, timeout,
@@ -273,7 +278,7 @@ class SyncHandler(object):
# result returned by the event source is poor form (it might cache
# the object)
room_id = event["room_id"]
- event_copy = {k: v for (k, v) in event.iteritems()
+ event_copy = {k: v for (k, v) in iteritems(event)
if k != "room_id"}
ephemeral_by_room.setdefault(room_id, []).append(event_copy)
@@ -292,7 +297,7 @@ class SyncHandler(object):
for event in receipts:
room_id = event["room_id"]
# exclude room id, as above
- event_copy = {k: v for (k, v) in event.iteritems()
+ event_copy = {k: v for (k, v) in iteritems(event)
if k != "room_id"}
ephemeral_by_room.setdefault(room_id, []).append(event_copy)
@@ -323,7 +328,7 @@ class SyncHandler(object):
current_state_ids = frozenset()
if any(e.is_state() for e in recents):
current_state_ids = yield self.state.get_current_state_ids(room_id)
- current_state_ids = frozenset(current_state_ids.itervalues())
+ current_state_ids = frozenset(itervalues(current_state_ids))
recents = yield filter_events_for_client(
self.store,
@@ -352,12 +357,24 @@ class SyncHandler(object):
since_key = since_token.room_key
while limited and len(recents) < timeline_limit and max_repeat:
- events, end_key = yield self.store.get_room_events_stream_for_room(
- room_id,
- limit=load_limit + 1,
- from_key=since_key,
- to_key=end_key,
- )
+ # If we have a since_key then we are trying to get any events
+ # that have happened since `since_key` up to `end_key`, so we
+ # can just use `get_room_events_stream_for_room`.
+ # Otherwise, we want to return the last N events in the room
+ # in toplogical ordering.
+ if since_key:
+ events, end_key = yield self.store.get_room_events_stream_for_room(
+ room_id,
+ limit=load_limit + 1,
+ from_key=since_key,
+ to_key=end_key,
+ )
+ else:
+ events, end_key = yield self.store.get_recent_events_for_room(
+ room_id,
+ limit=load_limit + 1,
+ end_token=end_key,
+ )
loaded_recents = sync_config.filter_collection.filter_room_timeline(
events
)
@@ -368,7 +385,7 @@ class SyncHandler(object):
current_state_ids = frozenset()
if any(e.is_state() for e in loaded_recents):
current_state_ids = yield self.state.get_current_state_ids(room_id)
- current_state_ids = frozenset(current_state_ids.itervalues())
+ current_state_ids = frozenset(itervalues(current_state_ids))
loaded_recents = yield filter_events_for_client(
self.store,
@@ -427,7 +444,7 @@ class SyncHandler(object):
Returns:
A Deferred map from ((type, state_key)->Event)
"""
- last_events, token = yield self.store.get_recent_events_for_room(
+ last_events, _ = yield self.store.get_recent_events_for_room(
room_id, end_token=stream_position.room_key, limit=1,
)
@@ -1026,7 +1043,7 @@ class SyncHandler(object):
if since_token:
for joined_sync in sync_result_builder.joined:
it = itertools.chain(
- joined_sync.timeline.events, joined_sync.state.itervalues()
+ joined_sync.timeline.events, itervalues(joined_sync.state)
)
for event in it:
if event.type == EventTypes.Member:
@@ -1104,7 +1121,7 @@ class SyncHandler(object):
newly_left_rooms = []
room_entries = []
invited = []
- for room_id, events in mem_change_events_by_room_id.iteritems():
+ for room_id, events in iteritems(mem_change_events_by_room_id):
non_joins = [e for e in events if e.membership != Membership.JOIN]
has_join = len(non_joins) != len(events)
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 77c0cf146f..5d9736e88f 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -16,7 +16,7 @@
from twisted.internet import defer
from synapse.api.errors import SynapseError, AuthError
-from synapse.util.logcontext import preserve_fn
+from synapse.util.logcontext import run_in_background
from synapse.util.metrics import Measure
from synapse.util.wheel_timer import WheelTimer
from synapse.types import UserID, get_domain_from_id
@@ -97,7 +97,8 @@ class TypingHandler(object):
if self.hs.is_mine_id(member.user_id):
last_fed_poke = self._member_last_federation_poke.get(member, None)
if not last_fed_poke or last_fed_poke + FEDERATION_PING_INTERVAL <= now:
- preserve_fn(self._push_remote)(
+ run_in_background(
+ self._push_remote,
member=member,
typing=True
)
@@ -196,7 +197,7 @@ class TypingHandler(object):
def _push_update(self, member, typing):
if self.hs.is_mine_id(member.user_id):
# Only send updates for changes to our own users.
- preserve_fn(self._push_remote)(member, typing)
+ run_in_background(self._push_remote, member, typing)
self._push_update_local(
member=member,
@@ -205,28 +206,31 @@ class TypingHandler(object):
@defer.inlineCallbacks
def _push_remote(self, member, typing):
- users = yield self.state.get_current_user_in_room(member.room_id)
- self._member_last_federation_poke[member] = self.clock.time_msec()
+ try:
+ users = yield self.state.get_current_user_in_room(member.room_id)
+ self._member_last_federation_poke[member] = self.clock.time_msec()
- now = self.clock.time_msec()
- self.wheel_timer.insert(
- now=now,
- obj=member,
- then=now + FEDERATION_PING_INTERVAL,
- )
+ now = self.clock.time_msec()
+ self.wheel_timer.insert(
+ now=now,
+ obj=member,
+ then=now + FEDERATION_PING_INTERVAL,
+ )
- for domain in set(get_domain_from_id(u) for u in users):
- if domain != self.server_name:
- self.federation.send_edu(
- destination=domain,
- edu_type="m.typing",
- content={
- "room_id": member.room_id,
- "user_id": member.user_id,
- "typing": typing,
- },
- key=member,
- )
+ for domain in set(get_domain_from_id(u) for u in users):
+ if domain != self.server_name:
+ self.federation.send_edu(
+ destination=domain,
+ edu_type="m.typing",
+ content={
+ "room_id": member.room_id,
+ "user_id": member.user_id,
+ "typing": typing,
+ },
+ key=member,
+ )
+ except Exception:
+ logger.exception("Error pushing typing notif to remotes")
@defer.inlineCallbacks
def _recv_edu(self, origin, content):
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index 714f0195c8..a39f0f7343 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -22,6 +22,7 @@ from synapse.util.metrics import Measure
from synapse.util.async import sleep
from synapse.types import get_localpart_from_id
+from six import iteritems
logger = logging.getLogger(__name__)
@@ -123,6 +124,13 @@ class UserDirectoryHandler(object):
)
@defer.inlineCallbacks
+ def handle_user_deactivated(self, user_id):
+ """Called when a user ID is deactivated
+ """
+ yield self.store.remove_from_user_dir(user_id)
+ yield self.store.remove_from_user_in_public_room(user_id)
+
+ @defer.inlineCallbacks
def _unsafe_process(self):
# If self.pos is None then means we haven't fetched it from DB
if self.pos is None:
@@ -403,7 +411,7 @@ class UserDirectoryHandler(object):
if change:
users_with_profile = yield self.state.get_current_user_in_room(room_id)
- for user_id, profile in users_with_profile.iteritems():
+ for user_id, profile in iteritems(users_with_profile):
yield self._handle_new_user(room_id, user_id, profile)
else:
users = yield self.store.get_users_in_public_due_to_room(room_id)
|