diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index e83adc8339..faa5609c0c 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -53,7 +53,20 @@ class BaseHandler(object):
self.event_builder_factory = hs.get_event_builder_factory()
- def ratelimit(self, requester):
+ @defer.inlineCallbacks
+ def ratelimit(self, requester, update=True):
+ """Ratelimits requests.
+
+ Args:
+ requester (Requester)
+ update (bool): Whether to record that a request is being processed.
+ Set to False when doing multiple checks for one request (e.g.
+ to check up front if we would reject the request), and set to
+ True for the last call for a given request.
+
+ Raises:
+ LimitExceededError if the request should be ratelimited
+ """
time_now = self.clock.time()
user_id = requester.user.to_string()
@@ -67,10 +80,25 @@ class BaseHandler(object):
if requester.app_service and not requester.app_service.is_rate_limited():
return
+ # Check if there is a per user override in the DB.
+ override = yield self.store.get_ratelimit_for_user(user_id)
+ if override:
+ # If overriden with a null Hz then ratelimiting has been entirely
+ # disabled for the user
+ if not override.messages_per_second:
+ return
+
+ messages_per_second = override.messages_per_second
+ burst_count = override.burst_count
+ else:
+ messages_per_second = self.hs.config.rc_messages_per_second
+ burst_count = self.hs.config.rc_message_burst_count
+
allowed, time_allowed = self.ratelimiter.send_message(
user_id, time_now,
- msg_rate_hz=self.hs.config.rc_messages_per_second,
- burst_count=self.hs.config.rc_message_burst_count,
+ msg_rate_hz=messages_per_second,
+ burst_count=burst_count,
+ update=update,
)
if not allowed:
raise LimitExceededError(
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index c22f65ce5d..982cda3edf 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -17,6 +17,7 @@ from synapse.api.constants import EventTypes
from synapse.util import stringutils
from synapse.util.async import Linearizer
from synapse.util.caches.expiringcache import ExpiringCache
+from synapse.util.retryutils import NotRetryingDestination
from synapse.util.metrics import measure_func
from synapse.types import get_domain_from_id, RoomStreamToken
from twisted.internet import defer
@@ -425,12 +426,38 @@ class DeviceListEduUpdater(object):
# This can happen since we batch updates
return
+ # Given a list of updates we check if we need to resync. This
+ # happens if we've missed updates.
resync = yield self._need_to_do_resync(user_id, pending_updates)
if resync:
# Fetch all devices for the user.
origin = get_domain_from_id(user_id)
- result = yield self.federation.query_user_devices(origin, user_id)
+ try:
+ result = yield self.federation.query_user_devices(origin, user_id)
+ except NotRetryingDestination:
+ # TODO: Remember that we are now out of sync and try again
+ # later
+ logger.warn(
+ "Failed to handle device list update for %s,"
+ " we're not retrying the remote",
+ user_id,
+ )
+ # We abort on exceptions rather than accepting the update
+ # as otherwise synapse will 'forget' that its device list
+ # is out of date. If we bail then we will retry the resync
+ # next time we get a device list update for this user_id.
+ # This makes it more likely that the device lists will
+ # eventually become consistent.
+ return
+ except Exception:
+ # TODO: Remember that we are now out of sync and try again
+ # later
+ logger.exception(
+ "Failed to handle device list update for %s", user_id
+ )
+ return
+
stream_id = result["stream_id"]
devices = result["devices"]
yield self.store.update_remote_device_list_cache(
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index c2b38d72a9..668a90e495 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -21,7 +21,7 @@ from twisted.internet import defer
from synapse.api.errors import SynapseError, CodeMessageException
from synapse.types import get_domain_from_id
-from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
+from synapse.util.logcontext import preserve_fn, make_deferred_yieldable
from synapse.util.retryutils import NotRetryingDestination
logger = logging.getLogger(__name__)
@@ -145,7 +145,7 @@ class E2eKeysHandler(object):
"status": 503, "message": e.message
}
- yield preserve_context_over_deferred(defer.gatherResults([
+ yield make_deferred_yieldable(defer.gatherResults([
preserve_fn(do_remote_query)(destination)
for destination in remote_queries_not_in_cache
]))
@@ -257,11 +257,21 @@ class E2eKeysHandler(object):
"status": 503, "message": e.message
}
- yield preserve_context_over_deferred(defer.gatherResults([
+ yield make_deferred_yieldable(defer.gatherResults([
preserve_fn(claim_client_keys)(destination)
for destination in remote_queries
]))
+ logger.info(
+ "Claimed one-time-keys: %s",
+ ",".join((
+ "%s for %s:%s" % (key_id, user_id, device_id)
+ for user_id, user_keys in json_result.iteritems()
+ for device_id, device_keys in user_keys.iteritems()
+ for key_id, _ in device_keys.iteritems()
+ )),
+ )
+
defer.returnValue({
"one_time_keys": json_result,
"failures": failures
@@ -288,19 +298,8 @@ class E2eKeysHandler(object):
one_time_keys = keys.get("one_time_keys", None)
if one_time_keys:
- logger.info(
- "Adding %d one_time_keys for device %r for user %r at %d",
- len(one_time_keys), device_id, user_id, time_now
- )
- key_list = []
- for key_id, key_json in one_time_keys.items():
- algorithm, key_id = key_id.split(":")
- key_list.append((
- algorithm, key_id, encode_canonical_json(key_json)
- ))
-
- yield self.store.add_e2e_one_time_keys(
- user_id, device_id, time_now, key_list
+ yield self._upload_one_time_keys_for_user(
+ user_id, device_id, time_now, one_time_keys,
)
# the device should have been registered already, but it may have been
@@ -313,3 +312,58 @@ class E2eKeysHandler(object):
result = yield self.store.count_e2e_one_time_keys(user_id, device_id)
defer.returnValue({"one_time_key_counts": result})
+
+ @defer.inlineCallbacks
+ def _upload_one_time_keys_for_user(self, user_id, device_id, time_now,
+ one_time_keys):
+ logger.info(
+ "Adding one_time_keys %r for device %r for user %r at %d",
+ one_time_keys.keys(), device_id, user_id, time_now,
+ )
+
+ # make a list of (alg, id, key) tuples
+ key_list = []
+ for key_id, key_obj in one_time_keys.items():
+ algorithm, key_id = key_id.split(":")
+ key_list.append((
+ algorithm, key_id, key_obj
+ ))
+
+ # First we check if we have already persisted any of the keys.
+ existing_key_map = yield self.store.get_e2e_one_time_keys(
+ user_id, device_id, [k_id for _, k_id, _ in key_list]
+ )
+
+ new_keys = [] # Keys that we need to insert. (alg, id, json) tuples.
+ for algorithm, key_id, key in key_list:
+ ex_json = existing_key_map.get((algorithm, key_id), None)
+ if ex_json:
+ if not _one_time_keys_match(ex_json, key):
+ raise SynapseError(
+ 400,
+ ("One time key %s:%s already exists. "
+ "Old key: %s; new key: %r") %
+ (algorithm, key_id, ex_json, key)
+ )
+ else:
+ new_keys.append((algorithm, key_id, encode_canonical_json(key)))
+
+ yield self.store.add_e2e_one_time_keys(
+ user_id, device_id, time_now, new_keys
+ )
+
+
+def _one_time_keys_match(old_key_json, new_key):
+ old_key = json.loads(old_key_json)
+
+ # if either is a string rather than an object, they must match exactly
+ if not isinstance(old_key, dict) or not isinstance(new_key, dict):
+ return old_key == new_key
+
+ # otherwise, we strip off the 'signatures' if any, because it's legitimate
+ # for different upload attempts to have different signatures.
+ old_key.pop("signatures", None)
+ new_key_copy = dict(new_key)
+ new_key_copy.pop("signatures", None)
+
+ return old_key == new_key_copy
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 53f9296399..52d97dfbf3 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -28,7 +28,7 @@ from synapse.api.constants import EventTypes, Membership, RejectedReason
from synapse.events.validator import EventValidator
from synapse.util import unwrapFirstError
from synapse.util.logcontext import (
- PreserveLoggingContext, preserve_fn, preserve_context_over_deferred
+ preserve_fn, preserve_context_over_deferred
)
from synapse.util.metrics import measure_func
from synapse.util.logutils import log_function
@@ -172,8 +172,22 @@ class FederationHandler(BaseHandler):
origin, pdu, prevs, min_depth
)
- prevs = {e_id for e_id, _ in pdu.prev_events}
- seen = set(have_seen.keys())
+ # Update the set of things we've seen after trying to
+ # fetch the missing stuff
+ have_seen = yield self.store.have_events(prevs)
+ seen = set(have_seen.iterkeys())
+
+ if not prevs - seen:
+ logger.info(
+ "Found all missing prev events for %s", pdu.event_id
+ )
+ elif prevs - seen:
+ logger.info(
+ "Not fetching %d missing events for room %r,event %s: %r...",
+ len(prevs - seen), pdu.room_id, pdu.event_id,
+ list(prevs - seen)[:5],
+ )
+
if prevs - seen:
logger.info(
"Still missing %d events for room %r: %r...",
@@ -208,19 +222,15 @@ class FederationHandler(BaseHandler):
Args:
origin (str): Origin of the pdu. Will be called to get the missing events
pdu: received pdu
- prevs (str[]): List of event ids which we are missing
+ prevs (set(str)): List of event ids which we are missing
min_depth (int): Minimum depth of events to return.
-
- Returns:
- Deferred<dict(str, str?)>: updated have_seen dictionary
"""
# We recalculate seen, since it may have changed.
have_seen = yield self.store.have_events(prevs)
seen = set(have_seen.keys())
if not prevs - seen:
- # nothing left to do
- defer.returnValue(have_seen)
+ return
latest = yield self.store.get_latest_event_ids_in_room(
pdu.room_id
@@ -232,8 +242,8 @@ class FederationHandler(BaseHandler):
latest |= seen
logger.info(
- "Missing %d events for room %r: %r...",
- len(prevs - seen), pdu.room_id, list(prevs - seen)[:5]
+ "Missing %d events for room %r pdu %s: %r...",
+ len(prevs - seen), pdu.room_id, pdu.event_id, list(prevs - seen)[:5]
)
# XXX: we set timeout to 10s to help workaround
@@ -265,22 +275,23 @@ class FederationHandler(BaseHandler):
timeout=10000,
)
+ logger.info(
+ "Got %d events: %r...",
+ len(missing_events), [e.event_id for e in missing_events[:5]]
+ )
+
# We want to sort these by depth so we process them and
# tell clients about them in order.
missing_events.sort(key=lambda x: x.depth)
for e in missing_events:
+ logger.info("Handling found event %s", e.event_id)
yield self.on_receive_pdu(
origin,
e,
get_missing=False
)
- have_seen = yield self.store.have_events(
- [ev for ev, _ in pdu.prev_events]
- )
- defer.returnValue(have_seen)
-
@log_function
@defer.inlineCallbacks
def _process_received_pdu(self, origin, pdu, state, auth_chain):
@@ -369,13 +380,6 @@ class FederationHandler(BaseHandler):
affected=event.event_id,
)
- # if we're receiving valid events from an origin,
- # it's probably a good idea to mark it as not in retry-state
- # for sending (although this is a bit of a leap)
- retry_timings = yield self.store.get_destination_retry_timings(origin)
- if retry_timings and retry_timings["retry_last_ts"]:
- self.store.set_destination_retry_timings(origin, 0, 0)
-
room = yield self.store.get_room(event.room_id)
if not room:
@@ -394,11 +398,10 @@ class FederationHandler(BaseHandler):
target_user = UserID.from_string(target_user_id)
extra_users.append(target_user)
- with PreserveLoggingContext():
- self.notifier.on_new_room_event(
- event, event_stream_id, max_stream_id,
- extra_users=extra_users
- )
+ self.notifier.on_new_room_event(
+ event, event_stream_id, max_stream_id,
+ extra_users=extra_users
+ )
if event.type == EventTypes.Member:
if event.membership == Membership.JOIN:
@@ -916,11 +919,10 @@ class FederationHandler(BaseHandler):
origin, auth_chain, state, event
)
- with PreserveLoggingContext():
- self.notifier.on_new_room_event(
- event, event_stream_id, max_stream_id,
- extra_users=[joinee]
- )
+ self.notifier.on_new_room_event(
+ event, event_stream_id, max_stream_id,
+ extra_users=[joinee]
+ )
logger.debug("Finished joining %s to %s", joinee, room_id)
finally:
@@ -1035,10 +1037,9 @@ class FederationHandler(BaseHandler):
target_user = UserID.from_string(target_user_id)
extra_users.append(target_user)
- with PreserveLoggingContext():
- self.notifier.on_new_room_event(
- event, event_stream_id, max_stream_id, extra_users=extra_users
- )
+ self.notifier.on_new_room_event(
+ event, event_stream_id, max_stream_id, extra_users=extra_users
+ )
if event.type == EventTypes.Member:
if event.content["membership"] == Membership.JOIN:
@@ -1084,29 +1085,22 @@ class FederationHandler(BaseHandler):
)
target_user = UserID.from_string(event.state_key)
- with PreserveLoggingContext():
- self.notifier.on_new_room_event(
- event, event_stream_id, max_stream_id,
- extra_users=[target_user],
- )
+ self.notifier.on_new_room_event(
+ event, event_stream_id, max_stream_id,
+ extra_users=[target_user],
+ )
defer.returnValue(event)
@defer.inlineCallbacks
def do_remotely_reject_invite(self, target_hosts, room_id, user_id):
- try:
- origin, event = yield self._make_and_verify_event(
- target_hosts,
- room_id,
- user_id,
- "leave"
- )
- event = self._sign_event(event)
- except SynapseError:
- raise
- except CodeMessageException as e:
- logger.warn("Failed to reject invite: %s", e)
- raise SynapseError(500, "Failed to reject invite")
+ origin, event = yield self._make_and_verify_event(
+ target_hosts,
+ room_id,
+ user_id,
+ "leave"
+ )
+ event = self._sign_event(event)
# Try the host that we succesfully called /make_leave/ on first for
# the /send_leave/ request.
@@ -1116,16 +1110,10 @@ class FederationHandler(BaseHandler):
except ValueError:
pass
- try:
- yield self.replication_layer.send_leave(
- target_hosts,
- event
- )
- except SynapseError:
- raise
- except CodeMessageException as e:
- logger.warn("Failed to reject invite: %s", e)
- raise SynapseError(500, "Failed to reject invite")
+ yield self.replication_layer.send_leave(
+ target_hosts,
+ event
+ )
context = yield self.state_handler.compute_event_context(event)
@@ -1246,10 +1234,9 @@ class FederationHandler(BaseHandler):
target_user = UserID.from_string(target_user_id)
extra_users.append(target_user)
- with PreserveLoggingContext():
- self.notifier.on_new_room_event(
- event, event_stream_id, max_stream_id, extra_users=extra_users
- )
+ self.notifier.on_new_room_event(
+ event, event_stream_id, max_stream_id, extra_users=extra_users
+ )
defer.returnValue(None)
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 6a53c5eb47..9efcdff1d6 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -18,7 +18,7 @@
from twisted.internet import defer
from synapse.api.errors import (
- CodeMessageException
+ MatrixCodeMessageException, CodeMessageException
)
from ._base import BaseHandler
from synapse.util.async import run_on_reactor
@@ -90,6 +90,9 @@ class IdentityHandler(BaseHandler):
),
{'sid': creds['sid'], 'client_secret': client_secret}
)
+ except MatrixCodeMessageException as e:
+ logger.info("getValidated3pid failed with Matrix error: %r", e)
+ raise SynapseError(e.code, e.msg, e.errcode)
except CodeMessageException as e:
data = json.loads(e.msg)
@@ -159,6 +162,9 @@ class IdentityHandler(BaseHandler):
params
)
defer.returnValue(data)
+ except MatrixCodeMessageException as e:
+ logger.info("Proxied requestToken failed with Matrix error: %r", e)
+ raise SynapseError(e.code, e.msg, e.errcode)
except CodeMessageException as e:
logger.info("Proxied requestToken failed: %r", e)
raise e
@@ -193,6 +199,9 @@ class IdentityHandler(BaseHandler):
params
)
defer.returnValue(data)
+ except MatrixCodeMessageException as e:
+ logger.info("Proxied requestToken failed with Matrix error: %r", e)
+ raise SynapseError(e.code, e.msg, e.errcode)
except CodeMessageException as e:
logger.info("Proxied requestToken failed: %r", e)
raise e
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 7a498af5a2..196925edad 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -16,7 +16,7 @@
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
-from synapse.api.errors import AuthError, Codes, SynapseError, LimitExceededError
+from synapse.api.errors import AuthError, Codes, SynapseError
from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
@@ -175,7 +175,8 @@ class MessageHandler(BaseHandler):
defer.returnValue(chunk)
@defer.inlineCallbacks
- def create_event(self, event_dict, token_id=None, txn_id=None, prev_event_ids=None):
+ def create_event(self, requester, event_dict, token_id=None, txn_id=None,
+ prev_event_ids=None):
"""
Given a dict from a client, create a new event.
@@ -185,6 +186,7 @@ class MessageHandler(BaseHandler):
Adds display names to Join membership events.
Args:
+ requester
event_dict (dict): An entire event
token_id (str)
txn_id (str)
@@ -226,6 +228,7 @@ class MessageHandler(BaseHandler):
event, context = yield self._create_new_client_event(
builder=builder,
+ requester=requester,
prev_event_ids=prev_event_ids,
)
@@ -251,17 +254,7 @@ class MessageHandler(BaseHandler):
# We check here if we are currently being rate limited, so that we
# don't do unnecessary work. We check again just before we actually
# send the event.
- time_now = self.clock.time()
- allowed, time_allowed = self.ratelimiter.send_message(
- event.sender, time_now,
- msg_rate_hz=self.hs.config.rc_messages_per_second,
- burst_count=self.hs.config.rc_message_burst_count,
- update=False,
- )
- if not allowed:
- raise LimitExceededError(
- retry_after_ms=int(1000 * (time_allowed - time_now)),
- )
+ yield self.ratelimit(requester, update=False)
user = UserID.from_string(event.sender)
@@ -319,6 +312,7 @@ class MessageHandler(BaseHandler):
See self.create_event and self.send_nonmember_event.
"""
event, context = yield self.create_event(
+ requester,
event_dict,
token_id=requester.access_token_id,
txn_id=txn_id
@@ -416,7 +410,7 @@ class MessageHandler(BaseHandler):
@measure_func("_create_new_client_event")
@defer.inlineCallbacks
- def _create_new_client_event(self, builder, prev_event_ids=None):
+ def _create_new_client_event(self, builder, requester=None, prev_event_ids=None):
if prev_event_ids:
prev_events = yield self.store.add_event_hashes(prev_event_ids)
prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids)
@@ -456,6 +450,8 @@ class MessageHandler(BaseHandler):
state_handler = self.state_handler
context = yield state_handler.compute_event_context(builder)
+ if requester:
+ context.app_service = requester.app_service
if builder.is_state():
builder.prev_state = yield self.store.add_event_hashes(
@@ -493,7 +489,7 @@ class MessageHandler(BaseHandler):
# We now need to go and hit out to wherever we need to hit out to.
if ratelimit:
- self.ratelimit(requester)
+ yield self.ratelimit(requester)
try:
yield self.auth.check_from_context(event, context)
@@ -531,9 +527,9 @@ class MessageHandler(BaseHandler):
state_to_include_ids = [
e_id
- for k, e_id in context.current_state_ids.items()
+ for k, e_id in context.current_state_ids.iteritems()
if k[0] in self.hs.config.room_invite_state_types
- or k[0] == EventTypes.Member and k[1] == event.sender
+ or k == (EventTypes.Member, event.sender)
]
state_to_include = yield self.store.get_events(state_to_include_ids)
@@ -545,7 +541,7 @@ class MessageHandler(BaseHandler):
"content": e.content,
"sender": e.sender,
}
- for e in state_to_include.values()
+ for e in state_to_include.itervalues()
]
invitee = UserID.from_string(event.state_key)
@@ -612,12 +608,9 @@ class MessageHandler(BaseHandler):
@defer.inlineCallbacks
def _notify():
yield run_on_reactor()
- yield self.notifier.on_new_room_event(
+ self.notifier.on_new_room_event(
event, event_stream_id, max_stream_id,
extra_users=extra_users
)
preserve_fn(_notify)()
-
- # If invite, remove room_state from unsigned before sending.
- event.unsigned.pop("invite_room_state", None)
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 1ede117c79..c7c0b0a1e2 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -30,6 +30,7 @@ from synapse.api.constants import PresenceState
from synapse.storage.presence import UserPresenceState
from synapse.util.caches.descriptors import cachedInlineCallbacks
+from synapse.util.async import Linearizer
from synapse.util.logcontext import preserve_fn
from synapse.util.logutils import log_function
from synapse.util.metrics import Measure
@@ -187,6 +188,7 @@ class PresenceHandler(object):
# process_id to millisecond timestamp last updated.
self.external_process_to_current_syncs = {}
self.external_process_last_updated_ms = {}
+ self.external_sync_linearizer = Linearizer(name="external_sync_linearizer")
# Start a LoopingCall in 30s that fires every 5s.
# The initial delay is to allow disconnected clients a chance to
@@ -316,11 +318,7 @@ class PresenceHandler(object):
if to_federation_ping:
federation_presence_out_counter.inc_by(len(to_federation_ping))
- _, _, hosts_to_states = yield self._get_interested_parties(
- to_federation_ping.values()
- )
-
- self._push_to_remotes(hosts_to_states)
+ self._push_to_remotes(to_federation_ping.values())
def _handle_timeouts(self):
"""Checks the presence of users that have timed out and updates as
@@ -509,6 +507,73 @@ class PresenceHandler(object):
self.external_process_to_current_syncs[process_id] = syncing_user_ids
@defer.inlineCallbacks
+ def update_external_syncs_row(self, process_id, user_id, is_syncing, sync_time_msec):
+ """Update the syncing users for an external process as a delta.
+
+ Args:
+ process_id (str): An identifier for the process the users are
+ syncing against. This allows synapse to process updates
+ as user start and stop syncing against a given process.
+ user_id (str): The user who has started or stopped syncing
+ is_syncing (bool): Whether or not the user is now syncing
+ sync_time_msec(int): Time in ms when the user was last syncing
+ """
+ with (yield self.external_sync_linearizer.queue(process_id)):
+ prev_state = yield self.current_state_for_user(user_id)
+
+ process_presence = self.external_process_to_current_syncs.setdefault(
+ process_id, set()
+ )
+
+ updates = []
+ if is_syncing and user_id not in process_presence:
+ if prev_state.state == PresenceState.OFFLINE:
+ updates.append(prev_state.copy_and_replace(
+ state=PresenceState.ONLINE,
+ last_active_ts=sync_time_msec,
+ last_user_sync_ts=sync_time_msec,
+ ))
+ else:
+ updates.append(prev_state.copy_and_replace(
+ last_user_sync_ts=sync_time_msec,
+ ))
+ process_presence.add(user_id)
+ elif user_id in process_presence:
+ updates.append(prev_state.copy_and_replace(
+ last_user_sync_ts=sync_time_msec,
+ ))
+
+ if not is_syncing:
+ process_presence.discard(user_id)
+
+ if updates:
+ yield self._update_states(updates)
+
+ self.external_process_last_updated_ms[process_id] = self.clock.time_msec()
+
+ @defer.inlineCallbacks
+ def update_external_syncs_clear(self, process_id):
+ """Marks all users that had been marked as syncing by a given process
+ as offline.
+
+ Used when the process has stopped/disappeared.
+ """
+ with (yield self.external_sync_linearizer.queue(process_id)):
+ process_presence = self.external_process_to_current_syncs.pop(
+ process_id, set()
+ )
+ prev_states = yield self.current_state_for_users(process_presence)
+ time_now_ms = self.clock.time_msec()
+
+ yield self._update_states([
+ prev_state.copy_and_replace(
+ last_user_sync_ts=time_now_ms,
+ )
+ for prev_state in prev_states.itervalues()
+ ])
+ self.external_process_last_updated_ms.pop(process_id, None)
+
+ @defer.inlineCallbacks
def current_state_for_user(self, user_id):
"""Get the current presence state for a user.
"""
@@ -527,14 +592,14 @@ class PresenceHandler(object):
for user_id in user_ids
}
- missing = [user_id for user_id, state in states.items() if not state]
+ missing = [user_id for user_id, state in states.iteritems() if not state]
if missing:
# There are things not in our in memory cache. Lets pull them out of
# the database.
res = yield self.store.get_presence_for_users(missing)
states.update(res)
- missing = [user_id for user_id, state in states.items() if not state]
+ missing = [user_id for user_id, state in states.iteritems() if not state]
if missing:
new = {
user_id: UserPresenceState.default(user_id)
@@ -546,88 +611,39 @@ class PresenceHandler(object):
defer.returnValue(states)
@defer.inlineCallbacks
- def _get_interested_parties(self, states, calculate_remote_hosts=True):
- """Given a list of states return which entities (rooms, users, servers)
- are interested in the given states.
-
- Returns:
- 3-tuple: `(room_ids_to_states, users_to_states, hosts_to_states)`,
- with each item being a dict of `entity_name` -> `[UserPresenceState]`
- """
- room_ids_to_states = {}
- users_to_states = {}
- for state in states:
- room_ids = yield self.store.get_rooms_for_user(state.user_id)
- for room_id in room_ids:
- room_ids_to_states.setdefault(room_id, []).append(state)
-
- plist = yield self.store.get_presence_list_observers_accepted(state.user_id)
- for u in plist:
- users_to_states.setdefault(u, []).append(state)
-
- # Always notify self
- users_to_states.setdefault(state.user_id, []).append(state)
-
- hosts_to_states = {}
- if calculate_remote_hosts:
- for room_id, states in room_ids_to_states.items():
- local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
- if not local_states:
- continue
-
- hosts = yield self.store.get_hosts_in_room(room_id)
-
- for host in hosts:
- hosts_to_states.setdefault(host, []).extend(local_states)
-
- for user_id, states in users_to_states.items():
- local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
- if not local_states:
- continue
-
- host = get_domain_from_id(user_id)
- hosts_to_states.setdefault(host, []).extend(local_states)
-
- # TODO: de-dup hosts_to_states, as a single host might have multiple
- # of same presence
-
- defer.returnValue((room_ids_to_states, users_to_states, hosts_to_states))
-
- @defer.inlineCallbacks
def _persist_and_notify(self, states):
"""Persist states in the database, poke the notifier and send to
interested remote servers
"""
stream_id, max_token = yield self.store.update_presence(states)
- parties = yield self._get_interested_parties(states)
- room_ids_to_states, users_to_states, hosts_to_states = parties
+ parties = yield get_interested_parties(self.store, states)
+ room_ids_to_states, users_to_states = parties
self.notifier.on_new_event(
"presence_key", stream_id, rooms=room_ids_to_states.keys(),
- users=[UserID.from_string(u) for u in users_to_states.keys()]
+ users=[UserID.from_string(u) for u in users_to_states]
)
- self._push_to_remotes(hosts_to_states)
+ self._push_to_remotes(states)
@defer.inlineCallbacks
def notify_for_states(self, state, stream_id):
- parties = yield self._get_interested_parties([state])
- room_ids_to_states, users_to_states, hosts_to_states = parties
+ parties = yield get_interested_parties(self.store, [state])
+ room_ids_to_states, users_to_states = parties
self.notifier.on_new_event(
"presence_key", stream_id, rooms=room_ids_to_states.keys(),
- users=[UserID.from_string(u) for u in users_to_states.keys()]
+ users=[UserID.from_string(u) for u in users_to_states]
)
- def _push_to_remotes(self, hosts_to_states):
+ def _push_to_remotes(self, states):
"""Sends state updates to remote servers.
Args:
- hosts_to_states (dict): Mapping `server_name` -> `[UserPresenceState]`
+ states (list(UserPresenceState))
"""
- for host, states in hosts_to_states.items():
- self.federation.send_presence(host, states)
+ self.federation.send_presence(states)
@defer.inlineCallbacks
def incoming_presence(self, origin, content):
@@ -764,18 +780,17 @@ class PresenceHandler(object):
# don't need to send to local clients here, as that is done as part
# of the event stream/sync.
# TODO: Only send to servers not already in the room.
- user_ids = yield self.store.get_users_in_room(room_id)
if self.is_mine(user):
state = yield self.current_state_for_user(user.to_string())
- hosts = set(get_domain_from_id(u) for u in user_ids)
- self._push_to_remotes({host: (state,) for host in hosts})
+ self._push_to_remotes([state])
else:
+ user_ids = yield self.store.get_users_in_room(room_id)
user_ids = filter(self.is_mine_id, user_ids)
states = yield self.current_state_for_users(user_ids)
- self._push_to_remotes({user.domain: states.values()})
+ self._push_to_remotes(states.values())
@defer.inlineCallbacks
def get_presence_list(self, observer_user, accepted=None):
@@ -1275,3 +1290,66 @@ def handle_update(prev_state, new_state, is_mine, wheel_timer, now):
persist_and_notify = True
return new_state, persist_and_notify, federation_ping
+
+
+@defer.inlineCallbacks
+def get_interested_parties(store, states):
+ """Given a list of states return which entities (rooms, users)
+ are interested in the given states.
+
+ Args:
+ states (list(UserPresenceState))
+
+ Returns:
+ 2-tuple: `(room_ids_to_states, users_to_states)`,
+ with each item being a dict of `entity_name` -> `[UserPresenceState]`
+ """
+ room_ids_to_states = {}
+ users_to_states = {}
+ for state in states:
+ room_ids = yield store.get_rooms_for_user(state.user_id)
+ for room_id in room_ids:
+ room_ids_to_states.setdefault(room_id, []).append(state)
+
+ plist = yield store.get_presence_list_observers_accepted(state.user_id)
+ for u in plist:
+ users_to_states.setdefault(u, []).append(state)
+
+ # Always notify self
+ users_to_states.setdefault(state.user_id, []).append(state)
+
+ defer.returnValue((room_ids_to_states, users_to_states))
+
+
+@defer.inlineCallbacks
+def get_interested_remotes(store, states, state_handler):
+ """Given a list of presence states figure out which remote servers
+ should be sent which.
+
+ All the presence states should be for local users only.
+
+ Args:
+ store (DataStore)
+ states (list(UserPresenceState))
+
+ Returns:
+ Deferred list of ([destinations], [UserPresenceState]), where for
+ each row the list of UserPresenceState should be sent to each
+ destination
+ """
+ hosts_and_states = []
+
+ # First we look up the rooms each user is in (as well as any explicit
+ # subscriptions), then for each distinct room we look up the remote
+ # hosts in those rooms.
+ room_ids_to_states, users_to_states = yield get_interested_parties(store, states)
+
+ for room_id, states in room_ids_to_states.iteritems():
+ hosts = yield state_handler.get_current_hosts_in_room(room_id)
+ hosts_and_states.append((hosts, states))
+
+ for user_id, states in users_to_states.iteritems():
+ host = get_domain_from_id(user_id)
+ hosts_and_states.append(([host], states))
+
+ defer.returnValue(hosts_and_states)
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 9bf638f818..7abee98dea 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -156,7 +156,7 @@ class ProfileHandler(BaseHandler):
if not self.hs.is_mine(user):
return
- self.ratelimit(requester)
+ yield self.ratelimit(requester)
room_ids = yield self.store.get_rooms_for_user(
user.to_string(),
diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py
new file mode 100644
index 0000000000..b5b0303d54
--- /dev/null
+++ b/synapse/handlers/read_marker.py
@@ -0,0 +1,64 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 Vector Creations Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import BaseHandler
+
+from twisted.internet import defer
+
+from synapse.util.async import Linearizer
+
+import logging
+logger = logging.getLogger(__name__)
+
+
+class ReadMarkerHandler(BaseHandler):
+ def __init__(self, hs):
+ super(ReadMarkerHandler, self).__init__(hs)
+ self.server_name = hs.config.server_name
+ self.store = hs.get_datastore()
+ self.read_marker_linearizer = Linearizer(name="read_marker")
+ self.notifier = hs.get_notifier()
+
+ @defer.inlineCallbacks
+ def received_client_read_marker(self, room_id, user_id, event_id):
+ """Updates the read marker for a given user in a given room if the event ID given
+ is ahead in the stream relative to the current read marker.
+
+ This uses a notifier to indicate that account data should be sent down /sync if
+ the read marker has changed.
+ """
+
+ with (yield self.read_marker_linearizer.queue((room_id, user_id))):
+ account_data = yield self.store.get_account_data_for_room(user_id, room_id)
+
+ existing_read_marker = account_data.get("m.fully_read", None)
+
+ should_update = True
+
+ if existing_read_marker:
+ # Only update if the new marker is ahead in the stream
+ should_update = yield self.store.is_event_after(
+ event_id,
+ existing_read_marker['event_id']
+ )
+
+ if should_update:
+ content = {
+ "event_id": event_id
+ }
+ max_id = yield self.store.add_account_data_to_room(
+ user_id, room_id, "m.fully_read", content
+ )
+ self.notifier.on_new_event("account_data_key", max_id, users=[user_id])
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 03c6a85fc6..ee3a2269a8 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -54,6 +54,13 @@ class RegistrationHandler(BaseHandler):
Codes.INVALID_USERNAME
)
+ if not localpart:
+ raise SynapseError(
+ 400,
+ "User ID cannot be empty",
+ Codes.INVALID_USERNAME
+ )
+
if localpart[0] == '_':
raise SynapseError(
400,
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 99cb7db0db..d2a0d6520a 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -75,7 +75,7 @@ class RoomCreationHandler(BaseHandler):
"""
user_id = requester.user.to_string()
- self.ratelimit(requester)
+ yield self.ratelimit(requester)
if "room_alias_name" in config:
for wchar in string.whitespace:
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 2052d6d05f..1ca88517a2 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -70,6 +70,7 @@ class RoomMemberHandler(BaseHandler):
content["kind"] = "guest"
event, context = yield msg_handler.create_event(
+ requester,
{
"type": EventTypes.Member,
"content": content,
@@ -139,13 +140,6 @@ class RoomMemberHandler(BaseHandler):
)
yield user_joined_room(self.distributor, user, room_id)
- def reject_remote_invite(self, user_id, room_id, remote_room_hosts):
- return self.hs.get_handlers().federation_handler.do_remotely_reject_invite(
- remote_room_hosts,
- room_id,
- user_id
- )
-
@defer.inlineCallbacks
def update_membership(
self,
@@ -286,13 +280,21 @@ class RoomMemberHandler(BaseHandler):
else:
# send the rejection to the inviter's HS.
remote_room_hosts = remote_room_hosts + [inviter.domain]
-
+ fed_handler = self.hs.get_handlers().federation_handler
try:
- ret = yield self.reject_remote_invite(
- target.to_string(), room_id, remote_room_hosts
+ ret = yield fed_handler.do_remotely_reject_invite(
+ remote_room_hosts,
+ room_id,
+ target.to_string(),
)
defer.returnValue(ret)
- except SynapseError as e:
+ except Exception as e:
+ # if we were unable to reject the exception, just mark
+ # it as rejected on our end and plough ahead.
+ #
+ # The 'except' clause is very broad, but we need to
+ # capture everything from DNS failures upwards
+ #
logger.warn("Failed to reject invite: %s", e)
yield self.store.locally_reject_invite(
@@ -737,10 +739,11 @@ class RoomMemberHandler(BaseHandler):
if len(current_state_ids) == 1 and create_event_id:
defer.returnValue(self.hs.is_mine_id(create_event_id))
- for (etype, state_key), event_id in current_state_ids.items():
+ for etype, state_key in current_state_ids:
if etype != EventTypes.Member or not self.hs.is_mine_id(state_key):
continue
+ event_id = current_state_ids[(etype, state_key)]
event = yield self.store.get_event(event_id, allow_none=True)
if not event:
continue
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 0eea7f8f9c..3b7818af5c 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -24,7 +24,6 @@ from synapse.types import UserID, get_domain_from_id
import logging
from collections import namedtuple
-import ujson as json
logger = logging.getLogger(__name__)
@@ -288,11 +287,13 @@ class TypingHandler(object):
for room_id, serial in self._room_serials.items():
if last_id < serial and serial <= current_id:
typing = self._room_typing[room_id]
- typing_bytes = json.dumps(list(typing), ensure_ascii=False)
- rows.append((serial, room_id, typing_bytes))
+ rows.append((serial, room_id, list(typing)))
rows.sort()
return rows
+ def get_current_token(self):
+ return self._latest_room_serial
+
class TypingNotificationEventSource(object):
def __init__(self, hs):
|